1. 서론

 

이번 포스팅에서는 Scatter-Gather Query를 구현하겠습니다. Scatter-Gather Query는 동일한 Query를 수행하는 Query Handler가 여러 App에 존재할 경우 모든 App에 Query를 요청하여 결과를 취합받아 최초 Query를 요청한 Application에서 결과를 처리합니다.

 

데모 프로젝트에 아래와 같은 요구사항이 추가되었음을 가정하여 Scatter-Gather Query 기능을 구현하도록 하겠습니다.

 

 

 

Axon Server에 Jeju 은행과 Seoul 은행 외부시스템이 연결되어있다고 가정해봅시다. 이때 소유주(HolderID)가 보유한 잔고에 대하여 각 은행에게 대출한도를 Query하면 은행별로 전달받은 답변을 Client 화면에 표시하는 요구사항을 코드를 통해 알아보겠습니다.


2. Jeju 은행 모듈 구현

 

1. Jeju 은행과, Seoul 은행에 Query를 요청하려면, Query 클래스 정보를 공유해야하므로, 공통 모듈(Common)에 Query 클래스를 생성해야합니다. 먼저 Common 모듈에 query > loan 패키지를 생성합니다. 이후 Query 및 결과를 저장할 클래스를 생성합니다.

 


2. 생성한 두 클래스를 구현합니다.

 

LoanLimitQuery.java

@AllArgsConstructor
@ToString
@Getter
public class LoanLimitQuery {
    private String holderID;
    private Long balance;
}

 

LoanLimitResult.java

@AllArgsConstructor
@ToString
@Getter
@Builder
public class LoanLimitResult {
    private String holderID;
    private String bankName;
    private Long balance;
    private Long loanLimit;
}

3. 새로운 은행 모듈 생성을 위하여 프로젝트 root 디렉토리에 위치한 settings.gradle 파일을 연다음 모듈 추가합니다.

 

settings.gradle

rootProject.name = 'demo'
include 'command'
include 'query'
include 'common'
include 'seoulBank'
include 'jejuBank'

4. 추가된 두 묘듈에서 공통 모듈을 사용하기 위해 빌드 설정을 추가해야 합니다. 프로젝트 root 디렉토리에 위치한 build.gradle 파일을 연다음 빌드 스크립트 내용을 추가합니다.

 

build.gradle

(...중략...)
project(':jejuBank') {
    dependencies {
        compile project(':common')
    }
}

project(':seoulBank') {
    dependencies {
        compile project(':common')
    }
}

 

이후 gradle build를 수행하면, jejuBank, seoulBank 모듈이 생성됩니다.


5.  jejuBank 프로젝트 하위에 build.gradle 파일을 생성합니다.

 


6. build.gradle 파일에 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
dependencies{
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
}

7. resource 패키지 하위에 application.yml 파일을 생성합니다.

 


8. application.yml 파일에 설정 값을 기술합니다.

 

application.yml

server:
  port: 9091

spring:
  application:
    name: eventsourcing-cqrs-jejuBank

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

9. 패키지 구조 설정 한다음 Component 패키지와 Main 클래스를 생성합니다.

 


9. Main 클래스 내용을 구현합니다.

 

jejuBankApp.java

@SpringBootApplication
public class JejuBankApp {
    public static void main(String[] args) {
        SpringApplication.run(JejuBankApp.class, args);
    }
}

10. component 패키지내에 Query를 처리할 Component 클래스를 생성합니다.

 


11. Component 클래스 내용을 구현합니다. 

 

AccountLoanComponent.java

@Component
@Slf4j
public class AccountLoanComponent {

    @QueryHandler
    private LoanLimitResult on(LoanLimitQuery query) {
        log.debug("handling {}",query);
        return LoanLimitResult.builder()
                .holderID(query.getHolderID())
                .balance(query.getBalance())
                .bankName("JejuBank")
                .loanLimit(Double.valueOf(query.getBalance() * 1.2).longValue())
                .build();
    }
}

 

위 코드에서 jeju 은행의 대출한도는 일괄적으로 보유 잔고의 120%만 가능하도록 가정하였습니다.


3. Seoul 은행 모듈 구현

1. seoulBank 프로젝트 하위에 build.gradle 파일을 생성합니다.

 


2. build.gradle 파일에 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
dependencies{
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
}

3. resource 패키지 하위에 application.yml 파일을 생성합니다.

 


 

4. application.yml 파일에 설정 값을 기술합니다.

 

application.yml

server:
  port: 9092

spring:
  application:
    name: eventsourcing-cqrs-seoulBank

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

5. 패키지 구조 설정 한다음 Component 패키지와 Main 클래스를 생성합니다.

 


6. Main 클래스 내용을 구현합니다.

 

SeoulBankApp.java

@SpringBootApplication
public class SeoulBankApp {
    public static void main(String[] args) {
        SpringApplication.run(SeoulBankApp.class, args);
    }
}

7. component 패키지내에 Query를 처리할 Component 클래스를 생성합니다.

 


8.  Component 클래스 내용을 구현합니다.

 

AccountLoanComponent.java

@Component
@Slf4j
public class AccountLoanComponent {
    @QueryHandler
    private LoanLimitResult on(LoanLimitQuery query) {
        log.debug("handling {}",query);
        return LoanLimitResult.builder()
                .holderID(query.getHolderID())
                .balance(query.getBalance())
                .bankName("SeoulBank")
                .loanLimit(Double.valueOf(query.getBalance() * 1.5).longValue())
                .build();
    }
}

4. Query 인터페이스 구현

 

1. 화면 생성을 위해 Query 모듈 resources > templates 패키지내에 scatter-gather.html 파일을 생성합니다.

 


2. 화면 코드를 구현합니다.

 

scatter-gather.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Scatter-Gather Query Example</title>
</head>
<script>
    window.addEventListener("DOMContentLoaded", function (){
        (function () {
            let appendDiv = document.getElementById("layout");
            let text = document.getElementById("holderInput");
            let pElem = document.createElement("p");
            document.getElementById("wrapper").addEventListener("click", append);

            function append(e) {
                let target = e.target;
                let callbackFunction = callback[target.getAttribute("data-cb")];
                callbackFunction();
            }

            let callback = {
                "search": (function () {
                    let holderId = text.value;
                    if (holderId === undefined || holderId === null || holderId ==="") {
                        alert("소유주를 입력하시오.");
                    } else {
                        let xhr = new XMLHttpRequest();
                        xhr.open('GET','http://localhost:9090/account/info/scatter/gather/'+holderId, true);
                        xhr.send();
                        xhr.onload = function(){
                            if(xhr.status === 200){
                                let elem = pElem.cloneNode();
                                elem.innerText = xhr.responseText;
                                appendDiv.appendChild(elem);
                            }
                        }
                    }
                })
            }
        }());
    });
</script>
<body>
<div id="wrapper">
    <input type="button" data-cb="search" value="조회"/>
</div>
<input type="text" id="holderInput" placeholder="소유주 ID를 입력하시오.">
<div id="layout"/>
</body>
</html>

3. Scatter-Query를 요청할 서비스를 구현하기 위하여 먼저 메소드를 정의해야합니다. Query 모듈 service 패키지에 위치한 QueryService 클래스를 엽니다.

 


4. 인터페이스에 추상 메소드를 정의합니다.

 

QueryService.java

public interface QueryService {
    (...중략...)
    List<LoanLimitResult> getAccountInfoScatterGather(String holderId);
}

5. QueryServiceImpl 클래스를 열어 추가된 추상 메소드를 구현합니다.

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Slf4j
@Service
public class QueryServiceImpl implements QueryService {
    (...중략...)
    private final AccountRepository repository;
    (...중략...)
    @Override
    public List<LoanLimitResult> getAccountInfoScatterGather(String holderId) {
        HolderAccountSummary accountSummary = repository.findByHolderId(holderId).orElseThrow();

        return queryGateway.scatterGather(new LoanLimitQuery(accountSummary.getHolderId(), accountSummary.getTotalBalance()),
                ResponseTypes.instanceOf(LoanLimitResult.class),
                30, TimeUnit.SECONDS)
                .collect(Collectors.toList());
    }
}

 

Scatter-Gather 쿼리는 단일 App에 요청하는 것이 아니므로, 만약 Handler 처리 App에 장애가 발생한다면 무한정 대기할 수 있습니다. 따라서 요청시, DeadLine을 정하여 요청시간 만큼만 대기할 수 있도록 지정이 필요합니다.


6. API End Point 지정 및 화면 호출을 위하여 Controller 클래스 수정이 필요합니다. Query 모듈내 Controller 패키지안에 있는 두개의 Controller 클래스에 관련 메소드를 추가합니다.

 

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    (...중략...)

    @GetMapping("account/info/scatter/gather/{id}")
    public ResponseEntity<List<LoanLimitResult>> getAccountInfoScatterGather(@PathVariable(value = "id") @NonNull @NotBlank String holderId){
        return ResponseEntity.ok()
                .body(queryService.getAccountInfoScatterGather(holderId));
    }

}

 

