1. 서론

이번 포스팅은 데모 프로젝트 진행에 있어 필수적으로 구현해야하는 코드는 없습니다. 따라서 Skip해도 괜찮습니다.  이번 시간에는 상태를 저장하는 State-Stored Aggregate에 대해서 알아보겠습니다.


2. Aggregate 종류

 

AxonFramework 에서 Aggregate의 종류는 크게 두 가지로 분류할 수 있습니다. 

 

  1. EventSourced Aggregate
  2. State-Stored Aggregate

 

EventSourced Aggregate는 기존 Command 어플리케이션을 제작하는 과정에서 구현한 모델 방식입니다. 즉 EventStore로부터 Event를 재생하면서 모델의 최신상태를 만듭니다.

 

출처 : https://altkomsoftware.pl/en/blog/cqrs-event-sourcing/

 

이와 반대로 State-Stored Aggregate는 위 그림과 같이 EventStore에 Event를 적재와 별개로 모델 자체에 최신 상태를 DB에 저장합니다. 데모 프로젝트 Aggregate 구조 변경을 통해 Command DB에 모델을 생성하는 방법에 대해서 알아봅시다.


3. State-Stored Aggregate

 

데모 프로젝트에는 Holder와 Account Entitiy가 존재합니다. Command 모델에서는 해당 Entitiy 관계를 분리하여 표현할 것이며 모델 구현은 JPA를 사용하겠습니다. 먼저 Command와 Query DB에 적재될 테이블 구조를 살펴보겠습니다.

 

 

ERD

 

Command 모델

 

Query 모델


3. Aggregate 구현

 

State-Stored Aggregate 구현을 위해 기존 코드를 단계적으로 변경하겠습니다.

 

1. HolderAggregate와 AccountAggregate 구조 변경을 통해 상태를 저장할 수 있도록 구현합니다.

 

HolderAggregate.java

@AllArgsConstructor
@NoArgsConstructor
@Aggregate
@Slf4j
@Entity(name = "holder")
@Table(name = "holder")
public class HolderAggregate {
    @AggregateIdentifier
    @Id
    @Column(name = "holder_id")
    @Getter
    private String holderID;
    @Column(name = "holder_name")
    private String holderName;
    private String tel;
    private String address;

    @OneToMany(mappedBy = "holder", orphanRemoval = true)
    private List<AccountAggregate> accounts = new ArrayList<>();

    public void registerAccount(AccountAggregate account){
        if(!this.accounts.contains(account))
            this.accounts.add(account);
    }
    public void unRegisterAccount(AccountAggregate account){
        this.accounts.remove(account);
    }

    @CommandHandler
    public HolderAggregate(HolderCreationCommand command) {
        log.debug("handling {}", command);

        this.holderID = command.getHolderID();
        this.holderName = command.getHolderName();
        this.tel = command.getTel();
        this.address = command.getAddress();

        apply(new HolderCreationEvent(command.getHolderID(), command.getHolderName(), command.getTel(), command.getAddress()));
    }
}

 

@Entity 어노테이션을 통해 대상 Aggregate가 JPA에서 관리되는 Entity임을 명시했습니다. 또한 Aggregate 식별자에 @Id 어노테이션을 추가하여 대상 속성이 PK임을 표시합니다.

 

HolderAggregate는 AccountAggregate와 1:N 관계를 맺고 있으므로 양방향 관계 설정 했으며, HolderAggregate가 삭제될 경우 AccountAggregate도 삭제되도록 orphanRemovel 옵션을 추가했습니다.

 

마지막으로 양방향 관계 설정 시 연관관계 편의 메소드 제공을 위해 registerAccount, unRegisterAccount 메소드를 추가했습니다. 

(참고 : 양방향 연관관계 편의 메소드)

 

혹시 위 Aggregate 코드에서 JPA 코드 추가 외에 혹시 이상한 점을 눈치채셨나요?

 

바로 EventSourcingHandler 메소드가 사라졌습니다. State-Stored Aggregate 모델은 모델 자체가 최신 상태를 유지하고 있으므로 EventStore로부터 Replay를 할 필요가 없습니다. 따라서 CommandHandler 메소드 내에서 Command 상태를 저장하는 로직을 포함시켜야 합니다.

 

이번에는 AccountAggreagte 클래스를 변경하겠습니다.

 

AccountAggregate.java

@NoArgsConstructor
@AllArgsConstructor
@Slf4j
@Aggregate
@EqualsAndHashCode
@Entity(name = "account")
@Table(name = "account")
public class AccountAggregate {
    @AggregateIdentifier
    @Id
    @Column(name = "account_id")
    private String accountID;

    @ManyToOne
    @JoinColumn(name = "holder_id", foreignKey = @ForeignKey(name = "FK_HOLDER"))
    private HolderAggregate holder;
    private Long balance;

    public void registerHolder(HolderAggregate holder){
        if(this.holder != null)
            this.holder.unRegisterAccount(this);
        this.holder = holder;
        this.holder.registerAccount(this);
    }

