1. 서론

 

 

이번 포스팅은 분산 트랜잭션 제어를 위한 Saga 패턴을 구현하겠습니다. 보상 트랜잭션까지 같이 구현하면 내용이 복잡하므로 보상 트랜잭션 및 Deadline 기능은 다음 포스팅에서 다루겠습니다.

 

Saga 패턴 사용을 위한 가상의 시나리오는 다음과 같습니다.

 

테스트 시나리오

 

1. Jeju 모듈에 계좌를 개설한다.(계좌명 : test, 잔고 : 100)

2. Command 모듈에 계정 & 계좌를 개설한다.

3. Jeju 모듈의 계좌에서 30원을 Command 모듈 계좌로 이체한다.

4. Jeju 모델 DB의 계좌와 Query 모델 DB를 통해서 이체금액을 확인한다.

 

이전 포스팅에서 구현한 Jeju 모듈을 활용하여 코드 구현을 진행하겠습니다. Saga 요청을 위한 API 및 Saga 인스턴스는 Command 모듈에 생성하도록 하겠습니다.


2. Common 모듈 구현

 

1. 공통으로 사용할 Command 및 Event 추가를 위해 Common 모듈 Command 패키지 및 Command 클래스를 생성합니다.

 


 

2. 생성한 Command 클래스를 구현합니다.

 

TransferComamndFactory.java

@RequiredArgsConstructor
public class TransferComamndFactory {
    private final AbstractTransferCommand transferCommand;

    public void create(String srcAccountID, String dstAccountID, Long amount, String transferID){
        transferCommand.create(srcAccountID, dstAccountID, amount, transferID);
    }

    public AbstractTransferCommand getTransferCommand(){
        return this.transferCommand;
    }
}

 

위 클래스는 계좌 이체와 연관된 Command를 생성하는 Factory 클래스입니다. 나중에 보상 트랜잭션을 위한 Command를 같이 관리하기 위하여 생성하였습니다.

 

AbstractTransferCommand.java

@ToString
@Getter
public abstract class AbstractTransferCommand {
    @TargetAggregateIdentifier
    protected String srcAccountID;
    protected String dstAccountID;
    protected Long amount;
    protected String transferID;

    public AbstractTransferCommand create(String srcAccountID, String dstAccountID, Long amount, String transferID) {
        this.srcAccountID = srcAccountID;
        this.dstAccountID = dstAccountID;
        this.transferID = transferID;
        this.amount = amount;
        return this;
    }
}

 

계좌 이체 요청을 위한 클래스입니다. srcAccountID는 인출할 계좌 ID이며, dstAccountID는 송금 대상 계좌 ID입니다.

 

JejuBankTransferCommand.java

public class JejuBankTransferCommand extends AbstractTransferCommand {
    @Override
    public String toString() {
        return "JejuBankTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

SeoulBankTransferCommand.java

public class SeoulBankTransferCommand extends AbstractTransferCommand {
    @Override
    public String toString() {
        return "SeoulBankTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

 


3. Common 모듈 event 패키지 하위에 Event 클래스를 생성합니다.

 

 


4. Event 클래스를 구현합니다.

 

MoneyTransferEvent.java

@Builder
@ToString
@Getter
public class MoneyTransferEvent {
    private String dstAccountID;
    private String srcAccountID;
    private Long amount;
    private String transferID;
    private TransferComamndFactory comamndFactory;
}

 

위 클래스는 Command 모듈로 계좌이체 요청이 들어오면, 인출할 대상 계좌번호(srcAccountID) 및 입금 계좌번호(dstAccountID), 그리고 이체 금액(amount)와 같은 기본정보와 트랜잭션간 고유 키(transferID) 및 요청 Command 구분 정보를 담고있습니다.

 

TransferApprovedEvent.java

@Builder
@ToString
@Getter
public class TransferApprovedEvent {
    private String srcAccountID;
    private String dstAccountID;
    private String transferID;
    private Long amount;
}

 

위 클래스는 계좌이체가 성공되었을 때, 금액을 반영하기 위한 Event입니다.

 

TransferDeniedEvent.java

@Getter
@Builder
@ToString
public class TransferDeniedEvent {
    private String srcAccountID;
    private String dstAccountID;
    private String transferID;
    private Long amount;
    private String description;
}

 

위 클래스는 계좌이체가 거절될 때, 요청 App에 거절 내역을 전달하기 위한 Event입니다.


3. Jeju 모듈 구현

 

1. Jeju 은행 모듈 build.gradle 파일을 엽니다.

 


2. Jeju 모듈에 State-Stored-Aggregate 구현을 위해 JPA 및 DB 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
dependencies{
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
    implementation group: 'org.postgresql', name: 'postgresql', version: '42.2.6'
}

3. Jeju 모듈 resources 패키지 하위 application.yml 파일을 엽니다.

 


4. datasource 속성을 추가합니다.

 

application.yml

server:
  port: 9091

spring:
  application:
    name: eventsourcing-cqrs-jejuBank
  datasource:
    platform: postgres
    url: jdbc:postgresql://localhost:5432/jeju
    username: jeju
    password: jeju
  jpa:
    hibernate:
      ddl-auto: update

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

logging.level.com.cqrs.jeju : debug

 

DB는 Command와 Query와 동일하게 Postgre를 사용하였으며, jeju 계정을 별도로 생성하였습니다.


5. 계좌 생성을 위해 Jeju 모듈 하위에 다음과 같은 패키지들을 생성합니다.

 


6. aggregate 패키지 내 aggregate 클래스부터 구현하겠습니다.

 

Account.java


@Entity
@Aggregate
@NoArgsConstructor
@AllArgsConstructor
@Slf4j
public class Account {
    @AggregateIdentifier
    @Id
    private String accountID;
    private Long balance;
    
    @CommandHandler
    public Account(AccountCreationCommand command) throws IllegalAccessException {
        log.debug("handling {}", command);
        if (command.getBalance() <= 0)
            throw new IllegalAccessException("유효하지 않은 입력입니다.");
        apply(new AccountCreationEvent(command.getAccountID(), command.getBalance()));
    }

    @EventSourcingHandler
    protected void on(AccountCreationEvent event) {
        log.debug("event {}", event);
        this.accountID = event.getAccountID();
        this.balance = event.getBalance();
    }

    @CommandHandler
    protected void on(JejuBankTransferCommand command) throws InterruptedException {

        log.debug("handling {}", command);
        if (this.balance < command.getAmount()) {
            apply(TransferDeniedEvent.builder()
                                        .srcAccountID(command.getSrcAccountID())
                                        .dstAccountID(command.getDstAccountID())
                                        .amount(command.getAmount())
                                        .description("잔고가 부족합니다.")
                                        .transferID(command.getTransferID())
                                     .build());
        } else {
            apply(TransferApprovedEvent.builder()
                                            .srcAccountID(command.getSrcAccountID())
                                            .dstAccountID(command.getDstAccountID())
                                            .transferID(command.getTransferID())
                                            .amount(command.getAmount())
                                        .build());
        }
    }

    @EventSourcingHandler
    protected void on(TransferApprovedEvent event) {
        log.debug("event {}", event);
        this.balance -= event.getAmount();
    }
}

 

트랜잭션 테스트 도중 Account 정보를 DB에서 바로 확인하기 위해 State-Stored Aggregate 형태로 구현하였습니다. 위 코드 내용은 Command 어플리케이션 구현파트에서 다룬 내용이 주이기에 별도 설명은 생략하겠습니다.


7. command 패키지내 Command 클래스 생성 및 이를 구현합니다.

 

AccountCreationCommand.java

@NoArgsConstructor
@AllArgsConstructor
@ToString
@Getter
public class AccountCreationCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private Long balance;
}

8. dto 패키지내 dto 클래스 생성 및 이를 구현합니다.

 

AccountDTO.java

@NoArgsConstructor
@AllArgsConstructor
@ToString
@Getter
public class AccountDTO {
    private String accountID;
    private Long balance;
}

9. event 패키지내 event 클래스를 생성 및 이를 구현합니다.

 

AccountCreationEvent.java

@ToString
@RequiredArgsConstructor
@Getter
public class AccountCreationEvent {
    private final String accountID;
    private final Long balance;
}

해당 이벤트는 Jeju 모듈내에서만 유효하기 때문에 공통 모듈에 생성하지 않았습니다.


10. service 패키지내 service 클래스 생성 및 이를 구현합니다.

 

AccountService.java

public interface AccountService {
    String createAccount(AccountDTO accountDTO);
}

 

AccountServiceImpl.java

@Service
@RequiredArgsConstructor
public class AccountServiceImpl implements AccountService {
    private final CommandGateway commandGateway;

    @Override
    public String createAccount(AccountDTO accountDTO) {
        return commandGateway.sendAndWait(new AccountCreationCommand(accountDTO.getAccountID(), accountDTO.getBalance()));
    }
}

11. controller 패키지내 controller 클래스 생성 및 이를 구현합니다.

 

AccountController.java

@RestController
@RequiredArgsConstructor
public class AccountController {
    private final AccountService accountService;

    @PostMapping("/account")
    public ResponseEntity<String> createAccount(@RequestBody AccountDTO accountDTO){
        return ResponseEntity.ok().body(accountService.createAccount(accountDTO));
    }
}

4. Command 모듈 구현

 

1. Command 구현 포스팅 8번 항목에서 AxonServerCommandBus를 SimpleCommandBus로 대체했습니다. Saga 트랜잭션에서는 다른 모듈에 대하여 Command 요청이 필요하므로 기존에 SImpleCommandBus 설정을 해제해야합니다. 이를 위해 config 패키지내에 있는 AxonConfig 클래스를 엽니다.

 


2. AxonConfig 파일에서 commandBus Bean 로직을 주석처리하거나 삭제합니다.

 

AxonConfig.java

@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
	(...중략...)
//    @Bean
//    SimpleCommandBus commandBus(TransactionManager transactionManager){
//        return  SimpleCommandBus.builder().transactionManager(transactionManager).build();
//    }
	(...중략...)
}

3. 계좌이체 요청 및 반영을 위한 Command 요청을 위해 command 패키지 내 command 클래스를 생성합니다.

 


4. Command 클래스 내용을 구현합니다.

 

TransferApprovedCommand.java

@ToString
@Getter
@Builder
public class TransferApprovedCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private Long amount;
    private String transferID;
}

위 클래스는 계좌이체가 정상적으로 수행되었을 때, 요청을 위한 Command 클래스입니다.

 

MoneyTransferCommand.java

@Builder
@ToString
@Getter
public class MoneyTransferCommand {
    private String srcAccountID;
    @TargetAggregateIdentifier
    private String dstAccountID;
    private Long amount;
    private String transferID;
    private BankType bankType;

    public enum BankType{
        JEJU(command -> new TransferComamndFactory(new JejuBankTransferCommand()),
        SEOUL(command -> new TransferComamndFactory(new SeoulBankTransferCommand());

        private Function<MoneyTransferCommand, TransferComamndFactory> expression;
        BankType(Function<MoneyTransferCommand, TransferComamndFactory> expression){ this.expression = expression;}
        public TransferComamndFactory getCommandFactory(MoneyTransferCommand command){
            TransferComamndFactory factory = this.expression.apply(command);
            factory.create(command.getSrcAccountID(), command.getDstAccountID(), command.amount, command.getTransferID());
            return factory;
        }

    }

    public static MoneyTransferCommand of(TransferDTO dto){
        return MoneyTransferCommand.builder()
                .srcAccountID(dto.getSrcAccountID())
                .dstAccountID(dto.getDstAccountID())
                .amount(dto.getAmount())
                .bankType(dto.getBankType())
                .transferID(UUID.randomUUID().toString())
                .build();
    }
}

 

계좌 이체 요청시, 은행구분에 따른 Command 생성을 달리 하기 위하여 enum을 사용했습니다. 또한, DTO 클래스를 Command 클래스로 변환하기 위한 Factory 메소드를 추가하였습니다.


5. DTO 추가를 위해 dto 패키지 내 dto 클래스를 생성합니다.

 

 

TransferDTO.java

@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class TransferDTO {
    private String srcAccountID;
    private String dstAccountID;
    private Long amount;
    private MoneyTransferCommand.BankType bankType;
}

6. Service 메소드 추가를 위해서 service 패키지 하위 service 클래스를 수정합니다.

 

 

TransactionService.java

public interface TransactionService {
    (...중략...)
    String transferMoney(TransferDTO transferDTO);
}

 

TransactionServiceImpl.java

@Service
@RequiredArgsConstructor
public class TransactionServiceImpl implements TransactionService {
    private final CommandGateway commandGateway;
	(...중략...)

