1. 서론
EventSourcing + CQRS 예제 프로젝트 개요 포스팅을 통해 구현할 프로젝트 소개 및 Command와 Event를 도출했습니다. 이번 시간에는 Command, Event, Aggregate 기본 구조를 구현해보도록 하겠습니다.
2. Event 구현
Event 클래스는 Command와 Query 둘다 사용되므로 공통 모듈에서 구현하겠습니다.
1. Common 모듈에 존재하는 build.gradle 파일을 엽니다. 이후 롬복 사용 및 공통 모듈 컴파일 시 jar파일 생성을 위하여 다음과 같이 작성합니다.
bootJar {
enabled = false
}
jar {
enabled = true
}
dependencies{
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
2. Common 모듈 패키지 생성을 위해 src > main > java 디렉토리 선택 후 [Alt + Insert] 키를 누릅니다. 이후 package 탭을 선택하고 임의의 package 명을 입력한 다음 OK 버튼을 누릅니다.
3. 생성된 패키지 하위에 Event 클래스 4개를 생성합니다.
4. Event 클래스를 구현합니다.
HolderCreationEvent.java
@AllArgsConstructor
@ToString
@Getter
public class HolderCreationEvent {
private String holderID;
private String holderName;
private String tel;
private String address;
}
AccountCreationEvent.java
@AllArgsConstructor
@ToString
@Getter
public class AccountCreationEvent {
private String holderID;
private String accountID;
}
DepositMoneyEvent.java
@AllArgsConstructor
@ToString
@Getter
public class DepositMoneyEvent {
private String holderID;
private String accountID;
private Long amount;
}
WithdrawMoneyEvent.java
@AllArgsConstructor
@ToString
@Getter
public class WithdrawMoneyEvent {
private String holderID;
private String accountID;
private Long amount;
}
3. Command 구현
만약 Command을 요청하는 App이 실제 처리하는 App과 동일하지 않다면, Command 또한 공통 모듈에 작성하는 것이 바람직합니다. 하지만 데모 프로젝트에서는 Command App에서 모두 처리할 것이므로 Command 모듈내 구현하도록 하겠습니다.
1. Command 모듈내에 위치한 패키지 최하위에 commands 패키지를 추가합니다.
2. 생성된 패키지 하위에 Command 클래스 4개를 생성합니다.
3. Command 클래스를 구현합니다.
HolderCreationCommand.java
@AllArgsConstructor
@ToString
@Getter
public class HolderCreationCommand {
@TargetAggregateIdentifier
private String holderID;
private String holderName;
private String tel;
private String address;
}
AccountCreationCommand.java
@AllArgsConstructor
@ToString
@Getter
public class AccountCreationCommand {
@TargetAggregateIdentifier
private String holderID;
private String accountID;
}
DepositMoneyCommand.java
@AllArgsConstructor
@ToString
@Getter
public class DepositMoneyCommand {
@TargetAggregateIdentifier
private String accountID;
private String holderID;
private Long amount;
}
WithdrawMoneyCommand.java
@AllArgsConstructor
@ToString
@Getter
public class WithdrawMoneyCommand {
@TargetAggregateIdentifier
private String accountID;
private String holderID;
private Long amount;
}
Event와 달리 Command 클래스에는 @TargetAggregateIdentifier 어노테이션이 붙었습니다. 이는 AxonFramework 모델링의 단위가 Aggregate이고 각 Aggregate마다 고유한 식별자가 부여되어야 하기 때문입니다. 따라서 Command 클래스를 디자인 할때에도 어떤 Aggregate를 대상으로 명령을 수행할 것인지 알아야 하기 때문에 대상 Aggregate의 식별자 지정이 필요합니다.
4. Aggregate 구현
도메인 주도 개발 방법론(DDD)을 배우면 반드시 등장하는 개념이 Aggregate입니다. AxonFramework 에서도 DDD 기반으로 설계되었기에 Aggregate가 필요합니다. 데모 프로젝트에서는 Holder와 Account 연관된 Aggregate를 구현하도록 하겠습니다.
1. Command 모듈내 aggregate 패키지를 생성후 Aggregate 클래스 2개를 생성합니다.
2. Aggregate 클래스를 구현합니다.
HolderAggregate.java
@RequiredArgsConstructor
@Aggregate
public class HolderAggregate {
@AggregateIdentifier
private String holderID;
private String holderName;
private String tel;
private String address;
}
AccountAggregate.java
@RequiredArgsConstructor
@Aggregate
public class AccountAggregate {
@AggregateIdentifier
private String accountID;
private String holderID;
private Long balance;
}
Aggregate 클래스에는 클래스 위에 Aggregate임을 알려주는 Annotation을 추가합니다. 또한 Aggregate 별로 식별자가 반드시 존재해야되기 때문에 유일성을 갖는 대표키 속성에 @AggregateIdentifier을 추가합니다.
AxonFramework 에서는 모든 명령(Command)과 이벤트(Event)가 Aggregate에서 발생합니다. 따라서 Aggregate에 대한 명령과 이벤트를 처리할 수 있는 Handler 메소드 작성이 필요합니다. 또한 기본적으로 Event Sourcing 패턴을 사용하기 때문에 명령이 발생한 Event를 적용하는 단계가 필요합니다.
Handler는 대개 Aggregation 클래스에서 정의하며, 외부 클래스에서 별도 정의도 가능합니다. 데모에서는 Aggregate에서 정의하는 방법을 사용할 것이며 외부 정의 방식은 Axon 공식 문서를 참조 바랍니다.
AxonFramework를 사용함에 있어 주로 사용하는 Handler Annotation은 다음과 같습니다.
- @CommandHandler : Aggregate에 대한 명령이 발생되었을 때 호출되는 메소드임을 알려주는 마커 역할
- @EventSourcingHandler : CommandHandler에서 발생한 이벤트를 적용하는 메소드임을 알려주는 마커 역할
- @EventHandler : Query 모델 혹은 이벤트 발생시 해당 이벤트를 적용하는 메소드임을 알려주는 마커 역할
HolderAggregation 클래스를 대상으로 계정 생성 명령(HolderCreationCommand)과 이로인해 발생하는 계정 생성 이벤트(HolderCreationEvent) 처리하는 Handler 메소드를 작성하면 다음과 같습니다.
HolderAggregation.java
@RequiredArgsConstructor
@Aggregate
public class HolderAggregate {
@AggregateIdentifier
private String holderID;
private String holderName;
private String tel;
private String address;
@CommandHandler
public HolderAggregate(HolderCreationCommand command) {
apply(new HolderCreationEvent(command.getHolderID(), command.getHolderName(), command.getTel(), command.getAddress()));
}
@EventSourcingHandler
protected void createAccount(HolderCreationEvent event){
this.holderID = event.getHolderID();
this.holderName = event.getHolderName();
this.tel = event.getTel();
this.address = event.getAddress();
}
}
소스코드를 보면 @CommandHandler와 @EventSourcingHandler 어노테이션이 추가된 것을 확인할 수 있습니다.
먼저 CommandHandler 메소드부터 살펴보겠습니다. 위 코드에서 CommandHandler는 생성자에 추가되었습니다. 이는 계정 생성 명령은 곧 HolderAggregate의 생성을 의미하는 것이기 때문입니다. 해당 메소드 안에 apply는 AggregateLifeCycle 클래스의 static 메소드이며, 해당 메소드를 통해서 이벤트를 발행합니다.
EventSourcingHandler 메소드는 CommandHandler에서 기존에 발행된 이벤트 및 현재 발생한 Command 이벤트를 적용하는 역할을 수행합니다.
위 그림은 HolderCreationCommand 명령이 발생되었을 때, 수행되는 내부 흐름을 간략하게 표현했습니다.
- 사용자로부터 Command 명령을 CommandGateway로 전달하면, 메시지 변환 과정(GenericCommandMessage)을 거쳐 CommandBus로 전달합니다.
- CommandBus는 Command 명령을 Axon Server로 전송합니다.
- AxonServer에서 명령을 Command Bus를 통해 해당 Command를 처리할 App에게 전달합니다.
- UnitOfWork(4~7 단계)를 수행합니다. 이 과정에서 Chain으로 연결된 handler 들을 거치면서 대상 Aggregate에 대하여 EventStore로부터 과거 이벤트들을 Loading 하여 최신 상태로 만듭니다. 이후 해당 Command와 연결된 Handler 메소드를 Reflection을 활용하여 호출합니다.
- CommandHandler 메소드를 호출하는 과정에서 apply 메소드 호출을 통해 이벤트(HolderCreationEvent)를 발행합니다.
- 발행된 Event는 내부 로직을 거치면서 Event 처리를 수행할 Handler를 매핑한 후 EventSourcingHandler 메소드를 Reflection을 활용하여 호출합니다.
- EventSourcingHandler 호출이 완료되면, EventBus를 통해 Event 발행을 요청합니다.(publish)
- EventBus는 이벤트를 Axon Server에 전달합니다.
- EventStore인 Axon Server에서는 전달받은 Event를 저장소에 기록합니다.
- 메시지 라우팅 기능을 담당하는 Axon Server는 연결된 App을 대상으로 Event를 전파합니다.
(※ AxonServer와 App간의 연결은 grpc를 사용합니다.)
위와 같이 간단하게 Handler 메소드 두개 작성했을 뿐인데, 내부 로직은 복잡합니다.
이번에는 AccountAggregate 클래스를 구현해보도록 하겠습니다.
AccountAggregate.java
@RequiredArgsConstructor
@Aggregate
public class AccountAggregate {
@AggregateIdentifier
private String accountID;
private String holderID;
private Long balance;
@CommandHandler
public AccountAggregate(AccountCreationCommand command) {
apply(new AccountCreationEvent(command.getHolderID(),command.getAccountID()));
}
@EventSourcingHandler
protected void createAccount(AccountCreationEvent event){
this.accountID = event.getAccountID();
this.holderID = event.getHolderID();
this.balance = 0L;
}
@CommandHandler
protected void depositMoney(DepositMoneyCommand command){
if(command.getAmount() <= 0) throw new IllegalStateException("amount >= 0");
apply(new DepositMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
}
@EventSourcingHandler
protected void depositMoney(DepositMoneyEvent event){
this.balance += event.getAmount();
}
@CommandHandler
protected void withdrawMoney(WithdrawMoneyCommand command){
if(this.balance - command.getAmount() < 0) throw new IllegalStateException("잔고가 부족합니다.");
else if(command.getAmount() <= 0 ) throw new IllegalStateException("amount >= 0");
apply(new WithdrawMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
}
@EventSourcingHandler
protected void withdrawMoney(WithdrawMoneyEvent event){
this.balance -= event.getAmount();
}
}
코드를 보면 모든 예외 처리 및 유효성 검증을 CommandHandler에서 하고 있습니다. 이는 EventStore에 적재된 모든 Event는 재생해야할 대상이기 때문에 EventSourcingHandler에서는 Replay만 수행합니다. 따라서 CommandHandler에서 사전 Exception 처리 및 유효성 검증을 통해서 검증된 Event만을 발행해야합니다.
5. Axon Server 라우팅 기능(Command)
지금까지 AxonFramework를 사용하는 Client 입장에서 동작 원리를 살펴봤습니다. 이번에는 Server 입장에서 메시지 라우팅이 어떻게 이루어지는지 살펴보도록 하겠습니다.
상황 1. Command를 처리하는 Handler가 하나만 Axon Server에 등록된 경우
Application 기동시 AxonServer와 연결을 시도합니다. 연결이 완료되면, 해당 App은 자신이 처리가능한 Command Handler 정보를 Server에 등록합니다.
이때 다른 App에서 Command 명령을 요청하게되면 AxonServer에서는 해당 Command를 수행할 수 있는 App을 알기 때문에 해당 App으로 Command 명령을 전달합니다.
상황2. 동일한 Command를 처리하는 Handler가 복수개 등록된 경우
동일한 Command A를 처리할 수 있는 Handler 메소드를 포함하는 App이 복수개로 등록되었을 경우 내부 흐름은 다음과 같습니다.
Axon Server에서는 Command가 도착할 경우 어떤 App에서 수행해야할지를 결정 해야합니다. 따라서 이를 위해 라우팅 테이블에 두 App의 정보를 등록합니다.
라우팅 테이블에는 어떤 노드들이 Server와 연결되어있고, 해당 노드들이 어떤 Command를 처리할 수 있는지에 대한 정보가 담겨있습니다. 내부 아키텍처는 Consistenet Hashing 기법을 사용하고 있으며, 관련 설명은 charsyam 님 블로그를 참고바랍니다.
이러한 상황에서 새로운 App에서 A Command를 요청했다면, 해당 요청 속에 포함된 라우팅 키를 찾아 라우팅 테이블에서 적합한 App을 선정하여 호출하게 됩니다. 그렇기에 Client Side 모델 데이터가 Sharding 되어있거나 고가용성을 위해 Cluster로 App을 구축했더라도 Command 명령은 정확히 하나의 App Command Handler에만 전달됩니다.
라우팅 키는 @TargetAggregateIdentifier 혹은 @RoutingKey 어노테이션을 Command에 포함시에 자동으로 생성됩니다.
6. 마치며
이번 포스팅에서는 Event, Command, Aggregate 모델을 구현했습니다. 다음 시간에는 API 구현 및 테스트를 통해서 실제 동작하는 과정에대해 알아보도록 하겠습니다.
Tip)
1. AxonServer에 저장된 Event 내역은 DashBoard에서 Search 항목을 누르면 조회할 수 있습니다.
2. Dashboard Commands 탭에서는 등록된 Command Handler 정보 및 현재 수행중인 Command 발생 빈도를 확인할 수 있습니다.
3. Command 명령이 발생하면 내부적으로는 몇차례 Command 메시지 변환 과정을 거쳐 부가적인 정보가 추가됩니다.
1단계
Command :
"CreateHolderCommand(
holderID='1f2cf247-afe7-46d6-bc6d-d588643d6643',
holderName= 'kevin',
tel='1234-5678',
address='서울시')"
callback:
FailureLoggingCallback@12115
2단계 (GenericMessage 변환후 메시지)
commandMessage:
"GenericCommandMessage{
payload={
CreateHolderCommand(
holderID='1f2cf247-afe7-46d6-bc6d-d588643d6643',
holderName= 'kevin',
tel='1234-5678',
address='서울시'),
metadata={},
messageIdentifier = '040ffa0c-5d2d-4588-a04c-8051867d4057',
commandName ='com.cqrs.command.commands.CreateHolderCommand}'"
commandCallback:
FailureLoggingCallback@12115
기본 Command 정보 외 message 식별자 및 Command 패키지 정보 등이 포함되어 있는 것을 확인할 수 있습니다. 참고로 해당 메시지는 CommandGateway의 Send API를 사용했을 때의 메시지 내용입니다.
'MSA > AxonFramework' 카테고리의 다른 글
8. Command 어플리케이션 구현 - 3 (0) | 2019.12.29 |
---|---|
7. Command 어플리케이션 구현 - 2 (2) | 2019.12.28 |
5. Axon Framework 기본 구성 및 Axon Server 연동 (3) | 2019.12.26 |
4. Gradle Multi Project 생성 (0) | 2019.12.25 |
3. EventSourcing & CQRS 예제 프로젝트 개요 (0) | 2019.12.21 |