1. 서론
이번 시간에는 Query App의 EventHandler 로직을 구현하도록 하겠습니다. Query App의 Read Model은 이전 포스팅에서 도출한 구조를 사용하도록 하겠습니다.
2. Projection 구현
Command에서 발생된 Event를 적용하는 과정을 Projection이라 합니다. 먼저 설계 단계에서 도출한 Entity 구현 이후 Projection을 구현하겠습니다.
1. Query 모듈내 entity 패키지를 생성후 Entity 클래스를 생성합니다.
2. Entity 클래스를 구현합니다.
HolderAccountSummary.java
@Entity
@Table(name = "MV_ACCOUNT")
@NoArgsConstructor
@AllArgsConstructor
@Builder
@Getter @Setter
public class HolderAccountSummary {
@Id
@Column(name = "holder_id", nullable = false)
private String holderId;
@Column(nullable = false)
private String name;
@Column(nullable = false)
private String tel;
@Column(nullable = false)
private String address;
@Column(name = "total_balance", nullable = false)
private Long totalBalance;
@Column(name = "account_cnt", nullable = false)
private Long accountCnt;
}
3. repository 패키지를 생성 후 entitiy repository 인터페이스를 생성합니다.
4. repository 인터페이스 메소드를 정의합니다.
AccountRepository.java
public interface AccountRepository extends JpaRepository<HolderAccountSummary,String> {
Optional<HolderAccountSummary> findByHolderId(String holderId);
}
5. projection 패키지 생성 후 projection 클래스를 생성합니다.
6. Projection 클래스를 구현합니다.
HolderAccountProjection.java
@Component
@AllArgsConstructor
@Slf4j
public class HolderAccountProjection {
private final AccountRepository repository;
@EventHandler
protected void on(HolderCreationEvent event, @Timestamp Instant instant) {
log.debug("projecting {} , timestamp : {}", event, instant.toString());
HolderAccountSummary accountSummary = HolderAccountSummary.builder()
.holderId(event.getHolderID())
.name(event.getHolderName())
.address(event.getAddress())
.tel(event.getTel())
.totalBalance(0L)
.accountCnt(0L)
.build();
repository.save(accountSummary);
}
@EventHandler
protected void on(AccountCreationEvent event, @Timestamp Instant instant){
log.debug("projecting {} , timestamp : {}", event, instant.toString());
HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
holderAccount.setAccountCnt(holderAccount.getAccountCnt()+1);
repository.save(holderAccount);
}
@EventHandler
protected void on(DepositMoneyEvent event, @Timestamp Instant instant){
log.debug("projecting {} , timestamp : {}", event, instant.toString());
HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
holderAccount.setTotalBalance(holderAccount.getTotalBalance() + event.getAmount());
repository.save(holderAccount);
}
@EventHandler
protected void on(WithdrawMoneyEvent event, @Timestamp Instant instant){
log.debug("projecting {} , timestamp : {}", event, instant.toString());
HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
holderAccount.setTotalBalance(holderAccount.getTotalBalance() - event.getAmount());
repository.save(holderAccount);
}
private HolderAccountSummary getHolderAccountSummary(String holderID) {
return repository.findByHolderId(holderID)
.orElseThrow(() -> new NoSuchElementException("소유주가 존재하지 않습니다."));
}
}
위 코드는 Projection 로직을 담고 있습니다. EventHandler 메소드 파라미터에는 @Timestamp 외 @SequenceNumber, ReplayStatus 등이 추가로 전달될 수 있으며, 자세한 내용은 Axon 공식 문서를 참고 바랍니다.
3. 테스트
API 테스트를 통해 발행된 Command가 Read Model에 제대로 반영되는지 테스트 하겠습니다.
1. reousrces 하위 application.yml 파일을 오픈 후에 로깅 정보를 입력합니다.
2. AxonServer 기동 후에 Query App을 수행합니다. 기동이 완료되면 DB에 테이블이 정상 생성되었는지 확인합니다.
3. Command App을 기동합니다. 이후 계정 생성, 계좌 생성, 입금, 출금 Command 명령을 수차례 반복 수행합니다.
4. 테스트 이후 Query App Read Model 갱신 여부를 확인합니다. 또한 Application Log를 통해서 Event 정상 처리를 확인합니다.
c.c.q.p.HolderAccountProjection : projecting HolderCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, holderName=kevin, tel=02-1234-5678, address=OO시 OO구) , timestamp : 2020-01-07T12:11:21.047Z
c.c.q.p.HolderAccountProjection : projecting HolderCreationEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, holderName=bruce, tel=02-5291-5678, address=OO시 OO구) , timestamp : 2020-01-07T12:12:14.238Z
c.c.q.p.HolderAccountProjection : projecting AccountCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=0e6d3546-4163-4083-bf2f-50f1289d8c25) , timestamp : 2020-01-07T12:12:27.633Z
c.c.q.p.HolderAccountProjection : projecting AccountCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=042937fb-e658-441d-b23a-fd56be237563) , timestamp : 2020-01-07T12:12:33.961Z
c.c.q.p.HolderAccountProjection : projecting AccountCreationEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, accountID=9eb1f188-c38f-401d-b212-3c26ea84acfa) , timestamp : 2020-01-07T12:12:45.193Z
c.c.q.p.HolderAccountProjection : projecting AccountCreationEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f) , timestamp : 2020-01-07T12:13:28.542Z
c.c.q.p.HolderAccountProjection : projecting DepositMoneyEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f, amount=300) , timestamp : 2020-01-07T12:15:02.059Z
c.c.q.p.HolderAccountProjection : projecting WithdrawMoneyEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f, amount=30) , timestamp : 2020-01-07T12:15:08.242Z
c.c.q.p.HolderAccountProjection : projecting WithdrawMoneyEvent(holderID=0df539ed-2ee2-41be-af32-0f6724a75da3, accountID=40d52cac-20d0-49c2-b973-049bd585108f, amount=20) , timestamp : 2020-01-07T12:15:13.488Z
c.c.q.p.HolderAccountProjection : projecting DepositMoneyEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, accountID=9eb1f188-c38f-401d-b212-3c26ea84acfa, amount=300) , timestamp : 2020-01-07T12:15:29.451Z
c.c.q.p.HolderAccountProjection : projecting WithdrawMoneyEvent(holderID=a5d267c6-fbd8-4f0d-b93b-03a8dbf09747, accountID=9eb1f188-c38f-401d-b212-3c26ea84acfa, amount=150) , timestamp : 2020-01-07T12:15:38.423Z
위 내역은 전체 로그중 일부만 발췌한 결과입니다. 확인 결과 Event가 정상적으로 반영된 것을 알 수 있습니다.
4. Replay
이전 포스팅에서 마지막 수신 Event Token 정보를 토대로 AxonSever으로부터 다음 목록을 수신받아 처리한다고 설명 했습니다. 하지만 때로는 Read Model 구조를 재구성하기 위해서 Event를 재생해야될 수 있습니다. Axon에서는 이를 위해 Replay 기능을 제공하며, 특정 시점부터 혹은 전체의 Event에 대한 Replay를 수행할 수 있습니다.
Replay 기능이 동작하면, 내부적으로는 Token 정보를 초기화하여, 특정 시점 혹은 처음부터 발행된 Event를 전달받아 재수행합니다.
이번 시간에는 전체 Event를 재생하는 방법에 대해서 설명하며, 특정 시점부터 이벤트 Replay는 Axon 공식 문서를 참고 바랍니다.
1. Replay를 수행하기 위해 Projection 클래스를 변경합니다. 먼저 ProcessingGroup을 지정하여 TrackingEventProcessor로 하여금 어떤 Group을 대상으로 Replay를 수행할지 지정합니다.
HolderAccountProjection.java
@Component
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
(...중략...)
}
2. Replay 수행시 Read Model 초기화 작업이 이루어지지 않으면, 남아있는 데이터에 이벤트가 적용되므로 데이터 정합성이 맞지 않습니다. 따라서 Replay가 수행되기전, 대상 테이블 또한 초기화가 선행 되어야합니다. 이를 위해 @ResetHandler 어노테이션을 추가한 메소드를 정의하여 초기화 작업을 처리합니다.
HolderAccountProjection.java
@Component
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
(...중략...)
@ResetHandler
private void resetHolderAccountInfo(){
log.debug("reset triggered");
repository.deleteAll();
}
}
3. Replay시 적용 대상 Handler 메소드에 @AllowReplay 어노테이션을 추가합니다.(Optional) 만약 Replay 대상에서 해당 Event는 처리하고 싶지 않을 경우에는 @DisallowReplay를 추가합니다. 예제에서는 전체 Replay 재생을 위해 @AllowReplay 어노테이션만 추가했습니다.
1~3번 과정을 모두 적용한 Projection 클래스 코드는 다음과 같습니다.
HolderAccountProjection.java
@Component
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
private final AccountRepository repository;
@EventHandler
@AllowReplay
protected void on(HolderCreationEvent event, @Timestamp Instant instant) {
log.debug("projecting {} , timestamp : {}", event, instant.toString());
HolderAccountSummary accountSummary = HolderAccountSummary.builder()
.holderId(event.getHolderID())
.name(event.getHolderName())
.address(event.getAddress())
.tel(event.getTel())
.totalBalance(0L)
.accountCnt(0L)
.build();
repository.save(accountSummary);
}
@EventHandler
@AllowReplay
protected void on(AccountCreationEvent event, @Timestamp Instant instant){
log.debug("projecting {} , timestamp : {}", event, instant.toString());
HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
holderAccount.setAccountCnt(holderAccount.getAccountCnt()+1);
repository.save(holderAccount);
}
@EventHandler
@AllowReplay
protected void on(DepositMoneyEvent event, @Timestamp Instant instant){
log.debug("projecting {} , timestamp : {}", event, instant.toString());
HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
holderAccount.setTotalBalance(holderAccount.getTotalBalance() + event.getAmount());
repository.save(holderAccount);
}
@EventHandler
@AllowReplay
protected void on(WithdrawMoneyEvent event, @Timestamp Instant instant){
log.debug("projecting {} , timestamp : {}", event, instant.toString());
HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
holderAccount.setTotalBalance(holderAccount.getTotalBalance() - event.getAmount());
repository.save(holderAccount);
}
private HolderAccountSummary getHolderAccountSummary(String holderID) {
return repository.findByHolderId(holderID)
.orElseThrow(() -> new NoSuchElementException("소유주가 존재하지 않습니다."));
}
@ResetHandler
private void resetHolderAccountInfo(){
log.debug("reset triggered");
repository.deleteAll();
}
}
4. Reset 수행 EndPoint를 구현하기 위해 controller 패키지 및 클래스를 생성합니다.
5. controller 클래스를 구현합니다.
HolderAccountController.java
@RestController
@RequiredArgsConstructor
public class HolderAccountController {
private final QueryService queryService;
@PostMapping("/reset")
public void reset() {
queryService.reset();
}
}
6. Service Package 및 Service 클래스를 생성합니다.
7. 서비스 클래스를 구현합니다.
QueryService.java
public interface QueryService {
void reset();
}
QueryServiceImpl.java
@RequiredArgsConstructor
@Service
public class QueryServiceImpl implements QueryService{
private final Configuration configuration;
@Override
public void reset() {
configuration.eventProcessingConfiguration()
.eventProcessorByProcessingGroup("accounts",
TrackingEventProcessor.class)
.ifPresent(trackingEventProcessor -> {
trackingEventProcessor.shutDown();
trackingEventProcessor.resetTokens(); // (1)
trackingEventProcessor.start();
});
}
}
실제 Token 초기화는 resetTokens 메소드를 통해서 이루어집니다. 해당 작업을 위해서는 EventProcessor의 재시작이 필요합니다.
8. Query App 재기동 후 API 테스트(POST : http://localhost:9090/reset)를 수행합니다. Application 로그 및 DB를 확인하면 정상적으로 Replay가 이루어졌음을 확인할 수 있습니다.
5. 마치며
EventHandler 메소드 적용에 따른 Read Model 구현을 완성했습니다. 다음 포스팅에서는 Replay 성능 개선 방법에 대하여 다루도록 하겠습니다.
'MSA > AxonFramework' 카테고리의 다른 글
14. Query 어플리케이션 구현(Event) - 4 (0) | 2020.01.11 |
---|---|
13. Query 어플리케이션 구현(Event) - 3 (0) | 2020.01.09 |
11. Query 어플리케이션 구현(Event) - 1 (0) | 2020.01.06 |
10. EventStore (0) | 2020.01.02 |
9. Command 어플리케이션 구현 - 4 (0) | 2019.12.31 |