    @Override
    public String transferMoney(TransferDTO transferDTO) {
        return commandGateway.sendAndWait(MoneyTransferCommand.of(transferDTO));
    }
}

7. API 추가를 위해서 controller 패키지내 Controller에 메소드를 추가합니다.

 

 

TransactionController.java

@RestController
@RequiredArgsConstructor
public class TransactionController {
    private final TransactionService transactionService;

    (...중략...)

    @PostMapping("/transfer")
    public ResponseEntity<String> transfer(@RequestBody TransferDTO transferDTO){
        return ResponseEntity.ok().body(transactionService.transferMoney(transferDTO));
    }
}

8. Aggregate 수정을 위해 aggregate 패키지내 AccountAggregate 클래스를 엽니다.

 


9. Aggregate에 Command 및 EventSourcing 핸들러 로직을 추가합니다.

 

AccountAggregate.java

@NoArgsConstructor
@AllArgsConstructor
@Slf4j
@Aggregate
@EqualsAndHashCode
public class AccountAggregate {
    (...중략...)

    @CommandHandler
    protected void transferMoney(MoneyTransferCommand command) {
        log.debug("handling {}", command);
        apply(MoneyTransferEvent.builder()
                .srcAccountID(command.getSrcAccountID())
                .dstAccountID(command.getDstAccountID())
                .amount(command.getAmount())
                .comamndFactory(command.getBankType().getCommandFactory(command))
                .transferID(command.getTransferID())
                .build());
    }

    @CommandHandler
    protected void transferMoney(TransferApprovedCommand command) {
        log.debug("handling {}", command);
        apply(new DepositMoneyEvent(this.holderID, command.getAccountID(), command.getAmount()));
        apply(new DepositCompletedEvent(command.getAccountID(), command.getTransferID()));
    }
}

10. Saga 패키지 및 Saga 클래스를 생성합니다.

 


11. Saga 클래스를 구현합니다.

 

TransferManager.java

@Saga
@Slf4j
public class TransferManager {
    @Autowired
    private transient CommandGateway commandGateway;
    private TransferComamndFactory comamndFactory;

    @StartSaga
    @SagaEventHandler(associationProperty = "transferID")
    protected void on(MoneyTransferEvent event) {
        log.debug("Created saga instance");
        log.debug("event : {}", event);
        comamndFactory = event.getComamndFactory();
        SagaLifecycle.associateWith("srcAccountID", event.getSrcAccountID());

		log.info("계좌 이체 시작 : {} ", event);
		commandGateway.send(comamndFactory.getTransferCommand());
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferApprovedEvent event) {
		log.info("이체 금액 {} 계좌 반영 요청 : {}", event.getAmount(), event);
        SagaLifecycle.associateWith("accountID", event.getDstAccountID());
        commandGateway.send(TransferApprovedCommand.builder()
                .accountID(event.getDstAccountID())
                .amount(event.getAmount())
                .transferID(event.getTransferID())
                .build());
    }
    
    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferDeniedEvent event) {
        log.info("계좌 이체 실패 : {}", event);
        log.info("실패 사유 : {}", event.getDescription());
		SagaLifecycle.end();
    }

    @SagaEventHandler(associationProperty = "accountID")
    @EndSaga
    protected void on(DepositCompletedEvent event){
        log.info("계좌 이체 성공 : {}", event);
    }
}

 

구현 내용을 간략하게 소개하면 다음과 같습니다. 먼저 @Saga 어노테이션을 통해 해당 클래스가 Saga의 대상임을 지정합니다. 해당 클래스는 NoArgsConstructor로 생성이 되어야하므로 이를 주의합니다.

 

@StartSaga는 Saga 인스턴스의 시작점입니다. 여기에서 associationProperty는 해당 인스턴스를 유일하게 구별할 수 있는 속성이 무엇인지 지정합니다. 자세한 내용은 Axon 공식문서를 참고바랍니다.

 

생성된 Saga는 트랜잭션이 끝나면 종료되어야합니다. 위 예제에서는 DepositCompletedEvent 메시지를 수신받으면, 전체 트랜잭션이 끝난것으로 판단하여 Saga 인스턴스를 종료하도록 되어있습니다. Saga 인스턴스 종료방법은 크게 두가지입니다. 먼저 위 에제와 같이 명시적으로 end 메소드를 기입하거나 @EndSaga 어노테이션 지정할 수 있습니다.


5. 테스트

 

1. Command, Query, Jeju 모듈 App을 기동한 다음, Jeju 은행 계좌를 개설합니다.

POST localhost:9091/account
Content-Type: application/json

{
	"accountID" : "test",
	"balance" : 100
}

2. 생성된 계좌 내역을 DB에서 확인합니다.

 


3. Command 모듈에 계정을 생성합니다.

 

POST http://localhost:8080/holder
Content-Type: application/json

{
	"holderName" : "Kevin",
	"tel" : "02-2645-5678",
	"address" : "OO시 OO구",
    "company" : "Korea"
}

4. Command 모듈에 계좌를 개설합니다.

 

POST http://localhost:8080/account
Content-Type: application/json

{
  "holderID" : "b01fae84-e8a5-427d-a5f4-baa7376b7163"
}

5. Query 모듈 DB에서 계정 및 계좌 생성 내역을 확인합니다.

 


6. Jeju 은행 계좌에서 Command 모듈에 개설된 계좌로 30원 이체합니다.

 

POST http://localhost:8080/transfer
Content-Type: application/json
{
	"srcAccountID" : "test",
	"dstAccountID" : "a31faade-5b57-4435-85da-1de0ed1c55c4",
	"amount" : 30,
	"bankType" : "JEJU"
}

 

AccountAggregate   : handling MoneyTransferCommand(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, bankType=JEJU)
TransferManager    : Created saga instance
TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@5a65bed)
TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@5a65bed) 
TransferManager    : 이체 금액 30 계좌 반영 요청 : TransferApprovedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, amount=30)
AccountAggregate   : handling TransferApprovedCommand(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b)
AccountAggregate   : applying DepositMoneyEvent(holderID=b01fae84-e8a5-427d-a5f4-baa7376b7163, accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30)
AccountAggregate   : balance 30
TransferManager    : 계좌 이체 성공 : DepositCompletedEvent(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=3a6583e3-0288-4e07-87f0-c818644e009b)

성공적으로 계좌이체가 완료되었으면 위와 비슷한 로그가 출력될 것입니다.


7. DB에서 계좌 상태를 확인합니다.

 


6. 마치며

 

 

serialized_sga 내용

<com.cqrs.command.saga.TransferManager>
    <comamndFactory>
        <transferCommand class="com.cqrs.command.transfer.JejuBankTransferCommand">
        <srcAccountID>test</srcAccountID>
        <dstAccountID>a31faade-5b57-4435-85da-1de0ed1c55c4</dstAccountID>
        <amount>30</amount>
        <transferID>test</transferID>
        </transferCommand>
        </compensationAbortCommand>
    </comamndFactory>
</com.cqrs.command.saga.TransferManager>

 

이번 포스팅에서는 Jeju, Command 모듈간에 일관성을 위하여 Saga 패턴을 사용했습니다. Command 모듈에서 MoneyTransferEvent Event가 발행되면, Saga 인스턴스가 생성됩니다. 이후 Application의 비즈니스 로직에 따라 모든 트랜잭션이 종료될 때까지 유지됩니다. Saga 인스턴스 정보는 동시에 SAGA_ENTRY 테이블에 적재되며, Saga 인스턴스가 종료되면 해당 데이터 또한 사라집니다.

 

다음 포스팅에서는 Deadline 기능, 보상 트랜잭션 로직이 적용된 Saga 패턴을 구현하겠습니다.

1. 서론

 

MSA 아키텍처를 구성하기 어려운 이유중 가장 큰 문제는 트랜잭션(Transaction)입니다. 기존 모놀로틱(Monolithic) 환경에서 DBMS가 기본적으로 제공해주는 트랜잭션 기능을 통해 데이터 Commit 혹은 Rollback을 통해 데이터를 일관성 있게 관리했습니다. 하지만 Application 및 DB가 분산되면서, 트랜잭션 처리를 단일 DBMS에서 제공하는 기능을 통해서 달성할 수 없습니다.

 

이번 포스팅에서는 분산 트랜잭션의 종류인 Two-Phase Commit, Saga 패턴 및 Axon에서 제공하는 Saga 기능에 대하여 소개하겠습니다.


2. Two Phase Commit

 

 

분산 DB 환경에서 쓰는 방법으로 주요 RDBMS에서 기능을 제공합니다. Two-Phase Commit은 말 그대로 2단계에 거쳐서 데이터를 영속화 하는 작업입니다. 위 그림과 같이 여러 DB가 분산 되었을 때, 트랜잭션을 조율하는 조정자(Coordinator)가 존재합니다. 조정자의 역할은 트랜잭션 요청이 들어왔을 때 두 단계를 거쳐 트랜잭션을 진행을 담당합니다. 이때 첫 번째 단계는 Prepare이며, 이는 쉽게말해 연관된 DB에게 데이터를 저장할 수 있는 상태인지 묻는 과정에 해당합니다.

 

 

메시지를 받은 DB에서는 Commit 작업을 위한 준비를 진행합니다. 이후 데이터를 영속할 수 있는 준비가 완료되면 조정자에게 준비가 완료되었음을 알리고, 반대로 Commit할 수 없다면 불가하다는 메시지를 전달합니다.

 

조정자는 첫 번째 단계에서 전달한 메시지에 대한 응답을 기다립니다. 모든 메시지 수신이 완료되면 두 번째 단계인 Commit을 진행합니다. Commit 단계에서는 조정자가 연관된 DB에게 데이터를 저장하라는 메시지를 송신하며, 수신받은 DB에서는 각자 DB에 데이터를 영속화 합니다.

 

 

모든 DB에서 트랜잭션 처리가 완료되면 전체 트랜잭션을 종료합니다.만약 두 단계를 거치는 과정에서 연관된 DB 중 하나의 DB라도 Commit을 할 수 없는 상황이라면, 모든 DB에게 Rollback을 요구합니다. 트랜잭션을 종료하는 동시에 모든 DB 데이터가 영속화됩니다. 따라서 트랜잭션의 범위는 데이터를 처리하는 DB 전체입니다.


MSA 환경에서 Two-Phase Commit 문제점

 

Two-Phase Commit은 DBMS 간 분산 트랜잭션을 지원해야 적용가능합니다. 하지만 NoSQL 제품군에는 이를 지원하지 않고, 함께 사용되는 DBMS가 동일 제품군(Oracle, MySQL, Postgres)이여야합니다. 따라서 DBMS polyglot 구성은 어렵습니다.

 

또한 Two-Phase Commit은 보통 하나의 API 엔드포인트를 통해 서비스 요청이 들어오고 내부적으로 DB가 분산되어있을 때 사용됩니다. 하지만 MSA 환경에서는 각기 다른 App에서 API간으로 통신을 통해 서비스 요청이 이루어지기 때문에 구현이 쉽지 않습니다.


