1. 서론

 

이번 시간에는 Query App의 EventHandler 로직을 구현하도록 하겠습니다. Query App의 Read Model은 이전 포스팅에서 도출한 구조를 사용하도록 하겠습니다. 


2. Projection 구현

 

Command에서 발생된 Event를 적용하는 과정을 Projection이라 합니다. 먼저 설계 단계에서 도출한 Entity 구현 이후 Projection을 구현하겠습니다. 

 

 

 

1. Query 모듈내 entity 패키지를 생성후 Entity 클래스를 생성합니다.

 


2. Entity 클래스를 구현합니다.

 

HolderAccountSummary.java

@Entity
@Table(name = "MV_ACCOUNT")
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Getter @Setter
public class HolderAccountSummary {
    @Id
    @Column(name = "holder_id", nullable = false)
    private String holderId;
    @Column(nullable = false)
    private String name;
    @Column(nullable = false)
    private String tel;
    @Column(nullable = false)
    private String address;
    @Column(name = "total_balance", nullable = false)
    private Long totalBalance;
    @Column(name = "account_cnt", nullable = false)
    private Long accountCnt;
}

3. repository 패키지를 생성 후 entitiy repository 인터페이스를 생성합니다.

 


4. repository 인터페이스 메소드를 정의합니다.

 

AccountRepository.java

public interface AccountRepository extends JpaRepository<HolderAccountSummary,String> {
    Optional<HolderAccountSummary> findByHolderId(String holderId);
}

 


5. projection 패키지 생성 후 projection 클래스를 생성합니다.

 


6. Projection 클래스를 구현합니다.

 

HolderAccountProjection.java

@Component
@AllArgsConstructor
@Slf4j
public class HolderAccountProjection {
    private final AccountRepository repository;

    @EventHandler
    protected void on(HolderCreationEvent event, @Timestamp Instant instant) {
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary accountSummary = HolderAccountSummary.builder()
                                                                        .holderId(event.getHolderID())
                                                                        .name(event.getHolderName())
                                                                        .address(event.getAddress())
                                                                        .tel(event.getTel())
                                                                        .totalBalance(0L)
                                                                        .accountCnt(0L)
                                                                    .build();
        repository.save(accountSummary);
    }
    
    @EventHandler
    protected void on(AccountCreationEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setAccountCnt(holderAccount.getAccountCnt()+1);
        repository.save(holderAccount);
    }
    
    @EventHandler
    protected void on(DepositMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() + event.getAmount());
        repository.save(holderAccount);
    }
    
    @EventHandler
    protected void on(WithdrawMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() - event.getAmount());
        repository.save(holderAccount);
    }

    private HolderAccountSummary getHolderAccountSummary(String holderID) {
        return repository.findByHolderId(holderID)
                .orElseThrow(() -> new NoSuchElementException("소유주가 존재하지 않습니다."));
    }
}

 

위 코드는 Projection 로직을 담고 있습니다. EventHandler 메소드 파라미터에는 @Timestamp@SequenceNumber, ReplayStatus 등이 추가로 전달될 수 있으며, 자세한 내용은 Axon 공식 문서를 참고 바랍니다.


3. 테스트

 

API 테스트를 통해 발행된 Command가 Read Model에 제대로 반영되는지 테스트 하겠습니다.

 

1. reousrces 하위 application.yml 파일을 오픈 후에 로깅 정보를 입력합니다.

 


2. AxonServer 기동 후에 Query App을 수행합니다. 기동이 완료되면 DB에 테이블이 정상 생성되었는지 확인합니다.

 


3. Command App을 기동합니다. 이후 계정 생성, 계좌 생성, 입금, 출금 Command 명령을 수차례 반복 수행합니다.

 


4. 테스트 이후 Query App Read Model 갱신 여부를 확인합니다. 또한 Application Log를 통해서 Event 정상 처리를 확인합니다.

 

 

c.c.q.p.HolderAccountProjection          : projecting HolderCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, holderName=kevin, tel=02-1234-5678, address=OO시 OO구) , timestamp : 2020-01-07T12:11:21.047Z
c.c.q.p.HolderAccountProjection          : projecting HolderCreationEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, holderName=bruce, tel=02-5291-5678, address=OO시 OO구) , timestamp : 2020-01-07T12:12:14.238Z
c.c.q.p.HolderAccountProjection          : projecting AccountCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=0e6d3546-4163-4083-bf2f-50f1289d8c25) , timestamp : 2020-01-07T12:12:27.633Z
c.c.q.p.HolderAccountProjection          : projecting AccountCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=042937fb-e658-441d-b23a-fd56be237563) , timestamp : 2020-01-07T12:12:33.961Z
c.c.q.p.HolderAccountProjection          : projecting AccountCreationEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, accountID=9eb1f188-c38f-401d-b212-3c26ea84acfa) , timestamp : 2020-01-07T12:12:45.193Z
c.c.q.p.HolderAccountProjection          : projecting AccountCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f) , timestamp : 2020-01-07T12:13:28.542Z
c.c.q.p.HolderAccountProjection          : projecting DepositMoneyEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f, amount=300) , timestamp : 2020-01-07T12:15:02.059Z
c.c.q.p.HolderAccountProjection          : projecting WithdrawMoneyEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f, amount=30) , timestamp : 2020-01-07T12:15:08.242Z
c.c.q.p.HolderAccountProjection          : projecting WithdrawMoneyEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f, amount=20) , timestamp : 2020-01-07T12:15:13.488Z
c.c.q.p.HolderAccountProjection          : projecting DepositMoneyEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, accountID=9eb1f188-c38f-401d-b212-3c26ea84acfa, amount=300) , timestamp : 2020-01-07T12:15:29.451Z
c.c.q.p.HolderAccountProjection          : projecting WithdrawMoneyEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, accountID=9eb1f188-c38f-401d-b212-3c26ea84acfa, amount=150) , timestamp : 2020-01-07T12:15:38.423Z

 

위 내역은 전체 로그중 일부만 발췌한 결과입니다. 확인 결과 Event가 정상적으로 반영된 것을 알 수 있습니다.


4. Replay

 

 

이전 포스팅에서 마지막 수신 Event Token 정보를 토대로 AxonSever으로부터 다음 목록을 수신받아 처리한다고 설명 했습니다. 하지만 때로는 Read Model 구조를 재구성하기 위해서 Event를 재생해야될 수 있습니다. Axon에서는 이를 위해 Replay 기능을 제공하며, 특정 시점부터 혹은 전체의 Event에 대한 Replay를 수행할 수 있습니다.

 

Replay 기능이 동작하면, 내부적으로는 Token 정보를 초기화하여, 특정 시점 혹은 처음부터 발행된 Event를 전달받아 재수행합니다. 

 

이번 시간에는 전체 Event를 재생하는 방법에 대해서 설명하며, 특정 시점부터 이벤트 Replay는 Axon 공식 문서를 참고 바랍니다.

 

 

1. Replay를 수행하기 위해 Projection 클래스를 변경합니다. 먼저 ProcessingGroup을 지정하여 TrackingEventProcessor로 하여금 어떤 Group을 대상으로 Replay를 수행할지 지정합니다.

 

 

HolderAccountProjection.java

@Component
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
(...중략...)
}

2. Replay 수행시 Read Model 초기화 작업이 이루어지지 않으면, 남아있는 데이터에 이벤트가 적용되므로 데이터 정합성이 맞지 않습니다. 따라서 Replay가 수행되기전, 대상 테이블 또한 초기화가 선행 되어야합니다. 이를 위해 @ResetHandler 어노테이션을 추가한 메소드를 정의하여 초기화 작업을 처리합니다.

 

HolderAccountProjection.java

@Component
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
(...중략...)
    @ResetHandler
    private void resetHolderAccountInfo(){
        log.debug("reset triggered");
        repository.deleteAll();
    }
}

3. Replay시 적용 대상 Handler 메소드에 @AllowReplay 어노테이션을 추가합니다.(Optional) 만약 Replay 대상에서 해당 Event는 처리하고 싶지 않을 경우에는 @DisallowReplay를 추가합니다. 예제에서는 전체 Replay 재생을 위해 @AllowReplay 어노테이션만 추가했습니다.

 

1~3번 과정을 모두 적용한 Projection 클래스 코드는 다음과 같습니다.

 

HolderAccountProjection.java

@Component
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
    private final AccountRepository repository;

    @EventHandler
    @AllowReplay
    protected void on(HolderCreationEvent event, @Timestamp Instant instant) {
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary accountSummary = HolderAccountSummary.builder()
                                                                        .holderId(event.getHolderID())
                                                                        .name(event.getHolderName())
                                                                        .address(event.getAddress())
                                                                        .tel(event.getTel())
                                                                        .totalBalance(0L)
                                                                        .accountCnt(0L)
                                                                    .build();
        repository.save(accountSummary);
    }
    @EventHandler
    @AllowReplay
    protected void on(AccountCreationEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setAccountCnt(holderAccount.getAccountCnt()+1);
        repository.save(holderAccount);
    }
    @EventHandler
    @AllowReplay
    protected void on(DepositMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() + event.getAmount());
        repository.save(holderAccount);
    }
    @EventHandler
    @AllowReplay
    protected void on(WithdrawMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() - event.getAmount());
        repository.save(holderAccount);
    }

    private HolderAccountSummary getHolderAccountSummary(String holderID) {
        return repository.findByHolderId(holderID)
                .orElseThrow(() -> new NoSuchElementException("소유주가 존재하지 않습니다."));
    }

    @ResetHandler
    private void resetHolderAccountInfo(){
        log.debug("reset triggered");
        repository.deleteAll();
    }
}

4. Reset 수행 EndPoint를 구현하기 위해 controller 패키지 및 클래스를 생성합니다.

 


5. controller 클래스를 구현합니다.

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    private final QueryService queryService;

    @PostMapping("/reset")
    public void reset() {
        queryService.reset();
    }
}

6. Service Package 및 Service 클래스를 생성합니다.

 


7. 서비스 클래스를 구현합니다.

 

QueryService.java

public interface QueryService {
    void reset();
}

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Service
public class QueryServiceImpl implements QueryService{
    private final Configuration configuration;

    @Override
    public void reset() {
        configuration.eventProcessingConfiguration()
                .eventProcessorByProcessingGroup("accounts",
                        TrackingEventProcessor.class)
                .ifPresent(trackingEventProcessor -> {
                    trackingEventProcessor.shutDown();
                    trackingEventProcessor.resetTokens(); // (1)
                    trackingEventProcessor.start();
                });
    }
}    

 

 

