1. 서론

 

이번 포스팅은 AxonFramework 관련 마지막 포스팅입니다. Saga 패턴 보상 트랜잭션 구현을 다루겠습니다.


2. Deadline

 

MSA 환경에서는 App이 여러개로 분산되어있으므로 하나의 App이 느려지거나 장애가 발생하면, 장애가 발생한 App을 호출하는 App에도 장애가 전파됩니다.

 

Axon에서 제공하는 Saga 패턴을 사용하면, 요청마다 Saga 인스턴스가 생성됩니다. 따라서 연관된 App의 장애로 인해 전체 트랜잭션에 Hang이 걸리게되면, 요청한 App 또한 안전하지 못합니다.

 

따라서 이를 완화하기 위해 Axon에서는 Deadline 기능을 제공합니다. Deadline은 App에서 지정한 시간동안 반응이 없으면, 이를 처리할 메소드를 Callback 형식으로 등록할 수 있는 기능입니다. 자세한 내용은 공식 문서를 참고 바라며, 데모 프로젝트에서는 CommandGateway 클래스에서 기본적으로 제공하는 sendAndWait 메소드를 통해서 일정 시간동안 응답이 없으면, 보상 트랜잭션을 발동하도록 구현하겠습니다.


 

1. Command 모듈 Saga 패키지내 있는 Saga 클래스를 엽니다.

 


2. Saga 클래스 코드를 수정합니다.

 

TransferManager.java

@Saga
@Slf4j
public class TransferManager {
    (...중략...)
    
    @StartSaga
    @SagaEventHandler(associationProperty = "transferID")
    protected void on(MoneyTransferEvent event) {
       (...중략...)
        try {
            log.info("계좌 이체 시작 : {} ", event);
            commandGateway.sendAndWait(comamndFactory.getTransferCommand(), 10, TimeUnit.SECONDS);
        } catch (CommandExecutionException e) {
            log.error("Failed transfer process. Start cancel transaction");
            //보상 트랜잭션 구현 로직
        }
    }
    (...중략...)

}

 

sendAndWait 두 번째, 세 번째 인자를 통해 TimeOut을 지정하며, 해당 기간동안 응답이 없을 경우 Exception을 통해 보상 트랜잭션을 발동할 수 있습니다.


3. 트랜잭션 프로세스 설계

 

일반적인 상황

 

 

일반적으로 계좌 이체 요청을하면, 해당 은행에서 보유한 잔고보다 요청액수가 클 경우에는 이체 거절을하며, 반대의 경우에는 이체 승인을 합니다. 따라서 요청자인 Command 모듈에서는 Jeju 은행의 승인 혹은 거절 이벤트 발생 여부에 따라서 결과를 처리하면 됩니다.


 

보상 트랜잭션 발동 상황

 

 

 

보상 트랜잭션은 연결된 App 사이에 트랜잭션 문제가 발생하였을 때, 이미 처리된 데이터를 원상복구를 위하여 추가적인 트랜잭션을 발동하는 것입니다. 

 

예제에서는 Timeout이 발생하게 되면, Jeju 은행 App의 응답과 관계없이 트랜잭션 Rollback을 위하여 보상 트랜잭션을 요청하고, Command 모듈에서는 다음 로직을 수행합니다.

 

 

만약 위 그림과 같이 만약 Jeju 은행에 발생한 장애로 인하여 계좌 요청을 진행하였지만 응답이 오지 않는 상황이라고 가정해봅시다. Command 모듈에서는 장애 방지를 위해 Timeout을 설정했기 때문에 일정 시간이 지나면 보상 트랜잭션을 발동할 것입니다.

 

 

이후 Jeju 은행 App이 정상화된다면 이전 요청을 수행할 것입니다. 그 결과 요청이 적절하지 않으면, 이체 거절 이벤트를 발송합니다. 만약 이체 거절 상황에서 요청받은 보상 트랜잭션을 처리한다면, 잔고는 그대로인 상황에서 보상 트랜잭션에 의해 잔고가 늘어나는 기현상이 발생합니다.

 

이를 해결하기 위해 다양한 방법이 있겠지만, 예제 프로젝트에서는 Timeout이 발생된 상황에서 이체 거절 메시지를 받게되면, 보상 트랜잭션 취소 요청하여 정상 처리하겠습니다.

 

 

위 그림은 Command 모듈에서 트랜잭션 요청 이후 처리해야할 과정을 개략적으로 순서도로 나타냈습니다.


4. Common 모듈 구현

 

1. Common 모듈 command 패키지내 command 클래스를 추가합니다.

 


2. command 클래스를 구현합니다.

 

AbstractCancelTransferCommand.java

@ToString
@NoArgsConstructor
@AllArgsConstructor
@Getter
public abstract class AbstractCancelTransferCommand {
    @TargetAggregateIdentifier
    protected String srcAccountID;
    protected String dstAccountID;
    protected Long amount;
    protected String transferID;

    public AbortTransferCommand create(String srcAccountID, String dstAccountID, Long amount, String transferID) {
        this.srcAccountID = srcAccountID;
        this.dstAccountID = dstAccountID;
        this.transferID = srcAccountID;
        this.amount = amount;
        return this;
    }
}

 

JejuBankCancelTransferCommand.java