3. Saga 패턴

 

 

Saga 패턴은 트랜잭션의 관리주체가 DBMS가 아닌 Application에 있습니다. App이 분산되어있을 때, 각 App 하위에 존재하는 DB는 Local 트랜잭션 처리만 담당합니다. 

 

 

따라서 각각의 App에 대한 연속적인 트랜잭션 요청 및 실패할 경우에 Rollback 처리(보상 트랜잭션)를  Application에서 구현해야합니다. 

 

Saga 패턴은 위 그림과 같이 연속적인 업데이트 연산으로 이루어져있으며, 전체가 동시에 데이터가 영속화되는 것이아니라 순차적인 단계로 트랜잭션이 이루어집니다. 따라서 Application 비즈니스 로직에서 요구되는 마지막 트랜잭션이 끝났을 때, 데이터가 완전히 영속되었음을 인지하고 이를 종료합니다.

 

Two-Phase Commit과 다르게 Saga를 활용한 트랜잭션은 데이터 격리성(Isolation)을 보장해주지 않습니다. 하지만 Application의 트랜잭션 관리를 통해 최종 일관성(Eventually Consistency)을 달성할 수 있기 때문에 분산되어있는 DB간에 정합성을 맞출 수 있습니다. 또한 트랜잭션 관리를 Application에서 하기 때문에 DBMS를 다른 제품군으로 구성할 수 있는 장점이 있습니다.

 

하지만 이러한 일관성을 달성하기 위해서는 프로세스 수행 과정상 누락되는 작업이 없는지 면밀히 살펴야하며, 실패할경우 에러 복구를 위한 보상 트랜잭션 처리 누락이 없도록 설계해야합니다.


4. Saga 패턴 종류


1. Choreography-Based Saga

 

 

Choreography-Based Saga는 자신이 보유한 서비스내 Local 트랜잭션을 관리하며, 트랜잭션이 종료되면 완료 Event를 발행합니다. 만약 그 다음에 수행되어야할 트랜잭션이 있다면, 해당 트랜잭션을 수행해야하는 App에서 완료 Event를 수신받고 다음 작업을 처리합니다. 이때 Event는 Kafka와 같은 메시지 큐를 이용해서 비동기 방식으로 전달할 수 있습니다.

 

 

Choreography-Based Saga 방식에서는 각 App별로 트랜잭션을 관리하는 로직이 있습니다. 따라서 중간에 트랜잭션이 실패하면, 해당 트랜잭션 취소처리를 실패한 App에서 보상 Event를 발행하여 Rollback 처리를 시도합니다.

 

위와 같은 구성은 구축하기 쉬운 장점이 있습니다. 하지만 운영자 입장에서 트랜잭션의 현재 상태를 알기 쉽지 않습니다. 


2. Orchestration-Based Saga

 

 

Orchestration-Based Saga는 트랜잭션 처리를 위한 Saga 인스턴스(Manager)가 별도로 존재합니다. 트랜잭션에 관여하는 모든 App은 Manager에 의하여 점진적으로 트랜잭션을 수행하며 결과를 Manager에게 전달합니다. 비즈니스 로직상 마지막 트랜잭션이 끝나면 Manager를 종료하여 전체 트랜잭션 처리를 종료합니다. 만약 중간에 실패하게 되면 Manager에서 보상 트랜잭션을 발동하여 일관성을 유지하도록 합니다.

 

 

모든 관리를 Manager가 호출하기 때문에 분산트랜잭션의 중앙 집중화가 이루어집니다. 따라서 서비스간의 복잡성이 줄어들고 구현 및 테스트가 상대적으로 쉽습니다. 또한 트랜잭션의 현재 상태를 Manager가 알고 있기 때문에 롤백을 쉽게할 수 있는 것 또한 장점입니다. 하지만 이를 관리하기 위한 Orchestrator 서비스가 추가되어야 하기 때문에 인프라 구현의 복잡성이 증가되는 단점이 존재합니다.


5. Axon Saga 기능 소개

 

AxonFramework 에서는 Orchestration 방식의 Saga 패턴을 지원합니다. 즉 트랜잭션을 시작하는 시점에 Saga 인스턴스를 생성하며. Saga 인스턴스에서 트랜잭션을 관리합니다.

 

 

Saga 인스턴스는 Event를 처리하는 UnitOfWork 단계에서 생성되며, 전체 트랜잭션 처리가 완료되면 Saga 인스턴스를 종료합니다. Axon에서는 Annotation 기반으로 Saga 인스턴스를 간편하게 설정할 수 있습니다. 또한 DB에 Saga 정보를 저장하고 있어 복구가 가능합니다. 

 

추가로 생성된 Saga에서 트랜잭션 요청시, Deadline 지정이 가능합니다. 이로인해 트랜잭션 수행 App으로부터 응답이 없을 경우 보상 트랜잭션을 수행할 수 있습니다.


6. 마치며

 

MSA를 구성하는 환경에서 Saga를 도입하기전에 비즈니스 로직상 트랜잭션처리가 반드시 필요한지에 대한 충분한 고려가 필요합니다. 이곳 저곳에 적용했다가는 트랜잭션 처리 지옥을 경험할 수 있기 때문입니다. 반드시 필요한 부분에만 일부 도입하는 것이 좋으며, 가장 좋은 상황은 MSA 환경에서 트랜잭션 처리를 하지 않도록 비즈니스 로직을 설계하는 것입니다.

 

다음 시간에는 예제 구현을 통해 Axon에서 제공하는 Saga 기능을 익혀보겠습니다.

1. 서론

 

이번 포스팅에서는 Scatter-Gather Query를 구현하겠습니다. Scatter-Gather Query는 동일한 Query를 수행하는 Query Handler가 여러 App에 존재할 경우 모든 App에 Query를 요청하여 결과를 취합받아 최초 Query를 요청한 Application에서 결과를 처리합니다.

 

데모 프로젝트에 아래와 같은 요구사항이 추가되었음을 가정하여 Scatter-Gather Query 기능을 구현하도록 하겠습니다.

 

 

 

Axon Server에 Jeju 은행과 Seoul 은행 외부시스템이 연결되어있다고 가정해봅시다. 이때 소유주(HolderID)가 보유한 잔고에 대하여 각 은행에게 대출한도를 Query하면 은행별로 전달받은 답변을 Client 화면에 표시하는 요구사항을 코드를 통해 알아보겠습니다.


2. Jeju 은행 모듈 구현

 

1. Jeju 은행과, Seoul 은행에 Query를 요청하려면, Query 클래스 정보를 공유해야하므로, 공통 모듈(Common)에 Query 클래스를 생성해야합니다. 먼저 Common 모듈에 query > loan 패키지를 생성합니다. 이후 Query 및 결과를 저장할 클래스를 생성합니다.

 


2. 생성한 두 클래스를 구현합니다.

 

LoanLimitQuery.java

@AllArgsConstructor
@ToString
@Getter
public class LoanLimitQuery {
    private String holderID;
    private Long balance;
}

 

LoanLimitResult.java

@AllArgsConstructor
@ToString
@Getter
@Builder
public class LoanLimitResult {
    private String holderID;
    private String bankName;
    private Long balance;
    private Long loanLimit;
}

3. 새로운 은행 모듈 생성을 위하여 프로젝트 root 디렉토리에 위치한 settings.gradle 파일을 연다음 모듈 추가합니다.

 

settings.gradle

rootProject.name = 'demo'
include 'command'
include 'query'
include 'common'
include 'seoulBank'
include 'jejuBank'

4. 추가된 두 묘듈에서 공통 모듈을 사용하기 위해 빌드 설정을 추가해야 합니다. 프로젝트 root 디렉토리에 위치한 build.gradle 파일을 연다음 빌드 스크립트 내용을 추가합니다.

 

build.gradle

(...중략...)
project(':jejuBank') {
    dependencies {
        compile project(':common')
    }
}

project(':seoulBank') {
    dependencies {
        compile project(':common')
    }
}

 

이후 gradle build를 수행하면, jejuBank, seoulBank 모듈이 생성됩니다.


5.  jejuBank 프로젝트 하위에 build.gradle 파일을 생성합니다.

 


6. build.gradle 파일에 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
dependencies{
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
}

7. resource 패키지 하위에 application.yml 파일을 생성합니다.

 


8. application.yml 파일에 설정 값을 기술합니다.

 

application.yml

server:
  port: 9091

spring:
  application:
    name: eventsourcing-cqrs-jejuBank

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

9. 패키지 구조 설정 한다음 Component 패키지와 Main 클래스를 생성합니다.

 


9. Main 클래스 내용을 구현합니다.

 

jejuBankApp.java

@SpringBootApplication
public class JejuBankApp {
    public static void main(String[] args) {
        SpringApplication.run(JejuBankApp.class, args);
    }
}

10. component 패키지내에 Query를 처리할 Component 클래스를 생성합니다.

 


11. Component 클래스 내용을 구현합니다. 

 

AccountLoanComponent.java

@Component
@Slf4j
public class AccountLoanComponent {

    @QueryHandler
    private LoanLimitResult on(LoanLimitQuery query) {
        log.debug("handling {}",query);
        return LoanLimitResult.builder()
                .holderID(query.getHolderID())
                .balance(query.getBalance())
                .bankName("JejuBank")
                .loanLimit(Double.valueOf(query.getBalance() * 1.2).longValue())
                .build();
    }
}

 

위 코드에서 jeju 은행의 대출한도는 일괄적으로 보유 잔고의 120%만 가능하도록 가정하였습니다.


3. Seoul 은행 모듈 구현

1. seoulBank 프로젝트 하위에 build.gradle 파일을 생성합니다.

 


2. build.gradle 파일에 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
dependencies{
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
}

3. resource 패키지 하위에 application.yml 파일을 생성합니다.

 


 

4. application.yml 파일에 설정 값을 기술합니다.

 

application.yml

server:
  port: 9092

spring:
  application:
    name: eventsourcing-cqrs-seoulBank

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

5. 패키지 구조 설정 한다음 Component 패키지와 Main 클래스를 생성합니다.

 


6. Main 클래스 내용을 구현합니다.

 

SeoulBankApp.java

@SpringBootApplication
public class SeoulBankApp {
    public static void main(String[] args) {
        SpringApplication.run(SeoulBankApp.class, args);
    }
}

7. component 패키지내에 Query를 처리할 Component 클래스를 생성합니다.

 


8.  Component 클래스 내용을 구현합니다.

 

AccountLoanComponent.java

@Component
@Slf4j
public class AccountLoanComponent {
    @QueryHandler
    private LoanLimitResult on(LoanLimitQuery query) {
        log.debug("handling {}",query);
        return LoanLimitResult.builder()
                .holderID(query.getHolderID())
                .balance(query.getBalance())
                .bankName("SeoulBank")
                .loanLimit(Double.valueOf(query.getBalance() * 1.5).longValue())
                .build();
    }
}

4. Query 인터페이스 구현

 

1. 화면 생성을 위해 Query 모듈 resources > templates 패키지내에 scatter-gather.html 파일을 생성합니다.

 


2. 화면 코드를 구현합니다.

 

scatter-gather.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Scatter-Gather Query Example</title>
</head>
<script>
    window.addEventListener("DOMContentLoaded", function (){
        (function () {
            let appendDiv = document.getElementById("layout");
            let text = document.getElementById("holderInput");
            let pElem = document.createElement("p");
            document.getElementById("wrapper").addEventListener("click", append);

            function append(e) {
                let target = e.target;
                let callbackFunction = callback[target.getAttribute("data-cb")];
                callbackFunction();
            }

            let callback = {
                "search": (function () {
                    let holderId = text.value;
                    if (holderId === undefined || holderId === null || holderId ==="") {
                        alert("소유주를 입력하시오.");
                    } else {
                        let xhr = new XMLHttpRequest();
                        xhr.open('GET','http://localhost:9090/account/info/scatter/gather/'+holderId, true);
                        xhr.send();
                        xhr.onload = function(){
                            if(xhr.status === 200){
                                let elem = pElem.cloneNode();
                                elem.innerText = xhr.responseText;
                                appendDiv.appendChild(elem);
                            }
                        }
                    }
                })
            }
        }());
    });
