1. 서론

 

 

이번 시간에는 Query 기능 중 Point to Point, Subscription 기능을 구현합니다. 또한, Query 결과를 보기 위하여 Client 화면을 간략하게 만들겠습니다.

 

Client 화면은 크게 Point to Point Query와 Subsciprtion Query를 조회하는 화면 2개를 분할하였으며, 화면 호출 URL은 다음과 같습니다.

 

Point to Point : http://localhost:9090/p2p

Subscription : http://localhost:9090/subscription

 

 

 

Subscription 에서는 조회를 누르면 Server와의 Connection이 설정되므로 이를 해제하기 위한 종료 버튼을 추가하였습니다. 조회 버튼을 누르게되면, Server API를 호출합니다. 두 API 주소는 다음과 같습니다.

 

 

Point to Point : http://localhost:9090/account/info/{id}

Subscription : http://localhost:9090/account/info/subcription/{id}

 

 

이제 본격적으로 기능 구현을 진행하겠습니다.


2. Point to Point Query

Query를 처리하는 Handler가 하나만 존재하고, 한번만 질의만 하면되는 상황이라면 Point to Point Query가 적합합니다.

해당 기능 구현을 통해 사용방법을 알아보겠습니다.

 

1. Query 모듈 build.gradle 파일을 엽니다.

 

 


2. 화면 구현을 위하여 thymeleaf 의존성을 추가합니다.

 

build.gradle

dependencies{
    (...중략...)
    implementation 'org.springframework.boot:spring-boot-starter-thymeleaf'
}

 


3. 화면 호출을 위하여 Query 모듈 Controller 패키지에 WebController 클래스를 추가합니다.

 

 


4. 클래스 내용을 구현합니다.

 

WebController.java

@Controller
public class WebController {
    @GetMapping("/p2p")
    public void pointToPointQueryView(){}
}

 

Controller에서 http://localhost:9090/p2p URL 호출 시 p2p.html 파일을 전달하도록 지정합니다.


5. 화면 구현을 위해서 Query 모듈 resources 패키지 하위에 templates 패키지 및 p2p.html 파일을 생성합니다.

 


6. html 내용을을 구현합니다.

 

p2p.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>PointToPoint Query Example</title>
</head>
<script>
    window.addEventListener("DOMContentLoaded", function (){
        (function () {
            let appendDiv = document.getElementById("layout");
            let text = document.getElementById("holderInput");
            let pElem = document.createElement("p");
            document.getElementById("wrapper").addEventListener("click", append);

            function append(e) {
                let target = e.target;
                let callbackFunction = callback[target.getAttribute("data-cb")];
                callbackFunction();
            }

            let callback = {
                "search": (function () {
                    let holderId = text.value;
                    if (holderId === undefined || holderId === null || holderId ==="") {
                        alert("소유주를 입력하시오.");
                    } else {
                        let xhr = new XMLHttpRequest();
                        xhr.open('GET','http://localhost:9090/account/info/'+holderId, true);
                        xhr.send();
                        xhr.onload = function(){
                            if(xhr.status === 200){
                                let elem = pElem.cloneNode();
                                elem.innerText = xhr.responseText;
                                appendDiv.appendChild(elem);
                            }
                        }
                    }
                })
            }
        }());
    });
</script>
<body>
<div id="wrapper">
    <input type="button" data-cb="search" value="조회"/>
</div>
<input type="text" id="holderInput" placeholder="소유주 ID를 입력하시오.">
<div id="layout"/>
</body>
</html>

 

위 코드 내용 중 가장 핵심이 되는 로직은 callback 객체입니다. 구현 내용은 비동기로 Query를 수행하는 API에 소유주 정보를 인자로 요청하면, 해당 내용을 수신받아 화면에 표시합니다.

 