실제 Token 초기화는 resetTokens 메소드를 통해서 이루어집니다. 해당 작업을 위해서는 EventProcessor의 재시작이 필요합니다.


8. Query App 재기동 후 API 테스트(POST : http://localhost:9090/reset)를 수행합니다. Application 로그 및 DB를 확인하면 정상적으로 Replay가 이루어졌음을 확인할 수 있습니다.


5. 마치며

 

EventHandler 메소드 적용에 따른 Read Model 구현을 완성했습니다. 다음 포스팅에서는 Replay 성능 개선 방법에 대하여 다루도록 하겠습니다.

1. 서론

 

 

이번 포스팅부터 Query App 구현을 다루겠습니다. Query App은 Event를 수신받아 Read Model에 반영하는 Projection 작업과 Read Model을 읽는 Query 2가지로 기능이 나뉩니다. 기능별로 실제 구현 코드량은 얼마되지 않지만 알아야 하는 개념이 많으므로 먼저 Event 처리 기능 관련하여 다루고 향후에 Query 기능을 다루겠습니다.

 

이번 내용은 EventHandler 구현 실습전 내부 처리 과정을 살펴보겠습니다.


2. Event 처리 과정

 

Token Store

 

 

이전 포스팅에서 확인 하였듯이 EventStore에는 EventStream의 내용을 순차적으로 적재합니다. 따라서 수신부에서 지금까지 수신된 Event는 어디까지이며, EvenStore에서 어디서부터 Event를 수신 받아야할지에 대한 정보를 가지고 있어야합니다. 해당 정보를 Token이라고 하며, Token은 Query App과 연관된 DB 내부에 저장하여 영구적으로 관리합니다.(Token Store) 

 

 

 

Token 내용

<org.axonframework.eventhandling.GlobalSequenceTrackingToken>
  <globalIndex>13</globalIndex>
</org.axonframework.eventhandling.GlobalSequenceTrackingToken>

 

위 예시는 TokenStore에 저장된 내용입니다.Token 컬럼에는 지금까지 Tracking된 Event의 Global Sequence 값이 들어있으며, 예시를 통해 13번이 마지막으로 수신된 Event임을 알 수 있습니다.

 


Tracking Event Processor

 

 

TokenStore를 통해서 마지막 수신 Event 정보를 알 수 있다면, 해당 정보를 토대로 Event 수신 요청 및 처리를 담당하는 중계 역할이 필요합니다. Axon에서 제공하는 Event 처리기는 Subscribing Event Processor(SEP), Tracking Event Processor(TEP) 2가지가 있습니다. 두 Event 처리기 차이점은 이벤트 발행 쓰레드에서 Event 처리여부입니다. SEP는 Event 발행 쓰레드에서 Event 또한 처리하며, TEP는 별도 쓰레드에서 처리합니다. TEP를 이해하는 것이 중요하므로 이를 중점적으로 다루겠습니다.


3. Tracking Event Processor

 

 

위 그림은 Query App이 기동될 때 EventProcessor 생성 및 처리 과정 흐름을 간략하게 표현했습니다. 

 

  1. Spring DefaultLifecycleProcessor가 Start 명령을 내립니다.
  2. Axon의 기본 설정을 담당하는 DefaultConfigurer 클래스 Start 메소드가 호출됩니다. 이후 등록된 Handler에게 수행 명령을 내립니다. 기본으로 등록된 Handler는 2개입니다.(EventProcessingModule, EventProcessorInfoConfiguration)
  3. EventProcessingInfoConfiguration의 Start 메소드가 호출됩니다.
  4. 내부 로직을 거쳐 ProcessorInfoSource와 EventProcessorControlService를 구동합니다. ProcessorInfoSource는 AxonServer에게 EventProcessor의 현재 상태를 주기적(Default 500ms)으로 보내는 역할을 담당합니다. EventProcessorContolService는 AxonServer에서 요청시 EventProcessor를 제어하는 서비스 역할을 담당합니다. EventPRocessorControlService의 실제 로직 수행은 AxonServerConnectionManager 및 EventProcessorController가 담당합니다.
  5. EventProcessingModule을 구동합니다. 이 과정에서 EventProcessor 생성을 요청합니다.
  6. TrackingEventProcessor를 생성합니다. 사용자가 별도 속성 정의를 하지 않았으면, DefaultEventProcessor가 생성됩니다. (※ Thread 수 : 1개, 배치 사이즈 : 1, 최대 Thread 수 : Segment 개수, tokenClaim 주기 : 5000ms)
  7. 생성된 TrackingEventProcessor에게 구동을 요청합니다. 내부적으로 AxonThreadFactory에게 WorkerLauncher 인스턴스에 대한 Thread 생성을 요청합니다. 
  8. TrackingEventProcessor가 구동중이면 내부로직이 반복 수행될 수도록 무한 루프로 구성되어 있습니다. 만약 EventStore Segment에 변경된 내역을 확인하여 처리해야한다면, AxonThreadFactory에게 TrackingSegmentWorker 인스턴스 생성을 요청합니다.
  9. Segment에 대한 Event 처리(ProcessingLoop)를 요청합니다.

 

9번 Segment Event 처리 과정을 자세히 확인하기 위해 순서도를 그리면 다음과 같습니다.

 

 

  1. EventStream을 오픈합니다.
  2. EventStream에서 최신 Event가 존재하는지 확인합니다. 존재하지 않는다면 Token 값을 갱신하고 작업을 종료합니다.
  3. 최신 Event를 수신받은 후 해당 App에서 처리가 가능한 Event인지 확인합니다. 만약 처리할 수 없다면 BlackList 등록이 가능한지 확인하고 이를 등록합니다. 이후 해당 Event는 MessageMonitor에 보고한 뒤 무시합니다.
  4. 처리가 가능한 Event라면 UnitOfWork를 수행합니다. 해당 과정에서 Token 값은 자동 갱신합니다. 이후 EventHandler를 찾아 메소드를 수행합니다.
  5. Event 처리가 완료되면 작업을 종료합니다.

 

내부구조는 복잡하지만 이를 한줄로 요약하자면 "주기적으로 TrackingEventProcessor에서 처리가 가능한 Event가 존재하는지 확인 및 처리(UnitOfWork)하고 Token 갱신하는 작업"으로 정의할 수 있습니다.


4. Axon Server 라우팅기능(Event)

이번에는 Client가 아닌 AxonServer 입장에서 Event 전달 흐름을 살펴보도록 하겠습니다.

 

 

Query Application을 구동하면 Token 정보를 읽어옵니다. 이후 EventStore에게 자신이 보유한 Token 정보를 알려준 다음 EventStream을 오픈합니다. 위 예시에는 요청당시 EventStore에는 추가로 유입된 Event가 없으므로 Application의 Token값 변경 후 작업을 종료합니다.

 

 

만약 Command App으로부터 Event가 추가된다면, 다음 ProcessingLoop 작업에서 해당 Event를 Query App으로 전달합니다. 수신받은 App에서 해당 Event 처리 가능 여부를 확인하는데, 만약 처리하지 못할 경우는 AxonServer에게 BlackList 추가를 요청합니다. 따라서 이후 신규 적재되는 Event에 대해서는 수신받지 않습니다.

 

 

신규 App이 추가로 기동되어 AxonServer에 등록요청한 상태에서 마지막 수신 Event가 2번이라면, AxonServer에서 3번부터 해당 Event를 App으로 전달합니다.


4. 마치며

 

이번 포스팅에서는 EventHandler 메소드를 수행하기위해서 알아야할 내부 구조를 알아봤습니다. 다음 포스팅에서는 EventHandler 메소드 구현에 대해 다루어보겠습니다.

1. 서론

 

Query App 관련 포스팅을 진행하기 앞서 Event가 저장되는 EventStore에 대하여 알아보고자 합니다. 이번 포스팅에서 다룰 내용은 EventStore를 위한 필요조건, DB 종류에 따른 EventStore 역할 장·단점 그리고 AxonServer EventStore 저장 구조입니다.


2. EventStore 필요 조건

 

Event를 읽고 쓰는데 있어 EventStore가 기본적으로 갖춰야할 조건을 알아보겠습니다.

 

1. 이벤트는 추가만 가능하고 입력,삭제,수정이 불가능하다.
2. 여러 이벤트가 하나의 트랜잭션 처리가 되어야 한다면, 트랜잭션 단위로 Commit 혹은 Rollback 되어야한다.
3. Commit된 이벤트는 유실되어서는 안된다
4. 발행된 모든 Event 중 Aggregate 별로 데이터를 읽을 수 있어야한다.
5. 모든 이벤트는 삽입된 순서대로 읽기가 가능해야한다.

 

위 조건은 대부분의 DBMS라면 충족되는 요건입니다. 그 밖에 EventStore를 구성하는데 있어 요구되는 사항은 무엇이 있을까요?

 

1. SnapShot 저장소 지원

 

 

Command App을 구현하면서 Snapshot 필요성을 인지하였습니다. EventStore에서는 특정 시점에 Aggregate별 Sequence 번호에 해당하는 Snapshot을 별도 공간에 적재하며, Event 로드시에 해당 스냅샷 상태와 스냅샷 이후의 Sequence 번호에 해당되는 Event만을 읽을 수 있도록 지원해야 합니다.

 


2. Event Notification 기능

 

 

EvenetStore가 신규 추가된 Event를 희망하는 App에게 전파하는 역할을 수행하지 못한다면, Subscriber에서 주기적으로 Polling하여 Event 유입이 있는지 확인하는 작업이 필요합니다. 이는 DB 관점에서 I/O 및 Network 트래픽이 증가하는 요인입니다. 일반적인 RDBMS에서는 이벤트 전파기능이 없기 때문에 위 그림과 같이 메시징 처리를 수행해야합니다.

 

 

이러한 문제점을 해결하고자 메시지 큐를 사용해서 EventStore에 적재함과 동시에 큐에도 이벤트를 적재하여 전송하는 방법을 생각할 수 있습니다. 하지만 이는 EventStore에 저장과 큐를 통한 Event 전송 시점에 대한 동기화를 보장할 수 없습니다.

 

또한, 기타 이유로 EventStore에 이벤트 삽입이 실패하는 경우 메시지 큐를 통해 이미 전달된 이벤트와의 일관성이 깨지게 됩니다.

 

따라서 EventStore의 가장 이상적인 구조는 EventStore 자체가 Message Bus 역할을 담당하는 것입니다.


3. EventStore 적합성 비교

 

지금부터 소개드리는 내용은 AxonIQ Webinar를 참고하여 작성하였습니다. 

 

1. RDBMS

 

RDBMS 사용의 장점으로는 Transaction에 대한 지원 및 기술적 성숙도가 높다는 점입니다. 또한 오랜시간동안 사용되었으므로 사용자들에게 친숙하며, 제공되는 Tool이 다양합니다.

 

하지만 가장 큰 문제점은 확장성입니다. 대량의 데이터 처리보다는 데이터 공간 효율화 및 관계를 통한 데이터 정합성 보장 등에 초점이 맞춰져 있습니다. AxonIQ에서 RDBMS를 EventStore로 사용했을 때의 벤치마크 테스트 결과는 다음과 같습니다.

 

출처 : https://youtu.be/zUSWsJteRfw

 

출처 : http://www.dbguide.net/db.db?cmd=view&boardUid=148209&boardConfigUid=9&boardIdx=136&boardStep=1

 

결과를 보면 데이터 양이 증가할 수록 처리량이 떨어지는 것을 확인할 수 있습니다. 다양한 요인이 있을 수 있겠지만, 대표적으로는 대용량 데이터를 기준으로 B-Tree 인덱스를 사용하면, 인덱스 Depth가 깊어지기 때문에 지속 발생하는 인덱스 Split과 더불어 수직적 탐색 비용이 증가합니다. 또한 데이터 특성상 인덱스 우측 Block에 Transaction이 집중적으로 몰리기 때문에 Oracle 기준 핫블록으로 인한 Latch 경합이 발생하여 동시성이 크게 저하될 수 있습니다.(Right Growing Index)

 

위 테스트 결과는 RDBMS 테이블 구조 변경 없이 일반적인 Heap 테이블과 B-Tree 인덱스를 기준으로 진행했습니다. 만약 DBMS가 Oracle이라면 Hash 파티셔닝, Reverse 인덱스, IOT(Index Organized Table) 등을 적절히 사용한다면 개선의 여지는 있습니다.

 

RDBMS는 Event Notification 기능이 없기 때문에 이를 고려해야합니다. 이전에 설명한 Polling 방식을 개선하기 위해서는 테이블 단위 Audit을 고려해볼 수 있습니다. 즉 Audit 결과를 File로 떨어트리고 해당 로그 tail 값을 AxonFramework에서 요구하는 포맷으로 변경한 다음 메시지 큐를 통해 보내는 방법이 있습니다. 혹은 Trigger를 이용하는 방법도 생각해볼 수 있으나 이는 추천하지 않습니다.


2. Mongo DB

 

 

Mongo DB는 대표적인 NoSQL로써 하나의 이벤트는 하나의 Document에 속하며, 대량의 데이터 처리에 적합합니다. 하지만 Transaction 지원 문제점이 있습니다. 최근에 4.2 버전이 Release되어 Multi Document에 대한 Transaction 기능이 추가되었지만, 단일 Node에서는 Transaction이 불가하는 등의 제약사항이 존재합니다.

 

참고자료

 

두 번째 문제점은 MongoDB 3.2 버전부터 Storage Engine으로 Wiredtiger를 기본적으로 사용하고 있습니다. Wiredtiger 저장 방식은 Btree, 컬럼스토어, LSM 방식이 있습니다. 이중 EventStore에 적합한 방식은 Write 작업에 최적화된 LSM 방식이나 MongoDB에는 LSM을 아직 지원하고 있지 않습니다.

 

세 번째 문제점은 RDBMS와 마찬가지로 Event Notification 기능이 없기 때문에, Commit 로그 결과 등을 AxonFramework에서 요구하는 포맷으로 변경한 다음 메시지 큐를 통해 보내는 방식을 고려해야 합니다.

 

마지막으로 모든 Document에 대하여 전역적인 Sequence 기능이 기본적으로 제공되지 않는다는 점입니다. 따라서 이를 해결하기 위해서는 직접 함수를 구현해야 합니다.

 

참고자료


3. Kafka

 

 

카프카는 대용량 환경에서 Message 전달 역할로 좋은선택입니다. 하지만 EventSourcing에 있어서 좋은 도구는 아닙니다. 그 이유는 위 그림과 같이 Event Stream에서 특정 Aggregate를 추출하기 위해서는 해당 Stream 전체를 읽으면서 그 중 내가 원하는 Aggregate만 필터링하는 작업이 수반되어야 하기 때문입니다.

 

 

물론 Aggregate 별로 Topic을 생성하는 방법등도 고려할 수 있으나 이는 Aggregate 별로 디스크에 적재되는 용량과 I/O 밸런스 등을 고려해야하는 등의 관리 포인트가 급격하게 상승합니다.

 

참고자료


4. Axon Server(Event Store)

 

AxonServer 내부에는 Event 저장을위한 별도 DB가 없으며, File을 직접 다룹니다. 외부와의 연결은 Rest API 혹은 gRPC 방법을 통해 가능합니다.

 

EventStore는 오직 데이터 추가만이 가능하도록 설계되었습니다. 따라서 수정, 삭제와 관련된 그 어떠한 API도 제공되지 않습니다.

 

 

 

AxonServer에서는 EventStream을 일정 크기별로 잘라서 Segment로 매핑합니다. 각 Segment는 하나의 파일이며, 내부에는 Event가 연속적으로 할당되어 있습니다. 생성된 파일은 데이터 Corruption 확인을 위해 CRC 체크하여 파일 손상을 확인합니다.

 

만약 Segment에 Event Entry가 가득차게되면, 새로운 파일을 생성하고 이를 가르키도록 Index 정보를 추가합니다.


 

 

EventStore는 Snapshot 저장소를 제공합니다. Snapshot 저장소 또한 Segment 단위로 저장되며, Snapshot Entry는 동일한 Aggregate의 번호가 매핑된 파일을 가르킵니다. 따라서 위 그림에서 A Aggregate를 읽는다고 가정한다면 Snapshot이 가르키는 1번 Segment 이후부터 데이터를 읽기 시작합니다.

 

하지만 이때 2번 Segment에는 A Aggregate Event가 존재하지 않습니다. 따라서 해당 Segment는 읽는 것이 의미가 없습니다. 따라서 스캔 과정에서 2번은 읽지 않고 Skip할 수 있다면 최소한의 I/O로 성능을 높일 수 있을 것입니다. AxonServer에서는 이를 위해 Bloom Filter를 도입하였습니다.


Bloom Filter

 

Bloom Filter는 찾고자 하는 데이터가 해당 집합에 포함되는지를 판단하는 확률적 자료구조입니다. 주로 DBMS에서 많이 사용하며, 디스크에서 찾고자하는 값이 존재할 가능성이 있는 경우에만 블록을 읽기 위해 사용됩니다.

 

예를 들어 설명하겠습니다. Bloom Filter는 N개의 bit 배열에 대해서 찾고자하는 데이터를 대상으로 H개의 해시 함수를 적용한 결과를 1로 표시한다음, 대상 집합에도 동일하게 H개의 해시 함수를 적용해 결과가 동일한지를 판단합니다. 만약 동일하다면 찾고자 하는 존재할 수도 있으므로 해당 집합을 탐색합니다.

 

 

예를들어 1개의 Segment에는 1개의 Event만 존재하고, 4개 bit 배열 및 1개의 해 시함수(mod 10)을 적용한다고 가정하겠습니다. 이때 찾고자하는 Aggregate 식별자는 14라면 위 해시 함수를 적용했을 때 결과는 4가 나옵니다. 또한 저장된 값 또한 14라면 Bloom Filter 및 찾고자 하는 값이 동일하므로 해당 집합에는 원하는 결과가 존재합니다.

 

 

하지만 만약 집합속에 있는 값이 24라면, 해시 함수 결과 똑같이 4라는 결과가 나옵니다. 이때는 값이 다른데도 불구하고, 값이 있을 수도 있다고 판정하여 해당 집합을 읽습니다. 따라서 이러한 경우는 비효율적인 I/O가 존재하며 이를 false positive라고 합니다. 따라서 Bloom Filter에서는 false positive를 줄이기 위해서 bit 배열의 수를 늘리는 것과 해시 함수 개수를 늘려서 동일한 값이 발생하지 않도록해야 불필요한 I/O를 유발하지 않습니다. 

 

참고자료


 

Bloom Filter를 통해서 읽어야할 Segment를 알았다면, 해당 Segment에서 시작점을 찾는 것은 Segment 내부에 저장되어있는 인덱스를 통해서 Aggregate의 Sequence 번호를 찾아갑니다. 결론적으로 Axon Server의 EventStore에서 Aggregate 데이터를 검색할 때, 해당 Aggregate의 식별자와 Sequence 번호를 기준으로 Bloom Filter인덱스를 활용하여 이를 찾습니다.


5. 마치며

 

이번 시간에는 EventStore 역할 비교 및 Axon Server 저장소 구조에 대해서 알아보았습니다. 다음 포스팅부터 Query Application 구현에 대하여 다루도록 하겠습니다.

1. 서론

이번 포스팅은 데모 프로젝트 진행에 있어 필수적으로 구현해야하는 코드는 없습니다. 따라서 Skip해도 괜찮습니다.  이번 시간에는 상태를 저장하는 State-Stored Aggregate에 대해서 알아보겠습니다.


2. Aggregate 종류

 

AxonFramework 에서 Aggregate의 종류는 크게 두 가지로 분류할 수 있습니다. 

 

  1. EventSourced Aggregate
  2. State-Stored Aggregate

 

EventSourced Aggregate는 기존 Command 어플리케이션을 제작하는 과정에서 구현한 모델 방식입니다. 즉 EventStore로부터 Event를 재생하면서 모델의 최신상태를 만듭니다.

 

출처 : https://altkomsoftware.pl/en/blog/cqrs-event-sourcing/

 

이와 반대로 State-Stored Aggregate는 위 그림과 같이 EventStore에 Event를 적재와 별개로 모델 자체에 최신 상태를 DB에 저장합니다. 데모 프로젝트 Aggregate 구조 변경을 통해 Command DB에 모델을 생성하는 방법에 대해서 알아봅시다.


3. State-Stored Aggregate

 

데모 프로젝트에는 Holder와 Account Entitiy가 존재합니다. Command 모델에서는 해당 Entitiy 관계를 분리하여 표현할 것이며 모델 구현은 JPA를 사용하겠습니다. 먼저 Command와 Query DB에 적재될 테이블 구조를 살펴보겠습니다.

 

 

ERD

 

Command 모델

 

Query 모델


3. Aggregate 구현

 

State-Stored Aggregate 구현을 위해 기존 코드를 단계적으로 변경하겠습니다.

 

1. HolderAggregate와 AccountAggregate 구조 변경을 통해 상태를 저장할 수 있도록 구현합니다.

 

HolderAggregate.java

@AllArgsConstructor
@NoArgsConstructor
@Aggregate
@Slf4j
@Entity(name = "holder")
@Table(name = "holder")
public class HolderAggregate {
    @AggregateIdentifier
    @Id
    @Column(name = "holder_id")
    @Getter
    private String holderID;
    @Column(name = "holder_name")
    private String holderName;
    private String tel;
    private String address;

    @OneToMany(mappedBy = "holder", orphanRemoval = true)
    private List<AccountAggregate> accounts = new ArrayList<>();

    public void registerAccount(AccountAggregate account){
        if(!this.accounts.contains(account))
            this.accounts.add(account);
    }
    public void unRegisterAccount(AccountAggregate account){
        this.accounts.remove(account);
    }

    @CommandHandler
    public HolderAggregate(HolderCreationCommand command) {
        log.debug("handling {}", command);

        this.holderID = command.getHolderID();
        this.holderName = command.getHolderName();
        this.tel = command.getTel();
        this.address = command.getAddress();

        apply(new HolderCreationEvent(command.getHolderID(), command.getHolderName(), command.getTel(), command.getAddress()));
    }
}

 

@Entity 어노테이션을 통해 대상 Aggregate가 JPA에서 관리되는 Entity임을 명시했습니다. 또한 Aggregate 식별자에 @Id 어노테이션을 추가하여 대상 속성이 PK임을 표시합니다.

 

HolderAggregate는 AccountAggregate와 1:N 관계를 맺고 있으므로 양방향 관계 설정 했으며, HolderAggregate가 삭제될 경우 AccountAggregate도 삭제되도록 orphanRemovel 옵션을 추가했습니다.

 

마지막으로 양방향 관계 설정 시 연관관계 편의 메소드 제공을 위해 registerAccount, unRegisterAccount 메소드를 추가했습니다. 

(참고 : 양방향 연관관계 편의 메소드)

 

혹시 위 Aggregate 코드에서 JPA 코드 추가 외에 혹시 이상한 점을 눈치채셨나요?

 

바로 EventSourcingHandler 메소드가 사라졌습니다. State-Stored Aggregate 모델은 모델 자체가 최신 상태를 유지하고 있으므로 EventStore로부터 Replay를 할 필요가 없습니다. 따라서 CommandHandler 메소드 내에서 Command 상태를 저장하는 로직을 포함시켜야 합니다.

 

이번에는 AccountAggreagte 클래스를 변경하겠습니다.

 

AccountAggregate.java

@NoArgsConstructor
@AllArgsConstructor
@Slf4j
@Aggregate
@EqualsAndHashCode
@Entity(name = "account")
@Table(name = "account")
public class AccountAggregate {
    @AggregateIdentifier
    @Id
    @Column(name = "account_id")
    private String accountID;

    @ManyToOne
    @JoinColumn(name = "holder_id", foreignKey = @ForeignKey(name = "FK_HOLDER"))
    private HolderAggregate holder;
    private Long balance;

    public void registerHolder(HolderAggregate holder){
        if(this.holder != null)
            this.holder.unRegisterAccount(this);
        this.holder = holder;
        this.holder.registerAccount(this);
    }

    @CommandHandler
    public AccountAggregate(AccountCreationCommand command) {
        log.debug("handling {}", command);
        this.accountID = command.getAccountID();
        HolderAggregate holder = command.getHolder();
        registerHolder(holder);
        this.balance = 0L;
        apply(new AccountCreationEvent(holder.getHolderID(),command.getAccountID()));
    }
    @CommandHandler
    protected void depositMoney(DepositMoneyCommand command){
        log.debug("handling {}", command);
        if(command.getAmount() <= 0) throw new IllegalStateException("amount >= 0");
        this.balance += command.getAmount();
        log.debug("balance {}", this.balance);
        apply(new DepositMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @CommandHandler
    protected void withdrawMoney(WithdrawMoneyCommand command){
        log.debug("handling {}", command);
        if(this.balance - command.getAmount() < 0) throw new IllegalStateException("잔고가 부족합니다.");
        else if(command.getAmount() <= 0 ) throw new IllegalStateException("amount >= 0");
        this.balance -= command.getAmount();
        log.debug("balance {}", this.balance);
        apply(new WithdrawMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
}

 

HolderAggregate에서 설명한 부분은 제외하고 추가된 점은 Account 모델이 Holder 모델에 대하여 FK를 지니고 있으므로 관계상에서 FK를 명시하였습니다. 


2. HolderAggregate 클래스 생성을 위한 Repository 패키지 및 클래스를 생성합니다.

 


3. HolderRepository 클래스를 구현합니다.

@Repository
public interface HolderRepository extends JpaRepository<HolderAggregate,String> {
    Optional<HolderAggregate> findHolderAggregateByHolderID(String id);
}

4. 객체 대상으로 연관관계를 변경하였기 때문에 AccountCreateCommand 클래스를 수정합니다.

 


5. Service 클래스를 수정합니다.

 

@Service
@RequiredArgsConstructor
public class TransactionServiceImpl implements TransactionService {
    private final CommandGateway commandGateway;
    private final HolderRepository holders;

...(중략)...

    @Override
    public CompletableFuture<String> createAccount(AccountDTO accountDTO) {
        HolderAggregate holder = holders.findHolderAggregateByHolderID(accountDTO.getHolderID())
                                       .orElseThrow( () -> new IllegalAccessError("계정 ID가 올바르지 않습니다."));
        return commandGateway.send(new AccountCreationCommand(UUID.randomUUID().toString(),holder));
    }

...(중략)...
}

6. Snapshot 설정을 위해 생성한 Configuration 속성을 적용하지 않도록 AxonConfig 클래스 @Configuration 어노테이션을 주석처리 합니다.

 

//@Configuration
//@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
...(중략)...
}

 

이로써 State-Stored Aggregate 구현에 대한 코드 변경은 끝났습니다.


4. 테스트

 

1. Application 실행 후 계정 생성 API 테스트를 진행합니다.

POST http://localhost:8080/holder
Content-Type: application/json

{
	"holderName" : "kevin",
	"tel" : "02-1234-5678",
	"address" : "OO시 OO구"
}

 

2. DB에서 계정 데이터 생성 여부를 확인합니다.

 


3. 계좌 생성 API를 호출합니다.

POST http://localhost:8080/account
Content-Type: application/json

{
  "holderID" : "486832c2-b606-470d-949a-9f9d8613b112"
}

 

4. DB에서 계좌 데이터 생성 여부를 확인합니다.

 


5. 입금 API를 호출합니다.

POST http://localhost:8080/deposit
Content-Type: application/json

{
  "accountID" : "9274bb43-ca87-4aa4-b1b4-0363382ad6fb",
  "holderID" : "486832c2-b606-470d-949a-9f9d8613b112",
  "amount" : 300
}

 

6. DB에서 해당 계정 잔고를 확인합니다.

 

 

정상적으로 입금된 것을 확인할 수 있습니다.


7. 출금 API를 4번 연속으로 호출합니다.

POST http://localhost:8080/withdrawal
Content-Type: application/json

{
  "accountID" : "9274bb43-ca87-4aa4-b1b4-0363382ad6fb",
  "holderID" : "486832c2-b606-470d-949a-9f9d8613b112",
  "amount" : 1
}

8. DB에서 출금 내역을 확인합니다.

 

 

정상적으로 출금된 것을 확인할 수 있습니다. 

o.a.commandhandling.SimpleCommandBus     : Handling command [com.cqrs.command.commands.WithdrawMoneyCommand]
c.c.command.aggregate.AccountAggregate   : handling WithdrawMoneyCommand(accountID=9274bb43-ca87-4aa4-b1b4-0363382ad6fb, holderID=486832c2-b606-470d-949a-9f9d8613b112, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 296
org.axonframework.messaging.Scope        : Clearing out ThreadLocal current Scope, as no Scopes are present

 

4번 연속 출금 후 Application의 로그를 일부 발췌하였습니다. 기존과 다른점은 EventSourced Aggregate의 경우에는 Replay를 위해 EventStore의 I/O 과정이 필요했지만 State-Stored Aggregate는 상태를 보관하므로 상태 복원 과정이 없습니다.


5. 마치며

State-Stored Aggreagte는 Command DB에 최신 상태를 보관합니다. 이로인해 매번 EventStore를 통해서 Replay를 하지 않아도 되는 점은 장점입니다.

 

하지만 만약에 테이블 데이터가 손실이 되어서 복구가 필요한 경우 Replay를 자동으로 수행되지 않으므로 별도로 EventSourcing 하는 작업을 구현해야 할 수 있습니다. 물론 DBMS 자체 복구 기능을 이용할 수도 있습니다. 하지만 Media Recovery가 불가피하다면 DB 서비스 중단이 발생합니다. 따라서 Aggregate별 사용 장단점을 인지한 다음 적절한 사용이 필요합니다.

 

이상으로 Command Application 구현 포스팅 마치도록 하겠습니다. 

1. 서론

지난 포스팅에서 Command Application에 대한 전반적인 구현을 마무리했습니다. 하지만 해당 프로그램은 근본적인 문제점을 안고 있습니다. 이번 포스팅에서는 발생되는 문제점과 이를 해결하기 위한 방법에 대하여 살펴보겠습니다.


2. 문제점 도출

아래 테이블은 계정 생성 Command를 실행했을 때 EventStore에 저장되는 데이터 중 일부를 발췌한 것입니다.

 

글로벌 인덱스 Payload 

Payload 종류

발생 시간 Aggregate 식별자 시퀀스 번호 타입
1   com.cqrs.events.HolderCreationEvent 2019-12-29T05:34:49.2527378Z 70f956e3-069c-4666-b0f4-324dfb0a807e 0 HolderAggregate

 

공간 부족으로 위 데이터에서 Payload 데이터만 따로 뽑아보면 다음과 같습니다. 

 

<com.cqrs.events.HolderCreationEvent>
  <holderID>70f956e3-069c-4666-b0f4-324dfb0a807e</holderID>
  <holderName>kevin</holderName>
  <tel>02-1234-5678</tel>
  <address>OO시 OO구/address>
</com.cqrs.events.HolderCreationEvent>

Payload에는 Event 내용이 담겨있습니다. 따라서 EventSourcing 및 Event Handler에서는 Payload 내용을 기준으로 Event를 처리합니다.

 

계정 생성만 완료된 상황에서 추가로 계좌 생성 > 계좌 입금(300원) > 인출 5회(1원씩)을 진행한 후 EventStore를 살펴보면 다음과 같습니다.

 

글로벌 인덱스

Payload 종류

Aggregate 식별자 시퀀스 번호 타입
1 com.cqrs.events.HolderCreationEvent 70f956e3-069c-4666-b0f4-324dfb0a807e 0 HolderAggregate
2 com.cqrs.events.AccountCreationEvent c65f80c3-9c44-4ca6-a977-72983a675203 0 AccountAggregate
3 com.cqrs.events.DepositMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 1 AccountAggregate
4 com.cqrs.events.WithdrawMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 2 AccountAggregate
5 com.cqrs.events.WithdrawMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 3 AccountAggregate
6 com.cqrs.events.WithdrawMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 4 AccountAggregate
7 com.cqrs.events.WithdrawMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 5 AccountAggregate
8 com.cqrs.events.WithdrawMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 6 AccountAggregate

(※ 공간 부족으로 Payload 및 이벤트 발생 시간 등은 제외하였습니다.)

 

만약 이러한 상황에서  c65f80c3-9c44-4ca6-a977-72983a675203 식별자를 지닌 AccountAggregate 에서 1원을 인출하는 명령이 발생된다면 내부적으로는 어떠한 과정을 거칠까요?

 

 

Command 어플리케이션 구현 - 1 포스팅에서 소개한 Command 이벤트 수행 내부 흐름도입니다. 당시 4번 과정에 대해서 다음과 같이 소개했습니다.

 

4. UnitOfWork 수행합니다. 이 과정에서 Chain으로 연결된 handler 들을 거치면서 대상 Aggregate에 대하여 EventStore로부터 과거 이벤트들을 Loading 하여 최신 상태로 만듭니다. 이후 해당 Command와 연결된 Handler 메소드를 Reflection을 활용하여 호출합니다.

 

즉 새로운 명령을 수행하기 위해서는 c65f80c3-9c44-4ca6-a977-72983a675203 식별자를 지닌 Aggregate를 대상으로 기존에 발행된 7개의 이벤트를 EventStore에서 읽어와 Loading하는 작업이 선행됩니다. 그 결과 최신 상태로 Aggregate를 만든 이후에 새 Command 적용 및 Event를 발생시킵니다.

 

o.a.commandhandling.SimpleCommandBus     : Handling command [com.cqrs.command.commands.WithdrawMoneyCommand]
c.c.command.aggregate.AccountAggregate   : applying AccountCreationEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203)
c.c.command.aggregate.AccountAggregate   : applying DepositMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=300)
c.c.command.aggregate.AccountAggregate   : balance 300
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 299
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 298
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 297
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 296
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 295
org.axonframework.messaging.Scope        : Clearing out ThreadLocal current Scope, as no Scopes are present
c.c.command.aggregate.AccountAggregate   : handling WithdrawMoneyCommand(accountID=c65f80c3-9c44-4ca6-a977-72983a675203, holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, amount=1)
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 294

 

위 코드는 Application에서 수행된 로그 중 일부를 발췌한 내용입니다.

이를 통해 알 수 있는 사실은 동일 Aggregate에 대해서 Event 갯수가 늘어날 수록 새로운 Command를 적용하는데 오랜 시간이 소요된다는 점입니다.


3. 개선 방안(Snapshot)

EventSourcing 패턴을 적용하는 Application에는 이전 단계에서 확인한 근본적인 문제점을 안고 있습니다. 따라서 이를 완화하기 위해서 일정 주기별로 Aggregate에 대한 Snapshot을 생성해야합니다.

 

 

Snapshot이란 특정 시점의 Aggregate의 상태를 말합니다. 일반적으로 EventStore에는 Aggregate의 상태를 저장하지 않고 이벤트만 저장합니다. 하지만 특정 시점의 Aggregate의 상태를 저장하여 Loading 과정에서 Snapshot 이후 Event만 Replay하여 빠르게 Aggregate Loading이 가능합니다.

 

AxonFramework에서도 Configuration 설정을 통해서 Aggregate 별로 Snapshot 설정이 가능합니다. Snapshot 설정에는 특정 Threshold를 넘어가면 생성되며, Snapshot 적용 예제를 통해 문제점을 완화해보도록 하겠습니다.

 

1. Command 모듈 AxonConfig 파일을 오픈합니다.

 


2. AxonConfig 클래스에 내용을 추가합니다.

 

AxonConfig.java

@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
    @Bean
    SimpleCommandBus commandBus(TransactionManager transactionManager){
        return  SimpleCommandBus.builder().transactionManager(transactionManager).build();
    }
    @Bean
    public AggregateFactory<AccountAggregate> aggregateFactory(){
        return new GenericAggregateFactory<>(AccountAggregate.class);
    }
    @Bean
    public Snapshotter snapshotter(EventStore eventStore, TransactionManager transactionManager){
        return AggregateSnapshotter
                .builder()
                    .eventStore(eventStore)
                    .aggregateFactories(aggregateFactory())
                    .transactionManager(transactionManager)
                .build();
    }
    @Bean
    public SnapshotTriggerDefinition snapshotTriggerDefinition(EventStore eventStore, TransactionManager transactionManager){
        final int SNAPSHOT_TRHRESHOLD = 5;
        return new EventCountSnapshotTriggerDefinition(snapshotter(eventStore,transactionManager),SNAPSHOT_TRHRESHOLD);
    }

    @Bean
    public Repository<AccountAggregate> accountAggregateRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition){
        return EventSourcingRepository
                .builder(AccountAggregate.class)
                    .eventStore(eventStore)
                    .snapshotTriggerDefinition(snapshotTriggerDefinition)
                .build();
    }
}

 

 

위 코드는 AccountAggregate 기준으로 Snapshot을 설정하도록 작성된 코드입니다. SnapshotTriggerDefinition을 통하여 Aggregate의 발행된 Event가 5개 이상일 경우 Snapshot을 생성하도록 지정하였습니다. Threshold 값에는 얼마를 지정 해야한다는 기준은 없으며, 비즈니스 로직에 따라 생성 주기를 조절하면 됩니다.


Snapshot 설정을 완료한 다음 Application을 재시작 한다음 다시 API 테스트를 하면 수행 당시에는 Snapshot이 존재하지 않기 때문에 전체를 Loading 합니다. 이때 Event를 적용하는 과정에서 Threshold 값을 넘었기 때문에 EventStore에 Snapshot을 새롭게 생성합니다. 

 

Aggregate 식별자 시퀀스 번호 

타입

Payload Payload 타입
c65f80c3-9c44-4ca6-a977-72983a675203 8 AccountAggregate   com.cqrs.command.aggregate.AccountAggregate

생성된 Snapshot 데이터 중 일부를 발췌했습니다. 특정 시퀀스 번호에 해당되는 Aggregate에 대한 상태 정보가 기입되었으며, 상태정보는 Payload에 담겨있습니다.

 

Payload 내용

<com.cqrs.command.aggregate.AccountAggregate>
  <accountID>c65f80c3-9c44-4ca6-a977-72983a675203</accountID>
  <holderID>70f956e3-069c-4666-b0f4-324dfb0a807e</holderID>
  <balance>293</balance>
</com.cqrs.command.aggregate.AccountAggregate>

 

Snapshot 생성 이후 다시 1원을 인출하게되면, 이전 시퀀스 번호인 8번 Snaphot이 존재하므로 전체 Event를 읽어오지 않고 Snapshot정보를 읽어온 다음 Command 명령을 수행합니다.

 

o.a.commandhandling.SimpleCommandBus     : Handling command [com.cqrs.command.commands.WithdrawMoneyCommand]
org.axonframework.messaging.Scope        : Clearing out ThreadLocal current Scope, as no Scopes are present
c.c.command.aggregate.AccountAggregate   : handling WithdrawMoneyCommand(accountID=c65f80c3-9c44-4ca6-a977-72983a675203, holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, amount=1)
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 292
org.axonframework.messaging.Scope        : Clearing out ThreadLocal current Scope, as no Scopes are present

 

Snapshot 생성 이후 5번의 Event 발생까지는 Snapshot 시점 이전부터 생성된 Event가 재생됩니다. 만약 다시 Threshold를 넘어서게 되면 새로운 Snapshot이 생성되고 그 이후부터는 새로운 Snapshot 이후 Event가 재생됩니다.


4. 성능개선

 

Aggregate를 매번 로딩하면 이를 복원하는데 드는 비용이 지속 수반됩니다. 따라서 자주 사용하는 Aggregate는 Cache를 적용하면 Loading 비용이 줄어들 것입니다. Axon에서 이를 위해 기본적으로 WeakReferenceCache를 제공하며 이를 적용한 Configuration은 다음과 같습니다.

 

AxonConfig.java

@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
    @Bean
    public AggregateFactory<AccountAggregate> aggregateFactory(){
        return new GenericAggregateFactory<>(AccountAggregate.class);
    }
    @Bean
    public Snapshotter snapshotter(EventStore eventStore, TransactionManager transactionManager){
        return AggregateSnapshotter
                .builder()
                    .eventStore(eventStore)
                    .aggregateFactories(aggregateFactory())
                    .transactionManager(transactionManager)
                .build();
    }
    @Bean
    public SnapshotTriggerDefinition snapshotTriggerDefinition(EventStore eventStore, TransactionManager transactionManager){
        final int SNAPSHOT_TRHRESHOLD = 5;
        return new EventCountSnapshotTriggerDefinition(snapshotter(eventStore,transactionManager),SNAPSHOT_TRHRESHOLD);
    }

    @Bean
    public Cache cache(){
        return new WeakReferenceCache();
    }

    @Bean
    public Repository<AccountAggregate> accountAggregateRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition, Cache cache){
        return CachingEventSourcingRepository
                .builder(AccountAggregate.class)
                    .eventStore(eventStore)
                    .snapshotTriggerDefinition(snapshotTriggerDefinition)
                    .cache(cache)
                .build();
    }
}