</script>
<body>
<div id="wrapper">
    <input type="button" data-cb="search" value="조회"/>
</div>
<input type="text" id="holderInput" placeholder="소유주 ID를 입력하시오.">
<div id="layout"/>
</body>
</html>

3. Scatter-Query를 요청할 서비스를 구현하기 위하여 먼저 메소드를 정의해야합니다. Query 모듈 service 패키지에 위치한 QueryService 클래스를 엽니다.

 


4. 인터페이스에 추상 메소드를 정의합니다.

 

QueryService.java

public interface QueryService {
    (...중략...)
    List<LoanLimitResult> getAccountInfoScatterGather(String holderId);
}

5. QueryServiceImpl 클래스를 열어 추가된 추상 메소드를 구현합니다.

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Slf4j
@Service
public class QueryServiceImpl implements QueryService {
    (...중략...)
    private final AccountRepository repository;
    (...중략...)
    @Override
    public List<LoanLimitResult> getAccountInfoScatterGather(String holderId) {
        HolderAccountSummary accountSummary = repository.findByHolderId(holderId).orElseThrow();

        return queryGateway.scatterGather(new LoanLimitQuery(accountSummary.getHolderId(), accountSummary.getTotalBalance()),
                ResponseTypes.instanceOf(LoanLimitResult.class),
                30, TimeUnit.SECONDS)
                .collect(Collectors.toList());
    }
}

 

Scatter-Gather 쿼리는 단일 App에 요청하는 것이 아니므로, 만약 Handler 처리 App에 장애가 발생한다면 무한정 대기할 수 있습니다. 따라서 요청시, DeadLine을 정하여 요청시간 만큼만 대기할 수 있도록 지정이 필요합니다.


6. API End Point 지정 및 화면 호출을 위하여 Controller 클래스 수정이 필요합니다. Query 모듈내 Controller 패키지안에 있는 두개의 Controller 클래스에 관련 메소드를 추가합니다.

 

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    (...중략...)

    @GetMapping("account/info/scatter/gather/{id}")
    public ResponseEntity<List<LoanLimitResult>> getAccountInfoScatterGather(@PathVariable(value = "id") @NonNull @NotBlank String holderId){
        return ResponseEntity.ok()
                .body(queryService.getAccountInfoScatterGather(holderId));
    }

}

 

WebController.java

@Controller
public class WebController {
	(...중략...)
    @GetMapping("/scatter-gather")
    public void scatterGatherQueryView(){}
}

5. 테스트

 

1. jeju, seoul 은행 App과 Query App을 기동합니다.

 

2. 웹브라우저(Chrome)에서 http://localhost:9090/scatter-gather URL 입력합니다.

 

3. 임의의 소유주 ID를 입력후 조회 버튼을 눌러 결과를 확인합니다.

 


6. 마치며

 

이번 포스팅을 끝으로 EventSourcing에 필요한 기본적인 Command, Event 처리 및 Query 요청에 대한 필수 기능 구현을 완료했습니다. 각 기능별로 세부적인 기능은 Axon 공식 홈페이지에서 제공하는 DocumentGoogle Groups를 이용하여 검색하시면 많은 자료를 구할 수 있으니 참고 바랍니다.

1. 서론

 

 

이번 시간에는 Query 기능 중 Point to Point, Subscription 기능을 구현합니다. 또한, Query 결과를 보기 위하여 Client 화면을 간략하게 만들겠습니다.

 

Client 화면은 크게 Point to Point Query와 Subsciprtion Query를 조회하는 화면 2개를 분할하였으며, 화면 호출 URL은 다음과 같습니다.

 

Point to Point : http://localhost:9090/p2p

Subscription : http://localhost:9090/subscription

 

 

 

Subscription 에서는 조회를 누르면 Server와의 Connection이 설정되므로 이를 해제하기 위한 종료 버튼을 추가하였습니다. 조회 버튼을 누르게되면, Server API를 호출합니다. 두 API 주소는 다음과 같습니다.

 

 

Point to Point : http://localhost:9090/account/info/{id}

Subscription : http://localhost:9090/account/info/subcription/{id}

 

 

이제 본격적으로 기능 구현을 진행하겠습니다.


2. Point to Point Query

Query를 처리하는 Handler가 하나만 존재하고, 한번만 질의만 하면되는 상황이라면 Point to Point Query가 적합합니다.

해당 기능 구현을 통해 사용방법을 알아보겠습니다.

 

1. Query 모듈 build.gradle 파일을 엽니다.

 

 


2. 화면 구현을 위하여 thymeleaf 의존성을 추가합니다.

 

build.gradle

dependencies{
    (...중략...)
    implementation 'org.springframework.boot:spring-boot-starter-thymeleaf'
}

 


3. 화면 호출을 위하여 Query 모듈 Controller 패키지에 WebController 클래스를 추가합니다.

 

 


4. 클래스 내용을 구현합니다.

 

WebController.java

@Controller
public class WebController {
    @GetMapping("/p2p")
    public void pointToPointQueryView(){}
}

 

Controller에서 http://localhost:9090/p2p URL 호출 시 p2p.html 파일을 전달하도록 지정합니다.


5. 화면 구현을 위해서 Query 모듈 resources 패키지 하위에 templates 패키지 및 p2p.html 파일을 생성합니다.

 


6. html 내용을을 구현합니다.

 

p2p.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>PointToPoint Query Example</title>
</head>
<script>
    window.addEventListener("DOMContentLoaded", function (){
        (function () {
            let appendDiv = document.getElementById("layout");
            let text = document.getElementById("holderInput");
            let pElem = document.createElement("p");
            document.getElementById("wrapper").addEventListener("click", append);

            function append(e) {
                let target = e.target;
                let callbackFunction = callback[target.getAttribute("data-cb")];
                callbackFunction();
            }

            let callback = {
                "search": (function () {
                    let holderId = text.value;
                    if (holderId === undefined || holderId === null || holderId ==="") {
                        alert("소유주를 입력하시오.");
                    } else {
                        let xhr = new XMLHttpRequest();
                        xhr.open('GET','http://localhost:9090/account/info/'+holderId, true);
                        xhr.send();
                        xhr.onload = function(){
                            if(xhr.status === 200){
                                let elem = pElem.cloneNode();
                                elem.innerText = xhr.responseText;
                                appendDiv.appendChild(elem);
                            }
                        }
                    }
                })
            }
        }());
    });
</script>
<body>
<div id="wrapper">
    <input type="button" data-cb="search" value="조회"/>
</div>
<input type="text" id="holderInput" placeholder="소유주 ID를 입력하시오.">
<div id="layout"/>
</body>
</html>

 

위 코드 내용 중 가장 핵심이 되는 로직은 callback 객체입니다. 구현 내용은 비동기로 Query를 수행하는 API에 소유주 정보를 인자로 요청하면, 해당 내용을 수신받아 화면에 표시합니다.

 


7. 화면이 정상적으로 출력되는지 확인하기 위하여, Query App을 기동합니다. 이후 웹브라우저(Chrome)을 열고 화면 호출 테스트를 수행합니다.(http://localhost:9090/p2p)

 

 


8. 테스트가 완료되었으면, Query를 수행할 API 내용을 구현하겠습니다. 먼저 Query 모듈 service 패키지내 QueryService 인터페이스를 엽니다.

 


9. Query 수행을 위한 메소드를 정의합니다.

 

QueryService.java

public interface QueryService {
    void reset();
    HolderAccountSummary getAccountInfo(String holderId);
}

10. Service 구현을 위하여 Query 모듈 service 패키지내 QueryServiceImpl 클래스를 엽니다.

 


11. Interface에 정의된 메소드를 구현합니다.

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Slf4j
@Service
public class QueryServiceImpl implements QueryService {
	(...중략...)
    @Override
    public HolderAccountSummary getAccountInfo(String holderId) {
        AccountQuery accountQuery = new AccountQuery(holderId);
        log.debug("handling {}", accountQuery);
        return queryGateway.query(accountQuery, ResponseTypes.instanceOf(HolderAccountSummary.class)).join();
    }

}

12. API End Point 설정을 위해 controller 패키지내 위치한 HolderAccountController 클래스를 엽니다.

 

 


13. API End Point를 추가합니다.

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    private final QueryService queryService;

	(...중략...)
    @GetMapping("/account/info/{id}")
    public ResponseEntity<HolderAccountSummary> getAccountInfo(@PathVariable(value = "id") @NonNull @NotBlank String holderId){
        return ResponseEntity.ok()
                             .body(queryService.getAccountInfo(holderId));
    }

}

 

 


14. QueryGateway로 전달된 Query를 처리하는 Handler 작성을 위해 Query 모듈 projection 패키지 하위 HolderAccountProjection 클래스를 엽니다.

 

 


15. HolderAccountProjection 클래스에서 QueryHandler 메소드를 구현합니다.

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
    (...중략...)

    @QueryHandler
    public HolderAccountSummary on(AccountQuery query){
        log.debug("handling {}", query);
        return repository.findByHolderId(query.getHolderId()).orElse(null);
    }
}

16. Query App을 기동합니다. 이후 EventStore에 저장된 HolderID 중 하나를 선택하여 입력창에 기입합니다. 조회 버튼을 눌러 정상적으로 조회되는지 확인합니다.

 

테스트 결과, Read Model에 저장된 데이터가 정상적으로 출력되는 것을 확인할 수 있습니다.


3. Subscription Query

 

 

Subscription Query는 Client로부터 Connection을 연결하면, 이를 해제하지 않고 유지합니다. Query를 처리하는 Hanlder App에서는 초기 결과를 최초에 반환합니다. 이때 Flux 타입으로 반환하며, QueryUpdateEmitter를 통해서 Read Model의 변경이 있을 때마다 수신 받습니다.

 

데모 프로젝트에서는 SSE(Server Sent Event) 방식으로 구현하기 위해 Client 화면에서는 EventSource 객체를 사용하겠습니다.

 


1. Query 모듈 build.gradle 파일을 엽니다.

 

 


2. Flux 사용을 위하여 reactor-core 의존성을 추가합니다.

 

build.gradle

dependencies{
    (...중략...)
    implementation group: 'io.projectreactor', name: 'reactor-core'
}

3. 화면 호출을 위하여 Query 모듈 Controller 패키지에 WebController 클래스를 추가합니다.

 


4. 클래스 내용을 구현합니다.

 

WebController.java

@Controller
public class WebController {
   (...중략...)
    @GetMapping("/subscription")
    public void subscriptionQueryView(){}
}

 

Controller에서 http://localhost:9090/subscription URL 호출 시 subscription.html 파일을 전달하도록 지정합니다.


5. 화면 구현을 위해서 Query 모듈 resources 패키지 하위에 templates 패키지 및 subscription.html 파일을 생성합니다.

 


6. html 내용을을 구현합니다.

 

