1. 서론

 

 

이번 포스팅은 분산 트랜잭션 제어를 위한 Saga 패턴을 구현하겠습니다. 보상 트랜잭션까지 같이 구현하면 내용이 복잡하므로 보상 트랜잭션 및 Deadline 기능은 다음 포스팅에서 다루겠습니다.

 

Saga 패턴 사용을 위한 가상의 시나리오는 다음과 같습니다.

 

테스트 시나리오

 

1. Jeju 모듈에 계좌를 개설한다.(계좌명 : test, 잔고 : 100)

2. Command 모듈에 계정 & 계좌를 개설한다.

3. Jeju 모듈의 계좌에서 30원을 Command 모듈 계좌로 이체한다.

4. Jeju 모델 DB의 계좌와 Query 모델 DB를 통해서 이체금액을 확인한다.

 

이전 포스팅에서 구현한 Jeju 모듈을 활용하여 코드 구현을 진행하겠습니다. Saga 요청을 위한 API 및 Saga 인스턴스는 Command 모듈에 생성하도록 하겠습니다.


2. Common 모듈 구현

 

1. 공통으로 사용할 Command 및 Event 추가를 위해 Common 모듈 Command 패키지 및 Command 클래스를 생성합니다.

 


 

2. 생성한 Command 클래스를 구현합니다.

 

TransferComamndFactory.java

@RequiredArgsConstructor
public class TransferComamndFactory {
    private final AbstractTransferCommand transferCommand;

    public void create(String srcAccountID, String dstAccountID, Long amount, String transferID){
        transferCommand.create(srcAccountID, dstAccountID, amount, transferID);
    }

    public AbstractTransferCommand getTransferCommand(){
        return this.transferCommand;
    }
}

 

위 클래스는 계좌 이체와 연관된 Command를 생성하는 Factory 클래스입니다. 나중에 보상 트랜잭션을 위한 Command를 같이 관리하기 위하여 생성하였습니다.

 

AbstractTransferCommand.java

@ToString
@Getter
public abstract class AbstractTransferCommand {
    @TargetAggregateIdentifier
    protected String srcAccountID;
    protected String dstAccountID;
    protected Long amount;
    protected String transferID;

    public AbstractTransferCommand create(String srcAccountID, String dstAccountID, Long amount, String transferID) {
        this.srcAccountID = srcAccountID;
        this.dstAccountID = dstAccountID;
        this.transferID = transferID;
        this.amount = amount;
        return this;
    }
}

 

계좌 이체 요청을 위한 클래스입니다. srcAccountID는 인출할 계좌 ID이며, dstAccountID는 송금 대상 계좌 ID입니다.

 

JejuBankTransferCommand.java