    @CommandHandler
    public AccountAggregate(AccountCreationCommand command) {
        log.debug("handling {}", command);
        this.accountID = command.getAccountID();
        HolderAggregate holder = command.getHolder();
        registerHolder(holder);
        this.balance = 0L;
        apply(new AccountCreationEvent(holder.getHolderID(),command.getAccountID()));
    }
    @CommandHandler
    protected void depositMoney(DepositMoneyCommand command){
        log.debug("handling {}", command);
        if(command.getAmount() <= 0) throw new IllegalStateException("amount >= 0");
        this.balance += command.getAmount();
        log.debug("balance {}", this.balance);
        apply(new DepositMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @CommandHandler
    protected void withdrawMoney(WithdrawMoneyCommand command){
        log.debug("handling {}", command);
        if(this.balance - command.getAmount() < 0) throw new IllegalStateException("잔고가 부족합니다.");
        else if(command.getAmount() <= 0 ) throw new IllegalStateException("amount >= 0");
        this.balance -= command.getAmount();
        log.debug("balance {}", this.balance);
        apply(new WithdrawMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
}

 

HolderAggregate에서 설명한 부분은 제외하고 추가된 점은 Account 모델이 Holder 모델에 대하여 FK를 지니고 있으므로 관계상에서 FK를 명시하였습니다. 


2. HolderAggregate 클래스 생성을 위한 Repository 패키지 및 클래스를 생성합니다.

 


3. HolderRepository 클래스를 구현합니다.

@Repository
public interface HolderRepository extends JpaRepository<HolderAggregate,String> {
    Optional<HolderAggregate> findHolderAggregateByHolderID(String id);
}

4. 객체 대상으로 연관관계를 변경하였기 때문에 AccountCreateCommand 클래스를 수정합니다.

 


5. Service 클래스를 수정합니다.

 

@Service
@RequiredArgsConstructor
public class TransactionServiceImpl implements TransactionService {
    private final CommandGateway commandGateway;
    private final HolderRepository holders;

...(중략)...

    @Override
    public CompletableFuture<String> createAccount(AccountDTO accountDTO) {
        HolderAggregate holder = holders.findHolderAggregateByHolderID(accountDTO.getHolderID())
                                       .orElseThrow( () -> new IllegalAccessError("계정 ID가 올바르지 않습니다."));
        return commandGateway.send(new AccountCreationCommand(UUID.randomUUID().toString(),holder));
    }

...(중략)...
}

6. Snapshot 설정을 위해 생성한 Configuration 속성을 적용하지 않도록 AxonConfig 클래스 @Configuration 어노테이션을 주석처리 합니다.

 

//@Configuration
//@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
...(중략)...
}

 

이로써 State-Stored Aggregate 구현에 대한 코드 변경은 끝났습니다.


4. 테스트

 

1. Application 실행 후 계정 생성 API 테스트를 진행합니다.

POST http://localhost:8080/holder
Content-Type: application/json

{
	"holderName" : "kevin",
	"tel" : "02-1234-5678",
	"address" : "OO시 OO구"
}

 

2. DB에서 계정 데이터 생성 여부를 확인합니다.

 


3. 계좌 생성 API를 호출합니다.

POST http://localhost:8080/account
Content-Type: application/json

{
  "holderID" : "486832c2-b606-470d-949a-9f9d8613b112"
}

 

4. DB에서 계좌 데이터 생성 여부를 확인합니다.

 


5. 입금 API를 호출합니다.

POST http://localhost:8080/deposit
Content-Type: application/json

{
  "accountID" : "9274bb43-ca87-4aa4-b1b4-0363382ad6fb",
  "holderID" : "486832c2-b606-470d-949a-9f9d8613b112",
  "amount" : 300
}

 

6. DB에서 해당 계정 잔고를 확인합니다.

 

 

정상적으로 입금된 것을 확인할 수 있습니다.


7. 출금 API를 4번 연속으로 호출합니다.

POST http://localhost:8080/withdrawal
Content-Type: application/json

{
  "accountID" : "9274bb43-ca87-4aa4-b1b4-0363382ad6fb",
  "holderID" : "486832c2-b606-470d-949a-9f9d8613b112",
  "amount" : 1
}

8. DB에서 출금 내역을 확인합니다.

 

 

정상적으로 출금된 것을 확인할 수 있습니다. 

o.a.commandhandling.SimpleCommandBus     : Handling command [com.cqrs.command.commands.WithdrawMoneyCommand]
c.c.command.aggregate.AccountAggregate   : handling WithdrawMoneyCommand(accountID=9274bb43-ca87-4aa4-b1b4-0363382ad6fb, holderID=486832c2-b606-470d-949a-9f9d8613b112, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 296
org.axonframework.messaging.Scope        : Clearing out ThreadLocal current Scope, as no Scopes are present

 

4번 연속 출금 후 Application의 로그를 일부 발췌하였습니다. 기존과 다른점은 EventSourced Aggregate의 경우에는 Replay를 위해 EventStore의 I/O 과정이 필요했지만 State-Stored Aggregate는 상태를 보관하므로 상태 복원 과정이 없습니다.


5. 마치며

State-Stored Aggreagte는 Command DB에 최신 상태를 보관합니다. 이로인해 매번 EventStore를 통해서 Replay를 하지 않아도 되는 점은 장점입니다.

 

하지만 만약에 테이블 데이터가 손실이 되어서 복구가 필요한 경우 Replay를 자동으로 수행되지 않으므로 별도로 EventSourcing 하는 작업을 구현해야 할 수 있습니다. 물론 DBMS 자체 복구 기능을 이용할 수도 있습니다. 하지만 Media Recovery가 불가피하다면 DB 서비스 중단이 발생합니다. 따라서 Aggregate별 사용 장단점을 인지한 다음 적절한 사용이 필요합니다.

 

이상으로 Command Application 구현 포스팅 마치도록 하겠습니다. 

1. 서론

지난 포스팅에서 Command Application에 대한 전반적인 구현을 마무리했습니다. 하지만 해당 프로그램은 근본적인 문제점을 안고 있습니다. 이번 포스팅에서는 발생되는 문제점과 이를 해결하기 위한 방법에 대하여 살펴보겠습니다.


2. 문제점 도출

아래 테이블은 계정 생성 Command를 실행했을 때 EventStore에 저장되는 데이터 중 일부를 발췌한 것입니다.

 

글로벌 인덱스 Payload 

Payload 종류

발생 시간 Aggregate 식별자 시퀀스 번호 타입
1   com.cqrs.events.HolderCreationEvent 2019-12-29T05:34:49.2527378Z 70f956e3-069c-4666-b0f4-324dfb0a807e 0 HolderAggregate

 

공간 부족으로 위 데이터에서 Payload 데이터만 따로 뽑아보면 다음과 같습니다. 

 

<com.cqrs.events.HolderCreationEvent>
  <holderID>70f956e3-069c-4666-b0f4-324dfb0a807e</holderID>
  <holderName>kevin</holderName>
  <tel>02-1234-5678</tel>
  <address>OO시 OO구/address>
</com.cqrs.events.HolderCreationEvent>

Payload에는 Event 내용이 담겨있습니다. 따라서 EventSourcing 및 Event Handler에서는 Payload 내용을 기준으로 Event를 처리합니다.

 

계정 생성만 완료된 상황에서 추가로 계좌 생성 > 계좌 입금(300원) > 인출 5회(1원씩)을 진행한 후 EventStore를 살펴보면 다음과 같습니다.

 

글로벌 인덱스

Payload 종류

Aggregate 식별자 시퀀스 번호 타입
1 com.cqrs.events.HolderCreationEvent 70f956e3-069c-4666-b0f4-324dfb0a807e 0 HolderAggregate
2 com.cqrs.events.AccountCreationEvent c65f80c3-9c44-4ca6-a977-72983a675203 0 AccountAggregate
3 com.cqrs.events.DepositMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 1 AccountAggregate
4 com.cqrs.events.WithdrawMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 2 AccountAggregate
5 com.cqrs.events.WithdrawMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 3 AccountAggregate
6 com.cqrs.events.WithdrawMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 4 AccountAggregate
7 com.cqrs.events.WithdrawMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 5 AccountAggregate
8 com.cqrs.events.WithdrawMoneyEvent c65f80c3-9c44-4ca6-a977-72983a675203 6 AccountAggregate

(※ 공간 부족으로 Payload 및 이벤트 발생 시간 등은 제외하였습니다.)

 

만약 이러한 상황에서  c65f80c3-9c44-4ca6-a977-72983a675203 식별자를 지닌 AccountAggregate 에서 1원을 인출하는 명령이 발생된다면 내부적으로는 어떠한 과정을 거칠까요?

 

 

Command 어플리케이션 구현 - 1 포스팅에서 소개한 Command 이벤트 수행 내부 흐름도입니다. 당시 4번 과정에 대해서 다음과 같이 소개했습니다.

 

4. UnitOfWork 수행합니다. 이 과정에서 Chain으로 연결된 handler 들을 거치면서 대상 Aggregate에 대하여 EventStore로부터 과거 이벤트들을 Loading 하여 최신 상태로 만듭니다. 이후 해당 Command와 연결된 Handler 메소드를 Reflection을 활용하여 호출합니다.

 

즉 새로운 명령을 수행하기 위해서는 c65f80c3-9c44-4ca6-a977-72983a675203 식별자를 지닌 Aggregate를 대상으로 기존에 발행된 7개의 이벤트를 EventStore에서 읽어와 Loading하는 작업이 선행됩니다. 그 결과 최신 상태로 Aggregate를 만든 이후에 새 Command 적용 및 Event를 발생시킵니다.

 

o.a.commandhandling.SimpleCommandBus     : Handling command [com.cqrs.command.commands.WithdrawMoneyCommand]
c.c.command.aggregate.AccountAggregate   : applying AccountCreationEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203)
c.c.command.aggregate.AccountAggregate   : applying DepositMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=300)
c.c.command.aggregate.AccountAggregate   : balance 300
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 299
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 298
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 297
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 296
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 295
org.axonframework.messaging.Scope        : Clearing out ThreadLocal current Scope, as no Scopes are present
c.c.command.aggregate.AccountAggregate   : handling WithdrawMoneyCommand(accountID=c65f80c3-9c44-4ca6-a977-72983a675203, holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, amount=1)
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 294

 

위 코드는 Application에서 수행된 로그 중 일부를 발췌한 내용입니다.

이를 통해 알 수 있는 사실은 동일 Aggregate에 대해서 Event 갯수가 늘어날 수록 새로운 Command를 적용하는데 오랜 시간이 소요된다는 점입니다.


3. 개선 방안(Snapshot)

EventSourcing 패턴을 적용하는 Application에는 이전 단계에서 확인한 근본적인 문제점을 안고 있습니다. 따라서 이를 완화하기 위해서 일정 주기별로 Aggregate에 대한 Snapshot을 생성해야합니다.

 

 

Snapshot이란 특정 시점의 Aggregate의 상태를 말합니다. 일반적으로 EventStore에는 Aggregate의 상태를 저장하지 않고 이벤트만 저장합니다. 하지만 특정 시점의 Aggregate의 상태를 저장하여 Loading 과정에서 Snapshot 이후 Event만 Replay하여 빠르게 Aggregate Loading이 가능합니다.

 

AxonFramework에서도 Configuration 설정을 통해서 Aggregate 별로 Snapshot 설정이 가능합니다. Snapshot 설정에는 특정 Threshold를 넘어가면 생성되며, Snapshot 적용 예제를 통해 문제점을 완화해보도록 하겠습니다.

 

1. Command 모듈 AxonConfig 파일을 오픈합니다.

 


2. AxonConfig 클래스에 내용을 추가합니다.

 

AxonConfig.java

@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
    @Bean
    SimpleCommandBus commandBus(TransactionManager transactionManager){
        return  SimpleCommandBus.builder().transactionManager(transactionManager).build();
    }
    @Bean
    public AggregateFactory<AccountAggregate> aggregateFactory(){
        return new GenericAggregateFactory<>(AccountAggregate.class);
    }
    @Bean
    public Snapshotter snapshotter(EventStore eventStore, TransactionManager transactionManager){
        return AggregateSnapshotter
                .builder()
                    .eventStore(eventStore)
                    .aggregateFactories(aggregateFactory())
                    .transactionManager(transactionManager)
                .build();
    }
    @Bean
    public SnapshotTriggerDefinition snapshotTriggerDefinition(EventStore eventStore, TransactionManager transactionManager){
        final int SNAPSHOT_TRHRESHOLD = 5;
        return new EventCountSnapshotTriggerDefinition(snapshotter(eventStore,transactionManager),SNAPSHOT_TRHRESHOLD);
    }

    @Bean
    public Repository<AccountAggregate> accountAggregateRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition){
        return EventSourcingRepository
                .builder(AccountAggregate.class)
                    .eventStore(eventStore)
                    .snapshotTriggerDefinition(snapshotTriggerDefinition)
                .build();
    }
}

 

 