public class JejuBankCancelTransferCommand extends AbstractCancelTransferCommand {
    @Override
    public String toString() {
        return "JejuBankCancelTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

SeoulBankCancelTransferCommand.java

public class SeoulBankCancelTransferCommand extends AbstractCancelTransferCommand {
    @Override
    public String toString() {
        return "SeoulBankCancelTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

AbstractCompensationCancelCommand.java

@ToString
@NoArgsConstructor
@AllArgsConstructor
@Getter
public abstract class AbstractCompensationCancelCommand {
    @TargetAggregateIdentifier
    protected String srcAccountID;
    protected String dstAccountID;
    protected Long amount;
    protected String transferID;
    
    public AbstractCompensationCancelCommand create(String srcAccountID, String dstAccountID, Long amount, String transferID) {
        this.srcAccountID = srcAccountID;
        this.dstAccountID = dstAccountID;
        this.transferID = srcAccountID;
        this.amount = amount;
        return this;
    }
}

 

JejuBankCompensationCancelCommand.java

public class JejuBankCompensationCancelCommand extends AbstractCompensationCancelCommand {
    @Override
    public String toString() {
        return "JejuBankCompensationCancelCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

SeoulBankCompensationCancelCommand.java

public class SeoulBankCompensationCancelCommand extends AbstractCompensationCancelCommand {
    @Override
    public String toString() {
        return "SeoulBankCompensationCancelCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

TransferComamndFactory.java

@RequiredArgsConstructor
public class TransferComamndFactory {
    private final AbstractTransferCommand transferCommand;
    private final AbstractCancelTransferCommand abortTransferCommand;
    private final AbstractCompensationCancelCommand compensationAbortCommand;

    public void create(String srcAccountID, String dstAccountID, Long amount, String transferID){
        transferCommand.create(srcAccountID, dstAccountID, amount, transferID);
        abortTransferCommand.create(srcAccountID, dstAccountID, amount, transferID);
        compensationAbortCommand.create(srcAccountID, dstAccountID, amount, transferID);
    }

    public AbstractTransferCommand getTransferCommand(){
        return this.transferCommand;
    }
    public AbstractCancelTransferCommand getAbortTransferCommand(){
        return this.abortTransferCommand;
    }
    public AbstractCompensationCancelCommand getCompensationAbortCommand(){
        return this.compensationAbortCommand;
    }
}

3. Common 모듈 event 패키지내 event 클래스를 추가합니다.

 


4. event 클래스 내용을 구현합니다.

 

CompletedCancelTransferEvent.java

@Builder
@ToString
@Getter
public class CompletedCancelTransferEvent {
    private String srcAccountID;
    private String dstAccountID;
    private Long amount;
    private String transferID;
}

 

CompletedCompensationCancelEvent.java

@Builder
@ToString
@Getter
public class CompletedCompensationCancelEvent {
    private String srcAccountID;
    private String dstAccountID;
    private Long amount;
    private String transferID;
}

5. Jeju 모듈 수정

 

 

보상 트랜잭션 처리를 위한 Handler 메소드를 추가하기 위해 Aggregate 클래스를 수정합니다.

 

Account.java

@Entity
@Aggregate
@NoArgsConstructor
@AllArgsConstructor
@Slf4j
public class Account {
    @AggregateIdentifier
    @Id
    private String accountID;
    private Long balance;
    private final transient Random random = new Random();

    @CommandHandler
    public Account(AccountCreationCommand command) throws IllegalAccessException {
        log.debug("handling {}", command);
        if (command.getBalance() <= 0)
            throw new IllegalAccessException("유효하지 않은 입력입니다.");
        apply(new AccountCreationEvent(command.getAccountID(), command.getBalance()));
    }

    @EventSourcingHandler
    protected void on(AccountCreationEvent event) {
        log.debug("event {}", event);
        this.accountID = event.getAccountID();
        this.balance = event.getBalance();
    }

    @CommandHandler
    protected void on(JejuBankTransferCommand command) throws InterruptedException {
        if (random.nextBoolean())
            TimeUnit.SECONDS.sleep(15);

        log.debug("handling {}", command);
        if (this.balance < command.getAmount()) {
            apply(TransferDeniedEvent.builder()
                                        .srcAccountID(command.getSrcAccountID())
                                        .dstAccountID(command.getDstAccountID())
                                        .amount(command.getAmount())
                                        .description("잔고가 부족합니다.")
                                        .transferID(command.getTransferID())
                                     .build());
        } else {
            apply(TransferApprovedEvent.builder()
                                            .srcAccountID(command.getSrcAccountID())
                                            .dstAccountID(command.getDstAccountID())
                                            .transferID(command.getTransferID())
                                            .amount(command.getAmount())
                                        .build());
        }
    }

    @EventSourcingHandler
    protected void on(TransferApprovedEvent event) {
        log.debug("event {}", event);
        this.balance -= event.getAmount();
    }

    @CommandHandler
    protected void on(JejuBankCancelTransferCommand command) {
        log.debug("handling {}", command);
        apply(CompletedCancelTransferEvent.builder()
                                            .srcAccountID(command.getSrcAccountID())
                                            .dstAccountID(command.getDstAccountID())
                                            .transferID(command.getTransferID())
                                            .amount(command.getAmount())
                                          .build());
    }

    @EventSourcingHandler
    protected void on(CompletedCancelTransferEvent event) {
        log.debug("event {}", event);
        this.balance += event.getAmount();
    }

    @CommandHandler
    protected void on(JejuBankCompensationCancelCommand command) {
        log.debug("handling {}", command);
        apply(CompletedCompensationCancelEvent.builder()
                                                .srcAccountID(command.getSrcAccountID())
                                                .dstAccountID(command.getDstAccountID())
                                                .transferID(command.getTransferID())
                                                .amount(command.getAmount())
                                              .build());
    }

    @EventSourcingHandler
    protected void on(CompletedCompensationCancelEvent event) {
        log.debug("event {}", event);
        this.balance -= event.getAmount();
    }
}

 

위 클래스 구현 내용 중에서 Timeout 테스트를 위해 50% 확률로 Sleep 하도록 임의로 추가하였습니다.

 

if (random.nextBoolean())
	TimeUnit.SECONDS.sleep(15);

6. Command 모듈 수정

 

1. Command 클래스 수정을 위해 command 패키지 하위 MoneyTransferCommand 클래스 파일을 엽니다.

 


2. 클래스 파일을 수정합니다.

 

MoneyTransferCommand.java

@Builder
@ToString
@Getter
public class MoneyTransferCommand {
    private String srcAccountID;
    @TargetAggregateIdentifier
    private String dstAccountID;
    private Long amount;
    private String transferID;
    private BankType bankType;

    public enum BankType{
        JEJU(command -> new TransferComamndFactory(new JejuBankTransferCommand(),new JejuBankCancelTransferCommand(), new JejuBankCompensationCancelCommand())),
        SEOUL(command -> new TransferComamndFactory(new SeoulBankTransferCommand(), new SeoulBankCancelTransferCommand(), new SeoulBankCompensationCancelCommand()));

        private Function<MoneyTransferCommand, TransferComamndFactory> expression;
        BankType(Function<MoneyTransferCommand, TransferComamndFactory> expression){ this.expression = expression;}
        public TransferComamndFactory getCommandFactory(MoneyTransferCommand command){
            TransferComamndFactory factory = this.expression.apply(command);
            factory.create(command.getSrcAccountID(), command.getDstAccountID(), command.amount, command.getTransferID());
            return factory;
        }

    }

    public static MoneyTransferCommand of(TransferDTO dto){
        return MoneyTransferCommand.builder()
                .srcAccountID(dto.getSrcAccountID())
                .dstAccountID(dto.getDstAccountID())
                .amount(dto.getAmount())
                .bankType(dto.getBankType())
                .transferID(UUID.randomUUID().toString())
                .build();
    }
}

3. 보상 트랜잭션 구현을 위해 Saga 클래스를 엽니다.

 


4. Saga 클래스를 수정합니다.

 

TransferManager.java

@Saga
@Slf4j
public class TransferManager {
    @Autowired
    private transient CommandGateway commandGateway;
    private boolean isExecutingCompensation = false;
    private boolean isAbortingCompensation = false;
    private TransferComamndFactory comamndFactory;

    @StartSaga
    @SagaEventHandler(associationProperty = "transferID")
    protected void on(MoneyTransferEvent event) {

        log.debug("Created saga instance");
        log.debug("event : {}", event);
        comamndFactory = event.getComamndFactory();
        SagaLifecycle.associateWith("srcAccountID", event.getSrcAccountID());

        try {
            log.info("계좌 이체 시작 : {} ", event);
            commandGateway.sendAndWait(comamndFactory.getTransferCommand(), 10, TimeUnit.SECONDS);
        } catch (CommandExecutionException e) {
            log.error("Failed transfer process. Start cancel transaction");
            cancelTransfer();
        }
    }

    private void cancelTransfer() {
        isExecutingCompensation = true;
        log.info("보상 트랜잭션 요청");
        commandGateway.send(comamndFactory.getAbortTransferCommand());
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(CompletedCancelTransferEvent event) {
        isExecutingCompensation = false;
        if (!isAbortingCompensation) {
            log.info("계좌 이체 취소 완료 : {} ", event);
            SagaLifecycle.end();
        }
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferDeniedEvent event) {
        log.info("계좌 이체 실패 : {}", event);
        log.info("실패 사유 : {}", event.getDescription());
        if(isExecutingCompensation){
            isAbortingCompensation = true;
            log.info("보상 트랜잭션 취소 요청 : {}", event);
            commandGateway.send(comamndFactory.getCompensationAbortCommand());
        }
        else {
            SagaLifecycle.end();
        }
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    @EndSaga
    protected void on(CompletedCompensationCancelEvent event){
        isAbortingCompensation = false;
        log.info("보상 트랜잭션 취소 완료 : {}",event);
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferApprovedEvent event) {
        if (!isExecutingCompensation && !isAbortingCompensation) {
            log.info("이체 금액 {} 계좌 반영 요청 : {}",event.getAmount(), event);
            SagaLifecycle.associateWith("accountID", event.getDstAccountID());
            commandGateway.send(TransferApprovedCommand.builder()
                                                            .accountID(event.getDstAccountID())
                                                            .amount(event.getAmount())
                                                            .transferID(event.getTransferID())
                                                       .build());
        }
    }

    @SagaEventHandler(associationProperty = "accountID")
    @EndSaga
    protected void on(DepositCompletedEvent event){
        log.info("계좌 이체 성공 : {}", event);
    }
}

5. 테스트 결과

정상

com.cqrs.command.saga.TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@d713db5)
com.cqrs.command.saga.TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@d713db5) 
com.cqrs.command.saga.TransferManager    : 이체 금액 1 계좌 반영 요청 : TransferApprovedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090, amount=1)
c.c.command.aggregate.AccountAggregate   : handling TransferApprovedCommand(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=1, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090)
c.c.command.aggregate.AccountAggregate   : applying DepositMoneyEvent(holderID=b01fae84-e8a5-427d-a5f4-baa7376b7163, accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 31
com.cqrs.command.saga.TransferManager    : 계좌 이체 성공 : DepositCompletedEvent(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090)

 

Timeout

com.cqrs.command.saga.TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=7a01bd7a-6ce0-43bd-93e9-ee0224dfd791, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3226f9ff)
com.cqrs.command.saga.TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=7a01bd7a-6ce0-43bd-93e9-ee0224dfd791, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3226f9ff) 
com.cqrs.command.saga.TransferManager    : Failed transfer process. Start cancel transaction
com.cqrs.command.saga.TransferManager    : 보상 트랜잭션 요청
com.cqrs.command.saga.TransferManager    : 계좌 이체 취소 완료 : CompletedAbortTransferEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=1, transferID=7a01bd7a-6ce0-43bd-93e9-ee0224dfd791) 

 

잔고 부족

c.c.command.aggregate.AccountAggregate   : handling MoneyTransferCommand(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=300, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, bankType=JEJU)
com.cqrs.command.saga.TransferManager    : Created saga instance
com.cqrs.command.saga.TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@66a12a11)
com.cqrs.command.saga.TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@66a12a11) 
com.cqrs.command.saga.TransferManager    : 계좌 이체 실패 : TransferDeniedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, amount=300, description=잔고가 부족합니다.)
com.cqrs.command.saga.TransferManager    : 실패 사유 : 잔고가 부족합니다.

 

잔고 부족 + Timeout

com.cqrs.command.saga.TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3d4d8677)
com.cqrs.command.saga.TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3d4d8677) 
com.cqrs.command.saga.TransferManager    : Failed transfer process. Start cancel transaction
com.cqrs.command.saga.TransferManager    : 보상 트랜잭션 요청
com.cqrs.command.saga.TransferManager    : 계좌 이체 실패 : TransferDeniedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, amount=300, description=잔고가 부족합니다.)
com.cqrs.command.saga.TransferManager    : 실패 사유 : 잔고가 부족합니다.
com.cqrs.command.saga.TransferManager    : 보상 트랜잭션 취소 요청 : TransferDeniedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, amount=300, description=잔고가 부족합니다.)
com.cqrs.command.saga.TransferManager    : 보상 트랜잭션 취소 완료 : CompletedCompensationAbortEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=300, transferID=b132f585-3be8-49b0-ab67-0293a05684ff)

 