public class JejuBankTransferCommand extends AbstractTransferCommand {
    @Override
    public String toString() {
        return "JejuBankTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

SeoulBankTransferCommand.java

public class SeoulBankTransferCommand extends AbstractTransferCommand {
    @Override
    public String toString() {
        return "SeoulBankTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

 


3. Common 모듈 event 패키지 하위에 Event 클래스를 생성합니다.

 

 


4. Event 클래스를 구현합니다.

 

MoneyTransferEvent.java

@Builder
@ToString
@Getter
public class MoneyTransferEvent {
    private String dstAccountID;
    private String srcAccountID;
    private Long amount;
    private String transferID;
    private TransferComamndFactory comamndFactory;
}

 

위 클래스는 Command 모듈로 계좌이체 요청이 들어오면, 인출할 대상 계좌번호(srcAccountID) 및 입금 계좌번호(dstAccountID), 그리고 이체 금액(amount)와 같은 기본정보와 트랜잭션간 고유 키(transferID) 및 요청 Command 구분 정보를 담고있습니다.

 

TransferApprovedEvent.java

@Builder
@ToString
@Getter
public class TransferApprovedEvent {
    private String srcAccountID;
    private String dstAccountID;
    private String transferID;
    private Long amount;
}

 

위 클래스는 계좌이체가 성공되었을 때, 금액을 반영하기 위한 Event입니다.

 

TransferDeniedEvent.java

@Getter
@Builder
@ToString
public class TransferDeniedEvent {
    private String srcAccountID;
    private String dstAccountID;
    private String transferID;
    private Long amount;
    private String description;
}

 

위 클래스는 계좌이체가 거절될 때, 요청 App에 거절 내역을 전달하기 위한 Event입니다.


3. Jeju 모듈 구현

 

1. Jeju 은행 모듈 build.gradle 파일을 엽니다.

 


2. Jeju 모듈에 State-Stored-Aggregate 구현을 위해 JPA 및 DB 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
dependencies{
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
    implementation group: 'org.postgresql', name: 'postgresql', version: '42.2.6'
}

3. Jeju 모듈 resources 패키지 하위 application.yml 파일을 엽니다.

 


4. datasource 속성을 추가합니다.

 

application.yml

server:
  port: 9091

spring:
  application:
    name: eventsourcing-cqrs-jejuBank
  datasource:
    platform: postgres
    url: jdbc:postgresql://localhost:5432/jeju
    username: jeju
    password: jeju
  jpa:
    hibernate:
      ddl-auto: update

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

logging.level.com.cqrs.jeju : debug

 

DB는 Command와 Query와 동일하게 Postgre를 사용하였으며, jeju 계정을 별도로 생성하였습니다.


5. 계좌 생성을 위해 Jeju 모듈 하위에 다음과 같은 패키지들을 생성합니다.

 


6. aggregate 패키지 내 aggregate 클래스부터 구현하겠습니다.

 

Account.java


@Entity
@Aggregate
@NoArgsConstructor
@AllArgsConstructor
@Slf4j
public class Account {
    @AggregateIdentifier
    @Id
    private String accountID;
    private Long balance;
    
    @CommandHandler
    public Account(AccountCreationCommand command) throws IllegalAccessException {
        log.debug("handling {}", command);
        if (command.getBalance() <= 0)
            throw new IllegalAccessException("유효하지 않은 입력입니다.");
        apply(new AccountCreationEvent(command.getAccountID(), command.getBalance()));
    }

    @EventSourcingHandler
    protected void on(AccountCreationEvent event) {
        log.debug("event {}", event);
        this.accountID = event.getAccountID();
        this.balance = event.getBalance();
    }

    @CommandHandler
    protected void on(JejuBankTransferCommand command) throws InterruptedException {

        log.debug("handling {}", command);
        if (this.balance < command.getAmount()) {
            apply(TransferDeniedEvent.builder()
                                        .srcAccountID(command.getSrcAccountID())
                                        .dstAccountID(command.getDstAccountID())
                                        .amount(command.getAmount())
                                        .description("잔고가 부족합니다.")
                                        .transferID(command.getTransferID())
                                     .build());
        } else {
            apply(TransferApprovedEvent.builder()
                                            .srcAccountID(command.getSrcAccountID())
                                            .dstAccountID(command.getDstAccountID())
                                            .transferID(command.getTransferID())
                                            .amount(command.getAmount())
                                        .build());
        }
    }

    @EventSourcingHandler
    protected void on(TransferApprovedEvent event) {
        log.debug("event {}", event);
        this.balance -= event.getAmount();
    }
}

 

트랜잭션 테스트 도중 Account 정보를 DB에서 바로 확인하기 위해 State-Stored Aggregate 형태로 구현하였습니다. 위 코드 내용은 Command 어플리케이션 구현파트에서 다룬 내용이 주이기에 별도 설명은 생략하겠습니다.


7. command 패키지내 Command 클래스 생성 및 이를 구현합니다.

 

AccountCreationCommand.java

@NoArgsConstructor
@AllArgsConstructor
@ToString
@Getter
public class AccountCreationCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private Long balance;
}

8. dto 패키지내 dto 클래스 생성 및 이를 구현합니다.

 

AccountDTO.java

@NoArgsConstructor
@AllArgsConstructor
@ToString
@Getter
public class AccountDTO {
    private String accountID;
    private Long balance;
}

9. event 패키지내 event 클래스를 생성 및 이를 구현합니다.

 

AccountCreationEvent.java

@ToString
@RequiredArgsConstructor
@Getter
public class AccountCreationEvent {
    private final String accountID;
    private final Long balance;
}

해당 이벤트는 Jeju 모듈내에서만 유효하기 때문에 공통 모듈에 생성하지 않았습니다.


10. service 패키지내 service 클래스 생성 및 이를 구현합니다.

 

AccountService.java

public interface AccountService {
    String createAccount(AccountDTO accountDTO);
}

 

AccountServiceImpl.java

@Service
@RequiredArgsConstructor
public class AccountServiceImpl implements AccountService {
    private final CommandGateway commandGateway;

    @Override
    public String createAccount(AccountDTO accountDTO) {
        return commandGateway.sendAndWait(new AccountCreationCommand(accountDTO.getAccountID(), accountDTO.getBalance()));
    }
}

11. controller 패키지내 controller 클래스 생성 및 이를 구현합니다.

 

AccountController.java

@RestController
@RequiredArgsConstructor
public class AccountController {
    private final AccountService accountService;

    @PostMapping("/account")
    public ResponseEntity<String> createAccount(@RequestBody AccountDTO accountDTO){
        return ResponseEntity.ok().body(accountService.createAccount(accountDTO));
    }
}

4. Command 모듈 구현

 

1. Command 구현 포스팅 8번 항목에서 AxonServerCommandBus를 SimpleCommandBus로 대체했습니다. Saga 트랜잭션에서는 다른 모듈에 대하여 Command 요청이 필요하므로 기존에 SImpleCommandBus 설정을 해제해야합니다. 이를 위해 config 패키지내에 있는 AxonConfig 클래스를 엽니다.

 


2. AxonConfig 파일에서 commandBus Bean 로직을 주석처리하거나 삭제합니다.

 

AxonConfig.java

@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
	(...중략...)
//    @Bean
//    SimpleCommandBus commandBus(TransactionManager transactionManager){
//        return  SimpleCommandBus.builder().transactionManager(transactionManager).build();
//    }
	(...중략...)
}

3. 계좌이체 요청 및 반영을 위한 Command 요청을 위해 command 패키지 내 command 클래스를 생성합니다.

 


4. Command 클래스 내용을 구현합니다.

 

TransferApprovedCommand.java

@ToString
@Getter
@Builder
public class TransferApprovedCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private Long amount;
    private String transferID;
}

위 클래스는 계좌이체가 정상적으로 수행되었을 때, 요청을 위한 Command 클래스입니다.

 

MoneyTransferCommand.java

@Builder
@ToString
@Getter
public class MoneyTransferCommand {
    private String srcAccountID;
    @TargetAggregateIdentifier
    private String dstAccountID;
    private Long amount;
    private String transferID;
    private BankType bankType;

