1. 서론

 

이전 포스팅에서 Replay에 대해서 학습했습니다. Replay는 신규 Read Model이 추가되거나 기존 모델의 변경이 있을 때, EventStore에서 기존 내역을 전달받아 재수행하는 작업입니다. 따라서 EventSourcing & CQRS 모델을 사용한다면, Replay 성능 고민이 반드시 필요합니다. 이번 포스팅에서는 AxonIQ 블로그를 기반으로 Replay 성능 개선 방법에 대해서 소개하겠습니다.

 

본문 설명에 앞서 AxonIQ 벤치마크 테스트 환경 Spec은 다음과 같습니다.

  • DBMS : Postgres(9.6), MongoDB(3.6)
  • CPU : vCPU 8코어 (GCP)
  • RAM : 30G
  • OS : Ubuntu 18.10
  • DISK : SSD 1T

 


2. Replay 문제점

 

만약 EventStore에 저장된 Event 수가 10억개이고, Read Model에서 Replay 수행 시 초당 1000개의 Event를 재생할 수 있다고 가정한다면, Replay 작업에만 약 11일이 걸립니다. 이는 대부분의 상황에서는 적용하기 힘듭니다. 따라서 Replay 작업  최적화가 반드시 필요합니다. 

 

앞선 포스팅에서 TrackingEventProcessor에 의하여 @EventHandler 메소드가 호출된다고 설명했습니다. 이때 별도 속성을 지정하지 않으면, TrackingEventProcessor는 Default 값으로 설정되며 기본값은 다음과 같습니다.

 

  • Thread 수 : 1개
  • Batch Size : 1
  • 최대 Thread 수 : Segment 개수
  • TokenClaim 주기 : 5000ms

 

이를 통해 알 수 있는 사실은 Axon에서 Event 처리는 Batch 단위로 이루어지는데, 기본 설정 값은 Event 1개씩 단일 Thread로 처리됨을 확인할 수 있습니다.

 

AxonIQ에서 TrackingEventProcessor에 대한 기본값 설정으로 Event 처리에 대한 벤치마크 수행 결과, 초당 260개의 Event를 처리하였습니다. 이는 100만개 Event 기준 10시간의 처리 능력을 보여주어 좋지 않은 처리 능력을 나타냈습니다. 지금부터 Event 처리 능력을 강화하는 방법에 대해서 하나씩 살펴보겠습니다.


3. Replay 개선 전략

 

3-1. Batch Size 조정

 

이전 설명에서 Axon Framework에서 Event를 Batch 단위로 처리한다고 했습니다. Batch 작업 시, 개별적인 Event를 다루는 것 외에 부가적인 기능은 다음과 같습니다.

 

  1. Tracking Token을 갱신하여 최신의 Event Stream 위치를 기억하도록 함.
  2. Transactional Store를 사용했을 때 DB의 Transaction Commit 작업 수행.

 

사용자의 별도 설정이 없다면, 기본 Batch Size는 1입니다. 이는 Replay 작업을 수행하기에는 너무 적은 수치입니다. 따라서 적절한 PoC를 통해 최적의 Size를 맞추어야 합니다. Axon에서는 TrackingEventProcessorConfiguration을 통해서 Size 조정이 가능합니다. 설정 방법은 다음과 같습니다.

 

1. Query 모듈내 config 패키지 생성 후 AxonConfig 파일을 생성합니다.

 


2. AxonConfig 클래스를 구현합니다.

 

AxonConfig.java

@Configuration
public class AxonConfig {
    @Autowired
    public void configure(EventProcessingConfigurer configurer) {
        configurer.registerTrackingEventProcessor(
                "accounts",
                org.axonframework.config.Configuration::eventStore,
                c -> TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
                        .andBatchSize(100)
        );
    }
}

 

코드 구현 내용은 accounts ProcessingGroup을 처리하는 TrackingEventProcessor의 Batch Size를 100으로 설정하였습니다.


출처 : https://axoniq.io/blog-overview/cqrs-replay-performance-tuning

 

위 그림은 Batch Size 조정에 따른 초당 Event 처리량을 그래프로 표시한 결과입니다. Size를 1부터 500까지 늘렸을 때 초당 260개의 이벤트 처리량에서 4000개로 15배의 성능 개선이 나타났습니다. 대략 500개 이상부터는 Size 증가에 따른 개선폭이 크지 않으므로 해당 예제에서는 500개가 적정선입니다. Batch Size 크기에 대한 적절한 가이드는 없으며, 이는 각 Application 환경에 맞게 테스트 이후 찾는 것이 좋습니다.

 

Batch Size를 늘려서 Transaction을 처리할 때 주의점이 있습니다. 이는 Read Model에 사용되는 DB가 Non-Transactional 하다면, 만약 Batch 중간에 실패했을 때 데이터가 자동 Rollback되지 않습니다. 따라서 이후 다시 Replay를 시도하게되면, 이미 처리된 Event가 다시 수행되므로 유의해야합니다.