테스트 결과 각기 다른 상황에서 정상적으로 수행되는 것을 확인할 수 있습니다.


6. 마치며

 

이상으로 AxonFramework에 대한 포스팅을 마치겠습니다. 테스팅에 대해서는 다루지 않았는데, 분산 App 환경에서 테스트 코드 작성은 반드시 필요하다고 생각합니다. 따라서 공식문서를 참고하여 테스트 코드 작성 방법을 익힌다면, 보다 안전한 프로그램이 될 것입니다.

 

그 외에 쿠버네티스 지원, Tracing에 대해서도 공식문서에 소개되어 있으니 참고바랍니다. 또한, 지금까지 구현한 프로젝트 내용은 깃헙에 업로드 했습니다.

 

포스팅 내용 중 개선사항에 대해서는 댓글로 남겨주시면, 확인 후 내용 반영하도록 하겠습니다.

1. 서론

 

 

이번 포스팅은 분산 트랜잭션 제어를 위한 Saga 패턴을 구현하겠습니다. 보상 트랜잭션까지 같이 구현하면 내용이 복잡하므로 보상 트랜잭션 및 Deadline 기능은 다음 포스팅에서 다루겠습니다.

 

Saga 패턴 사용을 위한 가상의 시나리오는 다음과 같습니다.

 