5. 마치며

 

3개의 포스팅을 통해 Command Application을 구현하는 방법에 대해서 살펴보았습니다. 다음 포스팅은 Aggregate 관련 번외편을 진행할 예정입니다. 따라서 데모 프로젝트를 위한 Application 구현은 이번 포스팅이 마지막입니다.

 

1. 서론

지난시간에 이어 Aggregate에 명령을 요청하는 API 구현 및 테스트를 진행하고자 합니다. 이번 포스팅에서는 API 레벨 테스트를 진행할 것이며, Axon에서 제공하는 테스트 관련 클래스 소개는 차후에 진행하겠습니다.

 


2. DTO 구현

EventSourcing & CQRS 예제 프로젝트 개요에서 API 엔드포인트를 도출했습니다. 이를 바탕으로 API 호출시 매핑되는 DTO 클래스 먼저 구현하겠습니다.

 

 

1. dto 패키지를 만듭니다. 그리고 dto 클래스 5개를 만듭니다.

 


2. dto 클래스를 구현합니다.

 

HolderDTO.java

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class HolderDTO {
    private String holderName;
    private String tel;
    private String address;
}

AccountDTO.java

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class AccountDTO {
    private String holderID;
}

TransactionDTO.java

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class TransactionDTO {
    private String accountID;
    private String holderID;
    private Long amount;
}