WebController.java

@Controller
public class WebController {
	(...중략...)
    @GetMapping("/scatter-gather")
    public void scatterGatherQueryView(){}
}

5. 테스트

 

1. jeju, seoul 은행 App과 Query App을 기동합니다.

 

2. 웹브라우저(Chrome)에서 http://localhost:9090/scatter-gather URL 입력합니다.

 

3. 임의의 소유주 ID를 입력후 조회 버튼을 눌러 결과를 확인합니다.

 


6. 마치며

 

이번 포스팅을 끝으로 EventSourcing에 필요한 기본적인 Command, Event 처리 및 Query 요청에 대한 필수 기능 구현을 완료했습니다. 각 기능별로 세부적인 기능은 Axon 공식 홈페이지에서 제공하는 DocumentGoogle Groups를 이용하여 검색하시면 많은 자료를 구할 수 있으니 참고 바랍니다.

1. 서론

 

 

이번 시간에는 Query 기능 중 Point to Point, Subscription 기능을 구현합니다. 또한, Query 결과를 보기 위하여 Client 화면을 간략하게 만들겠습니다.

 

Client 화면은 크게 Point to Point Query와 Subsciprtion Query를 조회하는 화면 2개를 분할하였으며, 화면 호출 URL은 다음과 같습니다.

 

Point to Point : http://localhost:9090/p2p

Subscription : http://localhost:9090/subscription

 

 

 

Subscription 에서는 조회를 누르면 Server와의 Connection이 설정되므로 이를 해제하기 위한 종료 버튼을 추가하였습니다. 조회 버튼을 누르게되면, Server API를 호출합니다. 두 API 주소는 다음과 같습니다.

 

 

Point to Point : http://localhost:9090/account/info/{id}

Subscription : http://localhost:9090/account/info/subcription/{id}

 

 

이제 본격적으로 기능 구현을 진행하겠습니다.


2. Point to Point Query

Query를 처리하는 Handler가 하나만 존재하고, 한번만 질의만 하면되는 상황이라면 Point to Point Query가 적합합니다.

해당 기능 구현을 통해 사용방법을 알아보겠습니다.

 

1. Query 모듈 build.gradle 파일을 엽니다.

 

 


2. 화면 구현을 위하여 thymeleaf 의존성을 추가합니다.

 

build.gradle

dependencies{
    (...중략...)
    implementation 'org.springframework.boot:spring-boot-starter-thymeleaf'
}

 


3. 화면 호출을 위하여 Query 모듈 Controller 패키지에 WebController 클래스를 추가합니다.

 

 


4. 클래스 내용을 구현합니다.

 

WebController.java

@Controller
public class WebController {
    @GetMapping("/p2p")
    public void pointToPointQueryView(){}
}

 

Controller에서 http://localhost:9090/p2p URL 호출 시 p2p.html 파일을 전달하도록 지정합니다.


5. 화면 구현을 위해서 Query 모듈 resources 패키지 하위에 templates 패키지 및 p2p.html 파일을 생성합니다.

 


6. html 내용을을 구현합니다.

 

p2p.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>PointToPoint Query Example</title>
</head>
<script>
    window.addEventListener("DOMContentLoaded", function (){
        (function () {
            let appendDiv = document.getElementById("layout");
            let text = document.getElementById("holderInput");
            let pElem = document.createElement("p");
            document.getElementById("wrapper").addEventListener("click", append);

            function append(e) {
                let target = e.target;
                let callbackFunction = callback[target.getAttribute("data-cb")];
                callbackFunction();
            }

            let callback = {
                "search": (function () {
                    let holderId = text.value;
                    if (holderId === undefined || holderId === null || holderId ==="") {
                        alert("소유주를 입력하시오.");
                    } else {
                        let xhr = new XMLHttpRequest();
                        xhr.open('GET','http://localhost:9090/account/info/'+holderId, true);
                        xhr.send();
                        xhr.onload = function(){
                            if(xhr.status === 200){
                                let elem = pElem.cloneNode();
                                elem.innerText = xhr.responseText;
                                appendDiv.appendChild(elem);
                            }
                        }
                    }
                })
            }
        }());
    });
</script>
<body>
<div id="wrapper">
    <input type="button" data-cb="search" value="조회"/>
</div>
<input type="text" id="holderInput" placeholder="소유주 ID를 입력하시오.">
<div id="layout"/>
</body>
</html>

 

위 코드 내용 중 가장 핵심이 되는 로직은 callback 객체입니다. 구현 내용은 비동기로 Query를 수행하는 API에 소유주 정보를 인자로 요청하면, 해당 내용을 수신받아 화면에 표시합니다.

 


7. 화면이 정상적으로 출력되는지 확인하기 위하여, Query App을 기동합니다. 이후 웹브라우저(Chrome)을 열고 화면 호출 테스트를 수행합니다.(http://localhost:9090/p2p)

 

 


8. 테스트가 완료되었으면, Query를 수행할 API 내용을 구현하겠습니다. 먼저 Query 모듈 service 패키지내 QueryService 인터페이스를 엽니다.

 


9. Query 수행을 위한 메소드를 정의합니다.

 

QueryService.java

public interface QueryService {
    void reset();
    HolderAccountSummary getAccountInfo(String holderId);
}

10. Service 구현을 위하여 Query 모듈 service 패키지내 QueryServiceImpl 클래스를 엽니다.

 


11. Interface에 정의된 메소드를 구현합니다.

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Slf4j
@Service
public class QueryServiceImpl implements QueryService {
	(...중략...)
    @Override
    public HolderAccountSummary getAccountInfo(String holderId) {
        AccountQuery accountQuery = new AccountQuery(holderId);
        log.debug("handling {}", accountQuery);
        return queryGateway.query(accountQuery, ResponseTypes.instanceOf(HolderAccountSummary.class)).join();
    }

}

12. API End Point 설정을 위해 controller 패키지내 위치한 HolderAccountController 클래스를 엽니다.

 

 


13. API End Point를 추가합니다.

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    private final QueryService queryService;

	(...중략...)
    @GetMapping("/account/info/{id}")
    public ResponseEntity<HolderAccountSummary> getAccountInfo(@PathVariable(value = "id") @NonNull @NotBlank String holderId){
        return ResponseEntity.ok()
                             .body(queryService.getAccountInfo(holderId));
    }

}

 

 


14. QueryGateway로 전달된 Query를 처리하는 Handler 작성을 위해 Query 모듈 projection 패키지 하위 HolderAccountProjection 클래스를 엽니다.

 

 


15. HolderAccountProjection 클래스에서 QueryHandler 메소드를 구현합니다.

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
    (...중략...)

    @QueryHandler
    public HolderAccountSummary on(AccountQuery query){
        log.debug("handling {}", query);
        return repository.findByHolderId(query.getHolderId()).orElse(null);
    }
}

16. Query App을 기동합니다. 이후 EventStore에 저장된 HolderID 중 하나를 선택하여 입력창에 기입합니다. 조회 버튼을 눌러 정상적으로 조회되는지 확인합니다.

 

테스트 결과, Read Model에 저장된 데이터가 정상적으로 출력되는 것을 확인할 수 있습니다.


3. Subscription Query

 

 

Subscription Query는 Client로부터 Connection을 연결하면, 이를 해제하지 않고 유지합니다. Query를 처리하는 Hanlder App에서는 초기 결과를 최초에 반환합니다. 이때 Flux 타입으로 반환하며, QueryUpdateEmitter를 통해서 Read Model의 변경이 있을 때마다 수신 받습니다.

 

데모 프로젝트에서는 SSE(Server Sent Event) 방식으로 구현하기 위해 Client 화면에서는 EventSource 객체를 사용하겠습니다.

 


1. Query 모듈 build.gradle 파일을 엽니다.

 

 


2. Flux 사용을 위하여 reactor-core 의존성을 추가합니다.

 

build.gradle

dependencies{
    (...중략...)
    implementation group: 'io.projectreactor', name: 'reactor-core'
}

3. 화면 호출을 위하여 Query 모듈 Controller 패키지에 WebController 클래스를 추가합니다.

 


4. 클래스 내용을 구현합니다.

 

WebController.java

@Controller
public class WebController {
   (...중략...)
    @GetMapping("/subscription")
    public void subscriptionQueryView(){}
}

 

Controller에서 http://localhost:9090/subscription URL 호출 시 subscription.html 파일을 전달하도록 지정합니다.


5. 화면 구현을 위해서 Query 모듈 resources 패키지 하위에 templates 패키지 및 subscription.html 파일을 생성합니다.

 


6. html 내용을을 구현합니다.

 