7. 화면이 정상적으로 출력되는지 확인하기 위하여, Query App을 기동합니다. 이후 웹브라우저(Chrome)을 열고 화면 호출 테스트를 수행합니다.(http://localhost:9090/p2p)

 

 


8. 테스트가 완료되었으면, Query를 수행할 API 내용을 구현하겠습니다. 먼저 Query 모듈 service 패키지내 QueryService 인터페이스를 엽니다.

 


9. Query 수행을 위한 메소드를 정의합니다.

 

QueryService.java

public interface QueryService {
    void reset();
    HolderAccountSummary getAccountInfo(String holderId);
}

10. Service 구현을 위하여 Query 모듈 service 패키지내 QueryServiceImpl 클래스를 엽니다.

 


11. Interface에 정의된 메소드를 구현합니다.

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Slf4j
@Service
public class QueryServiceImpl implements QueryService {
	(...중략...)
    @Override
    public HolderAccountSummary getAccountInfo(String holderId) {
        AccountQuery accountQuery = new AccountQuery(holderId);
        log.debug("handling {}", accountQuery);
        return queryGateway.query(accountQuery, ResponseTypes.instanceOf(HolderAccountSummary.class)).join();
    }

}

12. API End Point 설정을 위해 controller 패키지내 위치한 HolderAccountController 클래스를 엽니다.

 

 


13. API End Point를 추가합니다.

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    private final QueryService queryService;

	(...중략...)
    @GetMapping("/account/info/{id}")
    public ResponseEntity<HolderAccountSummary> getAccountInfo(@PathVariable(value = "id") @NonNull @NotBlank String holderId){
        return ResponseEntity.ok()
                             .body(queryService.getAccountInfo(holderId));
    }

}

 

 


14. QueryGateway로 전달된 Query를 처리하는 Handler 작성을 위해 Query 모듈 projection 패키지 하위 HolderAccountProjection 클래스를 엽니다.

 

 


15. HolderAccountProjection 클래스에서 QueryHandler 메소드를 구현합니다.

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
    (...중략...)

    @QueryHandler
    public HolderAccountSummary on(AccountQuery query){
        log.debug("handling {}", query);
        return repository.findByHolderId(query.getHolderId()).orElse(null);
    }
}

16. Query App을 기동합니다. 이후 EventStore에 저장된 HolderID 중 하나를 선택하여 입력창에 기입합니다. 조회 버튼을 눌러 정상적으로 조회되는지 확인합니다.

 

테스트 결과, Read Model에 저장된 데이터가 정상적으로 출력되는 것을 확인할 수 있습니다.


3. Subscription Query

 

 

Subscription Query는 Client로부터 Connection을 연결하면, 이를 해제하지 않고 유지합니다. Query를 처리하는 Hanlder App에서는 초기 결과를 최초에 반환합니다. 이때 Flux 타입으로 반환하며, QueryUpdateEmitter를 통해서 Read Model의 변경이 있을 때마다 수신 받습니다.

 

데모 프로젝트에서는 SSE(Server Sent Event) 방식으로 구현하기 위해 Client 화면에서는 EventSource 객체를 사용하겠습니다.

 


1. Query 모듈 build.gradle 파일을 엽니다.

 

 


2. Flux 사용을 위하여 reactor-core 의존성을 추가합니다.

 

build.gradle

dependencies{
    (...중략...)
    implementation group: 'io.projectreactor', name: 'reactor-core'
}

3. 화면 호출을 위하여 Query 모듈 Controller 패키지에 WebController 클래스를 추가합니다.

 


4. 클래스 내용을 구현합니다.

 

WebController.java

@Controller
public class WebController {
   (...중략...)
    @GetMapping("/subscription")
    public void subscriptionQueryView(){}
}

 

Controller에서 http://localhost:9090/subscription URL 호출 시 subscription.html 파일을 전달하도록 지정합니다.


5. 화면 구현을 위해서 Query 모듈 resources 패키지 하위에 templates 패키지 및 subscription.html 파일을 생성합니다.

 


6. html 내용을을 구현합니다.

 