3-2. 병렬 처리

 

출처 : https://axoniq.io/blog-overview/cqrs-replay-performance-tuning

 

TrackingEventProcessor의 Thread 수 기본 값은 1입니다. 즉 하나의 Thread로 모든 작업을 순차처리합니다. 따라서 Event Replay 성능을 높이기 위해서는 병렬도 증가가 필요합니다.

 

위 그림은 Batch Size는 500으로 설정한 상태에서 Thread 개수를 1에서 8개로 점차 늘렸을 때 초당 Event 처리량을 나타낸 것입니다. 병렬도를 8로 지정했을 때 초당 15000개의 Event를 처리할 수 있으므로 단일 Thread 대비 대략 4배정도의 개선이 이루어졌습니다.

 

 

하지만 Thread 개수를 늘릴 때는 고려해야할 사항이 많습니다. 그 이유는 병렬로 처리하게되면 처리되는 Event의 순서가 뒤바뀔 수 있기 때문입니다. 따라서 병렬 처리를 수행시, 순서가 보장될 수 있도록 처리해야합니다. Axon에서는 이러한 문제를 해결 하기 위해 Sequencing 정책을 제공합니다. Sequencing 정책이란 동일 Thread 내에서 Event는 반드시 처리 순서 보장에 대한 결정을 의미합니다.

 

기본적으로 Axon에서는  동일 Aggregate에 속한 Event는 동일 Thread에서 처리될 수 있도록 SequentialPerAggregatePolicy 클래스를 제공합니다. 이를 적용하여 AxonConfig 클래스 수정을 통해 병렬도를 변경하도록 하겠습니다.

 

AxonConfig.java

@Configuration
public class AxonConfig {
    @Autowired
    public void configure(EventProcessingConfigurer configurer) {
        configurer.registerTrackingEventProcessor(
                "accounts",
                org.axonframework.config.Configuration::eventStore,
                c -> TrackingEventProcessorConfiguration.forParallelProcessing(3)
                        .andBatchSize(100)
        );

        configurer.registerSequencingPolicy("accounts",
                configuration -> SequentialPerAggregatePolicy.instance());
    }
}

 

이전 대비 변경된 내용은 TrackingEventProcessor에 대해서 병렬도를 3으로 지정하였습니다. 또한 accounts ProcessingGroup을 대상으로 SequentialPerAggregatePolicy를 적용하여, 단일 Thread 내에서 동일 Aggregate Event가 순서대로 처리되도록 정책 설정하였습니다.


 

코드 변경 후 실제 Application을 구동한다음 Token Store에 TrackingEventProcessor 별로 Token이 생긴 것을 확인할 수 있습니다.


하지만 데모 프로젝트에서 단순히 위와같이 병렬도를 지정하고 Application을 수행하면, 소유주가 존재하지 않습니다 Error가 발생할 수도 있습니다. 이유는 아래와 같습니다.

 

데모 프로젝트 Aggregate 종류

 

 

Command App에서 구현된 Aggregate는 Holder와 Account 두개 입니다. 비즈니스 로직상 Account Aggregate는 반드시 Holder Aggregate가 존재해야지만 생성이 가능하며, Event Stream에도 순차적으로 생성되어 있습니다. 하지만 Read Model을 반영하는 과정에서 Thread를 분리시키면, 근본적으로 두 개의 Aggregate는 다릅니다. 따라서 SequentialPerAggregatePolicy 설정했어도 다른 Thread에 생성되어 처리될 가능성이 높습니다. 

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
	(...중략...)
	private HolderAccountSummary getHolderAccountSummary(String holderID) {
        log.debug("getHolder : {} ",holderID);
        return repository.findByHolderId(holderID)
                .orElseThrow(() -> new NoSuchElementException("소유주가 존재하지 않습니다." + holderID));
    }
}    

 

위 코드는 Account 생성 Event를 처리하기 위해 Repository 에서 Holder를 찾는 로직을 일부 발췌하였습니다. 이때 두 Aggregate가 다르므로 Account  생성 시점 Thread 1번에서 수행중인 Holder Aggregate가 DB에 반영되어있지 않을 수 있습니다. 그 결과 NoSuchElementException이 발생할 수 있습니다.

 

Replay를 수행할 때 의도적으로 @DisallowReplay 어노테이션을 추가하지 않는 이상 EventHandler에서 Event 누락이 발생되어서는 안됩니다. 따라서 문제점 해결을 위해 데모 프로젝트에서는 저장소 검색 과정에서 위와 같은 에러를 만나게 되었을 때 약간의 시차를 두고 다시 시도하게끔 지정하고자 합니다. 이를 위해 spring-retry 기능을 사용하도록 하겠습니다.


1. Query 모듈 build.gradle 파일을 열어 dependencies를 추가합니다.

 

build.gradle

dependencies{
 (...중략...)
 implementation group: 'org.springframework.retry', name: 'spring-retry'
}