subscription.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Subscription Query Example</title>
</head>
<script>
    window.addEventListener("DOMContentLoaded", function (){
        (function () {
            let appendDiv = document.getElementById("layout");
            let text = document.getElementById("holderInput");
            let eventSource = undefined;
            let pElem = document.createElement("p");
            document.getElementById("wrapper").addEventListener("click", append);

            function append(e) {
                let target = e.target;
                let callbackFunction = callback[target.getAttribute("data-cb")];
                callbackFunction();
            }

            function closeEventSource() {
                eventSource.close();
                eventSource = undefined;
            }

            let callback = {
                "search": (function () {
                    let holderId = text.value;
                    if (eventSource !== undefined) {
                        closeEventSource();
                    }

                    if (holderId === undefined || holderId === null || holderId === "") {
                        alert("소유주를 입력하시오.");
                    } else {
                        eventSource = new EventSource('/account/info/subscription/' + holderId);
                        eventSource.onopen = function () {
                            console.log("connected");
                        };
                        eventSource.onmessage = function (event) {
                            let elem = pElem.cloneNode();
                            elem.innerText = event.data;
                            appendDiv.appendChild(elem);
                        };
                        eventSource.onerror = function () {
                            console.error("Connection error has occurred");
                            closeEventSource();
                        }
                    }
                }),
                "disconnect": (function () {
                    if (eventSource !== undefined) {
                        console.log("disconnected");
                        closeEventSource();
                    }
                })
            }
        }());
    });
</script>
<body>
<div id="wrapper">
    <input type="button" data-cb="search" value="조회"/>
    <input type="button" data-cb="disconnect" value="종료"/>
</div>
<input type="text" id="holderInput" placeholder="소유주 ID를 입력하시오.">
<div id="layout"/>
</body>
</html>

 

조회 버튼을 누르면, EventSource 객체를 생성하여 Server Sent Event를 수신받으며, 메시지가 전달되면 수신된 데이터를 화면에 출력하도록 구현하였습니다.


7. 화면이 정상적으로 출력되는지 확인하기 위하여, Query App을 기동합니다. 이후 웹브라우저(Chrome)을 열고 화면 호출 테스트를 수행합니다.(http://localhost:9090/usbscription)

 

 


8. 테스트가 완료되었으면, Query를 수행할 API 내용을 구현하겠습니다. 먼저 Query 모듈 service 패키지내 QueryService 인터페이스를 엽니다.

 


9. Query 수행을 위한 메소드를 정의합니다.

 

QueryService.java

public interface QueryService {
    (...중략...)
    Flux<HolderAccountSummary> getAccountInfoSubscription(String holderId);
}

10. Service 구현을 위하여 Query 모듈 service 패키지내 QueryServiceImpl 클래스를 엽니다.

 


11. Interface에 정의된 메소드를 구현합니다.

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Slf4j
@Service
public class QueryServiceImpl implements QueryService {
    (...중략...)
    @Override
    public Flux<HolderAccountSummary> getAccountInfoSubscription(String holderId) {
        AccountQuery accountQuery = new AccountQuery(holderId);
        log.debug("handling {}", accountQuery);

        SubscriptionQueryResult<HolderAccountSummary, HolderAccountSummary> queryResult = queryGateway.subscriptionQuery(accountQuery,
                ResponseTypes.instanceOf(HolderAccountSummary.class),
                ResponseTypes.instanceOf(HolderAccountSummary.class)
        );

        return Flux.create(emitter -> {
            queryResult.initialResult().subscribe(emitter::next);
            queryResult.updates()
                    .doOnNext(holder -> {
                        log.debug("doOnNext : {}, isCanceled {}", holder, emitter.isCancelled());
                        if (emitter.isCancelled()) {
                            queryResult.close();
                        }
                    })
                    .doOnComplete(emitter::complete)
                    .subscribe(emitter::next);
        });
    }
}

 

위 코드 구현 내용은 최초에 initalResult 생성 후에, 지속적으로 updates 메소드를 통해 Stream 데이터를 전달받아 Client에게 전달합니다. 만약 중간에 Connection이 실패하게되면, 해당 Flux를 종료하도록 구현하였습니다.


12. API End Point 설정을 위해 controller 패키지내 위치한 HolderAccountController 클래스를 엽니다.

 

 


13. EndPoint를 추가합니다.

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    (...중략...)

    @GetMapping("account/info/subscription/{id}")
    public ResponseEntity<Flux<HolderAccountSummary>> getAccountInfoSubscription(@PathVariable(value = "id") @NonNull @NotBlank String holderId){
        return ResponseEntity.ok()
                             .body(queryService.getAccountInfoSubscription(holderId));
    }
}

14. Subscription에서는 Read Model에 변경이 발생되었을 때 이를 전파해야합니다. 따라서 이를 작성하기 위해  Query 모듈 projection 패키지 하위 HolderAccountProjection 클래스를 엽니다.

 

 


15. EventSourcingHandler를 통해 ReadModel의 변화가 발생하였을 때, QueryUpdateEmitter 클래스를 통해 이벤트 변경 내용을 전파하도록 클래스 내용을 수정합니다.

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
    private final AccountRepository repository;
    private final QueryUpdateEmitter queryUpdateEmitter;

    (...중략...)

    @EventHandler
    @AllowReplay
    protected void on(DepositMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() + event.getAmount());

        queryUpdateEmitter.emit(AccountQuery.class,
                query -> query.getHolderId().equals(event.getHolderID()),
                holderAccount);

        repository.save(holderAccount);
    }
    @EventHandler
    @AllowReplay
    protected void on(WithdrawMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() - event.getAmount());

        queryUpdateEmitter.emit(AccountQuery.class,
                query -> query.getHolderId().equals(event.getHolderID()),
                holderAccount);

        repository.save(holderAccount);
    }

	(...중략...)
}

 

각 Handler안에 queryUpdateEmitter를 통해 구독중인 Query와 동일한 ID의 Event가 들어오면, Query 결과에 전달되도록 처리하였습니다.


16. Query App을 기동합니다. 이후 EventStore에 저장된 HolderID 중 하나를 선택하여 입력창에 기입합니다. 조회 버튼을 눌러 정상적으로 조회되는지 확인합니다.

 

위 예제에서 holderId가 924eb0ab-c35f-4d3e-b753-a4ce35bd7c27인 계좌 전체의 잔고는 현재 290입니다. Update가 정상 수신되는지 확인하기 위하여, 해당 소유주가 보유한 계좌에서 5원을 인출하는 API를 호출합니다.

 

 


17. 5원 인출 후 Client 화면에서 변경된 데이터가 정상 수신되었는지 확인합니다.

 

확인 결과, 정상적으로 데이터 수신되었음을 확인할 수 있습니다.


18. 종료 버튼을 눌러 구독을 중지합니다. 이후 924eb0ab-c35f-4d3e-b753-a4ce35bd7c27 소유주가 보유한 계좌에서 추가로 5원 인출하였을 때, Query 결과가 화면에 표시되지 않음을 확인합니다. 화면의 변화가 없으면, 정상적으로 Connection이 종료된 것입니다.


4. 마치며

이번 시간에는 Point to Point, Subscription Query에 대해서 살펴보았습니다. Subscription Query는 반환 형태가 Flux 형태다보니 아무래도 Spring MVC에서는 사용하기 힘든 부분이 있을듯 합니다. 또한, 이를 지원하기 위해서는 Client에서도 SSE를 위한 구현이 필요하며, Server에서는 Client와 Connection 유지를 위해 Subscription Query가 증가할 수록 그에 상응하는 Thread 수가 증가합니다. 따라서 비즈니스 요건에 맞게 적절한 사용이 필요하며, Spring Webflux를 사용한다면 도입을 검토해볼 수 있을 것 같습니다.

 

1. 서론

이번 포스팅은 데모 프로젝트 진행에 있어 필수적으로 구현해야하는 코드는 없습니다. 따라서 Skip해도 괜찮습니다.  이번 시간에는 상태를 저장하는 State-Stored Aggregate에 대해서 알아보겠습니다.


2. Aggregate 종류

 

AxonFramework 에서 Aggregate의 종류는 크게 두 가지로 분류할 수 있습니다. 

 

  1. EventSourced Aggregate
  2. State-Stored Aggregate

 

EventSourced Aggregate는 기존 Command 어플리케이션을 제작하는 과정에서 구현한 모델 방식입니다. 즉 EventStore로부터 Event를 재생하면서 모델의 최신상태를 만듭니다.

 

출처 : https://altkomsoftware.pl/en/blog/cqrs-event-sourcing/

 

이와 반대로 State-Stored Aggregate는 위 그림과 같이 EventStore에 Event를 적재와 별개로 모델 자체에 최신 상태를 DB에 저장합니다. 데모 프로젝트 Aggregate 구조 변경을 통해 Command DB에 모델을 생성하는 방법에 대해서 알아봅시다.