subscription.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Subscription Query Example</title>
</head>
<script>
    window.addEventListener("DOMContentLoaded", function (){
        (function () {
            let appendDiv = document.getElementById("layout");
            let text = document.getElementById("holderInput");
            let eventSource = undefined;
            let pElem = document.createElement("p");
            document.getElementById("wrapper").addEventListener("click", append);

            function append(e) {
                let target = e.target;
                let callbackFunction = callback[target.getAttribute("data-cb")];
                callbackFunction();
            }

            function closeEventSource() {
                eventSource.close();
                eventSource = undefined;
            }

            let callback = {
                "search": (function () {
                    let holderId = text.value;
                    if (eventSource !== undefined) {
                        closeEventSource();
                    }

                    if (holderId === undefined || holderId === null || holderId === "") {
                        alert("소유주를 입력하시오.");
                    } else {
                        eventSource = new EventSource('/account/info/subscription/' + holderId);
                        eventSource.onopen = function () {
                            console.log("connected");
                        };
                        eventSource.onmessage = function (event) {
                            let elem = pElem.cloneNode();
                            elem.innerText = event.data;
                            appendDiv.appendChild(elem);
                        };
                        eventSource.onerror = function () {
                            console.error("Connection error has occurred");
                            closeEventSource();
                        }
                    }
                }),
                "disconnect": (function () {
                    if (eventSource !== undefined) {
                        console.log("disconnected");
                        closeEventSource();
                    }
                })
            }
        }());
    });
</script>
<body>
<div id="wrapper">
    <input type="button" data-cb="search" value="조회"/>
    <input type="button" data-cb="disconnect" value="종료"/>
</div>
<input type="text" id="holderInput" placeholder="소유주 ID를 입력하시오.">
<div id="layout"/>
</body>
</html>

 

조회 버튼을 누르면, EventSource 객체를 생성하여 Server Sent Event를 수신받으며, 메시지가 전달되면 수신된 데이터를 화면에 출력하도록 구현하였습니다.


7. 화면이 정상적으로 출력되는지 확인하기 위하여, Query App을 기동합니다. 이후 웹브라우저(Chrome)을 열고 화면 호출 테스트를 수행합니다.(http://localhost:9090/usbscription)

 

 


8. 테스트가 완료되었으면, Query를 수행할 API 내용을 구현하겠습니다. 먼저 Query 모듈 service 패키지내 QueryService 인터페이스를 엽니다.

 


9. Query 수행을 위한 메소드를 정의합니다.

 

QueryService.java

public interface QueryService {
    (...중략...)
    Flux<HolderAccountSummary> getAccountInfoSubscription(String holderId);
}

10. Service 구현을 위하여 Query 모듈 service 패키지내 QueryServiceImpl 클래스를 엽니다.

 


11. Interface에 정의된 메소드를 구현합니다.

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Slf4j
@Service
public class QueryServiceImpl implements QueryService {
    (...중략...)
    @Override
    public Flux<HolderAccountSummary> getAccountInfoSubscription(String holderId) {
        AccountQuery accountQuery = new AccountQuery(holderId);
        log.debug("handling {}", accountQuery);

        SubscriptionQueryResult<HolderAccountSummary, HolderAccountSummary> queryResult = queryGateway.subscriptionQuery(accountQuery,
                ResponseTypes.instanceOf(HolderAccountSummary.class),
                ResponseTypes.instanceOf(HolderAccountSummary.class)
        );

        return Flux.create(emitter -> {
            queryResult.initialResult().subscribe(emitter::next);
            queryResult.updates()
                    .doOnNext(holder -> {
                        log.debug("doOnNext : {}, isCanceled {}", holder, emitter.isCancelled());
                        if (emitter.isCancelled()) {
                            queryResult.close();
                        }
                    })
                    .doOnComplete(emitter::complete)
                    .subscribe(emitter::next);
        });
    }
}

 

위 코드 구현 내용은 최초에 initalResult 생성 후에, 지속적으로 updates 메소드를 통해 Stream 데이터를 전달받아 Client에게 전달합니다. 만약 중간에 Connection이 실패하게되면, 해당 Flux를 종료하도록 구현하였습니다.


12. API End Point 설정을 위해 controller 패키지내 위치한 HolderAccountController 클래스를 엽니다.

 

 


13. EndPoint를 추가합니다.

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    (...중략...)

    @GetMapping("account/info/subscription/{id}")
    public ResponseEntity<Flux<HolderAccountSummary>> getAccountInfoSubscription(@PathVariable(value = "id") @NonNull @NotBlank String holderId){
        return ResponseEntity.ok()
                             .body(queryService.getAccountInfoSubscription(holderId));
    }
}

14. Subscription에서는 Read Model에 변경이 발생되었을 때 이를 전파해야합니다. 따라서 이를 작성하기 위해  Query 모듈 projection 패키지 하위 HolderAccountProjection 클래스를 엽니다.

 

 


15. EventSourcingHandler를 통해 ReadModel의 변화가 발생하였을 때, QueryUpdateEmitter 클래스를 통해 이벤트 변경 내용을 전파하도록 클래스 내용을 수정합니다.

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
    private final AccountRepository repository;
    private final QueryUpdateEmitter queryUpdateEmitter;

    (...중략...)

    @EventHandler
    @AllowReplay
    protected void on(DepositMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() + event.getAmount());

        queryUpdateEmitter.emit(AccountQuery.class,
                query -> query.getHolderId().equals(event.getHolderID()),
                holderAccount);

        repository.save(holderAccount);
    }
    @EventHandler
    @AllowReplay
    protected void on(WithdrawMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() - event.getAmount());

        queryUpdateEmitter.emit(AccountQuery.class,
                query -> query.getHolderId().equals(event.getHolderID()),
                holderAccount);

        repository.save(holderAccount);
    }

	(...중략...)
}

 

각 Handler안에 queryUpdateEmitter를 통해 구독중인 Query와 동일한 ID의 Event가 들어오면, Query 결과에 전달되도록 처리하였습니다.


16. Query App을 기동합니다. 이후 EventStore에 저장된 HolderID 중 하나를 선택하여 입력창에 기입합니다. 조회 버튼을 눌러 정상적으로 조회되는지 확인합니다.

 

위 예제에서 holderId가 924eb0ab-c35f-4d3e-b753-a4ce35bd7c27인 계좌 전체의 잔고는 현재 290입니다. Update가 정상 수신되는지 확인하기 위하여, 해당 소유주가 보유한 계좌에서 5원을 인출하는 API를 호출합니다.

 

 


17. 5원 인출 후 Client 화면에서 변경된 데이터가 정상 수신되었는지 확인합니다.

 

확인 결과, 정상적으로 데이터 수신되었음을 확인할 수 있습니다.


18. 종료 버튼을 눌러 구독을 중지합니다. 이후 924eb0ab-c35f-4d3e-b753-a4ce35bd7c27 소유주가 보유한 계좌에서 추가로 5원 인출하였을 때, Query 결과가 화면에 표시되지 않음을 확인합니다. 화면의 변화가 없으면, 정상적으로 Connection이 종료된 것입니다.


4. 마치며

이번 시간에는 Point to Point, Subscription Query에 대해서 살펴보았습니다. Subscription Query는 반환 형태가 Flux 형태다보니 아무래도 Spring MVC에서는 사용하기 힘든 부분이 있을듯 합니다. 또한, 이를 지원하기 위해서는 Client에서도 SSE를 위한 구현이 필요하며, Server에서는 Client와 Connection 유지를 위해 Subscription Query가 증가할 수록 그에 상응하는 Thread 수가 증가합니다. 따라서 비즈니스 요건에 맞게 적절한 사용이 필요하며, Spring Webflux를 사용한다면 도입을 검토해볼 수 있을 것 같습니다.

 

1. 서론

 

이번 포스팅부터 생성된 Read Model에 대하여 Query하는 방법에 대하여 소개하겠습니다. AxonServer를 통해서 수많은 MicroService App들이 연결되어 있을 수 있습니다. 이러한 환경에서 Query 요청을 했을때, 단순 1:1 요청 응답을 요구할 수 도 있고 때로는 Query 요청에 따라 2개 이상의 다른 App에서 데이터를 수신받는 경우도 있습니다. 따라서 각기 다른 경우에 따라 처리해야하는 방법이 다릅니다.

 

Axon Framework는 총 3가지 타입의 Query 기능을 제공합니다. 이번 포스팅에서는 Axon 에서 제공하는 Query 종류와 동작 원리를 알아보겠습니다.


2. Axon Server 라우팅 기능(Query)

 

Command 명령을 요청할 때는 CommandBus를 이용하였고, Event 발생시에는 EventBus를 이용하였습니다. 마찬가지로 Query는 QueryGateway를 통해 요청을 전달하며, 전달된 Query는 QueryBus를 통해서 해당 Query를 처리하는 Handler로 연결됩니다. 이때 QueryHandler가 속한 App에 Query를 전달하는 역할을 Axon Server가 수행합니다. Query 관련 Axon Server의 라우팅 기능 동작 흐름을 살펴보겠습니다.

 

 

1. Point to Point Query

 

 

Command Handler와 마찬가지로 Application 기동시 AxonServer와 연결을 시도합니다. 연결이 완료되면, 해당 App은 자신이 처리가능한 Query Handler 정보를 Server에 등록합니다. Point to Point Query는 해당 Query를 처리하는 Handler가 단 하나의 Application에만 존재할 경우 해당 처리할 수 있는 App으로 Query를 전달하여 결과를 전달합니다.


2. Scatter & Gatter Query

 

 

 

두번째는 Scatter-Gather Query입니다. 이는 동일한 Query를 처리하는 Handler가 여러 App에 등록되어있을 때, 이를 처리하는 방법입니다. Application 기동시 Query Handler 정보를 Axon Server에 등록하면, Application 정보가 라우팅 테이블에 해당 App 정보를 기록합니다. 이때는 Client 측에서 각기 다른 App에서 수신되는 결과를 수집하여 처리 방법(한쪽 결과만 수집, 둘다 수집 등)을 정해야합니다. Axon Server는 Query 요청이 들어오면 등록된 Application에게 Query를 전달하며, 수신받은 App에서는 결과를 취합하여 전송합니다. 이후 Client 측에서 결과를 수신받아 데이터 결과를 종합합니다.


3. Subscription Query

 

 

Point to Point Query를 요청하였을 때, 만약 Query를 수행하는 Read Model에 대한 변경이 발생한다면, 화면에 출력되는 결과와 Read Model 사이 데이터 정합성 불일치 문제가 발생합니다. 따라서 이를 해결하기 위해서는 주기적으로 Query를 재요청하는 방법이 있습니다. 하지만 데이터 변경이 발생하지 않아도 계속 Query를 요청해야하는 문제점이 있으므로 효율적이지 못합니다.

 

 

 

Subscription Query는 Client측에서 Query를 요청할 때, Query 결과를 전달받고 Connection을 끊는 것이 아니라 계속 지속합니다. 이후 Query Handler가 위치한 App의 Read Model 변경이 발생할 경우 변경분에 대한 데이터를 전달받아 이를 최신화합니다.


3. Query Handler 동작 과정

 

 

AxonFramework 관련 Bean 생성시 사용자가 지정한 QueryBus가 없으면, Default로 AxonServerQueryBus가 생성됩니다. 이때 내부적으로 QueryProcessor가 만들어지고, 해당 생성자 안에서 ExecuterService를 통해 요청시 최대 10개의 Thread를 Default로 생성하도록 Handler에 등록합니다. 

 

ExecuterService에 의해서 만들어지는 Thread는 QueryProcessingTask이며, AxonServer로부터 Query를 전달받으면 해당 클래스의 run 메소드를 통해 QueryHandler 작업이 이어집니다.

 

 

 