테스트 시나리오

 

1. Jeju 모듈에 계좌를 개설한다.(계좌명 : test, 잔고 : 100)

2. Command 모듈에 계정 & 계좌를 개설한다.

3. Jeju 모듈의 계좌에서 30원을 Command 모듈 계좌로 이체한다.

4. Jeju 모델 DB의 계좌와 Query 모델 DB를 통해서 이체금액을 확인한다.

 

이전 포스팅에서 구현한 Jeju 모듈을 활용하여 코드 구현을 진행하겠습니다. Saga 요청을 위한 API 및 Saga 인스턴스는 Command 모듈에 생성하도록 하겠습니다.


2. Common 모듈 구현

 

1. 공통으로 사용할 Command 및 Event 추가를 위해 Common 모듈 Command 패키지 및 Command 클래스를 생성합니다.

 


 

2. 생성한 Command 클래스를 구현합니다.

 

TransferComamndFactory.java

@RequiredArgsConstructor
public class TransferComamndFactory {
    private final AbstractTransferCommand transferCommand;

    public void create(String srcAccountID, String dstAccountID, Long amount, String transferID){
        transferCommand.create(srcAccountID, dstAccountID, amount, transferID);
    }

    public AbstractTransferCommand getTransferCommand(){
        return this.transferCommand;
    }
}

 

위 클래스는 계좌 이체와 연관된 Command를 생성하는 Factory 클래스입니다. 나중에 보상 트랜잭션을 위한 Command를 같이 관리하기 위하여 생성하였습니다.

 

AbstractTransferCommand.java

@ToString
@Getter
public abstract class AbstractTransferCommand {
    @TargetAggregateIdentifier
    protected String srcAccountID;
    protected String dstAccountID;
    protected Long amount;
    protected String transferID;

    public AbstractTransferCommand create(String srcAccountID, String dstAccountID, Long amount, String transferID) {
        this.srcAccountID = srcAccountID;
        this.dstAccountID = dstAccountID;
        this.transferID = transferID;
        this.amount = amount;
        return this;
    }
}

 

계좌 이체 요청을 위한 클래스입니다. srcAccountID는 인출할 계좌 ID이며, dstAccountID는 송금 대상 계좌 ID입니다.

 

JejuBankTransferCommand.java