위 코드는 AccountAggregate 기준으로 Snapshot을 설정하도록 작성된 코드입니다. SnapshotTriggerDefinition을 통하여 Aggregate의 발행된 Event가 5개 이상일 경우 Snapshot을 생성하도록 지정하였습니다. Threshold 값에는 얼마를 지정 해야한다는 기준은 없으며, 비즈니스 로직에 따라 생성 주기를 조절하면 됩니다.


Snapshot 설정을 완료한 다음 Application을 재시작 한다음 다시 API 테스트를 하면 수행 당시에는 Snapshot이 존재하지 않기 때문에 전체를 Loading 합니다. 이때 Event를 적용하는 과정에서 Threshold 값을 넘었기 때문에 EventStore에 Snapshot을 새롭게 생성합니다. 

 

Aggregate 식별자 시퀀스 번호 

타입

Payload Payload 타입
c65f80c3-9c44-4ca6-a977-72983a675203 8 AccountAggregate   com.cqrs.command.aggregate.AccountAggregate

생성된 Snapshot 데이터 중 일부를 발췌했습니다. 특정 시퀀스 번호에 해당되는 Aggregate에 대한 상태 정보가 기입되었으며, 상태정보는 Payload에 담겨있습니다.

 

Payload 내용

<com.cqrs.command.aggregate.AccountAggregate>
  <accountID>c65f80c3-9c44-4ca6-a977-72983a675203</accountID>
  <holderID>70f956e3-069c-4666-b0f4-324dfb0a807e</holderID>
  <balance>293</balance>