subscription.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Subscription Query Example</title>
</head>
<script>
    window.addEventListener("DOMContentLoaded", function (){
        (function () {
            let appendDiv = document.getElementById("layout");
            let text = document.getElementById("holderInput");
            let eventSource = undefined;
            let pElem = document.createElement("p");
            document.getElementById("wrapper").addEventListener("click", append);

            function append(e) {
                let target = e.target;
                let callbackFunction = callback[target.getAttribute("data-cb")];
                callbackFunction();
            }

            function closeEventSource() {
                eventSource.close();
                eventSource = undefined;
            }

            let callback = {
                "search": (function () {
                    let holderId = text.value;
                    if (eventSource !== undefined) {
                        closeEventSource();
                    }

                    if (holderId === undefined || holderId === null || holderId === "") {
                        alert("소유주를 입력하시오.");
                    } else {
                        eventSource = new EventSource('/account/info/subscription/' + holderId);
                        eventSource.onopen = function () {
                            console.log("connected");
                        };
                        eventSource.onmessage = function (event) {
                            let elem = pElem.cloneNode();
                            elem.innerText = event.data;
                            appendDiv.appendChild(elem);
                        };
                        eventSource.onerror = function () {
                            console.error("Connection error has occurred");
                            closeEventSource();
                        }
                    }
                }),
                "disconnect": (function () {
                    if (eventSource !== undefined) {
                        console.log("disconnected");
                        closeEventSource();
                    }
                })
            }
        }());
    });
</script>
<body>
<div id="wrapper">
    <input type="button" data-cb="search" value="조회"/>
    <input type="button" data-cb="disconnect" value="종료"/>
</div>
<input type="text" id="holderInput" placeholder="소유주 ID를 입력하시오.">
<div id="layout"/>
</body>
</html>

 

조회 버튼을 누르면, EventSource 객체를 생성하여 Server Sent Event를 수신받으며, 메시지가 전달되면 수신된 데이터를 화면에 출력하도록 구현하였습니다.


7. 화면이 정상적으로 출력되는지 확인하기 위하여, Query App을 기동합니다. 이후 웹브라우저(Chrome)을 열고 화면 호출 테스트를 수행합니다.(http://localhost:9090/usbscription)

 

 


8. 테스트가 완료되었으면, Query를 수행할 API 내용을 구현하겠습니다. 먼저 Query 모듈 service 패키지내 QueryService 인터페이스를 엽니다.

 


9. Query 수행을 위한 메소드를 정의합니다.

 

QueryService.java

public interface QueryService {
    (...중략...)
    Flux<HolderAccountSummary> getAccountInfoSubscription(String holderId);
}

10. Service 구현을 위하여 Query 모듈 service 패키지내 QueryServiceImpl 클래스를 엽니다.

 


11. Interface에 정의된 메소드를 구현합니다.

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Slf4j
@Service
public class QueryServiceImpl implements QueryService {
    (...중략...)
    @Override
    public Flux<HolderAccountSummary> getAccountInfoSubscription(String holderId) {
        AccountQuery accountQuery = new AccountQuery(holderId);
        log.debug("handling {}", accountQuery);

        SubscriptionQueryResult<HolderAccountSummary, HolderAccountSummary> queryResult = queryGateway.subscriptionQuery(accountQuery,
                ResponseTypes.instanceOf(HolderAccountSummary.class),
                ResponseTypes.instanceOf(HolderAccountSummary.class)
        );

        return Flux.create(emitter -> {
            queryResult.initialResult().subscribe(emitter::next);
            queryResult.updates()
                    .doOnNext(holder -> {
                        log.debug("doOnNext : {}, isCanceled {}", holder, emitter.isCancelled());
                        if (emitter.isCancelled()) {
                            queryResult.close();
                        }
                    })
                    .doOnComplete(emitter::complete)
                    .subscribe(emitter::next);
        });
    }
}

 

위 코드 구현 내용은 최초에 initalResult 생성 후에, 지속적으로 updates 메소드를 통해 Stream 데이터를 전달받아 Client에게 전달합니다. 만약 중간에 Connection이 실패하게되면, 해당 Flux를 종료하도록 구현하였습니다.


12. API End Point 설정을 위해 controller 패키지내 위치한 HolderAccountController 클래스를 엽니다.

 

 


13. EndPoint를 추가합니다.

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    (...중략...)

    @GetMapping("account/info/subscription/{id}")
    public ResponseEntity<Flux<HolderAccountSummary>> getAccountInfoSubscription(@PathVariable(value = "id") @NonNull @NotBlank String holderId){
        return ResponseEntity.ok()
                             .body(queryService.getAccountInfoSubscription(holderId));
    }
}