DepositDTO.java

public class DepositDTO extends TransactionDTO {}

WithdrawalDTO.java

public class WithdrawalDTO extends TransactionDTO {}

 

입금과 출금형식이 동일하므로 TransactionDTO에 공통 속성 구현한다음 상속하였습니다.


3. Service 구현

CommandGateway와의 연결을 위한 Service 클래스를 구현합니다.

 

1. service 패키지 생성 후 service 인터페이스 및 구현 클래스를 생성합니다.

 


2. 인터페이스 정의 및 클래스를 구현합니다.

 

TransactionService.java

public interface TransactionService {
    CompletableFuture<String> createHolder(HolderDTO holderDTO);
    CompletableFuture<String> createAccount(AccountDTO accountDTO);
    CompletableFuture<String> depositMoney(DepositDTO transactionDTO);
    CompletableFuture<String> withdrawMoney(WithdrawalDTO transactionDTO);
}

TransactionServiceImpl.java

@Service
@RequiredArgsConstructor
public class TransactionServiceImpl implements TransactionService {
    private final CommandGateway commandGateway;

    @Override
    public CompletableFuture<String> createHolder(HolderDTO holderDTO) {
        return commandGateway.send(new HolderCreationCommand(UUID.randomUUID().toString()
                , holderDTO.getHolderName()
                , holderDTO.getTel()
                , holderDTO.getAddress()));
    }