Application 구동 이후, Query 요청이 발생되면, 내부적으로는 위와 같은 흐름을 거쳐 메시지가 전달됩니다.

 

  1. QueryGateway로 Query를 전달합니다. 이때 전달하는 Query가 Scatter-Gather, Subsciprtion, Point to Point 중 하나임을 메소드를 통해 AxonServer에게 전달합니다.
  2. 사용자가 전달한 Query를 GenericQueryMessage로 변환한다음 QueryBus로 전달합니다. Default QueryBus는 AxonServerQueryBus이므로 AxonServer에 전달됩니다.
  3. 전달된 Query는 QueryHandler가 존재하는 App으로 라우팅됩니다. 이때 AxonServer로부터 gRPC를 통해 onMessage 메소드가 호출되면, AxonServerQueryBus 내부에 할당된 Handler들의 onNext 메소드를 호출합니다.
  4. Bean 등록 당시 Handler 호출시 ExecutorService로부터 QueryProcessingTask 생성을 요청하였습니다. 따라서 Handler 호출과정에서 Thread 생성을 요청합니다.
  5. QueryProcessingTask 내부에 있는 run 메소드가 수행되면서 QueryProcessor에게 Query 수행을 위임합니다.
  6. QueryProcessor 내부 로직 수행중 Query 수행을 SimpleQueryBus에게 위임합니다.
  7. 내부적으로 UnitOfWork 과정을 거치면서, Reflection을 통해 QueryHandler 메소드를 찾아 수행후 결과를 돌려 받습니다.
  8. 최종 수행된 결과를 QueryProviderOutbound에게 전달합니다.
  9. AxonServer에게 결과를 전달합니다.
  10. Query를 요청한 Client에게 결과를 전달합니다.

4. 마치며

 

Query App 구현을 위해 기본적으로 알아야하는 내부 과정에 대해서 살펴봤습니다. 다음 포스팅에서는 코드 구현을 진행하겠습니다.

1. 서론

 

Software 개발 및 유지보수 단계에서 요구사항에 의하여 데이터 모델은 변하기 마련입니다. 그리고 바뀌는 데이터모델에 맞춰 Event 또한 형태가 변합니다. 이때, 이전 발행된 Event와 앞으로 적재되는 Event의 형태는 다르게 됩니다. 따라서 Replay 과정에서 변경된 Event를 적용하는데 있어 문제가 발생되지 않도록 코드를 통한 중재가 필요합니다. Axon 에서는 이를 위해 Event Upcasting 기능을 제공합니다. 이번 포스팅에서는 코드를 통하여 Event Upcasting을 적용하는 방법을 알아보겠습니다.


2. Versioning

 

 

Application 개발 후 요구사항 변경으로 계정 가입시에 회사명 정보가 추가되며, Read Model MView(Materialized View)에도 회사명 정보가 포함되고, 회사명이 기입되지 않으면 N/A로 표시된다고 가정하겠습니다.

 

요구사항 변경 내용을 코드로 구현하면 다음과 같습니다.


1. Command 수정을 위해서 API 변경이 필요합니다. Command 모듈내 dto 패키지에 위치한 HolderDTO 파일을 엽니다.

 


2. DTO 클래스에 company 정보를 추가합니다.

 

HolderDTO.java

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class HolderDTO {
    private String holderName;
    private String tel;
    private String address;
    private String company;
}

3. Common 모듈 commands 패키지내 HolderCreationCommand 파일을 엽니다.

 


4. Command 클래스에 company 정보를 추가합니다.

 

HolderCreationCommand.java

@AllArgsConstructor
@ToString
@Getter
public class HolderCreationCommand {
    @TargetAggregateIdentifier
    private String holderID;
    private String holderName;
    private String tel;
    private String address;
    private String company;
}

5. Command 모듈 service 패키지에 위치한 TransactionServiceImpl 파일을 엽니다.

 


6. Service 클래스에서 계정 생성 로직에 Company 정보를 넘기도록 수정합니다.

 

TransactionServiceImpl.java

@Service
@RequiredArgsConstructor
public class TransactionServiceImpl implements TransactionService {
	(...중략...)
    @Override
    public CompletableFuture<String> createHolder(HolderDTO holderDTO) {
        return commandGateway.send(new HolderCreationCommand(UUID.randomUUID().toString()
                , holderDTO.getHolderName()
                , holderDTO.getTel()
                , holderDTO.getAddress()
                , holderDTO.getCompany())
                );
    }
    (...중략...)
}

 

7. Common 모듈의 build.gradle 파일을 엽니다.

 


8. build.gradle 파일에 axon-messaging 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
bootJar { 
    enabled = false 
}
jar {
    enabled = true
}
dependencies{
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-messaging', version: "$axonVersion"
}

9. Common 모듈 events 패키지 내 HolderCreationEvent 파일을 엽니다.

 


10. Event에 변경된 사항을 추가합니다. 이때 Event에는 변경이 발생하므로, Event의 변경이 발생했음을 알리는 마커가 필요합니다. 이때 사용되는 어노테이션이 @Revision입니다. Revision 표시를 통해서 실제 Event가 발생되었을 때 EventStore에는 해당 Event의 버전이 저장되며, 추후 Event Upcasting 시에 해당 정보가 사용됩니다.

 

HolderCreationEvent.java

@AllArgsConstructor
@ToString
@Getter
@Revision("1.0")
public class HolderCreationEvent {
    private String holderID;
    private String holderName;
    private String tel;
    private String address;
    private String company;
}

11. Command 모듈 aggregate 패키지내에 위치한 HolderAggregate 파일을 엽니다.

 


12. Aggregate 클래스 Event 발행 로직에 company 정보를 전달할 수 있도록 변경합니다.

 

HolderAggregate.java

@AllArgsConstructor
@NoArgsConstructor
@Aggregate
@Slf4j
public class HolderAggregate {
	(...중략...)
    @CommandHandler
    public HolderAggregate(HolderCreationCommand command) {
        log.debug("handling {}", command);

        apply(new HolderCreationEvent(command.getHolderID(), command.getHolderName(), command.getTel(), command.getAddress(), command.getCompany()));
    }
    (...중략...)
}

13. Query 모듈에 version 패키지를 생성한 다음 HolderCreationEventV1 클래스를 만듭니다. 해당 클래스는 변경 이전HolderCreationEvent에 대해서 변경 후에 어떻게 추가된 정보를 처리를 지정 용도로 사용됩니다.

 


14. HolderCreationEventV1 클래스 내용을 구현합니다.

 

HolderCreationEventV1.java

public class HolderCreationEventV1 extends SingleEventUpcaster {
    private static SimpleSerializedType targetType = new SimpleSerializedType(HolderCreationEvent.class.getTypeName(), null);

    @Override
    protected boolean canUpcast(IntermediateEventRepresentation intermediateRepresentation) {
        return intermediateRepresentation.getType().equals(targetType);
    }

    @Override
    protected IntermediateEventRepresentation doUpcast(IntermediateEventRepresentation intermediateRepresentation) {
        return intermediateRepresentation.upcastPayload(
                new SimpleSerializedType(targetType.getName(), "1.0"),
                org.dom4j.Document.class,
                document -> {
                    document.getRootElement()
                            .addElement("company")
                            .setText("N/A");
                    return document;
                }
        );
    }

 

코드 구현내역은 다음과 같습니다. 먼저 targetType에는 대상 Event를 지정합니다. 데모 프로젝트에서는 HolderCreationEvent가 변경되었으므로 해당 Class 타입을 지정합니다. 두번째 인자에 위치한 null 값은 최초에는 @Revision 정보를 명시하지 않았으므로 존재하는 값이 없기 때문에 null로 지정하였습니다.

 

doUpcast 메소드는 실제 Event Version을 확인하고 이전 버전의 Event가 들어왔을 때 행동해야할 내용을 기술합니다.

EventStore에 저장된 Event 내용이 XML로 지정되므로, XML로 되어있는 Payload 에서 신규 추가된 company 정보와 값이 없을 경우 입력될 Default 값 N/A를 setText 메소드를 통해 지정합니다. 또한 해당 작업을 수행할 대상을 Revision 정보가 null인 targetType으로 한정합니다. 따라서 해당 메소드를 통해서 확인할 수 있는 사실은 HolderCreationEvent에 수많은 Revision 정보가 존재하더라도, HolderEventCreationEventV1 클래스에서는 Revision 번호가 1.0과 null인 두 Event간의 속성값에만 영향을 미칩니다.

 

 


15. Query 모듈내 config 패키지에 존재하는 AxonConfig 파일을 엽니다.

 

 


14. AxonConfig 파일에 Event Chain을 등록합니다.

 

AxonConfig.java

@Configuration
public class AxonConfig {
	(...중략...)

    @Bean
    public EventUpcasterChain eventUpcasterChain(){
        return new EventUpcasterChain(
                new HolderCreationEventV1()
        );
    }
}

 

만약 Event의 버전이 여러개 생성되었다면, HolderCreationEventV1이외 여러개의 버전간 핸들러 클래스를 생성한 다음 Event Chain 생성 로직에 핸들러 인스턴스를 등록합니다.


15. 새로운 계정 등록 테스트를 진행한다음 Query Application을 Replay 시킵니다.

 

POST http://localhost:8080/holder
Content-Type: application/json

{
	"holderName" : "Kane",
	"tel" : "02-2645-5678",
	"address" : "OO시 OO구",
    "company" : "Korea"
}

 

c.c.q.p.HolderAccountProjection          : projecting HolderCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, holderName=kevin, tel=02-1234-5678, address=OO시 OO구, company=N/A) , timestamp : 2020-01-07T12:11:21.047Z
c.c.q.p.HolderAccountProjection          : projecting HolderCreationEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, holderName=bruce, tel=02-5291-5678, address=OO시 OO구, company=N/A) , timestamp : 2020-01-07T12:12:14.238Z
c.c.q.p.HolderAccountProjection          : projecting HolderCreationEvent(holderID=1b77f5bc-73e6-4b7a-a1fd-d5453723b9c7, holderName=Kane, tel=02-2645-5678, address=OO시 OO구, company=Korea) , timestamp : 2020-01-11T13:00:10.487Z

 

위 내용은 Replay 이후 Application 로그 일부를 발췌한 내용입니다. 신규 생성한 Kane 사용자의 company는 입력값으로 정확히 등록이 되었으며, 기존에 발행된 Event는 N/A로 매핑된 것을 확인할 수 있습니다.


3. 마치며

 

이번 포스팅을 끝으로 Query Application에서 Event 처리와 관련된 내용을 마무리하겠습니다. 다음 포스팅에서는 Query Application에 저장된 Read Model을 기반으로 Query하는 방법에 대해서 다루도록 하겠습니다.

1. 서론

 

이전 포스팅에서 Replay에 대해서 학습했습니다. Replay는 신규 Read Model이 추가되거나 기존 모델의 변경이 있을 때, EventStore에서 기존 내역을 전달받아 재수행하는 작업입니다. 따라서 EventSourcing & CQRS 모델을 사용한다면, Replay 성능 고민이 반드시 필요합니다. 이번 포스팅에서는 AxonIQ 블로그를 기반으로 Replay 성능 개선 방법에 대해서 소개하겠습니다.

 

본문 설명에 앞서 AxonIQ 벤치마크 테스트 환경 Spec은 다음과 같습니다.

  • DBMS : Postgres(9.6), MongoDB(3.6)
  • CPU : vCPU 8코어 (GCP)
  • RAM : 30G
  • OS : Ubuntu 18.10
  • DISK : SSD 1T

 


2. Replay 문제점

 

만약 EventStore에 저장된 Event 수가 10억개이고, Read Model에서 Replay 수행 시 초당 1000개의 Event를 재생할 수 있다고 가정한다면, Replay 작업에만 약 11일이 걸립니다. 이는 대부분의 상황에서는 적용하기 힘듭니다. 따라서 Replay 작업  최적화가 반드시 필요합니다. 

 

앞선 포스팅에서 TrackingEventProcessor에 의하여 @EventHandler 메소드가 호출된다고 설명했습니다. 이때 별도 속성을 지정하지 않으면, TrackingEventProcessor는 Default 값으로 설정되며 기본값은 다음과 같습니다.

 

  • Thread 수 : 1개
  • Batch Size : 1
  • 최대 Thread 수 : Segment 개수
  • TokenClaim 주기 : 5000ms

 

이를 통해 알 수 있는 사실은 Axon에서 Event 처리는 Batch 단위로 이루어지는데, 기본 설정 값은 Event 1개씩 단일 Thread로 처리됨을 확인할 수 있습니다.

 