</com.cqrs.command.aggregate.AccountAggregate>

 

Snapshot 생성 이후 다시 1원을 인출하게되면, 이전 시퀀스 번호인 8번 Snaphot이 존재하므로 전체 Event를 읽어오지 않고 Snapshot정보를 읽어온 다음 Command 명령을 수행합니다.

 

o.a.commandhandling.SimpleCommandBus     : Handling command [com.cqrs.command.commands.WithdrawMoneyCommand]
org.axonframework.messaging.Scope        : Clearing out ThreadLocal current Scope, as no Scopes are present
c.c.command.aggregate.AccountAggregate   : handling WithdrawMoneyCommand(accountID=c65f80c3-9c44-4ca6-a977-72983a675203, holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, amount=1)
c.c.command.aggregate.AccountAggregate   : applying WithdrawMoneyEvent(holderID=70f956e3-069c-4666-b0f4-324dfb0a807e, accountID=c65f80c3-9c44-4ca6-a977-72983a675203, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 292
org.axonframework.messaging.Scope        : Clearing out ThreadLocal current Scope, as no Scopes are present

 

Snapshot 생성 이후 5번의 Event 발생까지는 Snapshot 시점 이전부터 생성된 Event가 재생됩니다. 만약 다시 Threshold를 넘어서게 되면 새로운 Snapshot이 생성되고 그 이후부터는 새로운 Snapshot 이후 Event가 재생됩니다.


4. 성능개선

 

Aggregate를 매번 로딩하면 이를 복원하는데 드는 비용이 지속 수반됩니다. 따라서 자주 사용하는 Aggregate는 Cache를 적용하면 Loading 비용이 줄어들 것입니다. Axon에서 이를 위해 기본적으로 WeakReferenceCache를 제공하며 이를 적용한 Configuration은 다음과 같습니다.

 

AxonConfig.java

@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
    @Bean
    public AggregateFactory<AccountAggregate> aggregateFactory(){
        return new GenericAggregateFactory<>(AccountAggregate.class);
    }
    @Bean
    public Snapshotter snapshotter(EventStore eventStore, TransactionManager transactionManager){
        return AggregateSnapshotter
                .builder()
                    .eventStore(eventStore)
                    .aggregateFactories(aggregateFactory())
                    .transactionManager(transactionManager)
                .build();
    }
    @Bean
    public SnapshotTriggerDefinition snapshotTriggerDefinition(EventStore eventStore, TransactionManager transactionManager){
        final int SNAPSHOT_TRHRESHOLD = 5;
        return new EventCountSnapshotTriggerDefinition(snapshotter(eventStore,transactionManager),SNAPSHOT_TRHRESHOLD);
    }

    @Bean
    public Cache cache(){
        return new WeakReferenceCache();
    }

    @Bean
    public Repository<AccountAggregate> accountAggregateRepository(EventStore eventStore, SnapshotTriggerDefinition snapshotTriggerDefinition, Cache cache){
        return CachingEventSourcingRepository
                .builder(AccountAggregate.class)
                    .eventStore(eventStore)
                    .snapshotTriggerDefinition(snapshotTriggerDefinition)
                    .cache(cache)
                .build();
    }
}

