1. 서론

 

 

이번 포스팅부터 Query App 구현을 다루겠습니다. Query App은 Event를 수신받아 Read Model에 반영하는 Projection 작업과 Read Model을 읽는 Query 2가지로 기능이 나뉩니다. 기능별로 실제 구현 코드량은 얼마되지 않지만 알아야 하는 개념이 많으므로 먼저 Event 처리 기능 관련하여 다루고 향후에 Query 기능을 다루겠습니다.

 

이번 내용은 EventHandler 구현 실습전 내부 처리 과정을 살펴보겠습니다.


2. Event 처리 과정

 

Token Store

 

 

이전 포스팅에서 확인 하였듯이 EventStore에는 EventStream의 내용을 순차적으로 적재합니다. 따라서 수신부에서 지금까지 수신된 Event는 어디까지이며, EvenStore에서 어디서부터 Event를 수신 받아야할지에 대한 정보를 가지고 있어야합니다. 해당 정보를 Token이라고 하며, Token은 Query App과 연관된 DB 내부에 저장하여 영구적으로 관리합니다.(Token Store) 

 

 

 

Token 내용

<org.axonframework.eventhandling.GlobalSequenceTrackingToken>
  <globalIndex>13</globalIndex>
</org.axonframework.eventhandling.GlobalSequenceTrackingToken>

 

위 예시는 TokenStore에 저장된 내용입니다.Token 컬럼에는 지금까지 Tracking된 Event의 Global Sequence 값이 들어있으며, 예시를 통해 13번이 마지막으로 수신된 Event임을 알 수 있습니다.

 


Tracking Event Processor

 

 

TokenStore를 통해서 마지막 수신 Event 정보를 알 수 있다면, 해당 정보를 토대로 Event 수신 요청 및 처리를 담당하는 중계 역할이 필요합니다. Axon에서 제공하는 Event 처리기는 Subscribing Event Processor(SEP), Tracking Event Processor(TEP) 2가지가 있습니다. 두 Event 처리기 차이점은 이벤트 발행 쓰레드에서 Event 처리여부입니다. SEP는 Event 발행 쓰레드에서 Event 또한 처리하며, TEP는 별도 쓰레드에서 처리합니다. TEP를 이해하는 것이 중요하므로 이를 중점적으로 다루겠습니다.


3. Tracking Event Processor

 

 

위 그림은 Query App이 기동될 때 EventProcessor 생성 및 처리 과정 흐름을 간략하게 표현했습니다. 

 

  1. Spring DefaultLifecycleProcessor가 Start 명령을 내립니다.
  2. Axon의 기본 설정을 담당하는 DefaultConfigurer 클래스 Start 메소드가 호출됩니다. 이후 등록된 Handler에게 수행 명령을 내립니다. 기본으로 등록된 Handler는 2개입니다.(EventProcessingModule, EventProcessorInfoConfiguration)
  3. EventProcessingInfoConfiguration의 Start 메소드가 호출됩니다.
  4. 내부 로직을 거쳐 ProcessorInfoSource와 EventProcessorControlService를 구동합니다. ProcessorInfoSource는 AxonServer에게 EventProcessor의 현재 상태를 주기적(Default 500ms)으로 보내는 역할을 담당합니다. EventProcessorContolService는 AxonServer에서 요청시 EventProcessor를 제어하는 서비스 역할을 담당합니다. EventPRocessorControlService의 실제 로직 수행은 AxonServerConnectionManager 및 EventProcessorController가 담당합니다.
  5. EventProcessingModule을 구동합니다. 이 과정에서 EventProcessor 생성을 요청합니다.
  6. TrackingEventProcessor를 생성합니다. 사용자가 별도 속성 정의를 하지 않았으면, DefaultEventProcessor가 생성됩니다. (※ Thread 수 : 1개, 배치 사이즈 : 1, 최대 Thread 수 : Segment 개수, tokenClaim 주기 : 5000ms)
  7. 생성된 TrackingEventProcessor에게 구동을 요청합니다. 내부적으로 AxonThreadFactory에게 WorkerLauncher 인스턴스에 대한 Thread 생성을 요청합니다. 
  8. TrackingEventProcessor가 구동중이면 내부로직이 반복 수행될 수도록 무한 루프로 구성되어 있습니다. 만약 EventStore Segment에 변경된 내역을 확인하여 처리해야한다면, AxonThreadFactory에게 TrackingSegmentWorker 인스턴스 생성을 요청합니다.
  9. Segment에 대한 Event 처리(ProcessingLoop)를 요청합니다.

 

9번 Segment Event 처리 과정을 자세히 확인하기 위해 순서도를 그리면 다음과 같습니다.

 

 

  1. EventStream을 오픈합니다.
  2. EventStream에서 최신 Event가 존재하는지 확인합니다. 존재하지 않는다면 Token 값을 갱신하고 작업을 종료합니다.
  3. 최신 Event를 수신받은 후 해당 App에서 처리가 가능한 Event인지 확인합니다. 만약 처리할 수 없다면 BlackList 등록이 가능한지 확인하고 이를 등록합니다. 이후 해당 Event는 MessageMonitor에 보고한 뒤 무시합니다.
  4. 처리가 가능한 Event라면 UnitOfWork를 수행합니다. 해당 과정에서 Token 값은 자동 갱신합니다. 이후 EventHandler를 찾아 메소드를 수행합니다.
  5. Event 처리가 완료되면 작업을 종료합니다.

 

내부구조는 복잡하지만 이를 한줄로 요약하자면 "주기적으로 TrackingEventProcessor에서 처리가 가능한 Event가 존재하는지 확인 및 처리(UnitOfWork)하고 Token 갱신하는 작업"으로 정의할 수 있습니다.


4. Axon Server 라우팅기능(Event)

이번에는 Client가 아닌 AxonServer 입장에서 Event 전달 흐름을 살펴보도록 하겠습니다.

 

 

Query Application을 구동하면 Token 정보를 읽어옵니다. 이후 EventStore에게 자신이 보유한 Token 정보를 알려준 다음 EventStream을 오픈합니다. 위 예시에는 요청당시 EventStore에는 추가로 유입된 Event가 없으므로 Application의 Token값 변경 후 작업을 종료합니다.

 

 

만약 Command App으로부터 Event가 추가된다면, 다음 ProcessingLoop 작업에서 해당 Event를 Query App으로 전달합니다. 수신받은 App에서 해당 Event 처리 가능 여부를 확인하는데, 만약 처리하지 못할 경우는 AxonServer에게 BlackList 추가를 요청합니다. 따라서 이후 신규 적재되는 Event에 대해서는 수신받지 않습니다.

 

 

신규 App이 추가로 기동되어 AxonServer에 등록요청한 상태에서 마지막 수신 Event가 2번이라면, AxonServer에서 3번부터 해당 Event를 App으로 전달합니다.


4. 마치며

 

이번 포스팅에서는 EventHandler 메소드를 수행하기위해서 알아야할 내부 구조를 알아봤습니다. 다음 포스팅에서는 EventHandler 메소드 구현에 대해 다루어보겠습니다.

+ Recent posts