14. Subscription에서는 Read Model에 변경이 발생되었을 때 이를 전파해야합니다. 따라서 이를 작성하기 위해  Query 모듈 projection 패키지 하위 HolderAccountProjection 클래스를 엽니다.

 

 


15. EventSourcingHandler를 통해 ReadModel의 변화가 발생하였을 때, QueryUpdateEmitter 클래스를 통해 이벤트 변경 내용을 전파하도록 클래스 내용을 수정합니다.

 

HolderAccountProjection.java

@Component
@EnableRetry
@AllArgsConstructor
@Slf4j
@ProcessingGroup("accounts")
public class HolderAccountProjection {
    private final AccountRepository repository;
    private final QueryUpdateEmitter queryUpdateEmitter;

    (...중략...)

    @EventHandler
    @AllowReplay
    protected void on(DepositMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() + event.getAmount());

        queryUpdateEmitter.emit(AccountQuery.class,
                query -> query.getHolderId().equals(event.getHolderID()),
                holderAccount);

        repository.save(holderAccount);
    }
    @EventHandler
    @AllowReplay
    protected void on(WithdrawMoneyEvent event, @Timestamp Instant instant){
        log.debug("projecting {} , timestamp : {}", event, instant.toString());
        HolderAccountSummary holderAccount = getHolderAccountSummary(event.getHolderID());
        holderAccount.setTotalBalance(holderAccount.getTotalBalance() - event.getAmount());

        queryUpdateEmitter.emit(AccountQuery.class,
                query -> query.getHolderId().equals(event.getHolderID()),
                holderAccount);

        repository.save(holderAccount);
    }

	(...중략...)
}

 

각 Handler안에 queryUpdateEmitter를 통해 구독중인 Query와 동일한 ID의 Event가 들어오면, Query 결과에 전달되도록 처리하였습니다.


16. Query App을 기동합니다. 이후 EventStore에 저장된 HolderID 중 하나를 선택하여 입력창에 기입합니다. 조회 버튼을 눌러 정상적으로 조회되는지 확인합니다.

 

위 예제에서 holderId가 924eb0ab-c35f-4d3e-b753-a4ce35bd7c27인 계좌 전체의 잔고는 현재 290입니다. Update가 정상 수신되는지 확인하기 위하여, 해당 소유주가 보유한 계좌에서 5원을 인출하는 API를 호출합니다.

 

 


17. 5원 인출 후 Client 화면에서 변경된 데이터가 정상 수신되었는지 확인합니다.

 

확인 결과, 정상적으로 데이터 수신되었음을 확인할 수 있습니다.


18. 종료 버튼을 눌러 구독을 중지합니다. 이후 924eb0ab-c35f-4d3e-b753-a4ce35bd7c27 소유주가 보유한 계좌에서 추가로 5원 인출하였을 때, Query 결과가 화면에 표시되지 않음을 확인합니다. 화면의 변화가 없으면, 정상적으로 Connection이 종료된 것입니다.


4. 마치며

이번 시간에는 Point to Point, Subscription Query에 대해서 살펴보았습니다. Subscription Query는 반환 형태가 Flux 형태다보니 아무래도 Spring MVC에서는 사용하기 힘든 부분이 있을듯 합니다. 또한, 이를 지원하기 위해서는 Client에서도 SSE를 위한 구현이 필요하며, Server에서는 Client와 Connection 유지를 위해 Subscription Query가 증가할 수록 그에 상응하는 Thread 수가 증가합니다. 따라서 비즈니스 요건에 맞게 적절한 사용이 필요하며, Spring Webflux를 사용한다면 도입을 검토해볼 수 있을 것 같습니다.

 

+ Recent posts