public class JejuBankTransferCommand extends AbstractTransferCommand {
    @Override
    public String toString() {
        return "JejuBankTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

SeoulBankTransferCommand.java

public class SeoulBankTransferCommand extends AbstractTransferCommand {
    @Override
    public String toString() {
        return "SeoulBankTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

 


3. Common 모듈 event 패키지 하위에 Event 클래스를 생성합니다.

 

 


4. Event 클래스를 구현합니다.

 

MoneyTransferEvent.java

@Builder
@ToString
@Getter
public class MoneyTransferEvent {
    private String dstAccountID;
    private String srcAccountID;
    private Long amount;
    private String transferID;
    private TransferComamndFactory comamndFactory;
}

 

위 클래스는 Command 모듈로 계좌이체 요청이 들어오면, 인출할 대상 계좌번호(srcAccountID) 및 입금 계좌번호(dstAccountID), 그리고 이체 금액(amount)와 같은 기본정보와 트랜잭션간 고유 키(transferID) 및 요청 Command 구분 정보를 담고있습니다.

 

TransferApprovedEvent.java

@Builder
@ToString
@Getter
public class TransferApprovedEvent {
    private String srcAccountID;
    private String dstAccountID;
    private String transferID;
    private Long amount;
}

 

위 클래스는 계좌이체가 성공되었을 때, 금액을 반영하기 위한 Event입니다.

 

TransferDeniedEvent.java

@Getter
@Builder
@ToString
public class TransferDeniedEvent {
    private String srcAccountID;
    private String dstAccountID;
    private String transferID;
    private Long amount;
    private String description;
}

 

위 클래스는 계좌이체가 거절될 때, 요청 App에 거절 내역을 전달하기 위한 Event입니다.


3. Jeju 모듈 구현

 

1. Jeju 은행 모듈 build.gradle 파일을 엽니다.

 


2. Jeju 모듈에 State-Stored-Aggregate 구현을 위해 JPA 및 DB 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
dependencies{
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
    implementation group: 'org.postgresql', name: 'postgresql', version: '42.2.6'
}

3. Jeju 모듈 resources 패키지 하위 application.yml 파일을 엽니다.

 


4. datasource 속성을 추가합니다.

 

application.yml

server:
  port: 9091

spring:
  application:
    name: eventsourcing-cqrs-jejuBank
  datasource:
    platform: postgres
    url: jdbc:postgresql://localhost:5432/jeju
    username: jeju
    password: jeju
  jpa:
    hibernate:
      ddl-auto: update

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

logging.level.com.cqrs.jeju : debug

 

DB는 Command와 Query와 동일하게 Postgre를 사용하였으며, jeju 계정을 별도로 생성하였습니다.


5. 계좌 생성을 위해 Jeju 모듈 하위에 다음과 같은 패키지들을 생성합니다.

 


6. aggregate 패키지 내 aggregate 클래스부터 구현하겠습니다.

 

Account.java


@Entity
@Aggregate
@NoArgsConstructor
@AllArgsConstructor
@Slf4j
public class Account {
    @AggregateIdentifier
    @Id
    private String accountID;
    private Long balance;
    
    @CommandHandler
    public Account(AccountCreationCommand command) throws IllegalAccessException {
        log.debug("handling {}", command);
        if (command.getBalance() <= 0)
            throw new IllegalAccessException("유효하지 않은 입력입니다.");
        apply(new AccountCreationEvent(command.getAccountID(), command.getBalance()));
    }

    @EventSourcingHandler
    protected void on(AccountCreationEvent event) {
        log.debug("event {}", event);
        this.accountID = event.getAccountID();
        this.balance = event.getBalance();
    }

    @CommandHandler
    protected void on(JejuBankTransferCommand command) throws InterruptedException {

        log.debug("handling {}", command);
        if (this.balance < command.getAmount()) {
            apply(TransferDeniedEvent.builder()
                                        .srcAccountID(command.getSrcAccountID())
                                        .dstAccountID(command.getDstAccountID())
                                        .amount(command.getAmount())
                                        .description("잔고가 부족합니다.")
                                        .transferID(command.getTransferID())
                                     .build());
        } else {
            apply(TransferApprovedEvent.builder()
                                            .srcAccountID(command.getSrcAccountID())
                                            .dstAccountID(command.getDstAccountID())
                                            .transferID(command.getTransferID())
                                            .amount(command.getAmount())
                                        .build());
        }
    }

    @EventSourcingHandler
    protected void on(TransferApprovedEvent event) {
        log.debug("event {}", event);
        this.balance -= event.getAmount();
    }
}

 

트랜잭션 테스트 도중 Account 정보를 DB에서 바로 확인하기 위해 State-Stored Aggregate 형태로 구현하였습니다. 위 코드 내용은 Command 어플리케이션 구현파트에서 다룬 내용이 주이기에 별도 설명은 생략하겠습니다.


7. command 패키지내 Command 클래스 생성 및 이를 구현합니다.

 

AccountCreationCommand.java

@NoArgsConstructor
@AllArgsConstructor
@ToString
@Getter
public class AccountCreationCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private Long balance;
}

8. dto 패키지내 dto 클래스 생성 및 이를 구현합니다.

 

AccountDTO.java

@NoArgsConstructor
@AllArgsConstructor
@ToString
@Getter
public class AccountDTO {
    private String accountID;
    private Long balance;
}

9. event 패키지내 event 클래스를 생성 및 이를 구현합니다.

 

AccountCreationEvent.java

@ToString
@RequiredArgsConstructor
@Getter
public class AccountCreationEvent {
    private final String accountID;
    private final Long balance;
}

해당 이벤트는 Jeju 모듈내에서만 유효하기 때문에 공통 모듈에 생성하지 않았습니다.


10. service 패키지내 service 클래스 생성 및 이를 구현합니다.

 

AccountService.java

public interface AccountService {
    String createAccount(AccountDTO accountDTO);
}

 

AccountServiceImpl.java

@Service
@RequiredArgsConstructor
public class AccountServiceImpl implements AccountService {
    private final CommandGateway commandGateway;

    @Override
    public String createAccount(AccountDTO accountDTO) {
        return commandGateway.sendAndWait(new AccountCreationCommand(accountDTO.getAccountID(), accountDTO.getBalance()));
    }
}

11. controller 패키지내 controller 클래스 생성 및 이를 구현합니다.

 

AccountController.java

@RestController
@RequiredArgsConstructor
public class AccountController {
    private final AccountService accountService;

    @PostMapping("/account")
    public ResponseEntity<String> createAccount(@RequestBody AccountDTO accountDTO){
        return ResponseEntity.ok().body(accountService.createAccount(accountDTO));
    }
}

4. Command 모듈 구현

 

1. Command 구현 포스팅 8번 항목에서 AxonServerCommandBus를 SimpleCommandBus로 대체했습니다. Saga 트랜잭션에서는 다른 모듈에 대하여 Command 요청이 필요하므로 기존에 SImpleCommandBus 설정을 해제해야합니다. 이를 위해 config 패키지내에 있는 AxonConfig 클래스를 엽니다.

 


2. AxonConfig 파일에서 commandBus Bean 로직을 주석처리하거나 삭제합니다.

 

AxonConfig.java

@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
	(...중략...)
//    @Bean
//    SimpleCommandBus commandBus(TransactionManager transactionManager){
//        return  SimpleCommandBus.builder().transactionManager(transactionManager).build();
//    }
	(...중략...)
}

3. 계좌이체 요청 및 반영을 위한 Command 요청을 위해 command 패키지 내 command 클래스를 생성합니다.

 


4. Command 클래스 내용을 구현합니다.

 

TransferApprovedCommand.java

@ToString
@Getter
@Builder
public class TransferApprovedCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private Long amount;
    private String transferID;
}

위 클래스는 계좌이체가 정상적으로 수행되었을 때, 요청을 위한 Command 클래스입니다.

 

MoneyTransferCommand.java

@Builder
@ToString
@Getter
public class MoneyTransferCommand {
    private String srcAccountID;
    @TargetAggregateIdentifier
    private String dstAccountID;
    private Long amount;
    private String transferID;
    private BankType bankType;