    @Override
    public CompletableFuture<String> createAccount(AccountDTO accountDTO) {
        return commandGateway.send(new AccountCreationCommand(accountDTO.getHolderID(),UUID.randomUUID().toString()));
    }

    @Override
    public CompletableFuture<String> depositMoney(DepositDTO transactionDTO) {
        return commandGateway.send(new DepositMoneyCommand(transactionDTO.getAccountID(), transactionDTO.getHolderID(), transactionDTO.getAmount()));
    }

    @Override
    public CompletableFuture<String> withdrawMoney(WithdrawalDTO transactionDTO) {
        return commandGateway.send(new WithdrawMoneyCommand(transactionDTO.getAccountID(), transactionDTO.getHolderID(), transactionDTO.getAmount()));
    }
}

 

 

Service 구현 코드를 보면 직관적으로 이해할 수 있듯이 CommandGateway를 통해 Command를 생성 합니다. 이는 지난 포스팅에서 다룬 Command 수행 내부 흐름 첫번째 단계에 해당합니다.

 

참고로 CommandGateway에서 제공되는 API는 크게 두 가지로 첫째는 위 소스코드에서 사용한 send 메소드이고 나머지는 sendAndWait 메소드입니다. send 메소드는 비동기 방식이며, sendAndWait은 동기 방식의 메소드입니다. 동기 방식의 메소드는 default가 응답이 올때까지 대기하며 이는 호출 후 hang 상태가 지속되면 스레드 고갈이 일어날 수 있습니다. 따라서 메소드 파라미터에 timeout을 지정하여 실패 처리할 수 있습니다. 자세한 내용은 Axon 공식 문서를 참고 바랍니다.