5. 마치며

 

3개의 포스팅을 통해 Command Application을 구현하는 방법에 대해서 살펴보았습니다. 다음 포스팅은 Aggregate 관련 번외편을 진행할 예정입니다. 따라서 데모 프로젝트를 위한 Application 구현은 이번 포스팅이 마지막입니다.

 

1. 서론

지난시간에 이어 Aggregate에 명령을 요청하는 API 구현 및 테스트를 진행하고자 합니다. 이번 포스팅에서는 API 레벨 테스트를 진행할 것이며, Axon에서 제공하는 테스트 관련 클래스 소개는 차후에 진행하겠습니다.

 


2. DTO 구현

EventSourcing & CQRS 예제 프로젝트 개요에서 API 엔드포인트를 도출했습니다. 이를 바탕으로 API 호출시 매핑되는 DTO 클래스 먼저 구현하겠습니다.

 

 

1. dto 패키지를 만듭니다. 그리고 dto 클래스 5개를 만듭니다.

 


2. dto 클래스를 구현합니다.

 

HolderDTO.java

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class HolderDTO {
    private String holderName;
    private String tel;
    private String address;
}

AccountDTO.java

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class AccountDTO {
    private String holderID;
}

TransactionDTO.java

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class TransactionDTO {
    private String accountID;
    private String holderID;
    private Long amount;
}

DepositDTO.java

public class DepositDTO extends TransactionDTO {}

WithdrawalDTO.java

public class WithdrawalDTO extends TransactionDTO {}

 

입금과 출금형식이 동일하므로 TransactionDTO에 공통 속성 구현한다음 상속하였습니다.


3. Service 구현

CommandGateway와의 연결을 위한 Service 클래스를 구현합니다.

 

1. service 패키지 생성 후 service 인터페이스 및 구현 클래스를 생성합니다.

 


2. 인터페이스 정의 및 클래스를 구현합니다.

 

TransactionService.java

public interface TransactionService {
    CompletableFuture<String> createHolder(HolderDTO holderDTO);
    CompletableFuture<String> createAccount(AccountDTO accountDTO);
    CompletableFuture<String> depositMoney(DepositDTO transactionDTO);
    CompletableFuture<String> withdrawMoney(WithdrawalDTO transactionDTO);
}

TransactionServiceImpl.java

@Service
@RequiredArgsConstructor
public class TransactionServiceImpl implements TransactionService {
    private final CommandGateway commandGateway;