AxonIQ에서 TrackingEventProcessor에 대한 기본값 설정으로 Event 처리에 대한 벤치마크 수행 결과, 초당 260개의 Event를 처리하였습니다. 이는 100만개 Event 기준 10시간의 처리 능력을 보여주어 좋지 않은 처리 능력을 나타냈습니다. 지금부터 Event 처리 능력을 강화하는 방법에 대해서 하나씩 살펴보겠습니다.


3. Replay 개선 전략

 

3-1. Batch Size 조정

 

이전 설명에서 Axon Framework에서 Event를 Batch 단위로 처리한다고 했습니다. Batch 작업 시, 개별적인 Event를 다루는 것 외에 부가적인 기능은 다음과 같습니다.

 

  1. Tracking Token을 갱신하여 최신의 Event Stream 위치를 기억하도록 함.
  2. Transactional Store를 사용했을 때 DB의 Transaction Commit 작업 수행.

 

사용자의 별도 설정이 없다면, 기본 Batch Size는 1입니다. 이는 Replay 작업을 수행하기에는 너무 적은 수치입니다. 따라서 적절한 PoC를 통해 최적의 Size를 맞추어야 합니다. Axon에서는 TrackingEventProcessorConfiguration을 통해서 Size 조정이 가능합니다. 설정 방법은 다음과 같습니다.

 

1. Query 모듈내 config 패키지 생성 후 AxonConfig 파일을 생성합니다.

 


2. AxonConfig 클래스를 구현합니다.

 

AxonConfig.java

@Configuration
public class AxonConfig {
    @Autowired
    public void configure(EventProcessingConfigurer configurer) {
        configurer.registerTrackingEventProcessor(
                "accounts",
                org.axonframework.config.Configuration::eventStore,
                c -> TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
                        .andBatchSize(100)
        );
    }
}

 

코드 구현 내용은 accounts ProcessingGroup을 처리하는 TrackingEventProcessor의 Batch Size를 100으로 설정하였습니다.


출처 : https://axoniq.io/blog-overview/cqrs-replay-performance-tuning

 

위 그림은 Batch Size 조정에 따른 초당 Event 처리량을 그래프로 표시한 결과입니다. Size를 1부터 500까지 늘렸을 때 초당 260개의 이벤트 처리량에서 4000개로 15배의 성능 개선이 나타났습니다. 대략 500개 이상부터는 Size 증가에 따른 개선폭이 크지 않으므로 해당 예제에서는 500개가 적정선입니다. Batch Size 크기에 대한 적절한 가이드는 없으며, 이는 각 Application 환경에 맞게 테스트 이후 찾는 것이 좋습니다.

 

Batch Size를 늘려서 Transaction을 처리할 때 주의점이 있습니다. 이는 Read Model에 사용되는 DB가 Non-Transactional 하다면, 만약 Batch 중간에 실패했을 때 데이터가 자동 Rollback되지 않습니다. 따라서 이후 다시 Replay를 시도하게되면, 이미 처리된 Event가 다시 수행되므로 유의해야합니다.


3-2. 병렬 처리

 

출처 : https://axoniq.io/blog-overview/cqrs-replay-performance-tuning

 

TrackingEventProcessor의 Thread 수 기본 값은 1입니다. 즉 하나의 Thread로 모든 작업을 순차처리합니다. 따라서 Event Replay 성능을 높이기 위해서는 병렬도 증가가 필요합니다.

 

위 그림은 Batch Size는 500으로 설정한 상태에서 Thread 개수를 1에서 8개로 점차 늘렸을 때 초당 Event 처리량을 나타낸 것입니다. 병렬도를 8로 지정했을 때 초당 15000개의 Event를 처리할 수 있으므로 단일 Thread 대비 대략 4배정도의 개선이 이루어졌습니다.

 

 

하지만 Thread 개수를 늘릴 때는 고려해야할 사항이 많습니다. 그 이유는 병렬로 처리하게되면 처리되는 Event의 순서가 뒤바뀔 수 있기 때문입니다. 따라서 병렬 처리를 수행시, 순서가 보장될 수 있도록 처리해야합니다. Axon에서는 이러한 문제를 해결 하기 위해 Sequencing 정책을 제공합니다. Sequencing 정책이란 동일 Thread 내에서 Event는 반드시 처리 순서 보장에 대한 결정을 의미합니다.

 

기본적으로 Axon에서는  동일 Aggregate에 속한 Event는 동일 Thread에서 처리될 수 있도록 SequentialPerAggregatePolicy 클래스를 제공합니다. 이를 적용하여 AxonConfig 클래스 수정을 통해 병렬도를 변경하도록 하겠습니다.

 

AxonConfig.java

@Configuration
public class AxonConfig {
    @Autowired
    public void configure(EventProcessingConfigurer configurer) {
        configurer.registerTrackingEventProcessor(
                "accounts",
                org.axonframework.config.Configuration::eventStore,
                c -> TrackingEventProcessorConfiguration.forParallelProcessing(3)
                        .andBatchSize(100)
        );

        configurer.registerSequencingPolicy("accounts",
                configuration -> SequentialPerAggregatePolicy.instance());
    }
}

 

이전 대비 변경된 내용은 TrackingEventProcessor에 대해서 병렬도를 3으로 지정하였습니다. 또한 accounts ProcessingGroup을 대상으로 SequentialPerAggregatePolicy를 적용하여, 단일 Thread 내에서 동일 Aggregate Event가 순서대로 처리되도록 정책 설정하였습니다.


 

코드 변경 후 실제 Application을 구동한다음 Token Store에 TrackingEventProcessor 별로 Token이 생긴 것을 확인할 수 있습니다.


하지만 데모 프로젝트에서 단순히 위와같이 병렬도를 지정하고 Application을 수행하면, 소유주가 존재하지 않습니다 Error가 발생할 수도 있습니다. 이유는 아래와 같습니다.

 

데모 프로젝트 Aggregate 종류

 

 

Command App에서 구현된 Aggregate는 Holder와 Account 두개 입니다. 비즈니스 로직상 Account Aggregate는 반드시 Holder Aggregate가 존재해야지만 생성이 가능하며, Event Stream에도 순차적으로 생성되어 있습니다. 하지만 Read Model을 반영하는 과정에서 Thread를 분리시키면, 근본적으로 두 개의 Aggregate는 다릅니다. 따라서 SequentialPerAggregatePolicy 설정했어도 다른 Thread에 생성되어 처리될 가능성이 높습니다. 

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
	(...중략...)
	private HolderAccountSummary getHolderAccountSummary(String holderID) {
        log.debug("getHolder : {} ",holderID);
        return repository.findByHolderId(holderID)
                .orElseThrow(() -> new NoSuchElementException("소유주가 존재하지 않습니다." + holderID));
    }
}    

 

위 코드는 Account 생성 Event를 처리하기 위해 Repository 에서 Holder를 찾는 로직을 일부 발췌하였습니다. 이때 두 Aggregate가 다르므로 Account  생성 시점 Thread 1번에서 수행중인 Holder Aggregate가 DB에 반영되어있지 않을 수 있습니다. 그 결과 NoSuchElementException이 발생할 수 있습니다.

 

Replay를 수행할 때 의도적으로 @DisallowReplay 어노테이션을 추가하지 않는 이상 EventHandler에서 Event 누락이 발생되어서는 안됩니다. 따라서 문제점 해결을 위해 데모 프로젝트에서는 저장소 검색 과정에서 위와 같은 에러를 만나게 되었을 때 약간의 시차를 두고 다시 시도하게끔 지정하고자 합니다. 이를 위해 spring-retry 기능을 사용하도록 하겠습니다.


1. Query 모듈 build.gradle 파일을 열어 dependencies를 추가합니다.

 

build.gradle

dependencies{
 (...중략...)
 implementation group: 'org.springframework.retry', name: 'spring-retry'
}

2. projection 패키지 HolderAccountProjection 클래스 파일을 엽니다.

 


3. HolderAccountProjection 클래스 상단에 @EnableRetry 어노테이션을 추가합니다. 

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
	(...중략...)
}    

4. Account 생성 이벤트를 처리하는 Event Handler 메소드에 @Retryable 어노테이션을 추가합니다. 

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
	(...중략...)
    
    @EventHandler
    @Retryable(value = {NoSuchElementException.class}, maxAttempts = 5, backoff = @Backoff(delay = 1000))
    @AllowReplay
    protected void on(AccountCreationEvent event, @Timestamp Instant instant)  {
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setAccountCnt(holderAccount.getAccountCnt()+1);
        repository.save(holderAccount);
    }
    
    (...중략...)
}    

 

위 코드의 내용은 Repository로부터 NoSuchElementException이 발생하면 1초 대기후에 다시 시도하며, 최대 5번까지 재수행을 시도하도록 설정하였습니다.

 

위 4단계를 거친 다음 Query App을 기동하여 Reset을 수행하면, 발생하였던 문제가 정상적으로 수행됨을 확인할 수 있습니다.


3-3. Batch 최적화

 

Read Model 구현에 있어 JPA를 사용한다면 Batch Update를 수행할 때 자동으로 최적화를 수행합니다. 가령 동일한 Record에 대하여 Update가 연속 두번 발생하면, 실제로는 Entitiy 로딩하기 위한 1번의 DBMS Call과 JPA 내부 Persistence Context에서 2번의 수정 작업을 거쳐 최종 1번의 Update DBMS Call이 발생합니다.

 

또한 다량의 Insert 작업이 중간에 Update 작업 없이 발생한다면, DBMS에 Insert를 반영할 때 SQL 단건씩 DBMS Call을 발생시키는 것이 아니라 Bulk Insert를 수행하여 최적화를 달성합니다.

 

하지만 JPA를 통한 Model 이외에도 적용할 수 있는 최적화 방법은 여러가지가 있습니다. 그 중 2가지 방법을 소개하겠습니다.

 

  1. Entitiy에 대해서 Update를 수행하기 위해서는 기본적으로 데이터를 Persistence Context에 Load 이후에 변경을 실시하는데, Replay 과정에서는 굳이 데이터를 Load할 필요없이 바로 Update 구문을 수행함으로써 DBMS Call을 줄일 수 있습니다. 
  2. 동일 Aggregate에 대해서 발생하는 Insert, Update, delete Event 순서를 결과에 어긋나지 않게 재배치하여 Bulk 작업을 수행한다면, 획기적으로 DBMS Call을 줄일 수 있습니다. 

 

방금 소개시켜드린 2가지 방법은 단순히 Configuration 설정 변경으로는 적용할 수 없습니다. 따라서 이를 해결 하기위해서는 DBMS 최적화에 대한 기술적인 고민과 이를 적용하는 내용을 프로그래밍해야 합니다. Axon 에서는 이를 보조하기 위하여 다양한 API를 제공합니다. 또한 각각의 Batch 마다 하나씩 UnitOfWork가 존재합니다. 따라서 EventHandler 메소드 파라미터에 UnitOfWork 인자를 추가하면 UnitOfWork에서 처리되는 자원에 접근할 수 있습니다. 이를 통해 메시징 처리를 사용자가 효율적으로 Customizing 처리할 수 있도록 도움을 줍니다.

 

위 소개드린 2가지 기법들을 활용하여 Axon 에서 벤치마크 테스트한 결과, 기존 초당 15000개의 처리량에서 30000개로 2배의 성능 향상을 이룰 수 있었습니다. 이는 최초대비 115배의 성능 개선을 달성한 것입니다. 


4. Mongo DB 테스트 결과

 

Read Model을 Mongo DB를 사용햇을 때 이전 Replay 최적화 방식을 도입함에 있어 AxonIQ 벤치마크 테스트 결과는 다음과 같습니다.

 

  1. 최적화 없음 : 초당 12000개
  2. Batch 최적화 : 초당 30000개

 

AxonIQ 벤치마크 결과 Postgresql과 크게 다르지 않은 처리량을 볼 수 있습니다.