3. State-Stored Aggregate

 

데모 프로젝트에는 Holder와 Account Entitiy가 존재합니다. Command 모델에서는 해당 Entitiy 관계를 분리하여 표현할 것이며 모델 구현은 JPA를 사용하겠습니다. 먼저 Command와 Query DB에 적재될 테이블 구조를 살펴보겠습니다.

 

 

ERD

 

Command 모델

 

Query 모델


3. Aggregate 구현

 

State-Stored Aggregate 구현을 위해 기존 코드를 단계적으로 변경하겠습니다.

 

1. HolderAggregate와 AccountAggregate 구조 변경을 통해 상태를 저장할 수 있도록 구현합니다.

 

HolderAggregate.java

@AllArgsConstructor
@NoArgsConstructor
@Aggregate
@Slf4j
@Entity(name = "holder")
@Table(name = "holder")
public class HolderAggregate {
    @AggregateIdentifier
    @Id
    @Column(name = "holder_id")
    @Getter
    private String holderID;
    @Column(name = "holder_name")
    private String holderName;
    private String tel;
    private String address;

    @OneToMany(mappedBy = "holder", orphanRemoval = true)
    private List<AccountAggregate> accounts = new ArrayList<>();

    public void registerAccount(AccountAggregate account){
        if(!this.accounts.contains(account))
            this.accounts.add(account);
    }
    public void unRegisterAccount(AccountAggregate account){
        this.accounts.remove(account);
    }

    @CommandHandler
    public HolderAggregate(HolderCreationCommand command) {
        log.debug("handling {}", command);

        this.holderID = command.getHolderID();
        this.holderName = command.getHolderName();
        this.tel = command.getTel();
        this.address = command.getAddress();

        apply(new HolderCreationEvent(command.getHolderID(), command.getHolderName(), command.getTel(), command.getAddress()));
    }
}

 

@Entity 어노테이션을 통해 대상 Aggregate가 JPA에서 관리되는 Entity임을 명시했습니다. 또한 Aggregate 식별자에 @Id 어노테이션을 추가하여 대상 속성이 PK임을 표시합니다.

 

HolderAggregate는 AccountAggregate와 1:N 관계를 맺고 있으므로 양방향 관계 설정 했으며, HolderAggregate가 삭제될 경우 AccountAggregate도 삭제되도록 orphanRemovel 옵션을 추가했습니다.

 

마지막으로 양방향 관계 설정 시 연관관계 편의 메소드 제공을 위해 registerAccount, unRegisterAccount 메소드를 추가했습니다. 

(참고 : 양방향 연관관계 편의 메소드)

 

혹시 위 Aggregate 코드에서 JPA 코드 추가 외에 혹시 이상한 점을 눈치채셨나요?

 

바로 EventSourcingHandler 메소드가 사라졌습니다. State-Stored Aggregate 모델은 모델 자체가 최신 상태를 유지하고 있으므로 EventStore로부터 Replay를 할 필요가 없습니다. 따라서 CommandHandler 메소드 내에서 Command 상태를 저장하는 로직을 포함시켜야 합니다.

 

이번에는 AccountAggreagte 클래스를 변경하겠습니다.

 

AccountAggregate.java

@NoArgsConstructor
@AllArgsConstructor
@Slf4j
@Aggregate
@EqualsAndHashCode
@Entity(name = "account")
@Table(name = "account")
public class AccountAggregate {
    @AggregateIdentifier
    @Id
    @Column(name = "account_id")
    private String accountID;

    @ManyToOne
    @JoinColumn(name = "holder_id", foreignKey = @ForeignKey(name = "FK_HOLDER"))
    private HolderAggregate holder;
    private Long balance;

    public void registerHolder(HolderAggregate holder){
        if(this.holder != null)
            this.holder.unRegisterAccount(this);
        this.holder = holder;
        this.holder.registerAccount(this);
    }