    @Override
    public CompletableFuture<String> createHolder(HolderDTO holderDTO) {
        return commandGateway.send(new HolderCreationCommand(UUID.randomUUID().toString()
                , holderDTO.getHolderName()
                , holderDTO.getTel()
                , holderDTO.getAddress()));
    }

    @Override
    public CompletableFuture<String> createAccount(AccountDTO accountDTO) {
        return commandGateway.send(new AccountCreationCommand(accountDTO.getHolderID(),UUID.randomUUID().toString()));
    }

    @Override
    public CompletableFuture<String> depositMoney(DepositDTO transactionDTO) {
        return commandGateway.send(new DepositMoneyCommand(transactionDTO.getAccountID(), transactionDTO.getHolderID(), transactionDTO.getAmount()));
    }

    @Override
    public CompletableFuture<String> withdrawMoney(WithdrawalDTO transactionDTO) {
        return commandGateway.send(new WithdrawMoneyCommand(transactionDTO.getAccountID(), transactionDTO.getHolderID(), transactionDTO.getAmount()));
    }
}

 

 

Service 구현 코드를 보면 직관적으로 이해할 수 있듯이 CommandGateway를 통해 Command를 생성 합니다. 이는 지난 포스팅에서 다룬 Command 수행 내부 흐름 첫번째 단계에 해당합니다.

 

참고로 CommandGateway에서 제공되는 API는 크게 두 가지로 첫째는 위 소스코드에서 사용한 send 메소드이고 나머지는 sendAndWait 메소드입니다. send 메소드는 비동기 방식이며, sendAndWait은 동기 방식의 메소드입니다. 동기 방식의 메소드는 default가 응답이 올때까지 대기하며 이는 호출 후 hang 상태가 지속되면 스레드 고갈이 일어날 수 있습니다. 따라서 메소드 파라미터에 timeout을 지정하여 실패 처리할 수 있습니다. 자세한 내용은 Axon 공식 문서를 참고 바랍니다.


4. Controller 구현

Controller를 통해 API 매핑작업을 수행합니다.

 

1. controller 패키지 만든 후 controller 클래스를 생성합니다.

 


2. Controller 클래스를 구현합니다.

@RestController
@RequiredArgsConstructor
public class TransactionController {
    private final TransactionService transactionService;

    @PostMapping("/holder")
    public CompletableFuture<String> createHolder(@RequestBody HolderDTO holderDTO){
        return transactionService.createHolder(holderDTO);
    }

    @PostMapping("/account")
    public CompletableFuture<String> createAccount(@RequestBody AccountDTO accountDTO){
        return transactionService.createAccount(accountDTO);
    }

    @PostMapping("/deposit")
    public CompletableFuture<String> deposit(@RequestBody DepositDTO transactionDTO){
        return transactionService.depositMoney(transactionDTO);
    }

    @PostMapping("/withdrawal")
    public CompletableFuture<String> withdraw(@RequestBody WithdrawalDTO transactionDTO){
        return transactionService.withdrawMoney(transactionDTO);
    }
}

 

Controller 클래스는 단순히 전달받은 DTO를 Service에 전달하는 역할만 수행하므로 자세한 설명은 생략합니다. 이로써 Command Application 기본 코드 작성은 끝났습니다.


5. Log 설정

 

Command, EventSourcing Handler 메소드가 수행되는 상황을 분석하기 위해서 Logging 설정을 진행합니다.

 

1. resource 폴더 및 application.yml 파일을 연다음 logging 설정을 추가합니다.

 


2. Aggregate 코드에 @Slf4j 어노테이션 추가 및 log 정보를 기록합니다.

 

HolderAggregate.java

@RequiredArgsConstructor
@Aggregate
@Slf4j
public class HolderAggregate {
    @AggregateIdentifier
    private String holderID;
    private String holderName;
    private String tel;
    private String address;

    @CommandHandler
    public HolderAggregate(HolderCreationCommand command) {
        log.debug("handling {}", command);
        apply(new HolderCreationEvent(command.getHolderID(), command.getHolderName(), command.getTel(), command.getAddress()));
    }

    @EventSourcingHandler
    protected void createHolder(HolderCreationEvent event){
        log.debug("applying {}", event);
        this.holderID = event.getHolderID();
        this.holderName = event.getHolderName();
        this.tel = event.getTel();
        this.address = event.getAddress();
    }
}

 

AccountAggregate.java

@RequiredArgsConstructor
@Aggregate
@Slf4j
public class AccountAggregate {
    @AggregateIdentifier
    private String accountID;
    private String holderID;
    private Long balance;