    public enum BankType{
        JEJU(command -> new TransferComamndFactory(new JejuBankTransferCommand()),
        SEOUL(command -> new TransferComamndFactory(new SeoulBankTransferCommand());

        private Function<MoneyTransferCommand, TransferComamndFactory> expression;
        BankType(Function<MoneyTransferCommand, TransferComamndFactory> expression){ this.expression = expression;}
        public TransferComamndFactory getCommandFactory(MoneyTransferCommand command){
            TransferComamndFactory factory = this.expression.apply(command);
            factory.create(command.getSrcAccountID(), command.getDstAccountID(), command.amount, command.getTransferID());
            return factory;
        }

    }

    public static MoneyTransferCommand of(TransferDTO dto){
        return MoneyTransferCommand.builder()
                .srcAccountID(dto.getSrcAccountID())
                .dstAccountID(dto.getDstAccountID())
                .amount(dto.getAmount())
                .bankType(dto.getBankType())
                .transferID(UUID.randomUUID().toString())
                .build();
    }
}

 

계좌 이체 요청시, 은행구분에 따른 Command 생성을 달리 하기 위하여 enum을 사용했습니다. 또한, DTO 클래스를 Command 클래스로 변환하기 위한 Factory 메소드를 추가하였습니다.


5. DTO 추가를 위해 dto 패키지 내 dto 클래스를 생성합니다.

 

 

TransferDTO.java

@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class TransferDTO {
    private String srcAccountID;
    private String dstAccountID;
    private Long amount;
    private MoneyTransferCommand.BankType bankType;
}

6. Service 메소드 추가를 위해서 service 패키지 하위 service 클래스를 수정합니다.

 

 

TransactionService.java

public interface TransactionService {
    (...중략...)
    String transferMoney(TransferDTO transferDTO);
}

 

TransactionServiceImpl.java

@Service
@RequiredArgsConstructor
public class TransactionServiceImpl implements TransactionService {
    private final CommandGateway commandGateway;
	(...중략...)