    public enum BankType{
        JEJU(command -> new TransferComamndFactory(new JejuBankTransferCommand()),
        SEOUL(command -> new TransferComamndFactory(new SeoulBankTransferCommand());

        private Function<MoneyTransferCommand, TransferComamndFactory> expression;
        BankType(Function<MoneyTransferCommand, TransferComamndFactory> expression){ this.expression = expression;}
        public TransferComamndFactory getCommandFactory(MoneyTransferCommand command){
            TransferComamndFactory factory = this.expression.apply(command);
            factory.create(command.getSrcAccountID(), command.getDstAccountID(), command.amount, command.getTransferID());
            return factory;
        }

    }

    public static MoneyTransferCommand of(TransferDTO dto){
        return MoneyTransferCommand.builder()
                .srcAccountID(dto.getSrcAccountID())
                .dstAccountID(dto.getDstAccountID())
                .amount(dto.getAmount())
                .bankType(dto.getBankType())
                .transferID(UUID.randomUUID().toString())
                .build();
    }
}

 

계좌 이체 요청시, 은행구분에 따른 Command 생성을 달리 하기 위하여 enum을 사용했습니다. 또한, DTO 클래스를 Command 클래스로 변환하기 위한 Factory 메소드를 추가하였습니다.


5. DTO 추가를 위해 dto 패키지 내 dto 클래스를 생성합니다.

 

 

TransferDTO.java

@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class TransferDTO {
    private String srcAccountID;
    private String dstAccountID;
    private Long amount;
    private MoneyTransferCommand.BankType bankType;
}

6. Service 메소드 추가를 위해서 service 패키지 하위 service 클래스를 수정합니다.

 

 

TransactionService.java

public interface TransactionService {
    (...중략...)
    String transferMoney(TransferDTO transferDTO);
}

 

TransactionServiceImpl.java

@Service
@RequiredArgsConstructor
public class TransactionServiceImpl implements TransactionService {
    private final CommandGateway commandGateway;
	(...중략...)

    @Override
    public String transferMoney(TransferDTO transferDTO) {
        return commandGateway.sendAndWait(MoneyTransferCommand.of(transferDTO));
    }
}

7. API 추가를 위해서 controller 패키지내 Controller에 메소드를 추가합니다.

 

 

TransactionController.java

@RestController
@RequiredArgsConstructor
public class TransactionController {
    private final TransactionService transactionService;

    (...중략...)

    @PostMapping("/transfer")
    public ResponseEntity<String> transfer(@RequestBody TransferDTO transferDTO){
        return ResponseEntity.ok().body(transactionService.transferMoney(transferDTO));
    }
}

8. Aggregate 수정을 위해 aggregate 패키지내 AccountAggregate 클래스를 엽니다.

 


9. Aggregate에 Command 및 EventSourcing 핸들러 로직을 추가합니다.

 

AccountAggregate.java

@NoArgsConstructor
@AllArgsConstructor
@Slf4j
@Aggregate
@EqualsAndHashCode
public class AccountAggregate {
    (...중략...)

    @CommandHandler
    protected void transferMoney(MoneyTransferCommand command) {
        log.debug("handling {}", command);
        apply(MoneyTransferEvent.builder()
                .srcAccountID(command.getSrcAccountID())
                .dstAccountID(command.getDstAccountID())
                .amount(command.getAmount())
                .comamndFactory(command.getBankType().getCommandFactory(command))
                .transferID(command.getTransferID())
                .build());
    }

    @CommandHandler
    protected void transferMoney(TransferApprovedCommand command) {
        log.debug("handling {}", command);
        apply(new DepositMoneyEvent(this.holderID, command.getAccountID(), command.getAmount()));
        apply(new DepositCompletedEvent(command.getAccountID(), command.getTransferID()));
    }
}

10. Saga 패키지 및 Saga 클래스를 생성합니다.

 


11. Saga 클래스를 구현합니다.

 

TransferManager.java

@Saga
@Slf4j
public class TransferManager {
    @Autowired
    private transient CommandGateway commandGateway;
    private TransferComamndFactory comamndFactory;

    @StartSaga
    @SagaEventHandler(associationProperty = "transferID")
    protected void on(MoneyTransferEvent event) {
        log.debug("Created saga instance");
        log.debug("event : {}", event);
        comamndFactory = event.getComamndFactory();
        SagaLifecycle.associateWith("srcAccountID", event.getSrcAccountID());

		log.info("계좌 이체 시작 : {} ", event);
		commandGateway.send(comamndFactory.getTransferCommand());
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferApprovedEvent event) {
		log.info("이체 금액 {} 계좌 반영 요청 : {}", event.getAmount(), event);
        SagaLifecycle.associateWith("accountID", event.getDstAccountID());
        commandGateway.send(TransferApprovedCommand.builder()
                .accountID(event.getDstAccountID())
                .amount(event.getAmount())
                .transferID(event.getTransferID())
                .build());
    }
    
    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferDeniedEvent event) {
        log.info("계좌 이체 실패 : {}", event);
        log.info("실패 사유 : {}", event.getDescription());
		SagaLifecycle.end();
    }