5. 마치며

 

이번 시간에는 Replay 성능 개선에 대하여 다루었습니다. 개인적으로 EventSourcing & CQRS를 도입하는데 있어 기술적으로 가장 고민을 많이해야하는 부분이 이번 포스팅 내용인 것 같습니다. Replay 수행에 있어 Application 개선 뿐만 아니라 DBMS 최적화 기법을 같이 고려한다면 더 큰 성능개선이 이루어질 수 있으므로 Read Model DB에 대한 지식은 큰 도움이 될 것입니다. 다음 시간에는 요구사항 변경에 따른 Event 모델 수정 및 Versioning에 대하여 다루겠습니다.

 

 

 

 

1. 서론

 

이번 시간에는 Query App의 EventHandler 로직을 구현하도록 하겠습니다. Query App의 Read Model은 이전 포스팅에서 도출한 구조를 사용하도록 하겠습니다. 


2. Projection 구현

 

Command에서 발생된 Event를 적용하는 과정을 Projection이라 합니다. 먼저 설계 단계에서 도출한 Entity 구현 이후 Projection을 구현하겠습니다. 

 

 

 

1. Query 모듈내 entity 패키지를 생성후 Entity 클래스를 생성합니다.

 


2. Entity 클래스를 구현합니다.

 

HolderAccountSummary.java

@Entity
@Table(name = "MV_ACCOUNT")
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Getter @Setter
public class HolderAccountSummary {
    @Id
    @Column(name = "holder_id", nullable = false)
    private String holderId;
    @Column(nullable = false)
    private String name;
    @Column(nullable = false)
    private String tel;
    @Column(nullable = false)
    private String address;
    @Column(name = "total_balance", nullable = false)
    private Long totalBalance;
    @Column(name = "account_cnt", nullable = false)
    private Long accountCnt;
}

3. repository 패키지를 생성 후 entitiy repository 인터페이스를 생성합니다.

 


4. repository 인터페이스 메소드를 정의합니다.

 

AccountRepository.java

public interface AccountRepository extends JpaRepository<HolderAccountSummary,String> {
    Optional<HolderAccountSummary> findByHolderId(String holderId);
}

 


5. projection 패키지 생성 후 projection 클래스를 생성합니다.

 


6. Projection 클래스를 구현합니다.

 

HolderAccountProjection.java

@Component
@AllArgsConstructor
@Slf4j
public class HolderAccountProjection {
    private final AccountRepository repository;

    @EventHandler
    protected void on(HolderCreationEvent event, @Timestamp Instant instant) {
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary accountSummary = HolderAccountSummary.builder()
                                                                        .holderId(event.getHolderID())
                                                                        .name(event.getHolderName())
                                                                        .address(event.getAddress())
                                                                        .tel(event.getTel())
                                                                        .totalBalance(0L)
                                                                        .accountCnt(0L)
                                                                    .build();
        repository.save(accountSummary);
    }
    
    @EventHandler
    protected void on(AccountCreationEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setAccountCnt(holderAccount.getAccountCnt()+1);
        repository.save(holderAccount);
    }
    
    @EventHandler
    protected void on(DepositMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() + event.getAmount());
        repository.save(holderAccount);
    }
    
    @EventHandler
    protected void on(WithdrawMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() - event.getAmount());
        repository.save(holderAccount);
    }

    private HolderAccountSummary getHolderAccountSummary(String holderID) {
        return repository.findByHolderId(holderID)
                .orElseThrow(() -> new NoSuchElementException("소유주가 존재하지 않습니다."));
    }
}

 

위 코드는 Projection 로직을 담고 있습니다. EventHandler 메소드 파라미터에는 @Timestamp@SequenceNumber, ReplayStatus 등이 추가로 전달될 수 있으며, 자세한 내용은 Axon 공식 문서를 참고 바랍니다.


3. 테스트

 

API 테스트를 통해 발행된 Command가 Read Model에 제대로 반영되는지 테스트 하겠습니다.

 

1. reousrces 하위 application.yml 파일을 오픈 후에 로깅 정보를 입력합니다.

 


2. AxonServer 기동 후에 Query App을 수행합니다. 기동이 완료되면 DB에 테이블이 정상 생성되었는지 확인합니다.

 


3. Command App을 기동합니다. 이후 계정 생성, 계좌 생성, 입금, 출금 Command 명령을 수차례 반복 수행합니다.

 


4. 테스트 이후 Query App Read Model 갱신 여부를 확인합니다. 또한 Application Log를 통해서 Event 정상 처리를 확인합니다.

 

 

c.c.q.p.HolderAccountProjection          : projecting HolderCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, holderName=kevin, tel=02-1234-5678, address=OO시 OO구) , timestamp : 2020-01-07T12:11:21.047Z
c.c.q.p.HolderAccountProjection          : projecting HolderCreationEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, holderName=bruce, tel=02-5291-5678, address=OO시 OO구) , timestamp : 2020-01-07T12:12:14.238Z
c.c.q.p.HolderAccountProjection          : projecting AccountCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=0e6d3546-4163-4083-bf2f-50f1289d8c25) , timestamp : 2020-01-07T12:12:27.633Z
c.c.q.p.HolderAccountProjection          : projecting AccountCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=042937fb-e658-441d-b23a-fd56be237563) , timestamp : 2020-01-07T12:12:33.961Z
c.c.q.p.HolderAccountProjection          : projecting AccountCreationEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, accountID=9eb1f188-c38f-401d-b212-3c26ea84acfa) , timestamp : 2020-01-07T12:12:45.193Z
c.c.q.p.HolderAccountProjection          : projecting AccountCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f) , timestamp : 2020-01-07T12:13:28.542Z
c.c.q.p.HolderAccountProjection          : projecting DepositMoneyEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f, amount=300) , timestamp : 2020-01-07T12:15:02.059Z
c.c.q.p.HolderAccountProjection          : projecting WithdrawMoneyEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f, amount=30) , timestamp : 2020-01-07T12:15:08.242Z
c.c.q.p.HolderAccountProjection          : projecting WithdrawMoneyEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f, amount=20) , timestamp : 2020-01-07T12:15:13.488Z
c.c.q.p.HolderAccountProjection          : projecting DepositMoneyEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, accountID=9eb1f188-c38f-401d-b212-3c26ea84acfa, amount=300) , timestamp : 2020-01-07T12:15:29.451Z
c.c.q.p.HolderAccountProjection          : projecting WithdrawMoneyEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, accountID=9eb1f188-c38f-401d-b212-3c26ea84acfa, amount=150) , timestamp : 2020-01-07T12:15:38.423Z

 

위 내역은 전체 로그중 일부만 발췌한 결과입니다. 확인 결과 Event가 정상적으로 반영된 것을 알 수 있습니다.


4. Replay

 

 

이전 포스팅에서 마지막 수신 Event Token 정보를 토대로 AxonSever으로부터 다음 목록을 수신받아 처리한다고 설명 했습니다. 하지만 때로는 Read Model 구조를 재구성하기 위해서 Event를 재생해야될 수 있습니다. Axon에서는 이를 위해 Replay 기능을 제공하며, 특정 시점부터 혹은 전체의 Event에 대한 Replay를 수행할 수 있습니다.

 

Replay 기능이 동작하면, 내부적으로는 Token 정보를 초기화하여, 특정 시점 혹은 처음부터 발행된 Event를 전달받아 재수행합니다. 

 

이번 시간에는 전체 Event를 재생하는 방법에 대해서 설명하며, 특정 시점부터 이벤트 Replay는 Axon 공식 문서를 참고 바랍니다.

 

 

1. Replay를 수행하기 위해 Projection 클래스를 변경합니다. 먼저 ProcessingGroup을 지정하여 TrackingEventProcessor로 하여금 어떤 Group을 대상으로 Replay를 수행할지 지정합니다.

 

 

HolderAccountProjection.java

@Component
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
(...중략...)
}

2. Replay 수행시 Read Model 초기화 작업이 이루어지지 않으면, 남아있는 데이터에 이벤트가 적용되므로 데이터 정합성이 맞지 않습니다. 따라서 Replay가 수행되기전, 대상 테이블 또한 초기화가 선행 되어야합니다. 이를 위해 @ResetHandler 어노테이션을 추가한 메소드를 정의하여 초기화 작업을 처리합니다.

 

HolderAccountProjection.java

@Component
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
(...중략...)
    @ResetHandler
    private void resetHolderAccountInfo(){
        log.debug("reset triggered");
        repository.deleteAll();
    }
}

3. Replay시 적용 대상 Handler 메소드에 @AllowReplay 어노테이션을 추가합니다.(Optional) 만약 Replay 대상에서 해당 Event는 처리하고 싶지 않을 경우에는 @DisallowReplay를 추가합니다. 예제에서는 전체 Replay 재생을 위해 @AllowReplay 어노테이션만 추가했습니다.

 

1~3번 과정을 모두 적용한 Projection 클래스 코드는 다음과 같습니다.

 

HolderAccountProjection.java

@Component
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
    private final AccountRepository repository;

    @EventHandler
    @AllowReplay
    protected void on(HolderCreationEvent event, @Timestamp Instant instant) {
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary accountSummary = HolderAccountSummary.builder()
                                                                        .holderId(event.getHolderID())
                                                                        .name(event.getHolderName())
                                                                        .address(event.getAddress())
                                                                        .tel(event.getTel())
                                                                        .totalBalance(0L)
                                                                        .accountCnt(0L)
                                                                    .build();
        repository.save(accountSummary);
    }
    @EventHandler
    @AllowReplay
    protected void on(AccountCreationEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setAccountCnt(holderAccount.getAccountCnt()+1);
        repository.save(holderAccount);
    }
    @EventHandler
    @AllowReplay
    protected void on(DepositMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() + event.getAmount());
        repository.save(holderAccount);
    }
    @EventHandler
    @AllowReplay
    protected void on(WithdrawMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() - event.getAmount());
        repository.save(holderAccount);
    }

    private HolderAccountSummary getHolderAccountSummary(String holderID) {
        return repository.findByHolderId(holderID)
                .orElseThrow(() -> new NoSuchElementException("소유주가 존재하지 않습니다."));
    }

    @ResetHandler
    private void resetHolderAccountInfo(){
        log.debug("reset triggered");
        repository.deleteAll();
    }
}

4. Reset 수행 EndPoint를 구현하기 위해 controller 패키지 및 클래스를 생성합니다.

 


5. controller 클래스를 구현합니다.

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    private final QueryService queryService;

    @PostMapping("/reset")
    public void reset() {
        queryService.reset();
    }
}

6. Service Package 및 Service 클래스를 생성합니다.

 


7. 서비스 클래스를 구현합니다.

 

QueryService.java

public interface QueryService {
    void reset();
}

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Service
public class QueryServiceImpl implements QueryService{
    private final Configuration configuration;

    @Override
    public void reset() {
        configuration.eventProcessingConfiguration()
                .eventProcessorByProcessingGroup("accounts",
                        TrackingEventProcessor.class)
                .ifPresent(trackingEventProcessor -> {
                    trackingEventProcessor.shutDown();
                    trackingEventProcessor.resetTokens(); // (1)
                    trackingEventProcessor.start();
                });
    }
}    

 

 

실제 Token 초기화는 resetTokens 메소드를 통해서 이루어집니다. 해당 작업을 위해서는 EventProcessor의 재시작이 필요합니다.


8. Query App 재기동 후 API 테스트(POST : http://localhost:9090/reset)를 수행합니다. Application 로그 및 DB를 확인하면 정상적으로 Replay가 이루어졌음을 확인할 수 있습니다.


5. 마치며

 

EventHandler 메소드 적용에 따른 Read Model 구현을 완성했습니다. 다음 포스팅에서는 Replay 성능 개선 방법에 대하여 다루도록 하겠습니다.

+ Recent posts