    @Override
    public String transferMoney(TransferDTO transferDTO) {
        return commandGateway.sendAndWait(MoneyTransferCommand.of(transferDTO));
    }
}

7. API 추가를 위해서 controller 패키지내 Controller에 메소드를 추가합니다.

 

 

TransactionController.java

@RestController
@RequiredArgsConstructor
public class TransactionController {
    private final TransactionService transactionService;

    (...중략...)

    @PostMapping("/transfer")
    public ResponseEntity<String> transfer(@RequestBody TransferDTO transferDTO){
        return ResponseEntity.ok().body(transactionService.transferMoney(transferDTO));
    }
}

8. Aggregate 수정을 위해 aggregate 패키지내 AccountAggregate 클래스를 엽니다.

 


9. Aggregate에 Command 및 EventSourcing 핸들러 로직을 추가합니다.

 

AccountAggregate.java

@NoArgsConstructor
@AllArgsConstructor
@Slf4j
@Aggregate
@EqualsAndHashCode
public class AccountAggregate {
    (...중략...)

    @CommandHandler
    protected void transferMoney(MoneyTransferCommand command) {
        log.debug("handling {}", command);
        apply(MoneyTransferEvent.builder()
                .srcAccountID(command.getSrcAccountID())
                .dstAccountID(command.getDstAccountID())
                .amount(command.getAmount())
                .comamndFactory(command.getBankType().getCommandFactory(command))
                .transferID(command.getTransferID())
                .build());
    }

    @CommandHandler
    protected void transferMoney(TransferApprovedCommand command) {
        log.debug("handling {}", command);
        apply(new DepositMoneyEvent(this.holderID, command.getAccountID(), command.getAmount()));
        apply(new DepositCompletedEvent(command.getAccountID(), command.getTransferID()));
    }
}

10. Saga 패키지 및 Saga 클래스를 생성합니다.

 


11. Saga 클래스를 구현합니다.

 

TransferManager.java

@Saga
@Slf4j
public class TransferManager {
    @Autowired
    private transient CommandGateway commandGateway;
    private TransferComamndFactory comamndFactory;

    @StartSaga
    @SagaEventHandler(associationProperty = "transferID")
    protected void on(MoneyTransferEvent event) {
        log.debug("Created saga instance");
        log.debug("event : {}", event);
        comamndFactory = event.getComamndFactory();
        SagaLifecycle.associateWith("srcAccountID", event.getSrcAccountID());

		log.info("계좌 이체 시작 : {} ", event);
		commandGateway.send(comamndFactory.getTransferCommand());
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferApprovedEvent event) {
		log.info("이체 금액 {} 계좌 반영 요청 : {}", event.getAmount(), event);
        SagaLifecycle.associateWith("accountID", event.getDstAccountID());
        commandGateway.send(TransferApprovedCommand.builder()
                .accountID(event.getDstAccountID())
                .amount(event.getAmount())
                .transferID(event.getTransferID())
                .build());
    }
    
    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferDeniedEvent event) {
        log.info("계좌 이체 실패 : {}", event);
        log.info("실패 사유 : {}", event.getDescription());
		SagaLifecycle.end();
    }

