1. 서론
이번 포스팅은 AxonFramework 관련 마지막 포스팅입니다. Saga 패턴 보상 트랜잭션 구현을 다루겠습니다.
2. Deadline
MSA 환경에서는 App이 여러개로 분산되어있으므로 하나의 App이 느려지거나 장애가 발생하면, 장애가 발생한 App을 호출하는 App에도 장애가 전파됩니다.
Axon에서 제공하는 Saga 패턴을 사용하면, 요청마다 Saga 인스턴스가 생성됩니다. 따라서 연관된 App의 장애로 인해 전체 트랜잭션에 Hang이 걸리게되면, 요청한 App 또한 안전하지 못합니다.
따라서 이를 완화하기 위해 Axon에서는 Deadline 기능을 제공합니다. Deadline은 App에서 지정한 시간동안 반응이 없으면, 이를 처리할 메소드를 Callback 형식으로 등록할 수 있는 기능입니다. 자세한 내용은 공식 문서를 참고 바라며, 데모 프로젝트에서는 CommandGateway 클래스에서 기본적으로 제공하는 sendAndWait 메소드를 통해서 일정 시간동안 응답이 없으면, 보상 트랜잭션을 발동하도록 구현하겠습니다.
1. Command 모듈 Saga 패키지내 있는 Saga 클래스를 엽니다.
2. Saga 클래스 코드를 수정합니다.
TransferManager.java
@Saga
@Slf4j
public class TransferManager {
(...중략...)
@StartSaga
@SagaEventHandler(associationProperty = "transferID")
protected void on(MoneyTransferEvent event) {
(...중략...)
try {
log.info("계좌 이체 시작 : {} ", event);
commandGateway.sendAndWait(comamndFactory.getTransferCommand(), 10, TimeUnit.SECONDS);
} catch (CommandExecutionException e) {
log.error("Failed transfer process. Start cancel transaction");
//보상 트랜잭션 구현 로직
}
}
(...중략...)
}
sendAndWait 두 번째, 세 번째 인자를 통해 TimeOut을 지정하며, 해당 기간동안 응답이 없을 경우 Exception을 통해 보상 트랜잭션을 발동할 수 있습니다.
3. 트랜잭션 프로세스 설계
일반적인 상황
일반적으로 계좌 이체 요청을하면, 해당 은행에서 보유한 잔고보다 요청액수가 클 경우에는 이체 거절을하며, 반대의 경우에는 이체 승인을 합니다. 따라서 요청자인 Command 모듈에서는 Jeju 은행의 승인 혹은 거절 이벤트 발생 여부에 따라서 결과를 처리하면 됩니다.
보상 트랜잭션 발동 상황
보상 트랜잭션은 연결된 App 사이에 트랜잭션 문제가 발생하였을 때, 이미 처리된 데이터를 원상복구를 위하여 추가적인 트랜잭션을 발동하는 것입니다.
예제에서는 Timeout이 발생하게 되면, Jeju 은행 App의 응답과 관계없이 트랜잭션 Rollback을 위하여 보상 트랜잭션을 요청하고, Command 모듈에서는 다음 로직을 수행합니다.
만약 위 그림과 같이 만약 Jeju 은행에 발생한 장애로 인하여 계좌 요청을 진행하였지만 응답이 오지 않는 상황이라고 가정해봅시다. Command 모듈에서는 장애 방지를 위해 Timeout을 설정했기 때문에 일정 시간이 지나면 보상 트랜잭션을 발동할 것입니다.
이후 Jeju 은행 App이 정상화된다면 이전 요청을 수행할 것입니다. 그 결과 요청이 적절하지 않으면, 이체 거절 이벤트를 발송합니다. 만약 이체 거절 상황에서 요청받은 보상 트랜잭션을 처리한다면, 잔고는 그대로인 상황에서 보상 트랜잭션에 의해 잔고가 늘어나는 기현상이 발생합니다.
이를 해결하기 위해 다양한 방법이 있겠지만, 예제 프로젝트에서는 Timeout이 발생된 상황에서 이체 거절 메시지를 받게되면, 보상 트랜잭션 취소 요청하여 정상 처리하겠습니다.
위 그림은 Command 모듈에서 트랜잭션 요청 이후 처리해야할 과정을 개략적으로 순서도로 나타냈습니다.
4. Common 모듈 구현
1. Common 모듈 command 패키지내 command 클래스를 추가합니다.
2. command 클래스를 구현합니다.
AbstractCancelTransferCommand.java
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Getter
public abstract class AbstractCancelTransferCommand {
@TargetAggregateIdentifier
protected String srcAccountID;
protected String dstAccountID;
protected Long amount;
protected String transferID;
public AbortTransferCommand create(String srcAccountID, String dstAccountID, Long amount, String transferID) {
this.srcAccountID = srcAccountID;
this.dstAccountID = dstAccountID;
this.transferID = srcAccountID;
this.amount = amount;
return this;
}
}
JejuBankCancelTransferCommand.java
public class JejuBankCancelTransferCommand extends AbstractCancelTransferCommand {
@Override
public String toString() {
return "JejuBankCancelTransferCommand{" +
"srcAccountID='" + srcAccountID + '\'' +
", dstAccountID='" + dstAccountID + '\'' +
", amount=" + amount +
", transferID='" + transferID + '\'' +
'}';
}
}
SeoulBankCancelTransferCommand.java
public class SeoulBankCancelTransferCommand extends AbstractCancelTransferCommand {
@Override
public String toString() {
return "SeoulBankCancelTransferCommand{" +
"srcAccountID='" + srcAccountID + '\'' +
", dstAccountID='" + dstAccountID + '\'' +
", amount=" + amount +
", transferID='" + transferID + '\'' +
'}';
}
}
AbstractCompensationCancelCommand.java
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Getter
public abstract class AbstractCompensationCancelCommand {
@TargetAggregateIdentifier
protected String srcAccountID;
protected String dstAccountID;
protected Long amount;
protected String transferID;
public AbstractCompensationCancelCommand create(String srcAccountID, String dstAccountID, Long amount, String transferID) {
this.srcAccountID = srcAccountID;
this.dstAccountID = dstAccountID;
this.transferID = srcAccountID;
this.amount = amount;
return this;
}
}
JejuBankCompensationCancelCommand.java
public class JejuBankCompensationCancelCommand extends AbstractCompensationCancelCommand {
@Override
public String toString() {
return "JejuBankCompensationCancelCommand{" +
"srcAccountID='" + srcAccountID + '\'' +
", dstAccountID='" + dstAccountID + '\'' +
", amount=" + amount +
", transferID='" + transferID + '\'' +
'}';
}
}
SeoulBankCompensationCancelCommand.java
public class SeoulBankCompensationCancelCommand extends AbstractCompensationCancelCommand {
@Override
public String toString() {
return "SeoulBankCompensationCancelCommand{" +
"srcAccountID='" + srcAccountID + '\'' +
", dstAccountID='" + dstAccountID + '\'' +
", amount=" + amount +
", transferID='" + transferID + '\'' +
'}';
}
}
TransferComamndFactory.java
@RequiredArgsConstructor
public class TransferComamndFactory {
private final AbstractTransferCommand transferCommand;
private final AbstractCancelTransferCommand abortTransferCommand;
private final AbstractCompensationCancelCommand compensationAbortCommand;
public void create(String srcAccountID, String dstAccountID, Long amount, String transferID){
transferCommand.create(srcAccountID, dstAccountID, amount, transferID);
abortTransferCommand.create(srcAccountID, dstAccountID, amount, transferID);
compensationAbortCommand.create(srcAccountID, dstAccountID, amount, transferID);
}
public AbstractTransferCommand getTransferCommand(){
return this.transferCommand;
}
public AbstractCancelTransferCommand getAbortTransferCommand(){
return this.abortTransferCommand;
}
public AbstractCompensationCancelCommand getCompensationAbortCommand(){
return this.compensationAbortCommand;
}
}
3. Common 모듈 event 패키지내 event 클래스를 추가합니다.
4. event 클래스 내용을 구현합니다.
CompletedCancelTransferEvent.java
@Builder
@ToString
@Getter
public class CompletedCancelTransferEvent {
private String srcAccountID;
private String dstAccountID;
private Long amount;
private String transferID;
}
CompletedCompensationCancelEvent.java
@Builder
@ToString
@Getter
public class CompletedCompensationCancelEvent {
private String srcAccountID;
private String dstAccountID;
private Long amount;
private String transferID;
}
5. Jeju 모듈 수정
보상 트랜잭션 처리를 위한 Handler 메소드를 추가하기 위해 Aggregate 클래스를 수정합니다.
Account.java
@Entity
@Aggregate
@NoArgsConstructor
@AllArgsConstructor
@Slf4j
public class Account {
@AggregateIdentifier
@Id
private String accountID;
private Long balance;
private final transient Random random = new Random();
@CommandHandler
public Account(AccountCreationCommand command) throws IllegalAccessException {
log.debug("handling {}", command);
if (command.getBalance() <= 0)
throw new IllegalAccessException("유효하지 않은 입력입니다.");
apply(new AccountCreationEvent(command.getAccountID(), command.getBalance()));
}
@EventSourcingHandler
protected void on(AccountCreationEvent event) {
log.debug("event {}", event);
this.accountID = event.getAccountID();
this.balance = event.getBalance();
}
@CommandHandler
protected void on(JejuBankTransferCommand command) throws InterruptedException {
if (random.nextBoolean())
TimeUnit.SECONDS.sleep(15);
log.debug("handling {}", command);
if (this.balance < command.getAmount()) {
apply(TransferDeniedEvent.builder()
.srcAccountID(command.getSrcAccountID())
.dstAccountID(command.getDstAccountID())
.amount(command.getAmount())
.description("잔고가 부족합니다.")
.transferID(command.getTransferID())
.build());
} else {
apply(TransferApprovedEvent.builder()
.srcAccountID(command.getSrcAccountID())
.dstAccountID(command.getDstAccountID())
.transferID(command.getTransferID())
.amount(command.getAmount())
.build());
}
}
@EventSourcingHandler
protected void on(TransferApprovedEvent event) {
log.debug("event {}", event);
this.balance -= event.getAmount();
}
@CommandHandler
protected void on(JejuBankCancelTransferCommand command) {
log.debug("handling {}", command);
apply(CompletedCancelTransferEvent.builder()
.srcAccountID(command.getSrcAccountID())
.dstAccountID(command.getDstAccountID())
.transferID(command.getTransferID())
.amount(command.getAmount())
.build());
}
@EventSourcingHandler
protected void on(CompletedCancelTransferEvent event) {
log.debug("event {}", event);
this.balance += event.getAmount();
}
@CommandHandler
protected void on(JejuBankCompensationCancelCommand command) {
log.debug("handling {}", command);
apply(CompletedCompensationCancelEvent.builder()
.srcAccountID(command.getSrcAccountID())
.dstAccountID(command.getDstAccountID())
.transferID(command.getTransferID())
.amount(command.getAmount())
.build());
}
@EventSourcingHandler
protected void on(CompletedCompensationCancelEvent event) {
log.debug("event {}", event);
this.balance -= event.getAmount();
}
}
위 클래스 구현 내용 중에서 Timeout 테스트를 위해 50% 확률로 Sleep 하도록 임의로 추가하였습니다.
if (random.nextBoolean())
TimeUnit.SECONDS.sleep(15);
6. Command 모듈 수정
1. Command 클래스 수정을 위해 command 패키지 하위 MoneyTransferCommand 클래스 파일을 엽니다.
2. 클래스 파일을 수정합니다.
MoneyTransferCommand.java
@Builder
@ToString
@Getter
public class MoneyTransferCommand {
private String srcAccountID;
@TargetAggregateIdentifier
private String dstAccountID;
private Long amount;
private String transferID;
private BankType bankType;
public enum BankType{
JEJU(command -> new TransferComamndFactory(new JejuBankTransferCommand(),new JejuBankCancelTransferCommand(), new JejuBankCompensationCancelCommand())),
SEOUL(command -> new TransferComamndFactory(new SeoulBankTransferCommand(), new SeoulBankCancelTransferCommand(), new SeoulBankCompensationCancelCommand()));
private Function<MoneyTransferCommand, TransferComamndFactory> expression;
BankType(Function<MoneyTransferCommand, TransferComamndFactory> expression){ this.expression = expression;}
public TransferComamndFactory getCommandFactory(MoneyTransferCommand command){
TransferComamndFactory factory = this.expression.apply(command);
factory.create(command.getSrcAccountID(), command.getDstAccountID(), command.amount, command.getTransferID());
return factory;
}
}
public static MoneyTransferCommand of(TransferDTO dto){
return MoneyTransferCommand.builder()
.srcAccountID(dto.getSrcAccountID())
.dstAccountID(dto.getDstAccountID())
.amount(dto.getAmount())
.bankType(dto.getBankType())
.transferID(UUID.randomUUID().toString())
.build();
}
}
3. 보상 트랜잭션 구현을 위해 Saga 클래스를 엽니다.
4. Saga 클래스를 수정합니다.
TransferManager.java
@Saga
@Slf4j
public class TransferManager {
@Autowired
private transient CommandGateway commandGateway;
private boolean isExecutingCompensation = false;
private boolean isAbortingCompensation = false;
private TransferComamndFactory comamndFactory;
@StartSaga
@SagaEventHandler(associationProperty = "transferID")
protected void on(MoneyTransferEvent event) {
log.debug("Created saga instance");
log.debug("event : {}", event);
comamndFactory = event.getComamndFactory();
SagaLifecycle.associateWith("srcAccountID", event.getSrcAccountID());
try {
log.info("계좌 이체 시작 : {} ", event);
commandGateway.sendAndWait(comamndFactory.getTransferCommand(), 10, TimeUnit.SECONDS);
} catch (CommandExecutionException e) {
log.error("Failed transfer process. Start cancel transaction");
cancelTransfer();
}
}
private void cancelTransfer() {
isExecutingCompensation = true;
log.info("보상 트랜잭션 요청");
commandGateway.send(comamndFactory.getAbortTransferCommand());
}
@SagaEventHandler(associationProperty = "srcAccountID")
protected void on(CompletedCancelTransferEvent event) {
isExecutingCompensation = false;
if (!isAbortingCompensation) {
log.info("계좌 이체 취소 완료 : {} ", event);
SagaLifecycle.end();
}
}
@SagaEventHandler(associationProperty = "srcAccountID")
protected void on(TransferDeniedEvent event) {
log.info("계좌 이체 실패 : {}", event);
log.info("실패 사유 : {}", event.getDescription());
if(isExecutingCompensation){
isAbortingCompensation = true;
log.info("보상 트랜잭션 취소 요청 : {}", event);
commandGateway.send(comamndFactory.getCompensationAbortCommand());
}
else {
SagaLifecycle.end();
}
}
@SagaEventHandler(associationProperty = "srcAccountID")
@EndSaga
protected void on(CompletedCompensationCancelEvent event){
isAbortingCompensation = false;
log.info("보상 트랜잭션 취소 완료 : {}",event);
}
@SagaEventHandler(associationProperty = "srcAccountID")
protected void on(TransferApprovedEvent event) {
if (!isExecutingCompensation && !isAbortingCompensation) {
log.info("이체 금액 {} 계좌 반영 요청 : {}",event.getAmount(), event);
SagaLifecycle.associateWith("accountID", event.getDstAccountID());
commandGateway.send(TransferApprovedCommand.builder()
.accountID(event.getDstAccountID())
.amount(event.getAmount())
.transferID(event.getTransferID())
.build());
}
}
@SagaEventHandler(associationProperty = "accountID")
@EndSaga
protected void on(DepositCompletedEvent event){
log.info("계좌 이체 성공 : {}", event);
}
}
5. 테스트 결과
정상
com.cqrs.command.saga.TransferManager : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@d713db5)
com.cqrs.command.saga.TransferManager : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@d713db5)
com.cqrs.command.saga.TransferManager : 이체 금액 1 계좌 반영 요청 : TransferApprovedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090, amount=1)
c.c.command.aggregate.AccountAggregate : handling TransferApprovedCommand(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=1, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090)
c.c.command.aggregate.AccountAggregate : applying DepositMoneyEvent(holderID=b01fae84-e8a5-427d-a5f4-baa7376b7163, accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=1)
c.c.command.aggregate.AccountAggregate : balance 31
com.cqrs.command.saga.TransferManager : 계좌 이체 성공 : DepositCompletedEvent(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090)
Timeout
com.cqrs.command.saga.TransferManager : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=7a01bd7a-6ce0-43bd-93e9-ee0224dfd791, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3226f9ff)
com.cqrs.command.saga.TransferManager : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=7a01bd7a-6ce0-43bd-93e9-ee0224dfd791, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3226f9ff)
com.cqrs.command.saga.TransferManager : Failed transfer process. Start cancel transaction
com.cqrs.command.saga.TransferManager : 보상 트랜잭션 요청
com.cqrs.command.saga.TransferManager : 계좌 이체 취소 완료 : CompletedAbortTransferEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=1, transferID=7a01bd7a-6ce0-43bd-93e9-ee0224dfd791)
잔고 부족
c.c.command.aggregate.AccountAggregate : handling MoneyTransferCommand(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=300, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, bankType=JEJU)
com.cqrs.command.saga.TransferManager : Created saga instance
com.cqrs.command.saga.TransferManager : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@66a12a11)
com.cqrs.command.saga.TransferManager : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@66a12a11)
com.cqrs.command.saga.TransferManager : 계좌 이체 실패 : TransferDeniedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, amount=300, description=잔고가 부족합니다.)
com.cqrs.command.saga.TransferManager : 실패 사유 : 잔고가 부족합니다.
잔고 부족 + Timeout
com.cqrs.command.saga.TransferManager : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3d4d8677)
com.cqrs.command.saga.TransferManager : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3d4d8677)
com.cqrs.command.saga.TransferManager : Failed transfer process. Start cancel transaction
com.cqrs.command.saga.TransferManager : 보상 트랜잭션 요청
com.cqrs.command.saga.TransferManager : 계좌 이체 실패 : TransferDeniedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, amount=300, description=잔고가 부족합니다.)
com.cqrs.command.saga.TransferManager : 실패 사유 : 잔고가 부족합니다.
com.cqrs.command.saga.TransferManager : 보상 트랜잭션 취소 요청 : TransferDeniedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, amount=300, description=잔고가 부족합니다.)
com.cqrs.command.saga.TransferManager : 보상 트랜잭션 취소 완료 : CompletedCompensationAbortEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=300, transferID=b132f585-3be8-49b0-ab67-0293a05684ff)
테스트 결과 각기 다른 상황에서 정상적으로 수행되는 것을 확인할 수 있습니다.
6. 마치며
이상으로 AxonFramework에 대한 포스팅을 마치겠습니다. 테스팅에 대해서는 다루지 않았는데, 분산 App 환경에서 테스트 코드 작성은 반드시 필요하다고 생각합니다. 따라서 공식문서를 참고하여 테스트 코드 작성 방법을 익힌다면, 보다 안전한 프로그램이 될 것입니다.
그 외에 쿠버네티스 지원, Tracing에 대해서도 공식문서에 소개되어 있으니 참고바랍니다. 또한, 지금까지 구현한 프로젝트 내용은 깃헙에 업로드 했습니다.
포스팅 내용 중 개선사항에 대해서는 댓글로 남겨주시면, 확인 후 내용 반영하도록 하겠습니다.
'MSA > AxonFramework' 카테고리의 다른 글
19. Saga 패턴을 활용한 트랜잭션 관리 - 2 (1) | 2020.01.25 |
---|---|
18. Saga 패턴을 활용한 트랜잭션 관리 - 1 (2) | 2020.01.23 |
17. Query 어플리케이션 구현(Query) - 3 (3) | 2020.01.20 |
16. Query 어플리케이션 구현(Query) - 2 (3) | 2020.01.19 |
15. Query 어플리케이션 구현(Query) - 1 (0) | 2020.01.14 |