4. Controller 구현

Controller를 통해 API 매핑작업을 수행합니다.

 

1. controller 패키지 만든 후 controller 클래스를 생성합니다.

 


2. Controller 클래스를 구현합니다.

@RestController
@RequiredArgsConstructor
public class TransactionController {
    private final TransactionService transactionService;

    @PostMapping("/holder")
    public CompletableFuture<String> createHolder(@RequestBody HolderDTO holderDTO){
        return transactionService.createHolder(holderDTO);
    }

    @PostMapping("/account")
    public CompletableFuture<String> createAccount(@RequestBody AccountDTO accountDTO){
        return transactionService.createAccount(accountDTO);
    }

    @PostMapping("/deposit")
    public CompletableFuture<String> deposit(@RequestBody DepositDTO transactionDTO){
        return transactionService.depositMoney(transactionDTO);
    }

    @PostMapping("/withdrawal")
    public CompletableFuture<String> withdraw(@RequestBody WithdrawalDTO transactionDTO){
        return transactionService.withdrawMoney(transactionDTO);
    }
}

 

Controller 클래스는 단순히 전달받은 DTO를 Service에 전달하는 역할만 수행하므로 자세한 설명은 생략합니다. 이로써 Command Application 기본 코드 작성은 끝났습니다.


5. Log 설정

 

Command, EventSourcing Handler 메소드가 수행되는 상황을 분석하기 위해서 Logging 설정을 진행합니다.

 

1. resource 폴더 및 application.yml 파일을 연다음 logging 설정을 추가합니다.

 


2. Aggregate 코드에 @Slf4j 어노테이션 추가 및 log 정보를 기록합니다.

 

HolderAggregate.java

@RequiredArgsConstructor
@Aggregate
@Slf4j
public class HolderAggregate {
    @AggregateIdentifier
    private String holderID;
    private String holderName;
    private String tel;
    private String address;

    @CommandHandler
    public HolderAggregate(HolderCreationCommand command) {
        log.debug("handling {}", command);
        apply(new HolderCreationEvent(command.getHolderID(), command.getHolderName(), command.getTel(), command.getAddress()));
    }

    @EventSourcingHandler
    protected void createHolder(HolderCreationEvent event){
        log.debug("applying {}", event);
        this.holderID = event.getHolderID();
        this.holderName = event.getHolderName();
        this.tel = event.getTel();
        this.address = event.getAddress();
    }
}

 

AccountAggregate.java

@RequiredArgsConstructor
@Aggregate
@Slf4j
public class AccountAggregate {
    @AggregateIdentifier
    private String accountID;
    private String holderID;
    private Long balance;