    @SagaEventHandler(associationProperty = "accountID")
    @EndSaga
    protected void on(DepositCompletedEvent event){
        log.info("계좌 이체 성공 : {}", event);
    }
}

 

구현 내용을 간략하게 소개하면 다음과 같습니다. 먼저 @Saga 어노테이션을 통해 해당 클래스가 Saga의 대상임을 지정합니다. 해당 클래스는 NoArgsConstructor로 생성이 되어야하므로 이를 주의합니다.

 

@StartSaga는 Saga 인스턴스의 시작점입니다. 여기에서 associationProperty는 해당 인스턴스를 유일하게 구별할 수 있는 속성이 무엇인지 지정합니다. 자세한 내용은 Axon 공식문서를 참고바랍니다.

 

생성된 Saga는 트랜잭션이 끝나면 종료되어야합니다. 위 예제에서는 DepositCompletedEvent 메시지를 수신받으면, 전체 트랜잭션이 끝난것으로 판단하여 Saga 인스턴스를 종료하도록 되어있습니다. Saga 인스턴스 종료방법은 크게 두가지입니다. 먼저 위 에제와 같이 명시적으로 end 메소드를 기입하거나 @EndSaga 어노테이션 지정할 수 있습니다.


5. 테스트

 

1. Command, Query, Jeju 모듈 App을 기동한 다음, Jeju 은행 계좌를 개설합니다.

POST localhost:9091/account
Content-Type: application/json

{
	"accountID" : "test",
	"balance" : 100
}

2. 생성된 계좌 내역을 DB에서 확인합니다.

 


3. Command 모듈에 계정을 생성합니다.

 

POST http://localhost:8080/holder
Content-Type: application/json

{
	"holderName" : "Kevin",
	"tel" : "02-2645-5678",
	"address" : "OO시 OO구",
    "company" : "Korea"
}

4. Command 모듈에 계좌를 개설합니다.

 

POST http://localhost:8080/account
Content-Type: application/json

{
  "holderID" : "b01fae84-e8a5-427d-a5f4-baa7376b7163"
}

5. Query 모듈 DB에서 계정 및 계좌 생성 내역을 확인합니다.

 


6. Jeju 은행 계좌에서 Command 모듈에 개설된 계좌로 30원 이체합니다.

 

POST http://localhost:8080/transfer
Content-Type: application/json
{
	"srcAccountID" : "test",
	"dstAccountID" : "a31faade-5b57-4435-85da-1de0ed1c55c4",
	"amount" : 30,
	"bankType" : "JEJU"
}

 

AccountAggregate   : handling MoneyTransferCommand(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, bankType=JEJU)
TransferManager    : Created saga instance
TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@5a65bed)
TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@5a65bed) 
TransferManager    : 이체 금액 30 계좌 반영 요청 : TransferApprovedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, amount=30)
AccountAggregate   : handling TransferApprovedCommand(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b)
AccountAggregate   : applying DepositMoneyEvent(holderID=b01fae84-e8a5-427d-a5f4-baa7376b7163, accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30)
AccountAggregate   : balance 30
TransferManager    : 계좌 이체 성공 : DepositCompletedEvent(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=3a6583e3-0288-4e07-87f0-c818644e009b)

성공적으로 계좌이체가 완료되었으면 위와 비슷한 로그가 출력될 것입니다.


7. DB에서 계좌 상태를 확인합니다.

 


6. 마치며

 

 

serialized_sga 내용

<com.cqrs.command.saga.TransferManager>
    <comamndFactory>
        <transferCommand class="com.cqrs.command.transfer.JejuBankTransferCommand">
        <srcAccountID>test</srcAccountID>
        <dstAccountID>a31faade-5b57-4435-85da-1de0ed1c55c4</dstAccountID>
        <amount>30</amount>
        <transferID>test</transferID>
        </transferCommand>
        </compensationAbortCommand>
    </comamndFactory>
</com.cqrs.command.saga.TransferManager>

 

이번 포스팅에서는 Jeju, Command 모듈간에 일관성을 위하여 Saga 패턴을 사용했습니다. Command 모듈에서 MoneyTransferEvent Event가 발행되면, Saga 인스턴스가 생성됩니다. 이후 Application의 비즈니스 로직에 따라 모든 트랜잭션이 종료될 때까지 유지됩니다. Saga 인스턴스 정보는 동시에 SAGA_ENTRY 테이블에 적재되며, Saga 인스턴스가 종료되면 해당 데이터 또한 사라집니다.

 

다음 포스팅에서는 Deadline 기능, 보상 트랜잭션 로직이 적용된 Saga 패턴을 구현하겠습니다.

+ Recent posts