    @SagaEventHandler(associationProperty = "accountID")
    @EndSaga
    protected void on(DepositCompletedEvent event){
        log.info("계좌 이체 성공 : {}", event);
    }
}

 

구현 내용을 간략하게 소개하면 다음과 같습니다. 먼저 @Saga 어노테이션을 통해 해당 클래스가 Saga의 대상임을 지정합니다. 해당 클래스는 NoArgsConstructor로 생성이 되어야하므로 이를 주의합니다.

 

@StartSaga는 Saga 인스턴스의 시작점입니다. 여기에서 associationProperty는 해당 인스턴스를 유일하게 구별할 수 있는 속성이 무엇인지 지정합니다. 자세한 내용은 Axon 공식문서를 참고바랍니다.

 

생성된 Saga는 트랜잭션이 끝나면 종료되어야합니다. 위 예제에서는 DepositCompletedEvent 메시지를 수신받으면, 전체 트랜잭션이 끝난것으로 판단하여 Saga 인스턴스를 종료하도록 되어있습니다. Saga 인스턴스 종료방법은 크게 두가지입니다. 먼저 위 에제와 같이 명시적으로 end 메소드를 기입하거나 @EndSaga 어노테이션 지정할 수 있습니다.


5. 테스트

 

1. Command, Query, Jeju 모듈 App을 기동한 다음, Jeju 은행 계좌를 개설합니다.

POST localhost:9091/account
Content-Type: application/json

{
	"accountID" : "test",
	"balance" : 100
}

2. 생성된 계좌 내역을 DB에서 확인합니다.

 


3. Command 모듈에 계정을 생성합니다.

 

POST http://localhost:8080/holder
Content-Type: application/json

{
	"holderName" : "Kevin",
	"tel" : "02-2645-5678",
	"address" : "OO시 OO구",
    "company" : "Korea"
}

4. Command 모듈에 계좌를 개설합니다.

 

POST http://localhost:8080/account
Content-Type: application/json

{
  "holderID" : "b01fae84-e8a5-427d-a5f4-baa7376b7163"
}

5. Query 모듈 DB에서 계정 및 계좌 생성 내역을 확인합니다.

 


6. Jeju 은행 계좌에서 Command 모듈에 개설된 계좌로 30원 이체합니다.

 

POST http://localhost:8080/transfer
Content-Type: application/json
{
	"srcAccountID" : "test",
	"dstAccountID" : "a31faade-5b57-4435-85da-1de0ed1c55c4",
	"amount" : 30,
	"bankType" : "JEJU"
}

 

AccountAggregate   : handling MoneyTransferCommand(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, bankType=JEJU)
TransferManager    : Created saga instance
TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@5a65bed)
TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@5a65bed) 
TransferManager    : 이체 금액 30 계좌 반영 요청 : TransferApprovedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, amount=30)
AccountAggregate   : handling TransferApprovedCommand(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b)
AccountAggregate   : applying DepositMoneyEvent(holderID=b01fae84-e8a5-427d-a5f4-baa7376b7163, accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30)
AccountAggregate   : balance 30
TransferManager    : 계좌 이체 성공 : DepositCompletedEvent(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=3a6583e3-0288-4e07-87f0-c818644e009b)

성공적으로 계좌이체가 완료되었으면 위와 비슷한 로그가 출력될 것입니다.


7. DB에서 계좌 상태를 확인합니다.

 


6. 마치며

 

 

serialized_sga 내용

<com.cqrs.command.saga.TransferManager>
    <comamndFactory>
        <transferCommand class="com.cqrs.command.transfer.JejuBankTransferCommand">
        <srcAccountID>test</srcAccountID>
        <dstAccountID>a31faade-5b57-4435-85da-1de0ed1c55c4</dstAccountID>
        <amount>30</amount>
        <transferID>test</transferID>
        </transferCommand>
        </compensationAbortCommand>
    </comamndFactory>
</com.cqrs.command.saga.TransferManager>

 

이번 포스팅에서는 Jeju, Command 모듈간에 일관성을 위하여 Saga 패턴을 사용했습니다. Command 모듈에서 MoneyTransferEvent Event가 발행되면, Saga 인스턴스가 생성됩니다. 이후 Application의 비즈니스 로직에 따라 모든 트랜잭션이 종료될 때까지 유지됩니다. Saga 인스턴스 정보는 동시에 SAGA_ENTRY 테이블에 적재되며, Saga 인스턴스가 종료되면 해당 데이터 또한 사라집니다.

 

다음 포스팅에서는 Deadline 기능, 보상 트랜잭션 로직이 적용된 Saga 패턴을 구현하겠습니다.

1. 서론

 

MSA 아키텍처를 구성하기 어려운 이유중 가장 큰 문제는 트랜잭션(Transaction)입니다. 기존 모놀로틱(Monolithic) 환경에서 DBMS가 기본적으로 제공해주는 트랜잭션 기능을 통해 데이터 Commit 혹은 Rollback을 통해 데이터를 일관성 있게 관리했습니다. 하지만 Application 및 DB가 분산되면서, 트랜잭션 처리를 단일 DBMS에서 제공하는 기능을 통해서 달성할 수 없습니다.

 

이번 포스팅에서는 분산 트랜잭션의 종류인 Two-Phase Commit, Saga 패턴 및 Axon에서 제공하는 Saga 기능에 대하여 소개하겠습니다.


2. Two Phase Commit

 

 

분산 DB 환경에서 쓰는 방법으로 주요 RDBMS에서 기능을 제공합니다. Two-Phase Commit은 말 그대로 2단계에 거쳐서 데이터를 영속화 하는 작업입니다. 위 그림과 같이 여러 DB가 분산 되었을 때, 트랜잭션을 조율하는 조정자(Coordinator)가 존재합니다. 조정자의 역할은 트랜잭션 요청이 들어왔을 때 두 단계를 거쳐 트랜잭션을 진행을 담당합니다. 이때 첫 번째 단계는 Prepare이며, 이는 쉽게말해 연관된 DB에게 데이터를 저장할 수 있는 상태인지 묻는 과정에 해당합니다.

 

 

메시지를 받은 DB에서는 Commit 작업을 위한 준비를 진행합니다. 이후 데이터를 영속할 수 있는 준비가 완료되면 조정자에게 준비가 완료되었음을 알리고, 반대로 Commit할 수 없다면 불가하다는 메시지를 전달합니다.

 

조정자는 첫 번째 단계에서 전달한 메시지에 대한 응답을 기다립니다. 모든 메시지 수신이 완료되면 두 번째 단계인 Commit을 진행합니다. Commit 단계에서는 조정자가 연관된 DB에게 데이터를 저장하라는 메시지를 송신하며, 수신받은 DB에서는 각자 DB에 데이터를 영속화 합니다.

 

 

모든 DB에서 트랜잭션 처리가 완료되면 전체 트랜잭션을 종료합니다.만약 두 단계를 거치는 과정에서 연관된 DB 중 하나의 DB라도 Commit을 할 수 없는 상황이라면, 모든 DB에게 Rollback을 요구합니다. 트랜잭션을 종료하는 동시에 모든 DB 데이터가 영속화됩니다. 따라서 트랜잭션의 범위는 데이터를 처리하는 DB 전체입니다.


MSA 환경에서 Two-Phase Commit 문제점

 