    @CommandHandler
    public AccountAggregate(AccountCreationCommand command) {
        log.debug("handling {}", command);
        apply(new AccountCreationEvent(command.getHolderID(),command.getAccountID()));
    }
    @EventSourcingHandler
    protected void createAccount(AccountCreationEvent event){
        log.debug("applying {}", event);
        this.accountID = event.getAccountID();
        this.holderID = event.getHolderID();
        this.balance = 0L;
    }
    @CommandHandler
    protected void depositMoney(DepositMoneyCommand command){
        log.debug("handling {}", command);
        if(command.getAmount() <= 0) throw new IllegalStateException("amount >= 0");
        apply(new DepositMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @EventSourcingHandler
    protected void depositMoney(DepositMoneyEvent event){
        log.debug("applying {}", event);
        this.balance += event.getAmount();
        log.debug("balance {}", this.balance);
    }
    @CommandHandler
    protected void withdrawMoney(WithdrawMoneyCommand command){
        log.debug("handling {}", command);
        if(this.balance - command.getAmount() < 0) throw new IllegalStateException("잔고가 부족합니다.");
        else if(command.getAmount() <= 0 ) throw new IllegalStateException("amount >= 0");
        apply(new WithdrawMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @EventSourcingHandler
    protected void withdrawMoney(WithdrawMoneyEvent event){
        log.debug("applying {}", event);
        this.balance -= event.getAmount();
        log.debug("balance {}", this.balance);
    }
}

6. API 테스트 코드 작성

Command Application API 테스트를 수행하기 위한 코드를 작성합니다. API 테스트를 위해 Postman을 비롯하여 여러 툴이 있지만, IntelliJhttp 기능을 사용해서 테스트를 진행하도록 하겠습니다.

 

1. Command 모듈 적절한 위치에 http 확장자로 끝나는 파일을 생성합니다.

 


2. http 코드를 작성합니다.

 

POST http://localhost:8080/holder
Content-Type: application/json

{
	"holderName" : "kevin",
	"tel" : "02-1234-5678",
	"address" : "OO시 OO구"
}

###

POST http://localhost:8080/account
Content-Type: application/json

{
  "holderID" : "계정 생성 후 반환되는 UUID"
}

###

POST http://localhost:8080/deposit
Content-Type: application/json

{
  "accountID" : "계좌 생성 후 반환되는 UUID",
  "holderID" : "계정 생성 후 반환되는 UUID",
  "amount" : 300
}

###

POST http://localhost:8080/withdrawal
Content-Type: application/json

{
  "accountID" : "계좌 생성 후 반환되는 UUID",
  "holderID" : "계정 생성 후 반환되는 UUID",
  "amount" : 10
}

###

7. API 테스트 

 

1. AxonServer가 기동된 상태에서 Command App을 수행합니다. 혹시 Axon Server 기동 방법이 궁금하신 분은 AxonServer 설치 및 실행 포스팅을 참고 바랍니다.

 


2. http 파일에서 계정 생성 url에 커서를 위치 시킨다음 [Alt + Enter] 키를 누릅니다.

 


3. Run Localhost:8080 버튼을 눌러 API를 실행합니다.

 

 

4. 정상 실행결과 및 반환된 계정 식별자 값을 확인합니다.

 

또한 위 그림과 같이 Application 로그에도 정상적으로 CommandHanlder 및 EventSourcingHandler 메소드가 수행된 것을 확인할 수 있습니다.


8. 성능 개선

 

데모 프로젝트에서는 Command 명령 생성과 이를 처리하는 Command Handler를 하나의 App에 모두 구현하였음에도 불구하고 위 Application에서는 Command 발행 시 Axon Server와의 통신을 수행합니다. 이는 AxonServer와 연결시 기본적으로 CommandBus로써 AxonServerCommandBus를 사용하기 때문입니다. 

 

이를 개선하기 위해서는 Command 처리시 AxonServer 연결없이 명령을 처리하도록 변경이 필요합니다. AxonFramework에서는 SimpleCommandBus 클래스를 제공하며, 설정을 통해 CommandBus 인터페이스 교체가 가능합니다.

 

설정 변경을 통해 Command Bus를 변경하도록 하겠습니다.

 

1. Command 모듈에 config 패키지 생성 후 AxonConfig 클래스를 생성합니다.

 


2. AxonConfig 클래스를 구현합니다.

 

AxonConfig.java

@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
    @Bean
    SimpleCommandBus commandBus(TransactionManager transactionManager){
        return  SimpleCommandBus.builder().transactionManager(transactionManager).build();
    }
 }

 

AxonFramework는 기본적으로 AxonAutoConfiguration 클래스를 통해 Default 속성을 정의합니다. Custom 속성을 추가하기 위해 Axon 기본 설정이 완료된 후 수행될 수 있도록 @AutoConfigureAfter 어노테이션을 추가했습니다.


3. Application 수행 후 테스트하면 CommandBus가 SimpleCommandBus로 교체된 것을 확인할 수 있습니다.

 o.a.commandhandling.SimpleCommandBus     : Handling command [com.cqrs.command.commands.HolderCreationCommand]

9. 마치며

이로써 Command Application의 전반적인 코드 구현을 완성하였습니다. 하지만 위 프로그램은 구조적인 문제점을 갖고 있습니다. 다음 포스팅에서는 해당 프로그램이 가진 문제점과 이를 해결하는 방법에 대해서 다루도록 하겠습니다.

 

 

 

 

1. 서론 

 

EventSourcing + CQRS 예제 프로젝트 개요 포스팅을 통해 구현할 프로젝트 소개 및 Command와 Event를 도출했습니다. 이번 시간에는 Command, Event, Aggregate 기본 구조를 구현해보도록 하겠습니다.

 

3. EventSourcing + CQRS 예제 프로젝트 개요

1. 개요 지금부터 EventSourcing과 CQRS가 적용된 프로젝트를 구현하면서 AxonFramework 사용법을 배워봅니다. 이에 앞서 앞으로 진행할 프로젝트에 대한 설계를 통해 구조를 잡아보겠습니다. 프로그램 요구사항..

cla9.tistory.com


2. Event 구현

 

 

Event 클래스는 Command와 Query 둘다 사용되므로 공통 모듈에서 구현하겠습니다.

 

1. Common 모듈에 존재하는 build.gradle 파일을 엽니다. 이후 롬복 사용 및 공통 모듈 컴파일 시 jar파일 생성을 위하여 다음과 같이 작성합니다.

 

bootJar { 
    enabled = false 
}
jar {
    enabled = true
}
dependencies{
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
}

2. Common 모듈 패키지 생성을 위해 src > main > java 디렉토리 선택 후 [Alt + Insert] 키를 누릅니다. 이후 package 탭을 선택하고 임의의 package 명을 입력한 다음 OK 버튼을 누릅니다.

 


3. 생성된 패키지 하위에 Event 클래스 4개를 생성합니다.

 


4. Event 클래스를 구현합니다.

 

HolderCreationEvent.java

@AllArgsConstructor
@ToString
@Getter
public class HolderCreationEvent {
    private String holderID;
    private String holderName;
    private String tel;
    private String address;
}

 

AccountCreationEvent.java

@AllArgsConstructor
@ToString
@Getter
public class AccountCreationEvent {
    private String holderID;
    private String accountID;
}

 

DepositMoneyEvent.java

@AllArgsConstructor
@ToString
@Getter
public class DepositMoneyEvent {
    private String holderID;
    private String accountID;
    private Long amount;
}

 

WithdrawMoneyEvent.java

@AllArgsConstructor
@ToString
@Getter
public class WithdrawMoneyEvent {
    private String holderID;
    private String accountID;
    private Long amount;
}

3. Command 구현

만약 Command을 요청하는 App이 실제 처리하는 App과 동일하지 않다면, Command 또한 공통 모듈에 작성하는 것이 바람직합니다. 하지만 데모 프로젝트에서는 Command App에서 모두 처리할 것이므로 Command 모듈내 구현하도록 하겠습니다.

 

1. Command 모듈내에 위치한 패키지 최하위에 commands 패키지를 추가합니다.

 


2. 생성된 패키지 하위에 Command 클래스 4개를 생성합니다.

 


3. Command 클래스를 구현합니다.

 

HolderCreationCommand.java

@AllArgsConstructor
@ToString
@Getter
public class HolderCreationCommand {
    @TargetAggregateIdentifier
    private String holderID;
    private String holderName;
    private String tel;
    private String address;
}

AccountCreationCommand.java

@AllArgsConstructor
@ToString
@Getter
public class AccountCreationCommand {
    @TargetAggregateIdentifier
    private String holderID;
    private String accountID;
}

DepositMoneyCommand.java

@AllArgsConstructor
@ToString
@Getter
public class DepositMoneyCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private String holderID;
    private Long amount;
}

WithdrawMoneyCommand.java

@AllArgsConstructor
@ToString
@Getter
public class WithdrawMoneyCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private String holderID;
    private Long amount;
}

 

Event와 달리 Command 클래스에는 @TargetAggregateIdentifier 어노테이션이 붙었습니다. 이는 AxonFramework 모델링의 단위가 Aggregate이고 각 Aggregate마다 고유한 식별자가 부여되어야 하기 때문입니다. 따라서 Command 클래스를 디자인 할때에도 어떤 Aggregate를 대상으로 명령을 수행할 것인지 알아야 하기 때문에 대상 Aggregate의 식별자 지정이 필요합니다.


4. Aggregate 구현

도메인 주도 개발 방법론(DDD)을 배우면 반드시 등장하는 개념이 Aggregate입니다. AxonFramework 에서도 DDD 기반으로 설계되었기에 Aggregate가 필요합니다. 데모 프로젝트에서는 HolderAccount 연관된 Aggregate를 구현하도록 하겠습니다.

 

1. Command 모듈내 aggregate 패키지를 생성후 Aggregate 클래스 2개를 생성합니다.

 


2. Aggregate 클래스를 구현합니다.

 

HolderAggregate.java

@RequiredArgsConstructor
@Aggregate
public class HolderAggregate {
    @AggregateIdentifier
    private String holderID;
    private String holderName;
    private String tel;
    private String address;
}

 

AccountAggregate.java

@RequiredArgsConstructor
@Aggregate
public class AccountAggregate {
    @AggregateIdentifier
    private String accountID;
    private String holderID;
    private Long balance;
}    

 

Aggregate 클래스에는 클래스 위에 Aggregate임을 알려주는 Annotation을 추가합니다. 또한 Aggregate 별로 식별자가 반드시 존재해야되기 때문에 유일성을 갖는 대표키 속성에 @AggregateIdentifier을 추가합니다.

 

AxonFramework 에서는 모든 명령(Command)과 이벤트(Event)가 Aggregate에서 발생합니다. 따라서 Aggregate에 대한 명령과 이벤트를 처리할 수 있는 Handler 메소드 작성이 필요합니다. 또한 기본적으로 Event Sourcing 패턴을 사용하기 때문에 명령이 발생한 Event를 적용하는 단계가 필요합니다.

 

 

Handler는 대개 Aggregation 클래스에서 정의하며, 외부 클래스에서 별도 정의도 가능합니다. 데모에서는 Aggregate에서 정의하는 방법을 사용할 것이며 외부 정의 방식은 Axon 공식 문서를 참조 바랍니다.

 

AxonFramework를 사용함에 있어 주로 사용하는 Handler Annotation은 다음과 같습니다.

 

  • @CommandHandler : Aggregate에 대한 명령이 발생되었을 때 호출되는 메소드임을 알려주는 마커 역할
  • @EventSourcingHandler : CommandHandler에서 발생한 이벤트를 적용하는 메소드임을 알려주는 마커 역할
  • @EventHandler : Query 모델 혹은 이벤트 발생시 해당 이벤트를 적용하는 메소드임을 알려주는 마커 역할

HolderAggregation 클래스를 대상으로 계정 생성 명령(HolderCreationCommand)과 이로인해 발생하는 계정 생성 이벤트(HolderCreationEvent) 처리하는 Handler 메소드를 작성하면 다음과 같습니다.

 

HolderAggregation.java

@RequiredArgsConstructor
@Aggregate
public class HolderAggregate {
    @AggregateIdentifier
    private String holderID;
    private String holderName;
    private String tel;
    private String address;

    @CommandHandler
    public HolderAggregate(HolderCreationCommand command) {
        apply(new HolderCreationEvent(command.getHolderID(), command.getHolderName(), command.getTel(), command.getAddress()));
    }

    @EventSourcingHandler
    protected void createAccount(HolderCreationEvent event){
        this.holderID = event.getHolderID();
        this.holderName = event.getHolderName();
        this.tel = event.getTel();
        this.address = event.getAddress();
    }
}

 

소스코드를 보면 @CommandHandler@EventSourcingHandler 어노테이션이 추가된 것을 확인할 수 있습니다.

 

먼저 CommandHandler 메소드부터 살펴보겠습니다. 위 코드에서 CommandHandler는 생성자에 추가되었습니다. 이는 계정 생성 명령은 곧 HolderAggregate의 생성을 의미하는 것이기 때문입니다. 해당 메소드 안에 applyAggregateLifeCycle 클래스의 static 메소드이며, 해당 메소드를 통해서 이벤트를 발행합니다.

 

EventSourcingHandler 메소드는 CommandHandler에서 기존에 발행된 이벤트 및 현재 발생한 Command 이벤트를 적용하는 역할을 수행합니다.

 

 

위 그림은 HolderCreationCommand 명령이 발생되었을 때, 수행되는 내부 흐름을 간략하게 표현했습니다.

 