2. projection 패키지 HolderAccountProjection 클래스 파일을 엽니다.

 


3. HolderAccountProjection 클래스 상단에 @EnableRetry 어노테이션을 추가합니다. 

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
	(...중략...)
}    

4. Account 생성 이벤트를 처리하는 Event Handler 메소드에 @Retryable 어노테이션을 추가합니다. 

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
	(...중략...)
    
    @EventHandler
    @Retryable(value = {NoSuchElementException.class}, maxAttempts = 5, backoff = @Backoff(delay = 1000))
    @AllowReplay
    protected void on(AccountCreationEvent event, @Timestamp Instant instant)  {
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setAccountCnt(holderAccount.getAccountCnt()+1);
        repository.save(holderAccount);
    }
    
    (...중략...)
}    

 

위 코드의 내용은 Repository로부터 NoSuchElementException이 발생하면 1초 대기후에 다시 시도하며, 최대 5번까지 재수행을 시도하도록 설정하였습니다.

 

위 4단계를 거친 다음 Query App을 기동하여 Reset을 수행하면, 발생하였던 문제가 정상적으로 수행됨을 확인할 수 있습니다.


3-3. Batch 최적화

 

Read Model 구현에 있어 JPA를 사용한다면 Batch Update를 수행할 때 자동으로 최적화를 수행합니다. 가령 동일한 Record에 대하여 Update가 연속 두번 발생하면, 실제로는 Entitiy 로딩하기 위한 1번의 DBMS Call과 JPA 내부 Persistence Context에서 2번의 수정 작업을 거쳐 최종 1번의 Update DBMS Call이 발생합니다.

 

또한 다량의 Insert 작업이 중간에 Update 작업 없이 발생한다면, DBMS에 Insert를 반영할 때 SQL 단건씩 DBMS Call을 발생시키는 것이 아니라 Bulk Insert를 수행하여 최적화를 달성합니다.

 

하지만 JPA를 통한 Model 이외에도 적용할 수 있는 최적화 방법은 여러가지가 있습니다. 그 중 2가지 방법을 소개하겠습니다.

 

  1. Entitiy에 대해서 Update를 수행하기 위해서는 기본적으로 데이터를 Persistence Context에 Load 이후에 변경을 실시하는데, Replay 과정에서는 굳이 데이터를 Load할 필요없이 바로 Update 구문을 수행함으로써 DBMS Call을 줄일 수 있습니다. 
  2. 동일 Aggregate에 대해서 발생하는 Insert, Update, delete Event 순서를 결과에 어긋나지 않게 재배치하여 Bulk 작업을 수행한다면, 획기적으로 DBMS Call을 줄일 수 있습니다. 

 

방금 소개시켜드린 2가지 방법은 단순히 Configuration 설정 변경으로는 적용할 수 없습니다. 따라서 이를 해결 하기위해서는 DBMS 최적화에 대한 기술적인 고민과 이를 적용하는 내용을 프로그래밍해야 합니다. Axon 에서는 이를 보조하기 위하여 다양한 API를 제공합니다. 또한 각각의 Batch 마다 하나씩 UnitOfWork가 존재합니다. 따라서 EventHandler 메소드 파라미터에 UnitOfWork 인자를 추가하면 UnitOfWork에서 처리되는 자원에 접근할 수 있습니다. 이를 통해 메시징 처리를 사용자가 효율적으로 Customizing 처리할 수 있도록 도움을 줍니다.

 

위 소개드린 2가지 기법들을 활용하여 Axon 에서 벤치마크 테스트한 결과, 기존 초당 15000개의 처리량에서 30000개로 2배의 성능 향상을 이룰 수 있었습니다. 이는 최초대비 115배의 성능 개선을 달성한 것입니다. 


4. Mongo DB 테스트 결과

 

Read Model을 Mongo DB를 사용햇을 때 이전 Replay 최적화 방식을 도입함에 있어 AxonIQ 벤치마크 테스트 결과는 다음과 같습니다.

 

  1. 최적화 없음 : 초당 12000개
  2. Batch 최적화 : 초당 30000개

 

AxonIQ 벤치마크 결과 Postgresql과 크게 다르지 않은 처리량을 볼 수 있습니다.


5. 마치며

 

이번 시간에는 Replay 성능 개선에 대하여 다루었습니다. 개인적으로 EventSourcing & CQRS를 도입하는데 있어 기술적으로 가장 고민을 많이해야하는 부분이 이번 포스팅 내용인 것 같습니다. Replay 수행에 있어 Application 개선 뿐만 아니라 DBMS 최적화 기법을 같이 고려한다면 더 큰 성능개선이 이루어질 수 있으므로 Read Model DB에 대한 지식은 큰 도움이 될 것입니다. 다음 시간에는 요구사항 변경에 따른 Event 모델 수정 및 Versioning에 대하여 다루겠습니다.

 

 

 

 

+ Recent posts