    @CommandHandler
    public AccountAggregate(AccountCreationCommand command) {
        log.debug("handling {}", command);
        this.accountID = command.getAccountID();
        HolderAggregate holder = command.getHolder();
        registerHolder(holder);
        this.balance = 0L;
        apply(new AccountCreationEvent(holder.getHolderID(),command.getAccountID()));
    }
    @CommandHandler
    protected void depositMoney(DepositMoneyCommand command){
        log.debug("handling {}", command);
        if(command.getAmount() <= 0) throw new IllegalStateException("amount >= 0");
        this.balance += command.getAmount();
        log.debug("balance {}", this.balance);
        apply(new DepositMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @CommandHandler
    protected void withdrawMoney(WithdrawMoneyCommand command){
        log.debug("handling {}", command);
        if(this.balance - command.getAmount() < 0) throw new IllegalStateException("잔고가 부족합니다.");
        else if(command.getAmount() <= 0 ) throw new IllegalStateException("amount >= 0");
        this.balance -= command.getAmount();
        log.debug("balance {}", this.balance);
        apply(new WithdrawMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
}

 

HolderAggregate에서 설명한 부분은 제외하고 추가된 점은 Account 모델이 Holder 모델에 대하여 FK를 지니고 있으므로 관계상에서 FK를 명시하였습니다. 


2. HolderAggregate 클래스 생성을 위한 Repository 패키지 및 클래스를 생성합니다.

 


3. HolderRepository 클래스를 구현합니다.

@Repository
public interface HolderRepository extends JpaRepository<HolderAggregate,String> {
    Optional<HolderAggregate> findHolderAggregateByHolderID(String id);
}

4. 객체 대상으로 연관관계를 변경하였기 때문에 AccountCreateCommand 클래스를 수정합니다.

 


5. Service 클래스를 수정합니다.

 

@Service
@RequiredArgsConstructor
public class TransactionServiceImpl implements TransactionService {
    private final CommandGateway commandGateway;
    private final HolderRepository holders;

...(중략)...

    @Override
    public CompletableFuture<String> createAccount(AccountDTO accountDTO) {
        HolderAggregate holder = holders.findHolderAggregateByHolderID(accountDTO.getHolderID())
                                       .orElseThrow( () -> new IllegalAccessError("계정 ID가 올바르지 않습니다."));
        return commandGateway.send(new AccountCreationCommand(UUID.randomUUID().toString(),holder));
    }

...(중략)...
}

6. Snapshot 설정을 위해 생성한 Configuration 속성을 적용하지 않도록 AxonConfig 클래스 @Configuration 어노테이션을 주석처리 합니다.

 

//@Configuration
//@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
...(중략)...
}

 

이로써 State-Stored Aggregate 구현에 대한 코드 변경은 끝났습니다.


4. 테스트

 

1. Application 실행 후 계정 생성 API 테스트를 진행합니다.

POST http://localhost:8080/holder
Content-Type: application/json

{
	"holderName" : "kevin",
	"tel" : "02-1234-5678",
	"address" : "OO시 OO구"
}

 

2. DB에서 계정 데이터 생성 여부를 확인합니다.

 


3. 계좌 생성 API를 호출합니다.

POST http://localhost:8080/account
Content-Type: application/json

{
  "holderID" : "486832c2-b606-470d-949a-9f9d8613b112"
}

 

4. DB에서 계좌 데이터 생성 여부를 확인합니다.

 


5. 입금 API를 호출합니다.

POST http://localhost:8080/deposit
Content-Type: application/json

{
  "accountID" : "9274bb43-ca87-4aa4-b1b4-0363382ad6fb",
  "holderID" : "486832c2-b606-470d-949a-9f9d8613b112",
  "amount" : 300
}

 

6. DB에서 해당 계정 잔고를 확인합니다.

 

 

정상적으로 입금된 것을 확인할 수 있습니다.


7. 출금 API를 4번 연속으로 호출합니다.

POST http://localhost:8080/withdrawal
Content-Type: application/json

{
  "accountID" : "9274bb43-ca87-4aa4-b1b4-0363382ad6fb",
  "holderID" : "486832c2-b606-470d-949a-9f9d8613b112",
  "amount" : 1
}

8. DB에서 출금 내역을 확인합니다.

 

 

정상적으로 출금된 것을 확인할 수 있습니다. 

o.a.commandhandling.SimpleCommandBus     : Handling command [com.cqrs.command.commands.WithdrawMoneyCommand]
c.c.command.aggregate.AccountAggregate   : handling WithdrawMoneyCommand(accountID=9274bb43-ca87-4aa4-b1b4-0363382ad6fb, holderID=486832c2-b606-470d-949a-9f9d8613b112, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 296
org.axonframework.messaging.Scope        : Clearing out ThreadLocal current Scope, as no Scopes are present

 

4번 연속 출금 후 Application의 로그를 일부 발췌하였습니다. 기존과 다른점은 EventSourced Aggregate의 경우에는 Replay를 위해 EventStore의 I/O 과정이 필요했지만 State-Stored Aggregate는 상태를 보관하므로 상태 복원 과정이 없습니다.


5. 마치며

State-Stored Aggreagte는 Command DB에 최신 상태를 보관합니다. 이로인해 매번 EventStore를 통해서 Replay를 하지 않아도 되는 점은 장점입니다.

 

하지만 만약에 테이블 데이터가 손실이 되어서 복구가 필요한 경우 Replay를 자동으로 수행되지 않으므로 별도로 EventSourcing 하는 작업을 구현해야 할 수 있습니다. 물론 DBMS 자체 복구 기능을 이용할 수도 있습니다. 하지만 Media Recovery가 불가피하다면 DB 서비스 중단이 발생합니다. 따라서 Aggregate별 사용 장단점을 인지한 다음 적절한 사용이 필요합니다.

 

이상으로 Command Application 구현 포스팅 마치도록 하겠습니다. 

1. 서론 

 

EventSourcing + CQRS 예제 프로젝트 개요 포스팅을 통해 구현할 프로젝트 소개 및 Command와 Event를 도출했습니다. 이번 시간에는 Command, Event, Aggregate 기본 구조를 구현해보도록 하겠습니다.

 

3. EventSourcing + CQRS 예제 프로젝트 개요

1. 개요 지금부터 EventSourcing과 CQRS가 적용된 프로젝트를 구현하면서 AxonFramework 사용법을 배워봅니다. 이에 앞서 앞으로 진행할 프로젝트에 대한 설계를 통해 구조를 잡아보겠습니다. 프로그램 요구사항..

cla9.tistory.com


2. Event 구현

 

 

Event 클래스는 Command와 Query 둘다 사용되므로 공통 모듈에서 구현하겠습니다.

 

1. Common 모듈에 존재하는 build.gradle 파일을 엽니다. 이후 롬복 사용 및 공통 모듈 컴파일 시 jar파일 생성을 위하여 다음과 같이 작성합니다.

 

bootJar { 
    enabled = false 
}
jar {
    enabled = true
}
dependencies{
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
}

2. Common 모듈 패키지 생성을 위해 src > main > java 디렉토리 선택 후 [Alt + Insert] 키를 누릅니다. 이후 package 탭을 선택하고 임의의 package 명을 입력한 다음 OK 버튼을 누릅니다.

 


3. 생성된 패키지 하위에 Event 클래스 4개를 생성합니다.

 


4. Event 클래스를 구현합니다.

 

HolderCreationEvent.java

@AllArgsConstructor
@ToString
@Getter
public class HolderCreationEvent {
    private String holderID;
    private String holderName;
    private String tel;
    private String address;
}

 

AccountCreationEvent.java

@AllArgsConstructor
@ToString
@Getter
public class AccountCreationEvent {
    private String holderID;
    private String accountID;
}

 

DepositMoneyEvent.java

@AllArgsConstructor
@ToString
@Getter
public class DepositMoneyEvent {
    private String holderID;
    private String accountID;
    private Long amount;
}

 

WithdrawMoneyEvent.java

@AllArgsConstructor
@ToString
@Getter
public class WithdrawMoneyEvent {
    private String holderID;
    private String accountID;
    private Long amount;
}

3. Command 구현

만약 Command을 요청하는 App이 실제 처리하는 App과 동일하지 않다면, Command 또한 공통 모듈에 작성하는 것이 바람직합니다. 하지만 데모 프로젝트에서는 Command App에서 모두 처리할 것이므로 Command 모듈내 구현하도록 하겠습니다.

 

1. Command 모듈내에 위치한 패키지 최하위에 commands 패키지를 추가합니다.

 


2. 생성된 패키지 하위에 Command 클래스 4개를 생성합니다.

 


3. Command 클래스를 구현합니다.

 

HolderCreationCommand.java

@AllArgsConstructor
@ToString
@Getter
public class HolderCreationCommand {
    @TargetAggregateIdentifier
    private String holderID;
    private String holderName;
    private String tel;
    private String address;
}

AccountCreationCommand.java

@AllArgsConstructor
@ToString
@Getter
public class AccountCreationCommand {
    @TargetAggregateIdentifier
    private String holderID;
    private String accountID;
}

DepositMoneyCommand.java

@AllArgsConstructor
@ToString
@Getter
public class DepositMoneyCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private String holderID;
    private Long amount;
}

WithdrawMoneyCommand.java

@AllArgsConstructor
@ToString
@Getter
public class WithdrawMoneyCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private String holderID;
    private Long amount;
}

 

Event와 달리 Command 클래스에는 @TargetAggregateIdentifier 어노테이션이 붙었습니다. 이는 AxonFramework 모델링의 단위가 Aggregate이고 각 Aggregate마다 고유한 식별자가 부여되어야 하기 때문입니다. 따라서 Command 클래스를 디자인 할때에도 어떤 Aggregate를 대상으로 명령을 수행할 것인지 알아야 하기 때문에 대상 Aggregate의 식별자 지정이 필요합니다.


4. Aggregate 구현

도메인 주도 개발 방법론(DDD)을 배우면 반드시 등장하는 개념이 Aggregate입니다. AxonFramework 에서도 DDD 기반으로 설계되었기에 Aggregate가 필요합니다. 데모 프로젝트에서는 HolderAccount 연관된 Aggregate를 구현하도록 하겠습니다.

 

1. Command 모듈내 aggregate 패키지를 생성후 Aggregate 클래스 2개를 생성합니다.

 


2. Aggregate 클래스를 구현합니다.

 

HolderAggregate.java

@RequiredArgsConstructor
@Aggregate
public class HolderAggregate {
    @AggregateIdentifier
    private String holderID;
    private String holderName;
    private String tel;
    private String address;
}

 

AccountAggregate.java

@RequiredArgsConstructor
@Aggregate
public class AccountAggregate {
    @AggregateIdentifier
    private String accountID;
    private String holderID;
    private Long balance;
}    

 

Aggregate 클래스에는 클래스 위에 Aggregate임을 알려주는 Annotation을 추가합니다. 또한 Aggregate 별로 식별자가 반드시 존재해야되기 때문에 유일성을 갖는 대표키 속성에 @AggregateIdentifier을 추가합니다.

 

AxonFramework 에서는 모든 명령(Command)과 이벤트(Event)가 Aggregate에서 발생합니다. 따라서 Aggregate에 대한 명령과 이벤트를 처리할 수 있는 Handler 메소드 작성이 필요합니다. 또한 기본적으로 Event Sourcing 패턴을 사용하기 때문에 명령이 발생한 Event를 적용하는 단계가 필요합니다.

 

 

Handler는 대개 Aggregation 클래스에서 정의하며, 외부 클래스에서 별도 정의도 가능합니다. 데모에서는 Aggregate에서 정의하는 방법을 사용할 것이며 외부 정의 방식은 Axon 공식 문서를 참조 바랍니다.

 

AxonFramework를 사용함에 있어 주로 사용하는 Handler Annotation은 다음과 같습니다.

 

  • @CommandHandler : Aggregate에 대한 명령이 발생되었을 때 호출되는 메소드임을 알려주는 마커 역할
  • @EventSourcingHandler : CommandHandler에서 발생한 이벤트를 적용하는 메소드임을 알려주는 마커 역할
  • @EventHandler : Query 모델 혹은 이벤트 발생시 해당 이벤트를 적용하는 메소드임을 알려주는 마커 역할

HolderAggregation 클래스를 대상으로 계정 생성 명령(HolderCreationCommand)과 이로인해 발생하는 계정 생성 이벤트(HolderCreationEvent) 처리하는 Handler 메소드를 작성하면 다음과 같습니다.

 

HolderAggregation.java

@RequiredArgsConstructor
@Aggregate
public class HolderAggregate {
    @AggregateIdentifier
    private String holderID;
    private String holderName;
    private String tel;
    private String address;

    @CommandHandler
    public HolderAggregate(HolderCreationCommand command) {
        apply(new HolderCreationEvent(command.getHolderID(), command.getHolderName(), command.getTel(), command.getAddress()));
    }

    @EventSourcingHandler
    protected void createAccount(HolderCreationEvent event){
        this.holderID = event.getHolderID();
        this.holderName = event.getHolderName();
        this.tel = event.getTel();
        this.address = event.getAddress();
    }
}

 

소스코드를 보면 @CommandHandler@EventSourcingHandler 어노테이션이 추가된 것을 확인할 수 있습니다.

 

먼저 CommandHandler 메소드부터 살펴보겠습니다. 위 코드에서 CommandHandler는 생성자에 추가되었습니다. 이는 계정 생성 명령은 곧 HolderAggregate의 생성을 의미하는 것이기 때문입니다. 해당 메소드 안에 applyAggregateLifeCycle 클래스의 static 메소드이며, 해당 메소드를 통해서 이벤트를 발행합니다.

 

EventSourcingHandler 메소드는 CommandHandler에서 기존에 발행된 이벤트 및 현재 발생한 Command 이벤트를 적용하는 역할을 수행합니다.

 

 

위 그림은 HolderCreationCommand 명령이 발생되었을 때, 수행되는 내부 흐름을 간략하게 표현했습니다.

 

  1. 사용자로부터 Command 명령을 CommandGateway로 전달하면, 메시지 변환 과정(GenericCommandMessage)을 거쳐 CommandBus로 전달합니다.
  2. CommandBus는 Command 명령을 Axon Server로 전송합니다.
  3. AxonServer에서 명령을 Command Bus를 통해 해당 Command를 처리할 App에게 전달합니다.
  4. UnitOfWork(4~7 단계)를 수행합니다. 이 과정에서 Chain으로 연결된 handler 들을 거치면서 대상 Aggregate에 대하여 EventStore로부터 과거 이벤트들을 Loading 하여 최신 상태로 만듭니다. 이후 해당 Command와 연결된 Handler 메소드를 Reflection을 활용하여 호출합니다.
  5. CommandHandler 메소드를 호출하는 과정에서 apply 메소드 호출을 통해 이벤트(HolderCreationEvent)를 발행합니다.
  6. 발행된 Event는 내부 로직을 거치면서 Event 처리를 수행할 Handler를 매핑한 후 EventSourcingHandler 메소드를 Reflection을 활용하여 호출합니다.
  7. EventSourcingHandler 호출이 완료되면, EventBus를 통해 Event 발행을 요청합니다.(publish)
  8. EventBus는 이벤트를 Axon Server에 전달합니다.
  9. EventStore인 Axon Server에서는 전달받은 Event를 저장소에 기록합니다.
  10. 메시지 라우팅 기능을 담당하는 Axon Server는 연결된 App을 대상으로 Event를 전파합니다.

(※ AxonServer와 App간의 연결은 grpc를 사용합니다.)

위와 같이 간단하게 Handler 메소드 두개 작성했을 뿐인데, 내부 로직은 복잡합니다.

 

이번에는 AccountAggregate 클래스를 구현해보도록 하겠습니다.

 

AccountAggregate.java

@RequiredArgsConstructor
@Aggregate
public class AccountAggregate {
    @AggregateIdentifier
    private String accountID;
    private String holderID;
    private Long balance;

    @CommandHandler
    public AccountAggregate(AccountCreationCommand command) {
        apply(new AccountCreationEvent(command.getHolderID(),command.getAccountID()));
    }
    @EventSourcingHandler
    protected void createAccount(AccountCreationEvent event){
        this.accountID = event.getAccountID();
        this.holderID = event.getHolderID();
        this.balance = 0L;
    }
    @CommandHandler
    protected void depositMoney(DepositMoneyCommand command){
        if(command.getAmount() <= 0) throw new IllegalStateException("amount >= 0");
        apply(new DepositMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @EventSourcingHandler
    protected void depositMoney(DepositMoneyEvent event){
        this.balance += event.getAmount();
    }
    @CommandHandler
    protected void withdrawMoney(WithdrawMoneyCommand command){
        if(this.balance - command.getAmount() < 0) throw new IllegalStateException("잔고가 부족합니다.");
        else if(command.getAmount() <= 0 ) throw new IllegalStateException("amount >= 0");
        apply(new WithdrawMoneyEvent(command.getHolderID(), command.getAccountID(), command.getAmount()));
    }
    @EventSourcingHandler
    protected void withdrawMoney(WithdrawMoneyEvent event){
        this.balance -= event.getAmount();
    }
}

 

코드를 보면 모든 예외 처리 및 유효성 검증CommandHandler에서 하고 있습니다. 이는 EventStore에 적재된 모든 Event는 재생해야할 대상이기 때문에 EventSourcingHandler에서는 Replay만 수행합니다. 따라서 CommandHandler에서 사전 Exception 처리 및 유효성 검증을 통해서 검증된 Event만을 발행해야합니다.


5. Axon Server 라우팅 기능(Command)

지금까지 AxonFramework를 사용하는 Client 입장에서 동작 원리를 살펴봤습니다. 이번에는 Server 입장에서 메시지 라우팅이 어떻게 이루어지는지 살펴보도록 하겠습니다.

 

 

상황 1. Command를 처리하는 Handler가 하나만 Axon Server에 등록된 경우

 

 

Application 기동시 AxonServer와 연결을 시도합니다. 연결이 완료되면, 해당 App은 자신이 처리가능한 Command Handler 정보를 Server에 등록합니다. 

 

이때 다른 App에서 Command 명령을 요청하게되면 AxonServer에서는 해당 Command를 수행할 수 있는 App을 알기 때문에 해당 App으로 Command 명령을 전달합니다.


상황2. 동일한 Command를 처리하는 Handler가 복수개 등록된 경우

 

 

동일한 Command A를 처리할 수 있는 Handler 메소드를 포함하는 App이 복수개로 등록되었을 경우 내부 흐름은 다음과 같습니다.

 

Axon Server에서는 Command가 도착할 경우 어떤 App에서 수행해야할지를 결정 해야합니다. 따라서 이를 위해 라우팅 테이블에 두 App의 정보를 등록합니다.

 

라우팅 테이블에는 어떤 노드들이 Server와 연결되어있고, 해당 노드들이 어떤 Command를 처리할 수 있는지에 대한 정보가 담겨있습니다. 내부 아키텍처는 Consistenet Hashing 기법을 사용하고 있으며, 관련 설명은 charsyam 님 블로그를 참고바랍니다.

 

 

이러한 상황에서 새로운 App에서 A Command를 요청했다면, 해당 요청 속에 포함된 라우팅 키를 찾아 라우팅 테이블에서 적합한 App을 선정하여 호출하게 됩니다. 그렇기에 Client Side 모델 데이터가 Sharding 되어있거나 고가용성을 위해 Cluster로 App을 구축했더라도 Command 명령은 정확히 하나의 App Command Handler에만 전달됩니다.

 

라우팅 키는 @TargetAggregateIdentifier 혹은 @RoutingKey 어노테이션을 Command에 포함시에 자동으로 생성됩니다.


6. 마치며

이번 포스팅에서는 Event, Command, Aggregate 모델을 구현했습니다. 다음 시간에는 API 구현 및 테스트를 통해서 실제 동작하는 과정에대해 알아보도록 하겠습니다.


Tip)

 

1. AxonServer에 저장된 Event 내역은 DashBoard에서 Search 항목을 누르면 조회할 수 있습니다. 

 


2. Dashboard Commands 탭에서는 등록된 Command Handler 정보 및 현재 수행중인 Command 발생 빈도를 확인할 수 있습니다.

 


3. Command 명령이 발생하면 내부적으로는 몇차례 Command 메시지 변환 과정을 거쳐 부가적인 정보가 추가됩니다. 

 

1단계

Command :
"CreateHolderCommand(
  holderID='1f2cf247-afe7-46d6-bc6d-d588643d6643', 
  holderName= 'kevin', 
  tel='1234-5678', 
  address='서울시')"
callback: 
FailureLoggingCallback@12115

2단계 (GenericMessage 변환후 메시지)

commandMessage:
"GenericCommandMessage{
	payload={
	CreateHolderCommand(
  		holderID='1f2cf247-afe7-46d6-bc6d-d588643d6643', 
  		holderName= 'kevin', 
		tel='1234-5678', 
		address='서울시'),
	metadata={},
	messageIdentifier = '040ffa0c-5d2d-4588-a04c-8051867d4057',
	commandName ='com.cqrs.command.commands.CreateHolderCommand}'"
commandCallback: 
FailureLoggingCallback@12115

 

기본 Command 정보 외 message 식별자 및 Command 패키지 정보 등이 포함되어 있는 것을 확인할 수 있습니다. 참고로 해당 메시지는 CommandGateway의 Send API를 사용했을 때의 메시지 내용입니다.

1. 서론

이번 시간에는 지난 포스팅에 이어서 AxonFramework 구성AxonServer 연동에 대해 다루도록 하겠습니다. 진행하기 앞서 몇가지 사전 작업이 필요합니다.

 


2. AxonFramework 설정

 

1. Command 모듈 build.gradle 파일을 엽니다. 이후 아래와 같이 의존성을 추가합니다.

 

 

추가된 의존성 중 AxonFramework와 직접 연관된 항목은 다음과 같습니다.

ext{
    axonVersion = "4.2.1"
}

dependencies{
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
    implementation group: 'org.axonframework', name: 'axon-configuration', version: "$axonVersion"
}

2. lombok 사용을 위해 Annotation Processing을 활성화 합니다.

(File > Settings > Build, Execution, Deployment > Compiler > Annotation Processors)

 


3. 환경설정을 위해 resources 폴더 밑에 application.yml 파일을 생성합니다. 이후 다음과 같이 작성합니다.

 

 

server:
  port: 8080

spring:
  application:
    name: eventsourcing-cqrs-command
  datasource:
    platform: postgres
    url: jdbc:postgresql://localhost:5432/command
    username: command
    password: command
    driverClassName: org.postgresql.Driver
  jpa:
    hibernate:
      ddl-auto: update

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124


4. Axon Server 연동 테스트를 위해 CommandApplication 클래스에 CommandHandler 메소드를 추가합니다.

(※ 연동 테스트 이후 해당 메소드는 삭제합니다.)

 


5. Command 모듈을 실행시킵니다. App이 정상적으로 실행되고, AxonServerCommandBus가 할당된 것을 확인합니다.

 


6. Axon Server 대시보드(http://localhost:8024) Overview 화면에서 Command App과 연결된 토폴로지를 확인합니다.

 

 


7. Query 모듈 설정을 위해 build.gradle 파일을 엽니다. 이후 Command 모듈 build.gradle과 동일하게 의존성을 추가합니다.

(※ 만약 Read Model을 MongoDB로 사용할 경우 Mongo DB 관련 의존성을 추가합니다.)

 


8. Query 모듈 resources 폴더 밑에 application.yml 파일을 생성합니다. 이후 다음과 같이 작성합니다.

(※ Command 모듈과 조금씩 다른 부분에 주의합니다.)

 

 

server:
  port: 9090

spring:
  application:
    name: eventsourcing-cqrs-query
  datasource:
    platform: postgres
    url: jdbc:postgresql://localhost:5432/query
    username: query
    password: query
    driverClassName: org.postgresql.Driver
  jpa:
    hibernate:
      ddl-auto: update

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

9. Axon Server 연동 테스트를 위해 QueryApplication 클래스에 EventHandler 메소드를 추가합니다.

(※ Command와 마찬가지로 연동 테스트 이후 해당 메소드는 삭제합니다.)

 


10. Query 모듈을 실행합니다. App이 정상적으로 실행되고, AxonServer 연결과 이벤트 추적을 위한 TrackingEventProcessor가 할당된 것을 확인합니다.

 


11. Axon Server 대시보드(http://localhost:8024) Overview 화면에서 기존에 연결된 Command App과 새로 연결된 Query App 토폴로지를 확인합니다.

 

 


12. 테스트를 위해 Command, Query 모듈에 작성한 핸들러 메소드를 삭제합니다.


3. 마치며

이번 포스팅을 통해서 AxonFramework의 기본 설정 및 Axon Server와의 연동을 살펴보았습니다. 하지만 중간에 등장한 CommandHandler, EventHandler, CommandBus, TrackingEventProcessor 등의 용어는 아직 생소합니다. 다음 포스팅부터는 본격적으로 Command, Query 프로그램 작성하면서 위 개념등을 살펴보겠습니다.

1. 서론

 

CQRS 구성을 위해 일반적으로 Command, Query 두가지 Application을 구성합니다. 이때 Application 사이 매개체 역할을 Event가 담당합니다. 따라서 Event 정보를 알기 위해서는 Event 정보가 두 App에 모두 포함되어야 합니다.

 

 

일반적인 프로젝트 구조라면 동일한 소스 코드가 두 App에 모두 존재해야되므로 Event의 변경사항이 있을 때 양쪽 Application의 구조를 바꿔야합니다. 더군다나 Axon에서는 Event 클래스의 패키지 구조가 동일해야되는 제약사항이 존재합니다. 따라서 이러한 문제를 해결하기 위한 다양한 방법중 Gradle을 활용해서 MultiProject 구성을 하고자 합니다. 

 

 

즉 Event 클래스만 모은 공통 모듈을 작성하고, Command, Query에서 이를 참조하도록 구성합니다. 이렇게 별도 모듈로 분리함으로서 변경 사항 발생시 공통 모듈에만 변경을 가하면 양쪽 App에 적용되므로 소스 관리가 용이해집니다.

 

자세한 내용은 Gradle 공식 문서를 참조하시기 바라며, 이번 포스팅에서는 기본적인 Gradle Multi Project 구성 방법에 대해서 알아보겠습니다.

 

참고 블로그

와이케이 마구잡이님 블로그

jojoldu님 블로그


2. Gradle 프로젝트 생성

 

1. IntellJ에서 Create New Project 를 클릭합니다.

 


2. Spring 프로젝트를 만들기 위해서 Spring Initializr를 선택합니다. 이후 Java SDK 버전 선택한 다음 Next 버튼을 눌러 다음 단계로 진행합니다.

 


3. Gradle 프로젝트 생성을 위해 Type을 Gradle Project로 설정합니다. 이후 Group과 Artifact를 본인 프로젝트 구성에 맞게 기입하니다. 마지막으로 Java version을 PC에 설치된 Java 버전과 동일하게 설정 후 Next 버튼을 선택합니다.

 


4. 의존성은 나중에 별도 추가할 예정이므로 Next 버튼을 눌러 다음 단계로 이동합니다.

 


5. Project 이름 설정 후, Finish 버튼을 선택합니다.

 


6. Gradle 설정 화면에서 특별하게 변경해야할 사항이 없다면 기본 설정 상태에서 OK 버튼을 선택합니다.

 


7. 의존성이 정상적으로 추가되면 아래 이미지 하단과같이 sync가 정상적으로 이루어짐을 확인할 수 있습니다. 지금 생성한 프로젝트는 root 프로젝트이므로 src 폴더 전체를 선택 후 삭제합니다.

 

 


3. Multi Module 구성하기

 

1. 프로젝트내 모듈은 3가지(Command, Query, Common)입니다. 따라서 이를 구성하기 위해서 root 프로젝트 내 settings.gradle 파일을 연 후에 아래 이미지와 같이 sub module명을 기입합니다.

 


2. 이제부터 프로젝트 구성을 위해서 구조 변경이 필요합니다. 먼저 root 프로젝트에 있는 build.gradle 파일을 엽니다. 이후 AS-IS로 되어있는 구조를 TO-BE 형태로 변경합니다.

 

AS-IS

plugins {
    id 'org.springframework.boot' version '2.2.2.RELEASE'
    id 'io.spring.dependency-management' version '1.0.8.RELEASE'
    id 'java'
}

group = 'com.cqrs'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

repositories {
    mavenCentral()
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter'
    testImplementation('org.springframework.boot:spring-boot-starter-test') {
        exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
    }
}

test {
    useJUnitPlatform()
}

 

TO-BE

buildscript {
    ext {
        springBootVersion = '2.2.2.RELEASE'
    }
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
    }
}

allprojects {
    group = 'com.cqrs'
    version = '0.0.1-SNAPSHOT'
}

subprojects {
    apply plugin: 'org.springframework.boot'
    apply plugin: 'io.spring.dependency-management'
    apply plugin: 'java'

    sourceCompatibility = '11'

    repositories {
        mavenCentral()
    }

    dependencies {
        testImplementation('org.springframework.boot:spring-boot-starter-test') {
            exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
        }
    }

    task initSourceFolders {
        sourceSets*.java.srcDirs*.each {
            if (!it.exists()) {
                it.mkdirs()
            }
        }
        sourceSets*.resources.srcDirs*.each {
            if (!it.exists()) {
                it.mkdirs()
            }
        }
    }
}

project(':command') {
    dependencies {
        compile project(':common')
    }
}

project(':query') {
    dependencies {
        compile project(':common')
    }
}

 

변경한 build.script 내용을 설명하면 다음과 같습니다.

 

buildscript 블록 : 나머지 스크립트를 빌드하는 과정에서 필요한 외부 라이브러리를 classpath에 추가하는 기능을 담당합니다. subprojects 내에서 플러그인 적용(apply plugin)이 가능한 이유 또한 buildscript를 통해 라이브러리를 classpath에 추가시켰기 때문입니다.

 

allprojects 블록 :  root 프로젝트(demo)와 하위 프로젝트(command, query, common)에 모두 적용되는 빌드 스크립트 기준을 작성합니다.

 

subprojects 블록 : 하위 프로젝트(command, query, common)에만 적용되는 빌드 스크립트 기준을 작성합니다.

  • sourceCompatibility : java 버전을 명시합니다.
  • repositories : 저장소 설정을 담당합니다. 
  • initSourceFolders task : sub module별로 기초 디렉터리가 존재하지 않으면, 자동 생성해주도록 설정합니다.

 

projects 블록 : Command, Query App은 빌드시에 공통 모듈(Common)이 포함되어야 함으로 빌드시에 추가하도록 설정합니다.

(※ : 가 들어간 이유는 Root 프로젝트 기준으로 각 모듈은 한단계 아래 계층에 존재하기 때문에 이를 구별하기 위함입니다.)


3. intellij 우측 gradle 탭을 엽니다. 이후 root 프로젝트 > Tasks > build > build를 더블클릭하여 build를 시도합니다. build 수행하면 root 프로젝트에 src 폴더를 지웠기 때문에 build 실패가 발생하지만, 좌측탭에 command, common, query 폴더가 생긴 것을 확인할 수 있습니다.

 


4. sub module 폴더 내에 build.gradle 파일을 생성합니다.


5. Command, Query App에서는 Spring Web MVC를 사용하기 때문에 build.gradle에 의존성을 추가합니다.

 


6. Command 모듈에서 패키지 생성을 위해 src > main > java 디렉토리 선택 후 [Alt + Insert] 키를 누릅니다. 이후 package 탭을 선택합니다.

 


7. 임의의 package 명을 입력한 후 OK 버튼을 누릅니다.

 


8. 생성된 package 하위에 App 실행을 위한 main 클래스를 작성합니다.

 


9. App을 구동하여 정상 작동하는지 확인합니다.

 


10. 5~9번 작업을 query 모듈에도 반복합니다.

 

 

위와 같이 정상적으로 수행된다면 Multi Project 기초 구성은 끝났습니다.


4. 마치며

이번 포스팅에서는 각 모듈별 기초적인 의존성 추가 및 모듈 구성을 했습니다. 다음 포스팅에서는 데모 프로젝트 진행을 위해서 각 모듈별 필요한 의존성 추가 및 코드 구현을 본격적으로 하겠습니다.

 

1. 서론

 

Axon Server는 이벤트 저장소인 EventStore, 어플리케이션 간의 Message 전달하는 역할을 수행합니다.

하지만 AxonFramework를 도입하는데 있어 필수 사항은 아닙니다. AxonIQ에서는 EventStore와 Message Broker를 다른 제품군으로 대체할 수 있도록 지원합니다.

 

따라서 비즈니스 환경에 맞게 취사선택이 가능합니다.

 

AxonIQ에서 제공하는 외부 모듈은 다음과 같으며, 예제 혹은 소스 파일은 깃헙에서 확인하실 수 있습니다.

 

  • Kafka
  • JGroups
  • Spring Cloud
  • Kotlin
  • Mongo
  • AMQP
  • Tracing

저는 AxonFramework + AxonServer를 사용하여 포스팅을 진행하겠습니다.


2. AxonServer 설치

 

1. AxonIQ 홈페이지에 접속후에 Download 버튼을 클릭합니다.

 

 

2. 메일 주소 입력 후에 Download 버튼을 클릭합니다.

 

 

 

 

3. AxonQuickStart.zip 파일을 원하는 위치로 다운로드 후 압축을 풀어줍니다.

 

 

 

4. 압축푼 경로 기준으로 axonquickstart-4.2.2\AxonServer 위치로 이동합니다.

 

 

 

5. 아래 표시된 파일이 우리가 구동해야할 AxonServer 입니다. 생각보다 너무 간단하죠?

 

 

 

6. jar 파일 실행을 위해 도스창을 열도록 하겠습니다. [WINDOW + R] 키를 동시에 누른 후 cmd를 입력합니다. 그리고 확인 버튼을 눌러줍니다.

 

 

 

7. AxonServer 파일 위치로 이동하기 위해서 탐색기 상단의 주소를 복사합니다.

 

 

 

8. 도스창 cd 명령어를 이용하여 Axonserver 위치로 이동합니다.

 

 

9. jar 명령어를 사용하여 Axonserver를 구동합니다.

 

 

 

10. 아래와 같은 화면이 나온다면 정상적으로 실행된 것입니다.

 

참고사항(기본 설정 시, Default 포트 매핑)

 - 메시지 라우팅 : 8124

 - Dashboard : 8024

 

 

11. 정상 수행 확인을 위해 브라우저를 열고, 대시보드 페이지로 접속합니다.

 


3. 마치며

 

AxonFramework를 사용하기 위한 기초 단계 작업을 마쳤습니다.

다음 포스팅부터는 에제 프로젝트 실습을 통해 하나하나씩 개념을 익혀보도록 하겠습니다.

 


Tip)

AxonServer 위치에 axonserver.properties 파일 생성하게 되면 default로 제공되는 속성을 변경할 수 있습니다.

변경 가능한 속성은 Axon 공식 문서를 참고하시기 바랍니다.

 

 

참고로 저는 Event 테스트 후 데이터 삭제를 위해 axoniq.axonserver.devmode.enabled=true 설정하여 사용하고 있습니다.

 

 

개발 모드 적용 후 AxonServer를 기동하게되면, 위 화면과 같이 Development Mode가 활성화되며 Reset Event Store 버튼이 생긴 것을 확인할 수 있습니다.

1. 개요

 

앞으로 진행될 포스팅은 MSA에 관심 많은 분을 대상으로 DDD, CQRS 및 Event Sourcing 내용을 알고 있다고 가정하고, Spring 환경에서 AxonFramework를 활용해 개념 구현하는 방법에 대해 소개하고자 합니다. 

 

만약 CQRS와 Event Sourcing에 대해서 궁금하다면 아래 블로그 및 Spring Camp 2017 영상을 참고하시면 좋을것 같습니다.

 

CQRS 소개

CQRS 

 

이벤트 소싱 소개

- 이벤트 소싱(이론부) 영상

- 이벤트 소싱(구현부) 영상

- 이벤트 소싱 소개 블로그

 

 

EventSourcing, CQRS 관련하여 Java 진영에서 사용되는 프레임워크를 검색한 결과, 크게 AxonFramework랑 Eventuate 두 가지를 주로 사용되고 있었습니다.

 

저는 그 중 대중적이고 Spring Boot에 친화적인(?) AxonFramework를 선정하여 공부한 흔적을 남겨보고자 합니다. 포스팅 중간 AxonIQ 기술 블로그 및 웨비나 자료 첨부는 원저작자에게 사용허가를 받았음을 알립니다.


2. Axon Framework 소개

 

출처 : https://youtu.be/GEN0jRkyEtU

 

Axon Framework는 2010년 최초 프로젝트가 시작되었으며, 2015년 이전까지는 관심도가 미비하였지만 이후 MSA가 열풍을 불게되면서 다운로드 수가 폭발적으로 증가하였습니다. 지금은 대략 월간 10만건 정도의 다운로드 수를 기록하고 있으며, 앞으로 점점 더 증가할 것으로 기대하고 있습니다.

 

해당 제품은 오픈소스로써 네덜란드에서 설립된 AxonIQ 회사에서 개발을 주도하고 있습니다. 주력 제품으로는 DDD, EventSourcing, CQRS를 구현할 수 있는 AxonFramework와 EventStore, 마이크로 서비스간 메시지 Routing을 담당하는 Axon Server 입니다.

(※ Axon Server는 Enterprise 버전이 별도로 있으며, 구매시 Cluster 구성 및 Security 설정 등이 가능합니다.)

 

출처 : https://axoniq.io/

 

프레임워크의 아키텍처는 대략 아래와 같습니다.

 

출처 : https://docs.axoniq.io/reference-guide/architecture-overview

 

 

기본적으로 CQRS 구조를 따르고 있습니다. 즉 외부로부터 명령(Command)이 들어오면, 해당 명령에 대한 이력을 EventStore에 저장하고 동시에 Event를 발생시킵니다.

 

발생된 이벤트는 EventHandler를 통하여 Read Model에 반영되고, 사용자 입장에서는 Read Model에 대한 Query를 통하여 데이터를 읽는 구조입니다.

(※ 사용자 편의에 따라 Command와 Query Model은 동일 DBMS에 설정할 수도 분리할 수도 있으며, 종류 또한 다르게 구성할 수 있습니다.)

 

 

 

앞으로 IntelliJ IDE를 사용해서 Spring Boot 기반 AxonFramework을 활용하여 EventSourcing, CQRS, Saga 등을 구현하는 방법에 대해서 포스팅을 진행하려고 합니다.

 

AxonFramework가 DDD(Domain Driven Development), EDD(Event Driven Development)에 기반하고 있으나 샘플코드를 간략하게 작성하기 위해서 DDD스러움은 배제하고(개발 실력, 도메인 지식 부족으로 인한 getter 남용 등....) Axon 제공 기능에 집중하여 소개하도록 하겠습니다.


향후 다룰 주제

  • AxonServer 기본 사용법
  • Gradle을 활용한 Multi Project 설정
  • AxonFramework 프로젝트 기본 설정
  • AxonFramework 데모 프로젝트 구현(계좌 입출금)
  • 기본 아키텍처 소개(Command, Query, Event)
  • Replay
  • Event Tracking Architecture
  • EventStore
  • Version
  • Saga

+ Recent posts