    @CommandHandler
    public AccountAggregate(AccountCreationCommand command) {
        log.debug("handling {}", command);
        apply(new AccountCreationEvent(command.getHolderID(),command.getAccountID()));
    }
    @EventSourcingHandler
    protected void createAccount(AccountCreationEvent event){
        log.debug("applying {}", event);
        this.accountID = event.getAccountID();
        this.holderID = event.getHolderID();
        this.balance = 0L;
    }
    @CommandHandler
    protected void depositMoney(DepositMoneyCommand command){
        log.debug("handling {}", command);
        if(command.getAmount() <= 0) throw new IllegalStateException("amount >= 0");
        apply(new DepositMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @EventSourcingHandler
    protected void depositMoney(DepositMoneyEvent event){
        log.debug("applying {}", event);
        this.balance += event.getAmount();
        log.debug("balance {}", this.balance);
    }
    @CommandHandler
    protected void withdrawMoney(WithdrawMoneyCommand command){
        log.debug("handling {}", command);
        if(this.balance - command.getAmount() < 0) throw new IllegalStateException("잔고가 부족합니다.");
        else if(command.getAmount() <= 0 ) throw new IllegalStateException("amount >= 0");
        apply(new WithdrawMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @EventSourcingHandler
    protected void withdrawMoney(WithdrawMoneyEvent event){
        log.debug("applying {}", event);
        this.balance -= event.getAmount();
        log.debug("balance {}", this.balance);
    }
}

6. API 테스트 코드 작성

Command Application API 테스트를 수행하기 위한 코드를 작성합니다. API 테스트를 위해 Postman을 비롯하여 여러 툴이 있지만, IntelliJhttp 기능을 사용해서 테스트를 진행하도록 하겠습니다.

 

1. Command 모듈 적절한 위치에 http 확장자로 끝나는 파일을 생성합니다.

 


2. http 코드를 작성합니다.

 

POST http://localhost:8080/holder
Content-Type: application/json

{
	"holderName" : "kevin",
	"tel" : "02-1234-5678",
	"address" : "OO시 OO구"
}

###

POST http://localhost:8080/account
Content-Type: application/json

{
  "holderID" : "계정 생성 후 반환되는 UUID"
}

###

POST http://localhost:8080/deposit
Content-Type: application/json

{
  "accountID" : "계좌 생성 후 반환되는 UUID",
  "holderID" : "계정 생성 후 반환되는 UUID",
  "amount" : 300
}

###

POST http://localhost:8080/withdrawal
Content-Type: application/json

{
  "accountID" : "계좌 생성 후 반환되는 UUID",
  "holderID" : "계정 생성 후 반환되는 UUID",
  "amount" : 10
}

###

7. API 테스트 

 

1. AxonServer가 기동된 상태에서 Command App을 수행합니다. 혹시 Axon Server 기동 방법이 궁금하신 분은 AxonServer 설치 및 실행 포스팅을 참고 바랍니다.

 


2. http 파일에서 계정 생성 url에 커서를 위치 시킨다음 [Alt + Enter] 키를 누릅니다.

 


3. Run Localhost:8080 버튼을 눌러 API를 실행합니다.

 

 

4. 정상 실행결과 및 반환된 계정 식별자 값을 확인합니다.

 

또한 위 그림과 같이 Application 로그에도 정상적으로 CommandHanlder 및 EventSourcingHandler 메소드가 수행된 것을 확인할 수 있습니다.


8. 성능 개선

 

데모 프로젝트에서는 Command 명령 생성과 이를 처리하는 Command Handler를 하나의 App에 모두 구현하였음에도 불구하고 위 Application에서는 Command 발행 시 Axon Server와의 통신을 수행합니다. 이는 AxonServer와 연결시 기본적으로 CommandBus로써 AxonServerCommandBus를 사용하기 때문입니다. 

 

이를 개선하기 위해서는 Command 처리시 AxonServer 연결없이 명령을 처리하도록 변경이 필요합니다. AxonFramework에서는 SimpleCommandBus 클래스를 제공하며, 설정을 통해 CommandBus 인터페이스 교체가 가능합니다.

 

설정 변경을 통해 Command Bus를 변경하도록 하겠습니다.

 

1. Command 모듈에 config 패키지 생성 후 AxonConfig 클래스를 생성합니다.

 


2. AxonConfig 클래스를 구현합니다.

 

AxonConfig.java

@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
    @Bean
    SimpleCommandBus commandBus(TransactionManager transactionManager){
        return  SimpleCommandBus.builder().transactionManager(transactionManager).build();
    }
 }

 

AxonFramework는 기본적으로 AxonAutoConfiguration 클래스를 통해 Default 속성을 정의합니다. Custom 속성을 추가하기 위해 Axon 기본 설정이 완료된 후 수행될 수 있도록 @AutoConfigureAfter 어노테이션을 추가했습니다.


3. Application 수행 후 테스트하면 CommandBus가 SimpleCommandBus로 교체된 것을 확인할 수 있습니다.

 o.a.commandhandling.SimpleCommandBus     : Handling command [com.cqrs.command.commands.HolderCreationCommand]

9. 마치며

이로써 Command Application의 전반적인 코드 구현을 완성하였습니다. 하지만 위 프로그램은 구조적인 문제점을 갖고 있습니다. 다음 포스팅에서는 해당 프로그램이 가진 문제점과 이를 해결하는 방법에 대해서 다루도록 하겠습니다.

 

 

 

 

1. 개요

 

지금부터 EventSourcing과 CQRS가 적용된 프로젝트를 구현하면서 AxonFramework 사용법을 배워봅니다.

이에 앞서 앞으로 진행할 프로젝트에 대한 설계를 통해 구조를 잡아보겠습니다.

 

 

프로그램 요구사항 

 

  • 계정 생성시 소유주, 전화번호, 주소를 입력한다.
  • 계정 생성이 완료되면, 계정 ID가 발급된다.
  • 계좌 생성시, 계정 ID를 입력한다.
  • 계좌 생성이 완료되면 고유한 계좌 번호를 반환한다.
  • 계좌번호를 통해서 입금 가능하다.
  • 계좌번호를 통해서 출금 가능하다.
  • 인출과정에서 잔액이 부족한경우 "잔액이 부족합니다." 메시지를 출력한다.
  • 소유자가 보유한 전체 계좌 개수 및 잔액 조회가 가능하다.

2. 프로그램 설계

 

앞으로 구현할 프로그램은 간단한 은행 입출금 관련 데모입니다. 짧은 요구사항이지만 도메인이 복잡해진다면 계좌 관리(Account)거래(Transacation)는 별도 Bounded Context로 분리할 수 있습니다. 하지만 데모 프로젝트에서는 편의상 두 도메인을 하나의 Bounded Context에 구현하겠습니다.

 


CQRS 

 

AxonFramework는 기본적으로 CQRS 아키텍처를 따르므로, 데모도 마찬가지로 해당 구조를 기반으로 설계했습니다. 즉 Command 와 Query App을 분리하고 각 App별로 DB를 사용하도록 구성했습니다.

 

편의상 두 App의 DB는 Postgresql로 통일하였으며, 각기 다른 스키마 사용을 통해 두 DB를 논리적으로 분리했습니다. Command DB는 AxonFramework에서 내부적으로 사용하는 saga, token, association_value 데이터를 관리합니다.

(※ Read 모델은 MongoDB로 구성하는 등의 Polyglot 구조 변경 가능합니다.)

 

또한 이미 앞선 장을 통해서 EventStore 및 메시지 라우팅을 담당하는 AxonServer를 사용하고 있으므로 이는 고려하지 않겠습니다.

 


다이어그램

 

데모에서 사용되는 구조는 다음과 같습니다. Command 모델은 소유주(holder)와 계좌(accont)로 나뉘어 Aggregation을 만들었습니다. 반면 Query 모델은 소유주의 전체 계좌의 총액을 보여주도록 Materialized View 를 ERD로 표현했습니다.

 

 

Command 

 

Aggregate

 

Query

 

Materialized View
summary


Command, Event 도출

 

이벤트 주도 개발 방법에서 가장 중요한 것은 이벤트 설계입니다. 이때 Event Storming 전략을 사용하여 Command, Event 등을 도출합니다.

(※ Event Storming 단계에서 포스트잇을 사용하는데, 비슷한 느낌을 주기 위해서 이미지를 사용했습니다.)

 

 

계정 생성

계좌 생성

입금

출금


서비스 EndPoint 설계

 

다음은 Controller 매핑되는 API EndPoint를 설계하겠습니다.

참고로 Command App은 8080 , Query App은 9090 Port를 사용했습니다.

 

계정 생성

POST : localhost:8080/holder
{  
    "holderName" : 소유주,
    "tel" : 전화번호,
    "address" : 주소
}

계좌 생성

POST : localhost:8080/account
{
    "holderID" : 계정 ID
}

 

입금

POST : localhost:8080/deposit
{
    "accountID" : 계좌번호,
    "holderID" : 계정 ID,
    "amount":입금액
}

 

출금

POST : localhost:8080/withdrawal
{
    "accountID" : 계좌번호,
    "holderID" : 계정 ID,
    "amount":출금액
}

 

계좌 정보 조회

GET : localhost:9090/account/info/{accountID}

3. 테스트 시나리오

 

마지막으로 실제 App 수행시 진행할 테스트 시나리오를 다음과 같이 작성했습니다.

 

  1. 고객 Kevin이 계정을 생성한다. (소유주 : Kevin, 전화번호 : 02-1234-5678, 주소 : 서울시)
  2. Kevin이 계좌를 개설한다.(소유주 ID : 1번 과정을 통해 생성된 UUID)
  3. Kevin계좌에 1000원을 입금한다(계좌 ID : 2번 과정을 통해 생성된 UUID, 입금액 : 1000)
  4. Kevin 계좌에서 100원을 인출한다(계좌 ID : 2번 과정을 통해 생성된 UUID, 출금액 : 100)
  5. Kevin 계좌에서 200원을 인출한다(계좌 ID : 2번 과정을 통해 생성된 UUID, 출금액 : 200)
  6. Kevin 전체 계좌 잔액 내역을 조회한다.
  7. Kevin 계좌에서 800원을 인출한다(계좌 ID : 2번 과정을 통해 생성된 UUID, 출금액 : 800)
  8. "잔액이 부족합니다." 메시지를 확인한다.

4. 마치며

 

구현할 프로젝트에 대한 기본적인 설계를 마칩니다. 다음 포스팅에서는 프로젝트 생성을 위해 Gradle을 이용한 Multi Project 생성 방법에 대해서 다루겠습니다.

+ Recent posts