Two-Phase Commit은 DBMS 간 분산 트랜잭션을 지원해야 적용가능합니다. 하지만 NoSQL 제품군에는 이를 지원하지 않고, 함께 사용되는 DBMS가 동일 제품군(Oracle, MySQL, Postgres)이여야합니다. 따라서 DBMS polyglot 구성은 어렵습니다.

 

또한 Two-Phase Commit은 보통 하나의 API 엔드포인트를 통해 서비스 요청이 들어오고 내부적으로 DB가 분산되어있을 때 사용됩니다. 하지만 MSA 환경에서는 각기 다른 App에서 API간으로 통신을 통해 서비스 요청이 이루어지기 때문에 구현이 쉽지 않습니다.


3. Saga 패턴

 

 

Saga 패턴은 트랜잭션의 관리주체가 DBMS가 아닌 Application에 있습니다. App이 분산되어있을 때, 각 App 하위에 존재하는 DB는 Local 트랜잭션 처리만 담당합니다. 

 

 

따라서 각각의 App에 대한 연속적인 트랜잭션 요청 및 실패할 경우에 Rollback 처리(보상 트랜잭션)를  Application에서 구현해야합니다. 

 

Saga 패턴은 위 그림과 같이 연속적인 업데이트 연산으로 이루어져있으며, 전체가 동시에 데이터가 영속화되는 것이아니라 순차적인 단계로 트랜잭션이 이루어집니다. 따라서 Application 비즈니스 로직에서 요구되는 마지막 트랜잭션이 끝났을 때, 데이터가 완전히 영속되었음을 인지하고 이를 종료합니다.

 

Two-Phase Commit과 다르게 Saga를 활용한 트랜잭션은 데이터 격리성(Isolation)을 보장해주지 않습니다. 하지만 Application의 트랜잭션 관리를 통해 최종 일관성(Eventually Consistency)을 달성할 수 있기 때문에 분산되어있는 DB간에 정합성을 맞출 수 있습니다. 또한 트랜잭션 관리를 Application에서 하기 때문에 DBMS를 다른 제품군으로 구성할 수 있는 장점이 있습니다.

 

하지만 이러한 일관성을 달성하기 위해서는 프로세스 수행 과정상 누락되는 작업이 없는지 면밀히 살펴야하며, 실패할경우 에러 복구를 위한 보상 트랜잭션 처리 누락이 없도록 설계해야합니다.


4. Saga 패턴 종류


1. Choreography-Based Saga

 

 

Choreography-Based Saga는 자신이 보유한 서비스내 Local 트랜잭션을 관리하며, 트랜잭션이 종료되면 완료 Event를 발행합니다. 만약 그 다음에 수행되어야할 트랜잭션이 있다면, 해당 트랜잭션을 수행해야하는 App에서 완료 Event를 수신받고 다음 작업을 처리합니다. 이때 Event는 Kafka와 같은 메시지 큐를 이용해서 비동기 방식으로 전달할 수 있습니다.

 

 

Choreography-Based Saga 방식에서는 각 App별로 트랜잭션을 관리하는 로직이 있습니다. 따라서 중간에 트랜잭션이 실패하면, 해당 트랜잭션 취소처리를 실패한 App에서 보상 Event를 발행하여 Rollback 처리를 시도합니다.

 

위와 같은 구성은 구축하기 쉬운 장점이 있습니다. 하지만 운영자 입장에서 트랜잭션의 현재 상태를 알기 쉽지 않습니다. 


2. Orchestration-Based Saga

 

 

Orchestration-Based Saga는 트랜잭션 처리를 위한 Saga 인스턴스(Manager)가 별도로 존재합니다. 트랜잭션에 관여하는 모든 App은 Manager에 의하여 점진적으로 트랜잭션을 수행하며 결과를 Manager에게 전달합니다. 비즈니스 로직상 마지막 트랜잭션이 끝나면 Manager를 종료하여 전체 트랜잭션 처리를 종료합니다. 만약 중간에 실패하게 되면 Manager에서 보상 트랜잭션을 발동하여 일관성을 유지하도록 합니다.

 

 

모든 관리를 Manager가 호출하기 때문에 분산트랜잭션의 중앙 집중화가 이루어집니다. 따라서 서비스간의 복잡성이 줄어들고 구현 및 테스트가 상대적으로 쉽습니다. 또한 트랜잭션의 현재 상태를 Manager가 알고 있기 때문에 롤백을 쉽게할 수 있는 것 또한 장점입니다. 하지만 이를 관리하기 위한 Orchestrator 서비스가 추가되어야 하기 때문에 인프라 구현의 복잡성이 증가되는 단점이 존재합니다.


5. Axon Saga 기능 소개

 

AxonFramework 에서는 Orchestration 방식의 Saga 패턴을 지원합니다. 즉 트랜잭션을 시작하는 시점에 Saga 인스턴스를 생성하며. Saga 인스턴스에서 트랜잭션을 관리합니다.

 

 

Saga 인스턴스는 Event를 처리하는 UnitOfWork 단계에서 생성되며, 전체 트랜잭션 처리가 완료되면 Saga 인스턴스를 종료합니다. Axon에서는 Annotation 기반으로 Saga 인스턴스를 간편하게 설정할 수 있습니다. 또한 DB에 Saga 정보를 저장하고 있어 복구가 가능합니다. 

 

추가로 생성된 Saga에서 트랜잭션 요청시, Deadline 지정이 가능합니다. 이로인해 트랜잭션 수행 App으로부터 응답이 없을 경우 보상 트랜잭션을 수행할 수 있습니다.


6. 마치며

 

MSA를 구성하는 환경에서 Saga를 도입하기전에 비즈니스 로직상 트랜잭션처리가 반드시 필요한지에 대한 충분한 고려가 필요합니다. 이곳 저곳에 적용했다가는 트랜잭션 처리 지옥을 경험할 수 있기 때문입니다. 반드시 필요한 부분에만 일부 도입하는 것이 좋으며, 가장 좋은 상황은 MSA 환경에서 트랜잭션 처리를 하지 않도록 비즈니스 로직을 설계하는 것입니다.

 

다음 시간에는 예제 구현을 통해 Axon에서 제공하는 Saga 기능을 익혀보겠습니다.

+ Recent posts