  1. 사용자로부터 Command 명령을 CommandGateway로 전달하면, 메시지 변환 과정(GenericCommandMessage)을 거쳐 CommandBus로 전달합니다.
  2. CommandBus는 Command 명령을 Axon Server로 전송합니다.
  3. AxonServer에서 명령을 Command Bus를 통해 해당 Command를 처리할 App에게 전달합니다.
  4. UnitOfWork(4~7 단계)를 수행합니다. 이 과정에서 Chain으로 연결된 handler 들을 거치면서 대상 Aggregate에 대하여 EventStore로부터 과거 이벤트들을 Loading 하여 최신 상태로 만듭니다. 이후 해당 Command와 연결된 Handler 메소드를 Reflection을 활용하여 호출합니다.
  5. CommandHandler 메소드를 호출하는 과정에서 apply 메소드 호출을 통해 이벤트(HolderCreationEvent)를 발행합니다.
  6. 발행된 Event는 내부 로직을 거치면서 Event 처리를 수행할 Handler를 매핑한 후 EventSourcingHandler 메소드를 Reflection을 활용하여 호출합니다.
  7. EventSourcingHandler 호출이 완료되면, EventBus를 통해 Event 발행을 요청합니다.(publish)
  8. EventBus는 이벤트를 Axon Server에 전달합니다.
  9. EventStore인 Axon Server에서는 전달받은 Event를 저장소에 기록합니다.
  10. 메시지 라우팅 기능을 담당하는 Axon Server는 연결된 App을 대상으로 Event를 전파합니다.

(※ AxonServer와 App간의 연결은 grpc를 사용합니다.)

위와 같이 간단하게 Handler 메소드 두개 작성했을 뿐인데, 내부 로직은 복잡합니다.

 

이번에는 AccountAggregate 클래스를 구현해보도록 하겠습니다.

 

AccountAggregate.java

@RequiredArgsConstructor
@Aggregate
public class AccountAggregate {
    @AggregateIdentifier
    private String accountID;
    private String holderID;
    private Long balance;

    @CommandHandler
    public AccountAggregate(AccountCreationCommand command) {
        apply(new AccountCreationEvent(command.getHolderID(),command.getAccountID()));
    }
    @EventSourcingHandler
    protected void createAccount(AccountCreationEvent event){
        this.accountID = event.getAccountID();
        this.holderID = event.getHolderID();
        this.balance = 0L;
    }
    @CommandHandler
    protected void depositMoney(DepositMoneyCommand command){
        if(command.getAmount() <= 0) throw new IllegalStateException("amount >= 0");
        apply(new DepositMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @EventSourcingHandler
    protected void depositMoney(DepositMoneyEvent event){
        this.balance += event.getAmount();
    }
    @CommandHandler
    protected void withdrawMoney(WithdrawMoneyCommand command){
        if(this.balance - command.getAmount() < 0) throw new IllegalStateException("잔고가 부족합니다.");
        else if(command.getAmount() <= 0 ) throw new IllegalStateException("amount >= 0");
        apply(new WithdrawMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @EventSourcingHandler
    protected void withdrawMoney(WithdrawMoneyEvent event){
        this.balance -= event.getAmount();
    }
}

 

코드를 보면 모든 예외 처리 및 유효성 검증CommandHandler에서 하고 있습니다. 이는 EventStore에 적재된 모든 Event는 재생해야할 대상이기 때문에 EventSourcingHandler에서는 Replay만 수행합니다. 따라서 CommandHandler에서 사전 Exception 처리 및 유효성 검증을 통해서 검증된 Event만을 발행해야합니다.


5. Axon Server 라우팅 기능(Command)

지금까지 AxonFramework를 사용하는 Client 입장에서 동작 원리를 살펴봤습니다. 이번에는 Server 입장에서 메시지 라우팅이 어떻게 이루어지는지 살펴보도록 하겠습니다.

 

 

상황 1. Command를 처리하는 Handler가 하나만 Axon Server에 등록된 경우

 

 

Application 기동시 AxonServer와 연결을 시도합니다. 연결이 완료되면, 해당 App은 자신이 처리가능한 Command Handler 정보를 Server에 등록합니다. 

 

이때 다른 App에서 Command 명령을 요청하게되면 AxonServer에서는 해당 Command를 수행할 수 있는 App을 알기 때문에 해당 App으로 Command 명령을 전달합니다.


상황2. 동일한 Command를 처리하는 Handler가 복수개 등록된 경우

 

 

동일한 Command A를 처리할 수 있는 Handler 메소드를 포함하는 App이 복수개로 등록되었을 경우 내부 흐름은 다음과 같습니다.

 

Axon Server에서는 Command가 도착할 경우 어떤 App에서 수행해야할지를 결정 해야합니다. 따라서 이를 위해 라우팅 테이블에 두 App의 정보를 등록합니다.

 

라우팅 테이블에는 어떤 노드들이 Server와 연결되어있고, 해당 노드들이 어떤 Command를 처리할 수 있는지에 대한 정보가 담겨있습니다. 내부 아키텍처는 Consistenet Hashing 기법을 사용하고 있으며, 관련 설명은 charsyam 님 블로그를 참고바랍니다.

 

 

이러한 상황에서 새로운 App에서 A Command를 요청했다면, 해당 요청 속에 포함된 라우팅 키를 찾아 라우팅 테이블에서 적합한 App을 선정하여 호출하게 됩니다. 그렇기에 Client Side 모델 데이터가 Sharding 되어있거나 고가용성을 위해 Cluster로 App을 구축했더라도 Command 명령은 정확히 하나의 App Command Handler에만 전달됩니다.

 

라우팅 키는 @TargetAggregateIdentifier 혹은 @RoutingKey 어노테이션을 Command에 포함시에 자동으로 생성됩니다.


6. 마치며

이번 포스팅에서는 Event, Command, Aggregate 모델을 구현했습니다. 다음 시간에는 API 구현 및 테스트를 통해서 실제 동작하는 과정에대해 알아보도록 하겠습니다.


Tip)

 

1. AxonServer에 저장된 Event 내역은 DashBoard에서 Search 항목을 누르면 조회할 수 있습니다. 

 


2. Dashboard Commands 탭에서는 등록된 Command Handler 정보 및 현재 수행중인 Command 발생 빈도를 확인할 수 있습니다.

 


3. Command 명령이 발생하면 내부적으로는 몇차례 Command 메시지 변환 과정을 거쳐 부가적인 정보가 추가됩니다. 

 

1단계

Command :
"CreateHolderCommand(
  holderID='1f2cf247-afe7-46d6-bc6d-d588643d6643', 
  holderName= 'kevin', 
  tel='1234-5678', 
  address='서울시')"
callback: 
FailureLoggingCallback@12115

2단계 (GenericMessage 변환후 메시지)

commandMessage:
"GenericCommandMessage{
	payload={
	CreateHolderCommand(
  		holderID='1f2cf247-afe7-46d6-bc6d-d588643d6643', 
  		holderName= 'kevin', 
		tel='1234-5678', 
		address='서울시'),
	metadata={},
	messageIdentifier = '040ffa0c-5d2d-4588-a04c-8051867d4057',
	commandName ='com.cqrs.command.commands.CreateHolderCommand}'"
commandCallback: 
FailureLoggingCallback@12115

 

기본 Command 정보 외 message 식별자 및 Command 패키지 정보 등이 포함되어 있는 것을 확인할 수 있습니다. 참고로 해당 메시지는 CommandGateway의 Send API를 사용했을 때의 메시지 내용입니다.

1. 서론

이번 시간에는 지난 포스팅에 이어서 AxonFramework 구성AxonServer 연동에 대해 다루도록 하겠습니다. 진행하기 앞서 몇가지 사전 작업이 필요합니다.

 


2. AxonFramework 설정

 

1. Command 모듈 build.gradle 파일을 엽니다. 이후 아래와 같이 의존성을 추가합니다.

 

 

추가된 의존성 중 AxonFramework와 직접 연관된 항목은 다음과 같습니다.

ext{
    axonVersion = "4.2.1"
}

dependencies{
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
    implementation group: 'org.axonframework', name: 'axon-configuration', version: "$axonVersion"
}

2. lombok 사용을 위해 Annotation Processing을 활성화 합니다.

(File > Settings > Build, Execution, Deployment > Compiler > Annotation Processors)

 


3. 환경설정을 위해 resources 폴더 밑에 application.yml 파일을 생성합니다. 이후 다음과 같이 작성합니다.

 

 

server:
  port: 8080

spring:
  application:
    name: eventsourcing-cqrs-command
  datasource:
    platform: postgres
    url: jdbc:postgresql://localhost:5432/command
    username: command
    password: command
    driverClassName: org.postgresql.Driver
  jpa:
    hibernate:
      ddl-auto: update

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124


4. Axon Server 연동 테스트를 위해 CommandApplication 클래스에 CommandHandler 메소드를 추가합니다.

(※ 연동 테스트 이후 해당 메소드는 삭제합니다.)

 


5. Command 모듈을 실행시킵니다. App이 정상적으로 실행되고, AxonServerCommandBus가 할당된 것을 확인합니다.

 


6. Axon Server 대시보드(http://localhost:8024) Overview 화면에서 Command App과 연결된 토폴로지를 확인합니다.

 

 


7. Query 모듈 설정을 위해 build.gradle 파일을 엽니다. 이후 Command 모듈 build.gradle과 동일하게 의존성을 추가합니다.

(※ 만약 Read Model을 MongoDB로 사용할 경우 Mongo DB 관련 의존성을 추가합니다.)

 


8. Query 모듈 resources 폴더 밑에 application.yml 파일을 생성합니다. 이후 다음과 같이 작성합니다.

(※ Command 모듈과 조금씩 다른 부분에 주의합니다.)

 

 

server:
  port: 9090

spring:
  application:
    name: eventsourcing-cqrs-query
  datasource:
    platform: postgres
    url: jdbc:postgresql://localhost:5432/query
    username: query
    password: query
    driverClassName: org.postgresql.Driver
  jpa:
    hibernate:
      ddl-auto: update

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

9. Axon Server 연동 테스트를 위해 QueryApplication 클래스에 EventHandler 메소드를 추가합니다.

(※ Command와 마찬가지로 연동 테스트 이후 해당 메소드는 삭제합니다.)

 


10. Query 모듈을 실행합니다. App이 정상적으로 실행되고, AxonServer 연결과 이벤트 추적을 위한 TrackingEventProcessor가 할당된 것을 확인합니다.

 


11. Axon Server 대시보드(http://localhost:8024) Overview 화면에서 기존에 연결된 Command App과 새로 연결된 Query App 토폴로지를 확인합니다.

 

 


12. 테스트를 위해 Command, Query 모듈에 작성한 핸들러 메소드를 삭제합니다.


3. 마치며

이번 포스팅을 통해서 AxonFramework의 기본 설정 및 Axon Server와의 연동을 살펴보았습니다. 하지만 중간에 등장한 CommandHandler, EventHandler, CommandBus, TrackingEventProcessor 등의 용어는 아직 생소합니다. 다음 포스팅부터는 본격적으로 Command, Query 프로그램 작성하면서 위 개념등을 살펴보겠습니다.

+ Recent posts