서론

 

지금까지 gRPC에 대한 소개 및 해당 기술이 가진 이점에 대해서 살펴봤습니다. 이번 포스팅에서는 kotlin 환경에서 gRPC 관련 기본 설정 셋업하는 방법에 대해서 다루어보도록 하겠습니다. 프로젝트 설정은 gradle 기반의 기본 Kotlin 빈 프로젝트는 생성되었음을 가정하고 진행하겠습니다.


 

1. gradle 설정 추가

 

 

build.gradle.kts

import com.google.protobuf.gradle.*

plugins {
    ...(중략)...
    id("com.google.protobuf") version "0.8.13"
}

 

가장 먼저 설정할 것은 protobuf 관련 plugin을 설정하는 것입니다. 위 내용을 gradle.kts 파일 plugins 항목에 추가합니다.


build.gradle.kts

...(중략)...

val grpcVersion = "3.19.4"
val grpcKotlinVersion = "1.2.1"
val grpcProtoVersion = "1.44.1"

dependencies{
    implementation("io.grpc:grpc-kotlin-stub:$grpcKotlinVersion")
    implementation("io.grpc:grpc-protobuf:$grpcProtoVersion")
    implementation("com.google.protobuf:protobuf-kotlin:$grpcVersion")
}

 

그 다음에는 protobuf 관리와 stub을 자동으로 생성해주는 라이브러리 의존성을 위와같이 추가합니다. 

 


build.gradle.kts

...(중략)...
sourceSets{
    getByName("main"){
        java {
            srcDirs(
                "build/generated/source/proto/main/java",
                "build/generated/source/proto/main/kotlin"
            )
        }
    }
}

 

위 내용은 build 이후에 Stub 클래스가 생성되는 directory에 대해서 target으로 추가하기 위한 설정입니다. 해당 설정을 통해 소스 내에서 Stub 클래스 참조가 가능합니다.

 


build.gradle.kts

...(중략)...

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:$grpcVersion"
    }
    plugins {
        id("grpc") {
            artifact = "io.grpc:protoc-gen-grpc-java:$grpcProtoVersion"
        }
        id("grpckt") {
            artifact = "io.grpc:protoc-gen-grpc-kotlin:$grpcKotlinVersion:jdk7@jar"
        }
    }
    generateProtoTasks {
        all().forEach {
            it.plugins {
                id("grpc")
                id("grpckt")
            }
            it.builtins {
                id("kotlin")
            }
        }
    }
}

 

마지막으로 설정할 내용은 build 시점에 protobuf를 생성하기 위한 task를 추가하는 작업입니다. 해당 설정을 통해 Java Stub 파일과 Kotlin Stub파일을 생성할 수 있습니다.

 

 

기본 설정이 모두 끝났으면 gradle refresh를 통해서 설정을 마무리합니다.

 


2. 임시 Protobuf 생성 테스트

 

설정이 완료되었으면, Protobuf를 만들어보고 정상적으로 Stub 클래스가 생성되는지 확인해보도록 하겠습니다. 

 

 

 

먼저 main 디렉토리 하위에 proto 디렉토리를 생성합니다.

 

 

생성된 proto 디렉토리 하위에 test.proto를 생성합니다.

 

 

 

 

간단한 테스트를 위해 위 내용을 기입합니다.

 

 

gradle 탭에서 build 버튼을 클릭합니다.

 

 

build가 정상적으로 완료되면, 위 그림과 같이 build 폴더가 생깁니다. 이를 확인해봅시다.

 

 

Stub 클래스가 정상 생성되었는지 확인을 위해 build > generated > source > proto > main 하위에 java와 kotlin 패키지를 열어봅니다. 만약 정상적으로 build가 완료되었으면, 위 그림과 같이 Test Stub 클래스가 생성된 것을 확인할 수 있습니다.

 

 

생성된 Stub 클래스를 프로그램내에서 정상 사용할 수 있는지 여부를 테스트하기 위해 위와 같이 별도 main 함수를 만들어 생성 가능 여부를 확인해봅니다. 

 

만약 클래스 참조가 불가하다면, build.gradle.kts 파일에 sourceSets 내 경로가 일치하는지 확인 후 수정합니다.

 

지금까지 과정이 모두 정상적이라면, proto 파일을 만들고 이를 build하고 Stub 클래스를 생성 후 프로그램 참조하는 모든 과정을 가볍게 훑어볼 수 있었습니다. 테스트를 위해 사용되었던 test.proto 파일은 더 이상 필요하지 않으므로 제거해도 좋습니다.

 


 

3. 마치며

 

이번 포스팅은 Kotlin 기반에서 gRPC 설정 하는 방법에 대해서 알아봤습니다. 다음 포스팅부터는 본격적으로 protobuf 사용법에 대해서 알아보겠습니다.

 

 

'MSA > gRPC' 카테고리의 다른 글

3. gRPC는 왜 빠를까? (통신 방식) - 2  (6) 2022.03.10
2. gRPC는 왜 빠를까? (Payload) - 1  (1) 2022.03.10
1. gRPC 개요  (0) 2022.03.05

서론

 

지난 포스팅에서는 gRPC에서 사용되는 protobuf와 REST 통신에서 사용되는 JSON 크기와 Serialization/Deserialization 관점에서 성능을 비교해봤습니다. 이번에는 gRPC에서 제공하는 통신 방법에 대해서 살펴보고 REST 단건 통신과 비교하여 송/수신 시간을 비교해보겠습니다.

 

 


 

1. gRPC 통신 방법

 

gRPC는 HTTP 2.0을 기반으로 구성되어있기 때문에 Multiplexing으로 연결을 구성할 수 있습니다. 따라서 단일 Connection으로 순서의 상관없이 여러 응답을 전달받을 수 있는 Streaming 처리가 가능합니다. gRPC는 총 4가지의 통신 방법을 지원하며 그 중 3가지 방식은 Streaming 처리 방식입니다. 지금부터 하나씩 살펴보겠습니다.

 

 

 


2. Unary

 

첫 번째 방식은 Unary 통신 방식입니다.

 

 

이는 가장 단순한 서비스 형태로써 클라이언트가 단일 요청 메시지를 보내고 서버는 이에 단일 응답을 내려보내주는 방식입니다. 일반적으로 사용하는 REST API를 통해 주고 받는 Stateless 방식과 동일하다고 볼 수 있으며, 개념적으로 이해하기 쉽습니다.

 

그렇다면 gRPC의 Unary 통신과 REST의 성능을 비교해보면 어떤차이를 보일까요? 테스트 시나리오를 기반으로 두 통신방법을 비교해보록 하겠습니다.

 

 

1. 사용자를 등록하는 서비스가 있다고 가정한다.
2. 10, 100 등 10만까지 10의 거듭 제곱 형태로 delay없이 요청 횟수를 늘리면서 REST와 gRPC의 응답 총 시간을 구한다.
3. 테스트 시작전 warm up을 위해 50회의 요청 수행 후 테스트를 진행한다.

 

 

위 시나리오를 기반으로 Unary 통신을 구현해보겠습니다.

 

 

syntax = "proto3";

import "google/protobuf/empty.proto";

option java_multiple_files = true;
option java_package = "grpc.polar.penguin";

message Address{
    string city = 1;
    string zip_code = 2;
}

message Person{
    string name = 2;
    int32 age = 3;
    repeated string hobbies = 4;
    optional Address address = 5;
}

service PersonService {
    rpc register(Person) returns (google.protobuf.Empty);
}

 

Protobuf는 위와 같이 디자인했습니다. message 포맷은 이전 포스팅에서 설계 내용과 동일합니다. 여기서 새로 추가된 항목은 service 부분입니다. 추가된 내용을 살펴보면 인자로 Person 타입을 입력받고 반환 값은 없으므로 Empty를 지정하였습니다.

 

이번 포스팅 내용은 통신 방법에 대한 설명이므로 syntax 설명은 향후 다른 포스팅 내용으로 다루겠습니다.

 

class PersonGrpcService : PersonServiceGrpcKt.PersonServiceCoroutineImplBase() {
    override suspend fun register(request: Person): Empty {
        //TODO : request 처리
        return Empty.getDefaultInstance()
    }
}

 

Proto 파일 디자인 후 Build하면 Stub 클래스가 자동 생성됩니다. 위 코드는 gRPC 서비스 처리를 구현하기 위해 Stub 클래스인 PersonServiceCoroutineImplBase을 상속받아 구현한 코드입니다. 테스트 시나리오에서는 전달받은 Person 객체를 따로 저장하거나 처리하지 않고 Empty 객체를 반환하도록 구현하였습니다.

 

fun main() {
    val server = ServerBuilder.forPort(6565)
        .addService(PersonGrpcService())
        .build()

    server.start()
    server.awaitTermination()
}

 

Server 기동 시에 Service를 등록 시켜서 Client의 요청이 들어왔을 경우에 해당 Service로 Routing 하도록 설정합니다. 이후 Server를 기동합니다.

 

fun main() {
    val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
        .usePlaintext()
        .build()

    val stub = PersonServiceGrpc.newBlockingStub(channel)

    execute(stub, 50) //warm up phase

    val base = 10.0
    val dec = DecimalFormat("#,###")
    for (exponent in 1..5) {
        val iterCount = base.pow(exponent).toInt()
        val time = measureTimeMillis {
            execute(stub, iterCount)
            println("count : ${dec.format(iterCount)}")
        }
        println("elapsed time $time ms")
        println("------------------------------------")
    }
    
    channel.shutdown()
}

 

Unary 테스트를 위한 client 코드는 위와같습니다. Server를 localhost의 6565 포트에서 기동중이므로 해당 요청에 대한 Channel을 생성합니다.

 

이후 proto 파일 Build 과정에서 생성된 PersonServiceGrpc 내에 있는 BlockingStub 객체를 생성 해서 해당 Channel에 Binding 합니다. Channel에 Binding 한 다음에는 Stub 객체의 메소드를 호출하면 Server와 통신을 수행할 수 있습니다.

 

stub 객체까지 만들고 나면, 10 ~ 10만번까지 10의 거듭제곱 형태로 늘려가면서 gRPC Unary 통신을 수행 후 총 수행 시간을 출력합니다.

 

 

위 코드에서 실질적으로 gRPC를 호출하는 부분은 execute 함수입니다.

 

fun execute(stub: PersonServiceGrpc.PersonServiceBlockingStub, count: Int) {
    repeat(IntRange(1, count).count()) {
        stub.register(
            person {
                name = "kevin"
                age = (1..50).random()
                address = address {
                    city = "seoul"
                    zipCode = "123456"
                }
                hobbies.addAll(listOf("foot ball", "basket ball"))
            }
        )
    }
}

 

execute 함수를 살펴보면 위와 같이 iteration count를 인자로 전달받고 그 횟수만큼 gRPC 요청을 보내는 것을 확인할 수 있습니다.

 

 

 

프로그램을 실행하면 위와 같이 Unary 요청 수행 결과를 확인할 수 있습니다.

 

 

이번에는 REST 통신을 통해 같은 횟수를 반복했을 때 Unary 통신과 비교하여 총 수행시간이 얼만큼의 차이가 있는지를 비교해보도록 하겠습니다. 이때 Unary 테스트 또한 단일 Channel에서 Blocking 방식으로 수행시간을 측정하였으므로 REST 통신 또한 같은 방법으로 테스트를 진행하겠습니다.

 

data class PersonDto(
    val name : String,
    val age : Int,
    val hobbies : List<String>? = null,
    val address : AddressDto? = null
)

data class AddressDto(
    val city : String,
    val zipCode : String
)

 

JSON으로 입력받을 DTO를 위와 같이 디자인합니다.

 

@RestController
class PersonController(private val service: PersonService) {
    @PostMapping("/person")
    suspend fun register(@RequestBody person : PersonDto) {
        //TODO : request 처리
    }
}

 

REST Controller 코드는 위와 같습니다. gRPC 서비스 코드에서도 인자를 전달받아 아무런 처리를 하지 않았기 때문에 마찬가지로 요청만 전달받고 아무 처리를 수행하지 않도록 구성하였습니다.

 

@Component
class RegisterTest : CommandLineRunner {
    override fun run(vararg args: String?) {
        val client = WebClient.builder()
            .build()

        execute(client, 50) // warm up phase

        val base = 10.0
        val dec = DecimalFormat("#,###")
        for (exponent in 1..5) {
            val iterCount = base.pow(exponent).toInt()
            val time = measureTimeMillis {
                execute(client, iterCount)
                println("count : ${dec.format(iterCount)}")
            }
            println("elapsed time $time ms")
            println("------------------------------------")
        }
    }

    private fun execute(client: WebClient, count: Int) {
        repeat(IntRange(1, count).count()) {
            client.post().uri("localhost:8080/person")
                .bodyValue(
                    PersonDto(
                        name = "kevin",
                        age = (1..50).random(),
                        address = AddressDto(city = "seoul", zipCode = "123456"),
                        hobbies = listOf("foot ball", "basket ball")
                    )
                )
                .retrieve()
                .bodyToMono(Void::class.java)
                .block()
        }
    }
}

 

Client 수행 프로그램은 위와 같습니다. gRPC 테스트 코드와 크게 다르지 않으며, 차이점이 있다면 Stub 객체를 사용한 것이 아닌 Webclient를 사용한 부분입니다.

 

 

Client 코드를 수행하면 위와 같은 결과를 얻을 수 있습니다.

 

횟수 REST gRPC(Unary) 성능
10 23 ms 14 ms 1.64배
100 165 ms 101 ms 1.63배
1,000 1,000 ms 694 ms 1.44배
10,000 4,109 ms 2,132 ms 1.92배
100,000 41,491 ms 13,768 ms 3.01배

 

결과를 살펴보면, Iteration 횟수가 증가할 수록 그 차이가 벌어지는 것을 확인할 수 있습니다. 격차가 벌어진 이유는 다양한 이유가 있지만 Protobuf의 Serialization & Deserialization이 가장 큰 영향을 미치지 않았을까 생각합니다.

 

 

이번에는 네트워크 패킷을 통해서 REST와 gRPC의 통신 과정을 비교 해보겠습니다. 비교를 위해서 사용자 등록을 5회만 수행 후 종료한 내용을 확인해보도록 하겠습니다.

 

 

 

REST 통신을 5회 수행하였을 때, 네트워크 흐름을 표시하면 위 그림과 같습니다. 자세히보면 REST 통신은 HTTP 1.1을 사용한 것을 알 수 있고 SYN, ACK와 FIN, ACK가 매 요청마다 보이지 않는 것으로 보아 Connection을 매번 요청하지 않았음을 확인할 수 있습니다.

 

 

이번에는 gRPC Unary 통신 결과입니다. REST에서는 HTTP 1.1 방식이었던 것과 달리 예상대로 HTTP 2.0으로 통신을 수행한 것을 확인할 수 있습니다.

 

gRPC에서 Unary 통신은 HTTP 2.0 Stream으로 데이터를 전송합니다. 따라서 위 패킷 내용을 살펴보면, Stream 통신에 있어서 필요한 데이터 흐름을 파악할 수 있습니다.

 

가령 WINDOW_UPDATE를 통해서 Client가 수신할 수 있는 Byte 수를 Server에 알려줘 해당 정보를 기반으로 Flow control이 가능하도록 사전 설정하는 것을 확인할 수 있습니다. 또한 PING 패킷의 경우는 연결된 Channel 에서 사용중인 Connection liveness를 체크합니다. 만약 PING 단계에서 정상 응답을 수신 받지 못하면, Connection을 끊습니다. 이후 Connection 재생성을 통해 다시 연결할 수 있습니다.

 

이번에는 데이터 패킷을 상세하게 살펴보도록 하겠습니다.

 

 

위 그림은 요청 패킷을 구조화한 모습입니다.

 

Header를 살펴보면, Header의 길이 그리고 Header의 종류 flag가 보입니다. 그리고 Stream ID가 표시된 것을 볼 수 있는데, 이는 HTTP Stream 내에서 사용되는 Stream 메시지 별 Unique ID 입니다. Client에서 보내는 메시지는 Stream ID가 홀수개로 증가합니다.

 

Header에는 그 밖에 요청 Path 정보 및 Schema, Content-type이 표시됩니다. 내부적으로 요청은 POST로 요청되는 것을 확인할 수 있습니다.

 

Data 영역에는 실제 전달되는 데이터와 Flag등을 전달합니다. Unary 통신의 경우 gRPC Stream 요청은 아니므로 Flag에는 End Stream으로 지정된 것을 확인할 수 있습니다.

 

 

응답 패킷은 크게 3가지 부분으로 이루어져있습니다. 첫번째는 요청에 대한 응답헤더이고, 두 번째는 응답에 대한 데이터 마지막으로는 trailer 헤더로 구성되어있습니다.

 

그렇다면 위 5개의 데이터 전송 흐름에서 gRPC 패킷은 어떤 특징을 지니고 있을까요?

 

 

요청 패킷을 살펴보면, Header 길이가 최초 메시지를 보낼 때보다 크기가 줄어든 것을 확인할 수 있습니다. 또한 Stream ID는 홀수 번호로 순차 증가한 것을 확인할 수 있습니다.

 

 

마찬가지로 응답 패킷을 살펴보면, 최초 응답 헤더에 비해 이후 응답 메시지의 Header 크기가 줄어든 것을 확인할 수 있습니다.

 

위와 같이 gRPC는 기반에 HTTP 2.0을 기반으로 하여 메시지 전송간 데이터 Payload가 줄어드는 장점이 존재하기 때문에 이전 REST 방식에 통신에 있어서 조금 더 빠른 결과를 나타낼 수 있습니다.

 


3. Streaming

 

이번에는 Streaming 처리 방법에 대해서 살펴보도록 하겠습니다. Stream은 데이터를 한번만 전송하는 것이 아니라 연속적인 흐름으로 전달하는 것을 의미합니다.

 

gRPC에서는 총 3가지 종류의 Streaming이 존재합니다. 

 

 

1) Client Stream

 

 

Client는 Stream 형태로 전달하고 Client의 요청이 끝나면 Server에서 한번에 응답을 내려주는 경우는 Client Stream이라고 부릅니다.

 

 

2) Server Stream

 

Client의 요청은 한번만 전달하고 Server에서 응답은 여러 번에 걸쳐 전송하는 경우는 Server Stream이라고 부릅니다.

 

 

3) Bidirectional Stream

 

양방향 모두 Stream으로 데이터를 전송하는 경우는 Bidirectional Stream 이라고 부릅니다.

 

Stream 처리 방법은 개념적으로 어렵지 않고 이번 포스팅에서는 사용 방법 보다는 성능 비교가 주 목적이므로 모든 Stream 방식에 대한 구현을 다루지는 않겠습니다.

 

Stream 처리 관련해서 다루어볼 내용은 Client Stream 방식을 활용해서 Unary, REST 방식의 테스트 시나리오를 동일하게 적용하여 어떤 차이점이 있는지를 살펴보도록 하겠습니다.

 

...(중략)...
service PersonService {
    ...(중략)...
    rpc registerBatch(stream Person) returns (google.protobuf.Empty);
}

 

먼저 Stream 처리를 위해 서비스에 RPC를 등록합니다. 이후 Build를 수행합니다.

 

class PersonGrpcService : PersonServiceGrpcKt.PersonServiceCoroutineImplBase() {
    ...(중략)...
    override suspend fun registerBatch(requests: Flow<Person>): Empty {
        val start = System.currentTimeMillis()
        requests
            .catch {
                //TODO : Error 처리
            }
            .onCompletion {
                println("${System.currentTimeMillis() - start} ms elapsed. ")
            }
            .collect {
                //TODO : request 처리
            }
        return Empty.getDefaultInstance()
    }

    
}

 

Build 이후 해당 Stub 메소드 구현을 위해서 PersonServiceCoroutineImplBase Stub 클래스에서 RPC 관련 메소드를 override 합니다. 이때 Stream으로 전달받은 데이터를 기반으로 비즈니스 로직 처리는 수행하지 않기 때문에 collect 부분은 아무런 작업을 수행하지 않도록 구성했습니다.

 

fun main() {
    val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
        .usePlaintext()
        .build()

    val stub = PersonServiceGrpcKt.PersonServiceCoroutineStub(channel)

    runBlocking { execute(stub, 50) } // warm up phase

    val base = 10.0
    val dec = DecimalFormat("#,###")

    runBlocking {
        for (exponent in 1..5) {
            val iterCount = base.pow(exponent).toInt()
            val time = measureTimeMillis {
                execute(stub, iterCount)
                println("count : ${dec.format(iterCount)}")
            }
            println("elapsed time $time ms")
            println("------------------------------------")
        }
    }
}

suspend fun execute(stub: PersonServiceGrpcKt.PersonServiceCoroutineStub, count: Int) {
    try {
        stub.registerBatch(
            IntRange(1, count)
                .map {
                    person {
                        name = "kevin"
                        age = (1..50).random()
                        address = address {
                            city = "seoul"
                            zipCode = "123456"
                        }
                        hobbies.addAll(listOf("foot ball", "basket ball"))
                    }
                }
                .asFlow()
        )
    } catch (e: StatusException) {
        println(e)
    }
}

 

Client 프로그램은 위와 같이 구성했습니다. gRPC의 Stream 처리를 구현하기 위해서 StreamObserver를 활용해서 구현하는 방식과 Kotlin의 Coroutine 방식 두 가지 방식으로 구현 가능한데, 위 코드는 Coroutine 방식으로 구현하였습니다.

 

내용을 살펴보면 이전 Unary 코드와 크게 다르지는 않으며, 데이터 전달시 Flow로 변환하여 전달하는 것을 확인할 수 있습니다.

 

코드 구현이 완료되었으면 실행 후 결과를 비교해보겠습니다.

 

 

실행 결과를 살펴보면, REST와 gRPC(Unary)와 비교했을 때 엄청난 개선이 이루어진 것을 확인할 수 있습니다.

이를 표로 나타내면 다음과 같습니다.

 

횟수 REST gRPC(Unary) gRPC(Client Stream)
10 23 ms 14 ms 9 ms
100 165 ms 101 ms 20 ms
1,000 1,000 ms 694 ms 106 ms
10,000 4,109 ms 2,132 ms 468 ms
100,000 41,491 ms 13,768 ms 2,880 ms

 

요청 횟수가 적을 때보다 횟수가 늘어감에 따라 차이가 더 커지는 것을 확인할 수 있습니다. 가령 10만번 데이터 전송의 경우 REST 방식보다 14.4배 Unary 방식에 비교하면 4.78배 효율이 좋은 것을 확인할 수 있습니다.

 

그렇다면 Stream 처리 방식은 왜 이리 많은 차이를 보이는 것일까요? 이전과 마찬가지로 패킷의 흐름을 살펴보겠습니다.

 

 

위 내용은 Stream 형식으로 Person 데이터를 50회 전송했을 때 네트워크 흐름입니다.

Unary와 REST 방식은 5회만 전송했는데도 많은 Network 요청이 있었던 것과 비교하여 50회 데이터를 전송했는데도 패킷의 횟수가 그리 많지 않습니다.

 

 

 

데이터 전송 부분만 살펴보면, 요청을 전달할 때 Header는 한번만 전송한 것을 확인할 수 있고, 응답 또한 한번만 전달받은 것을 확인할 수 있습니다.

 

 

그리고 데이터는 여러번 전달한 것이 아니라 한 Packet안에 여러개의 요청이 포함되어 전달된 것을 확인할 수 있습니다.

위 패킷 흐름에는 총 2번 전달하는 과정에서 50개의 요청이 담겨있는 것을 확인할 수 있습니다.

 

한번에 동일 요청 다수를 함께 전달할 경우, Stream 방식이 매번 요청을 수행하는 Unary 방법보다 효율적인 데이터 전송이 가능합니다. 따라서 네트워크 전달 과정에서 많은 비용을 감소하여 성능이 더욱 좋다고 볼 수 있습니다.


4. 마치며

 

지난 포스팅과 이번 포스팅을 통해서 gRPC의 성능 이점에 대해서 다양한 각도로 살펴봤습니다. 다음 포스팅부터는 gRPC를 사용하는 방법에 대해서 차차 알아보도록 하겠습니다.

'MSA > gRPC' 카테고리의 다른 글

4. kotlin 환경에서 gRPC 설정하기  (0) 2022.03.10
2. gRPC는 왜 빠를까? (Payload) - 1  (1) 2022.03.10
1. gRPC 개요  (0) 2022.03.05

서론

 

이전 포스팅에서는 gRPC에 대한 기본적인 소개를 다루어 봤습니다. 이번에는 gRPC에서 사용하는 Protocol Buffer(aka  Protobuf)와 보편적으로 사용하는 JSON 메시지 포맷에 대한 비교를 통해 어떤 부분에서 Protobuf가 이점이 있는지를 살펴보겠습니다.

 


 

 

1. JSON, Protobuf 변환 속도 비교

 

이전 포스팅에서 살펴봤듯이 REST 통신에서는 JSON 규격으로 메시지를 주고 받았고 이때 발생하는 Serialization & Deserialization 과정은 비용이 소모되는 작업임을 살펴봤습니다. 반면 gRPC에서는 binary 포맷으로 데이터를 주고받기 때문에 변환 과정에 따른 비용이 JSON에 비해서 적다고 설명했습니다.

 

그렇다면 실제 Protobuf 변환 과정과 JSON 변환 과정을 측정해보면 얼마나 유의미한 결과를 나타낼까요? 테스트를 통해 차이가 얼마나 발생하는지 살펴봅시다.

 

data class PersonDto(
    val name : String,
    val age : Int,
    val hobbies : List<String>? = null,
    val address : AddressDto? = null
)

data class AddressDto(
    val city : String,
    val zipCode : String
)

 

JSON 변환 테스트를 위해 Sample 객체를 위와 같이 디자인합니다. 위 데이터 구조는 Person이라는 객체를 생성함에 있어 이름, 나이, 주소 정보를 입력받으며 취미의 경우 다수가 존재하므로 List로 입력받도록 디자인 했습니다. 

 

syntax = "proto3";

option java_multiple_files = true;
option java_package = "grpc.polar.penguin";

message Address{
    string city = 1;
    string zip_code = 2;
}

message Person{
    string name = 1;
    int32 age = 2;
    repeated string hobbies = 3;
    optional Address address = 4;
}

 

앞서 구현한 data class에 대응되는 Proto 파일은 위와 같이 구현합니다. 아직 Protobuf에 대해서 본격적으로 다루어보지 않은만큼 syntax가 이해되지 않더라도 좋습니다.

 

 

 

기본 Spec을 정의하였으면 이제 변환 과정 테스트 시나리오를 정의해봅시다.

 

1. 10, 100 ... 천만번까지 10의 거듭제곱 횟수만큼 변환 과정을 수행하면서 각 단계에서 걸린 총 시간을 측정한다.

2. 단계별 warm up 과정을 추가하고 해당 단계에서의 결과는 제외한다. 따라서 단계별 50회 변환 과정을 추가한다.

3. JSON, Proto 변환 측정 과정은 다음과 같다.
   - JSON : DTO를 JSON Byte 배열로 변환한 다음 해당 Byte을 다시 DTO로 변환하는데 걸린 시간
   - Proto : Stub을 Byte 배열로 변환한 다음 해당 Byte 배열을 다시 Stub 객체로 변환하는데 걸린 시간

 

 

테스트 시나리오를 위해 작성한 메인 프로그램의 흐름은 위와 같습니다. 10 부터 천만번까지 각각 변환과정을 수행한 결과를 출력하도록 구성했습니다.

 

 

측정 과정은 앞서 시나리오대로 단계별 변환 횟수에 맞추어 변환 작업을 수행하며, 단계별 최초 50회는 warm up 단계로 구성하여 결과에서 제외한 총 수행시간을 반환하도록 작성했습니다.

 

 

Stub 객체를 Byte 배열로 변환하고 이를 다시 Stub 객체로 변환하는 코드는 위와 같습니다. 

 

 

DTO 객체를 JSON Byte 배열로 저장한 다음 이를 다시 Person DTO 객체로 변환하는 코드는 위와 같습니다. 이 과정에서 Parser로는 Jackson을 사용했습니다.

 

코드 작성은 모두 마무리되었습니다. 이제 프로그램을 수행시킨 결과를 확인해봅시다.

 

 

측정 결과는 위와 같습니다. 살펴보면 변환 횟수가 증가하면서 두 방식의 변환 시간의 차가 크게 벌어지는 것을 확인할 수 있습니다. 가령 천만번 변환의 경우 7배 빠른 것으로 확인되었습니다.

 

그렇다면 위 측정결과를  gRPC가 REST 방식에 비해 7배 빠르다고 말할 수 있을까요?

 

 

요청에 대해서 응답을 처리하는 전체 flow를 아주 간략하게 표현한다면, 위와 같이 표현할 수 있을 것입니다. 위 과정에서 오래걸리는 영역은 당연히 Business Logic 처리를 위한 수행시간일 것입니다. 따라서 Business Logic 수행 시간이 오래 걸릴 수록 격차는 현격히 줄어들 것입니다.

 

하지만 TPS가 높은 시스템에서는 1ms라도 응답 속도를 줄이는 것이 중요하기 때문에 이런 경우 매우 유의미한 결과라고 볼 수 있습니다.

 


 

2. JSON, Protobuf 크기 비교

 

이번에는 기존에 사용했던 DTO, Stub 인스턴스를 byte 배열로 변환하였을 때 크기에 대해서 비교해보고 차이점을 통해 Protobuf의 특징을 확인해보겠습니다.

 

 

사이즈 크기 비교를 위해 작성한 프로그램은 위와 같습니다. 이전 내용과 같이 PersonDTO와 Stub 객체를 생성 후 둘 다 byte 배열로 변환한 크기를 출력하도록 구성했습니다.

 

 

실행 결과를 보면, 동일한 데이터 입력에 있어 JSON 방식과 Proto 방식간의 결과물 크기가 상당히 차이나는 것을 확인할 수 있습니다.

 

이러한 차이가 발생하는 이유는 Proto 메시지 정의에 따라서 Binary 데이터를 만드는 encoding 과정에서 데이터가 압축되기 때문입니다. 이와 관련하여 자세한 기술적인 내용은 아래 네이버 기술 블로그와 구글 Protocol Encoding 공식문서를 살펴보시면 도움 되실 것 같습니다.

 

 

네이버 기술 블로그 grpc 깊게 파고들기

 

[NBP 기술&경험] 시대의 흐름, gRPC 깊게 파고들기 #2

google에서 개발한 오픈소스 RPC(Remote Procedure Call) 프레임워크, gRPC를 알아봅니다.

medium.com

구글 Protocol Buffer Encoding 공식 문서

 

Encoding  |  Protocol Buffers  |  Google Developers

Encoding This document describes the binary wire format for protocol buffer messages. You don't need to understand this to use protocol buffers in your applications, but it can be very useful to know how different protocol buffer formats affect the size of

developers.google.com

 

 

이번에는 address와 hobbies를 제거한 다음의 수행 결과를 비교해보도록 하겠습니다.

 

 

결과를 측정해보면 값이 모두 들어있을 때보다 일부 필드에 값이 입력되지 않았을 경우 Stub 객체의 Byte 배열 크기와 JSON의 결과값이 더욱 차이가 나며, 이는 전체 값을 입력했을 때 보다 압축률이 더 좋음을 의미합니다.

 

그렇다면 필드에 데이터가 없을 때 어떻게 압축 효율이 더 좋을 수 있을까요? 이에 대해서 한번 살펴봅시다.

 

{"name":"polar penguin","age":20,"hobbies":null,"address":null}

 

위 결과는 DTO를 JSON으로 변환한 결과입니다. 길이를 살펴보면 63바이트인 것을 확인할 수 있습니다.

 

결과를 통해 살펴본 흥미로운 사실은 hobbies와 address는 실질적으로 아무런 값을 입력하지 않았음에도 불구하고 JSON에서는 Key와 value를 포함시킨다는 사실입니다. 이로인해 불필요한 overhead가 추가됩니다.

 

반면 Protobuf의 경우는 무엇이 다를까요?

 

message Person{
    string name = 1;
    int32 age = 2;
    repeated string hobbies = 3;
    optional Address address = 4;
}

 

이전에 살펴본 Person의 proto 정의는 위와 같습니다. 그리고 테스트 프로그램에서 수행한 실제 Stub 객체에는 hobbies와 address가 포함되지 않았음을 확인할 수 있습니다.

 

proto 파일에서 눈여겨 볼 점은 실제 Property 옆에 표시된 field 번호가 존재하는 점입니다. 가령 name에는 1이 age에는 2가 지정되어있습니다.

 

해당 번호는 Protobuf의 필드를 인식하게 만들어주는 Key를 구성하는 요소입니다. 참고로 이전에 첨부한 Naver 기술 블로그Google 공식 문서에서는 해당 Field 번호와 Wiretype가 조합된 Key를 이용하여 Encoding 및 Decoding을 수행하여 필드 값을 Parsing 함을 자세히 확인할 수 있습니다.

 

그렇다면 hobbies와 address가 입력되지 않았을 때 개념적으로 어떤 변화가 발생했을까요? 먼저 개념적으로 이해하기 위해 추상적으로 어떻게 표현되었는지 살펴봅시다.

 

 

protobuf에서는 field 번호가 해당 객체 내에서 필드 값을 식별하는데 있어 주요 역할을 수행합니다. 따라서 protobuf를 설계할 때 field 별로 부여하는 field 번호는 unique 해야합니다.

 

결과물을 살펴보면, JSON 표현 방식에 비해서 2가지 특징을 지닌 것을 확인할 수 있습니다.

 

1. 해당 객체 값에 값이 입력되지 않았을 경우 결과물에 포함시키지 않습니다. 따라서 JSON에 비해서 Byte 배열 크기가 줄어들 수 있습니다.

2. 실제 필드명의 길이가 어떻든 관계없이 field 번호를 기반으로 Binary 데이터가 만들어지기 때문에 payload 크기가 감소됩니다. 이는 field 명이 길어질 수록 payload 크기가 커지는 JSON과 대비하여 공간을 절약할 수 있습니다.

 

 

 

 

이번에는 패킷 수준에서 메시지 내용을 자세하게 살펴보겠습니다. 내용을 보면 방금전 설명했던 설명과 유사함을 확인할 수 있습니다.

 

데이터 구조를 살펴보면 Field Number와 Wire Type을 기반으로 ( (Field Number << 3) | Wire Type ) 형태로 Hex 값으로 구성되어 있습니다. 또한 모든 Field 내용이 저장되어있지 않고 사용자가 기입한 내용만 저장되어있는 것을 확인할 수 있습니다.

 

 

 

더 자세히 확인하기 위해 실제 Stub 객체에서 생성되는 Binary 내용을 해석해보도록 하겠습니다.

 

 

0A : name의 field 번호 1, wire type 2이므로 ( (1 << 3) | 2 ) 수행하면 10입니다. 따라서 이는 Hex 값으로 0A입니다.

0D : value의 길이를 의미합니다. 여기서 name에 저장된 값은 polar penguin 총 13자이므로 이는 Hex 값으로 0D입니다.

70 6F 6C 61 72 20 70 65 6E 67 75 69 6E : "polar penguin" 문자열의 Hex 값입니다. 

10 : age의 field 번호 2, wire type 0이므로 ( (2 << 3) | 0 ) 수행하면 16입니다. 따라서 이는 Hex 값으로 10입니다.

14 : age의 값인 20입니다. 이는 Hex 값으로 14입니다.

 

 

지금까지 Proto에 저장되는 결과를 알아보기 위해 실제 저장된 Binary 구조까지 살펴봤습니다. 모든 기술이 장점이 있으면 단점이 존재하듯이 Protobuf는 결과물이 Binary 포맷이기 때문에 결과 값을 유추하기 쉽지 않은 점은 단점이라고 볼 수 있습니다. 하지만 성능이 더 중요시되는 환경에서는 짧은 Payload는 전송 속도에 있어 강점입니다.

 


마치며

 

이번 포스팅에서는 Protobuf와 JSON을 비교하여 변환 속도와 Payload 크기 차이점을 비교해봤습니다. Protobuf는 gRPC의 핵심 요소로써 gRPC가 가지는 성능 이점의 주요 부분 중 하나라고 생각합니다. 다음 포스팅에서는 HTTP 2.0 기반으로 gRPC의 통신 방법에 대해서 살펴보겠습니다.

'MSA > gRPC' 카테고리의 다른 글

4. kotlin 환경에서 gRPC 설정하기  (0) 2022.03.10
3. gRPC는 왜 빠를까? (통신 방식) - 2  (6) 2022.03.10
1. gRPC 개요  (0) 2022.03.05

1. 서론

 

최근 MSA가 각광받으면서 많은 회사에서 Monolithic 구조를 여러개의 마이크로 서비스로 분리하려고 시도하고 있습니다. 

 

MSA 구성은 다양한 장점을 내포하고 있으나 그만큼 다양한 문제점 또한 상존합니다. 이 글에서는 MSA의 문제점 중 하나인 네트워크 통신 overhead에 초점을 맞추어 gRPC 기술이 어떤 부분을 해소해줄 수 있는지에 대해서 다루어보고 해당 기술은 어떻게 사용할 수 있는지에 대해서 설명해보고자 합니다.

 


 

2. 마이크로 서비스간 통신 이슈

 

 

Monolithic 구조에서는 하나의 프로그램으로 동작하기 때문에 그 안에서 구조적인 2개의 서비스간의 데이터는 공유 메모리를 통해서 주고받을 수 있습니다. 따라서 이 경우 서비스간 메시지 전송 성능은 큰 이슈가 되지 않습니다.

 

 

 

 

반면 MSA에서는 여러 모듈로 분리되어있고 동일 머신에 존재하지 않을 수 있습니다. 따라서 일반적으로는 보편화된 방식인 REST 통신을 통해 메시지를 주고 받습니다.

 

문제는 Frontend 요청에 대한 응답을 만들어내기 위해 여러 마이크로 서비스간의 협력이 필요하다면, 구간별 REST 통신에 따른 비효율로 인해 응답속도가 저하된다는 점입니다. 그렇다면 구체적으로 어떤 요인으로 인해 응답 속도 저하가 발생될까요? 이에 대해서 알아보기 전에 HTTP 1.1의 특징에 대해서 이해하고 HTTP 1.1의 또 다른 이슈를 확인해보도록 하겠습니다.

 


 

3. HTTP 1.1 통신 방법

 

 

 

HTTP는 TCP위에서 동작하므로 데이터 송수신에 앞서서 TCP 연결 시점에 3 way handshake 과정을 거치며, 연결을 종료할 때도 4 way handshake 방식으로 종료하게됩니다.

 

이러한 경우 만약 여러 데이터를 전송 응답을 반복해야하는 상황이라면, 매번 연결을 맺고 종료하는 과정으로 인한 비효율이 발생합니다.

 

 

 

앞서 살펴본 HTTP 1.0은 요청/응답을 하기에 앞서 매번 Connection을 맺고 끊어야했기 때문에 연결 요청/해제 비용이 상당히 높았습니다.

 

따라서 이러한 성능 이슈를 해결하고자 HTTP 1.0 기반의 브라우저와 서버에서는 자체적으로 Keep-alive 기능을 지원하기도 했습니다. 이 경우 Header에 Keep alive 관련 헤더를 포함해서 Connection을 유지하는 경우도 있었습니다. 하지만 해당 기능은 공식 Spec은 아니였습니다.

 

HTTP 1.1에서는 1.0의 문제점을 해결하고자 Persistent Connection과 Pipelining 기법을 제공하였습니다. 해당 기능이 무엇인지 알아봅시다.

 

 

 

Persistent Connection의 경우 Keep Alive와 같이 요청/응답을 위해 매번 Connection을 맺는 것이 아니라 연결을 일정시간 지속하는 것을 의미합니다.

 

 

다만 Persistent Connection만 적용했을 경우 왼쪽 그림과 같이 1개의 요청을 보내고 요청에 대한 응답이 와야 그 다음 요청을 보내기 위해 기다려야 합니다. 따라서 오른쪽과 같이 추가로 Pipelining을 적용하여 각 요청마다 응답을 기다리지 않고, 요청을 하나의 Packet에 담아 지속적으로 요청을 전달할 수 있도록 개선하였습니다.

 

Pipelining을 살펴보면 HTTP 1.0과 비교해서 많은 부분이 개선된 것으로 보입니다. 하지만 Pipelining에서도 성능 이슈는 존재합니다. 과연 무엇일까요?

 


 

4. HTTP 1.1 문제점

 

1. HOLB(Head Of Line Blocking)

 

 

Pipelining에서 요청 자체는 응답 여부와 관계없이 보낼 수 있습니다. 하지만 여전히 순차적으로 응답을 받아야합니다. 따라서 첫 번째 요청에 대한 응답이 오래걸리는 상황이라면, 두 번째 세번 째 요청 응답은 첫번째 요청이 응답처리가 완료되기 전까지 대기해야합니다. 이러한 문제를 Head Of Line Blocking(HOLB)라고 합니다. 

 

만약 위 예시와 같이 B, C, D, E 자원의 경우 크기가 작아 빠르게 처리될 수 있다면, 사용자 응답성이 좋아질 수 있습니다. 하지만 HTTP 1.1의 경우에는 A 자원의 응답처리가 완료되지 않았기 때문에 결과적으로는 전체 응답의 대기가 발생합니다. 이는 곧 사용성이 나빠지는 원인이 됩니다.

 

 

이러한 이슈를 해소하기 위해 대개 브라우저에서는 도메인당 기본 6개(브라우저 별 상이)의 Connection을 맺어놓고 데이터를 병렬적으로 요청 및 응답을 통해서 응답성을 개선하고 있습니다.

 

 

또한 개발자 입장에서는 브라우저 특성을 활용하여 자원 다운로드 속도를 빠르게 하기 위해 여러 기법을 사용합니다. 그 중 대표적인 방법은 여러 도메인으로 데이터를 분산하여 저장하고 도메인마다 병렬적으로 Connection 맺어 빠르게 많은 자원을 다운로드하도록 개선하는 방법입니다. 이러한 기법을 도메인 샤딩(Domain Sharding)이라고 합니다.

 

 

2. Header 문제

 

HTTP 통신시 헤더에는 많은 메타 정보가 저장되어 있습니다. 이때 사용자가 특정 사이트를 접속하게되면 방문 시점에 다수의 HTTP 요청이 발생하게 될 것입니다. 그리고 매 요청마다 중복된 헤더 값을 전달하며, 쿠키 또한 매 정보 요청마다 포함되어 전송됩니다. 더욱이 Header 정보는 Plain text로 전달되고 이는 Binary에 비해 상대적으로 크기가 크기 때문에 전송시 많은 비효율이 발생한다고 볼 수 있습니다.

 

 


 

5. HTTP 2.0 등장

 

출처 : https://developers.google.com/web/fundamentals/performance/http2/?hl=ko

 

HTTP 2.0은 2014년에 표준안이 제안되고 15년에 공개된 프로토콜입니다. HTTP 1.x 버전의 성능 개선을 위해 Multiplexed Streams 기술을 사용합니다. 해당 기술은 이전에 살펴본 HTTP pepelining의 개선 버전으로 하나의 Connection으로 여러개의 데이터를 주고 받을 수 있도록 Stream 처리가 가능합니다.

 

 

또한 응답에 대해서 우선순위(Priority)가 주어져서 요청 순서와 관계없이 우선순위가 높을 수록 더 빨리 응답을 할 수 있는 것이 특징입니다.

 

출처 : https://developers.google.com/web/fundamentals/performance/http2/?hl=ko

 

세 번째 특징으로는 HTTP 1.1에서는 매 요청마다 동일한 Header 정보를 보내야하는데 반해서 HTTP 2.0 버전에서는 Header 압축을 통해서 지속적인 데이터 요청에 대한 Header 크기를 줄일 수 있습니다.

 

즉 HTTP 2.0을 사용하게되면 더 적은 Connection으로 더 적은 Header 크기를 전송할 수 있으며 Stream 통신으로 인해 여러 데이터를 주고 받을 수 있게 되었습니다.

 

그 밖에 여러 특징이 존재하며, HTTP 2.0에 대해서 더 자세한 내용은 구글 개발자 페이지를 참고하시기 바랍니다.

 


 

6. REST API 이슈

 

gRPC는 HTTP 2.0 기반위에서 동작하기 때문에 지금까지 HTTP 2.0의 특징에 대해서 살펴봤습니다. 짧게 정리하자면, Header 압축, Multiplexed Stream 처리 지원 등으로 인해 네트워크 비용을 많이 감소시켰습니다.

 

그렇다면 HTTP 2.0 특징을 제외한 gRPC만의 특징은 무엇이 있을까요? 먼저 REST API 통신의 문제점에 대해서 먼저 살펴본 다음 gRPC의 특징에 대해서 살펴보도록 하겠습니다.

 

 

1) JSON Payload 비효율

 

 

 

REST 구조에서는 JSON 형태로 데이터를 주고 받습니다. JSON은 데이터 구조를 쉽게 표현할 수 있으며, 사람이 읽기 좋은 표현 방식입니다. 하지만 사람이 읽기 좋은 방식이라는 의미는 머신 입장에서는 자신이 읽을 수 있는 형태로 변환이 필요하다는 것을 의미합니다.

 

 

따라서 Client와 Server간의 데이터 송수신간에 JSON 형태로 Serialization 그리고 Deserialization 과정이 수반되어야합니다. JSON 변환은 컴퓨터 CPU 및 메모리 리소스를 소모하므로 수많은 데이터를 빠르게 처리하는 과정에서는 효율이 떨어질 수 밖에 없습니다.

 

 

2) API Spec 정의 및 문서 표준화 부재

 

 

REST API를 사용할 때 가장 큰 고민은 API 개발자와 API를 사용자 간의 효율적인 커뮤니케이션 방법입니다. 가령 API가 어떻게 디자인 되었는지, 그리고 해당 속성은 어떤 값을 입력해야하는지에 대해 상호간의 이해가 필요합니다. REST를 사용한다면 이를 위해서 자체적인 문서나 Restdocs 혹은 Swagger를 통해서 API 문서를 공유합니다. 하지만 이러한 방식은 REST와 관련된 표준은 아닙니다.

 

 

두 번째 이슈는 JSON 구조는 값은 String으로 표현됩니다. 따라서 사전에 타입 제약 조건에 대한 명확한 합의가 없거나 문서를 보고 개발자가 인지하지 못한다면, Server에 전달전에 이를 검증할 수 없습니다. 가령 위 예시와 같이 Server에서 zipCode는 숫자 타입으로 처리되어야하지만 Client에서는 이에 대한 제약 없이 문자열을 포함시켜 전달할 수 있음을 의미합니다.

 

그렇다면 gRPC 기술은 위 두 가지 이슈를 어떻게 풀어내었을까요?

 


 

7. gRPC Protobuf

 

 

Client에서 Server측의 API를 호출하기 위해서 기존에는 어떤 Endpoint로 호출해야할 지 그리고 전달 Spec에 대해서 API 문서 작성 혹은 Client와 Server 개발자간의 커뮤니케이션을 통해 정의해야했습니다. 그리고 이는 별도의 문서 생성이나 커뮤니케이션 비용이 추가로 발생합니다.

 

이러한 문제를 감소시키기 위해 다양한 방법이 존재합니다. 그 중 한가지는 Server의 기능을 사용할 수 있는 전용 Library를 Client에게 제공하는 것입니다. 그러면 Client는 해당 Library에서 제공하는 Util 메소드를 활용해서 호출하면 내부적으로는 Server와 통신하여 올바른 결과를 제공할 수 있습니다. 또한 해당 방법은 Server에서 요구하는 Spec에 부합되는 데이터만 보낼 수 있게 강제화 할 수 있다는 측면에서 스키마에 대한 제약을 가할 수 있습니다.

 

 

출처 : gRPC 공식 문서(https://grpc.io/docs/what-is-grpc/introduction/)

 

gRPC에서는 위 그림과 같이 이와 유사한 형태인 Stub 클래스를 Client에게 제공하여 Client는 Stub을 통해서만 gRPC 서버와 통신을 수행하도록 강제화 했습니다. 

 

그렇다면 Stub 클래스는 무엇이고 위 그림에서 보이는 Proto는 무엇일까요?

 

message Address{
    string city = 1;
    string zip_code = 2;
}

message Person{
    string name = 1;
    int32 age = 2;
    repeated string hobbies = 3;
    optional Address address = 4;
}

service PersonService {
    rpc register(Person) returns (google.protobuf.Empty);
    rpc registerBatch(stream Person) returns (google.protobuf.Empty);
}

 

Protocol Buffer는 Google이 공개한 데이터 구조로써, 특정 언어 혹은 특정 플랫폼에 종속적이지 않은 데이터 표현 방식입니다. 하지만 Protocol Buffer는 특정 언어에 속하지 않으므로 Java나 Kotlin, Golang 언어에서 직접적으로 사용할 수 없습니다. 

 

 

 

 

따라서 Protocol Buffer를 언어에서 독립적으로 활용하기 위해서는 이를 기반으로 Client 혹은 Server에서 사용할 수 있는 Stub 클래스를 생성해야합니다. 이때 protoc 프로그램을 활용해서 다양한 언어에서 사용할 수 있는 Stub 클래스를 자동 생성할 수 있습니다.

 

 

만약 Server가 Java 혹은 Kotlin 기반으로 구성되어있고 Client도 Java 혹은 Kotlin이라면, 위와 같이 Stub 생성을 자동으로 해주는 Library를 활용할 수 있습니다. 

 

 

 

위 그림은 Library를 활용해서 Build 시점에 Proto 파일을 찾고 컴파일 단계에서 이를 분석해서 Stub 클래스를 자동으로 생성된 모습입니다. 

 

 

Stub 클래스를 생성하면, 해당 클래스 정보를 Server와 Client에 공유한 다음 Stub 클래스를 활용하여 서로 양방향 통신을 수행할 수 있습니다.

 

 

위 코드는 Stub 객체를 활용하여 Client에서 특정 RPC를 호출한 모습입니다. REST 방식을 활용한다면 RestTemplate 혹은 Webclient나 Retrofit2와 같은 도구 활용해서 JSON으로 데이터를 전송해야합니다. 반면 gRPC 방법에서는 위와같이 Stub 객체에 정의된 메소드 호출을 통해서 Client/Server간 데이터 송수신을 수행할 수 있어 편리합니다.

 

지금까지 학습한 Protocol Buffer 내용을 정리하면 다음과 같은 장점을 지닌 것을 확인할 수 있습니다.

 

1. 스키마 타입 제약이 가능하다

2. Protocol buffer가 API 문서를 대체할 수 있다.

 

위 두가지 특징은 이전에 REST에서 다룬 이슈 중 하나인 API Spec 정의 및 문서 표준화 부재의 문제를 어느정도 해소해줄 수 있습니다. 그렇다면 또 하나의 이슈인 JSON Payload 비효율 문제와 대비하여 gRPC는 어떠한 이점을 지니고 있을까요?

 

 

JSON 타입은 위와같이 사람이 읽기는 좋지만 데이터 전송 비용이 높으며, 해당 데이터 구조로 Serialization, Deserialization 하는 비용이 높음을 앞서 지적했습니다.

 

 

 

gRPC의 통신에서는 데이터를 송수신할 때 Binary로 데이터를 encoding 해서 보내고 이를 decoding 해서 매핑합니다. 따라서 JSON에 비해 payload 크기가 상당히 적습니다.

 

또한 JSON에서는 필드에 값을 입력하지 않아도 구조상에 해당 필드가 포함되어야하기 때문에 크기가 커집니다.  반면 gRPC에서는 입력된 값에 대해서만 Binary 데이터에 포함시키기 때문에 압축 효율이 JSON에 비해 상당히 좋습니다.

 

결론적으로 이러한 적은 데이터 크기 및 Serialization, Deserialization 과정의 적은 비용은 대규모 트래픽 환경에서 성능상 유리합니다.

 


8. gRPC 단점

 

지금까지 gRPC에서 사용되는 기반 기술에 대해서 살펴봤습니다. gRPC는 MSA 환경에서 문제점인 네트워크 지연 문제를 어느정도 해결해 줄 수 있는 기술로써 점차 많은 곳에서 도입을 진행하고 있지만 다음과 같은 문제점 또한 존재합니다.

 

1) 브라우저에서 gRPC를 직접 지원 안함

 

현재 gRPC-WEB을 사용해서 직접 브라우저에서 서버로 gRPC 통신을 수행할 수 없습니다. 따라서 Envoy와 같은 Proxy 서버를 통해 요청을 Forwarding 해야합니다.

 

또 다른 방법으로는 gRPC 서버와 브라우저 사이에 Aggregator 서버를 별도로 두어 Aggregator와 브라우저간에는 REST 통신을 수행하고 Aggregator와 gRPC 서버간에 gRPC 통신을 수행하는 방법을 사용해야합니다.

 

 

2) Stub 관리 비용 추가

 

Client와 Server는 Stub 클래스를 통해 서로 통신을 수행합니다. 하지만 요구사항 변경으로인해 Stub 클래스 변경이 필요할 때 Server에서 변경한 내용을 Client에서도 적용을 해야합니다. 이 경우 버전 차이로 인한 하위 호환성 문제가 발생할 수 있기 때문에 서비스간 Stub 관리 방법을 정의해야합니다.

 

가장 많이 사용하는 방법으로는 Proto 파일을 중앙에서 gitops 형식으로 관리하고 변경이 생겼을 때 이를 감지하고 언어별로 컴파일하여 Stub 클래스를 라이브러리 형태로 배포하는 방법을 많이 사용합니다.

 

 


마치며

 

이번 포스팅에서는 gRPC가 MSA 환경에서 왜 대두되었는지 기존의 방식과 어떠한 차이점이 있는지에 대해서 간략하게 알아봤습니다. 다음 포스팅에서는 gRPC와 REST를 다각도로 비교해보면서 gRPC가 어떠한 장점이 있는지를 분석해보겠습니다.

'MSA > gRPC' 카테고리의 다른 글

4. kotlin 환경에서 gRPC 설정하기  (0) 2022.03.10
3. gRPC는 왜 빠를까? (통신 방식) - 2  (6) 2022.03.10
2. gRPC는 왜 빠를까? (Payload) - 1  (1) 2022.03.10

1. 서론

 

이번 포스팅은 AxonFramework 관련 마지막 포스팅입니다. Saga 패턴 보상 트랜잭션 구현을 다루겠습니다.


2. Deadline

 

MSA 환경에서는 App이 여러개로 분산되어있으므로 하나의 App이 느려지거나 장애가 발생하면, 장애가 발생한 App을 호출하는 App에도 장애가 전파됩니다.

 

Axon에서 제공하는 Saga 패턴을 사용하면, 요청마다 Saga 인스턴스가 생성됩니다. 따라서 연관된 App의 장애로 인해 전체 트랜잭션에 Hang이 걸리게되면, 요청한 App 또한 안전하지 못합니다.

 

따라서 이를 완화하기 위해 Axon에서는 Deadline 기능을 제공합니다. Deadline은 App에서 지정한 시간동안 반응이 없으면, 이를 처리할 메소드를 Callback 형식으로 등록할 수 있는 기능입니다. 자세한 내용은 공식 문서를 참고 바라며, 데모 프로젝트에서는 CommandGateway 클래스에서 기본적으로 제공하는 sendAndWait 메소드를 통해서 일정 시간동안 응답이 없으면, 보상 트랜잭션을 발동하도록 구현하겠습니다.


 

1. Command 모듈 Saga 패키지내 있는 Saga 클래스를 엽니다.

 


2. Saga 클래스 코드를 수정합니다.

 

TransferManager.java

@Saga
@Slf4j
public class TransferManager {
    (...중략...)
    
    @StartSaga
    @SagaEventHandler(associationProperty = "transferID")
    protected void on(MoneyTransferEvent event) {
       (...중략...)
        try {
            log.info("계좌 이체 시작 : {} ", event);
            commandGateway.sendAndWait(comamndFactory.getTransferCommand(), 10, TimeUnit.SECONDS);
        } catch (CommandExecutionException e) {
            log.error("Failed transfer process. Start cancel transaction");
            //보상 트랜잭션 구현 로직
        }
    }
    (...중략...)

}

 

sendAndWait 두 번째, 세 번째 인자를 통해 TimeOut을 지정하며, 해당 기간동안 응답이 없을 경우 Exception을 통해 보상 트랜잭션을 발동할 수 있습니다.


3. 트랜잭션 프로세스 설계

 

일반적인 상황

 

 

일반적으로 계좌 이체 요청을하면, 해당 은행에서 보유한 잔고보다 요청액수가 클 경우에는 이체 거절을하며, 반대의 경우에는 이체 승인을 합니다. 따라서 요청자인 Command 모듈에서는 Jeju 은행의 승인 혹은 거절 이벤트 발생 여부에 따라서 결과를 처리하면 됩니다.


 

보상 트랜잭션 발동 상황

 

 

 

보상 트랜잭션은 연결된 App 사이에 트랜잭션 문제가 발생하였을 때, 이미 처리된 데이터를 원상복구를 위하여 추가적인 트랜잭션을 발동하는 것입니다. 

 

예제에서는 Timeout이 발생하게 되면, Jeju 은행 App의 응답과 관계없이 트랜잭션 Rollback을 위하여 보상 트랜잭션을 요청하고, Command 모듈에서는 다음 로직을 수행합니다.

 

 

만약 위 그림과 같이 만약 Jeju 은행에 발생한 장애로 인하여 계좌 요청을 진행하였지만 응답이 오지 않는 상황이라고 가정해봅시다. Command 모듈에서는 장애 방지를 위해 Timeout을 설정했기 때문에 일정 시간이 지나면 보상 트랜잭션을 발동할 것입니다.

 

 

이후 Jeju 은행 App이 정상화된다면 이전 요청을 수행할 것입니다. 그 결과 요청이 적절하지 않으면, 이체 거절 이벤트를 발송합니다. 만약 이체 거절 상황에서 요청받은 보상 트랜잭션을 처리한다면, 잔고는 그대로인 상황에서 보상 트랜잭션에 의해 잔고가 늘어나는 기현상이 발생합니다.

 

이를 해결하기 위해 다양한 방법이 있겠지만, 예제 프로젝트에서는 Timeout이 발생된 상황에서 이체 거절 메시지를 받게되면, 보상 트랜잭션 취소 요청하여 정상 처리하겠습니다.

 

 

위 그림은 Command 모듈에서 트랜잭션 요청 이후 처리해야할 과정을 개략적으로 순서도로 나타냈습니다.


4. Common 모듈 구현

 

1. Common 모듈 command 패키지내 command 클래스를 추가합니다.

 


2. command 클래스를 구현합니다.

 

AbstractCancelTransferCommand.java

@ToString
@NoArgsConstructor
@AllArgsConstructor
@Getter
public abstract class AbstractCancelTransferCommand {
    @TargetAggregateIdentifier
    protected String srcAccountID;
    protected String dstAccountID;
    protected Long amount;
    protected String transferID;

    public AbortTransferCommand create(String srcAccountID, String dstAccountID, Long amount, String transferID) {
        this.srcAccountID = srcAccountID;
        this.dstAccountID = dstAccountID;
        this.transferID = srcAccountID;
        this.amount = amount;
        return this;
    }
}

 

JejuBankCancelTransferCommand.java

public class JejuBankCancelTransferCommand extends AbstractCancelTransferCommand {
    @Override
    public String toString() {
        return "JejuBankCancelTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

SeoulBankCancelTransferCommand.java

public class SeoulBankCancelTransferCommand extends AbstractCancelTransferCommand {
    @Override
    public String toString() {
        return "SeoulBankCancelTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

AbstractCompensationCancelCommand.java

@ToString
@NoArgsConstructor
@AllArgsConstructor
@Getter
public abstract class AbstractCompensationCancelCommand {
    @TargetAggregateIdentifier
    protected String srcAccountID;
    protected String dstAccountID;
    protected Long amount;
    protected String transferID;
    
    public AbstractCompensationCancelCommand create(String srcAccountID, String dstAccountID, Long amount, String transferID) {
        this.srcAccountID = srcAccountID;
        this.dstAccountID = dstAccountID;
        this.transferID = srcAccountID;
        this.amount = amount;
        return this;
    }
}

 

JejuBankCompensationCancelCommand.java

public class JejuBankCompensationCancelCommand extends AbstractCompensationCancelCommand {
    @Override
    public String toString() {
        return "JejuBankCompensationCancelCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

SeoulBankCompensationCancelCommand.java

public class SeoulBankCompensationCancelCommand extends AbstractCompensationCancelCommand {
    @Override
    public String toString() {
        return "SeoulBankCompensationCancelCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

TransferComamndFactory.java

@RequiredArgsConstructor
public class TransferComamndFactory {
    private final AbstractTransferCommand transferCommand;
    private final AbstractCancelTransferCommand abortTransferCommand;
    private final AbstractCompensationCancelCommand compensationAbortCommand;

    public void create(String srcAccountID, String dstAccountID, Long amount, String transferID){
        transferCommand.create(srcAccountID, dstAccountID, amount, transferID);
        abortTransferCommand.create(srcAccountID, dstAccountID, amount, transferID);
        compensationAbortCommand.create(srcAccountID, dstAccountID, amount, transferID);
    }

    public AbstractTransferCommand getTransferCommand(){
        return this.transferCommand;
    }
    public AbstractCancelTransferCommand getAbortTransferCommand(){
        return this.abortTransferCommand;
    }
    public AbstractCompensationCancelCommand getCompensationAbortCommand(){
        return this.compensationAbortCommand;
    }
}

3. Common 모듈 event 패키지내 event 클래스를 추가합니다.

 


4. event 클래스 내용을 구현합니다.

 

CompletedCancelTransferEvent.java

@Builder
@ToString
@Getter
public class CompletedCancelTransferEvent {
    private String srcAccountID;
    private String dstAccountID;
    private Long amount;
    private String transferID;
}

 

CompletedCompensationCancelEvent.java

@Builder
@ToString
@Getter
public class CompletedCompensationCancelEvent {
    private String srcAccountID;
    private String dstAccountID;
    private Long amount;
    private String transferID;
}

5. Jeju 모듈 수정

 

 

보상 트랜잭션 처리를 위한 Handler 메소드를 추가하기 위해 Aggregate 클래스를 수정합니다.

 

Account.java

@Entity
@Aggregate
@NoArgsConstructor
@AllArgsConstructor
@Slf4j
public class Account {
    @AggregateIdentifier
    @Id
    private String accountID;
    private Long balance;
    private final transient Random random = new Random();

    @CommandHandler
    public Account(AccountCreationCommand command) throws IllegalAccessException {
        log.debug("handling {}", command);
        if (command.getBalance() <= 0)
            throw new IllegalAccessException("유효하지 않은 입력입니다.");
        apply(new AccountCreationEvent(command.getAccountID(), command.getBalance()));
    }

    @EventSourcingHandler
    protected void on(AccountCreationEvent event) {
        log.debug("event {}", event);
        this.accountID = event.getAccountID();
        this.balance = event.getBalance();
    }

    @CommandHandler
    protected void on(JejuBankTransferCommand command) throws InterruptedException {
        if (random.nextBoolean())
            TimeUnit.SECONDS.sleep(15);

        log.debug("handling {}", command);
        if (this.balance < command.getAmount()) {
            apply(TransferDeniedEvent.builder()
                                        .srcAccountID(command.getSrcAccountID())
                                        .dstAccountID(command.getDstAccountID())
                                        .amount(command.getAmount())
                                        .description("잔고가 부족합니다.")
                                        .transferID(command.getTransferID())
                                     .build());
        } else {
            apply(TransferApprovedEvent.builder()
                                            .srcAccountID(command.getSrcAccountID())
                                            .dstAccountID(command.getDstAccountID())
                                            .transferID(command.getTransferID())
                                            .amount(command.getAmount())
                                        .build());
        }
    }

    @EventSourcingHandler
    protected void on(TransferApprovedEvent event) {
        log.debug("event {}", event);
        this.balance -= event.getAmount();
    }

    @CommandHandler
    protected void on(JejuBankCancelTransferCommand command) {
        log.debug("handling {}", command);
        apply(CompletedCancelTransferEvent.builder()
                                            .srcAccountID(command.getSrcAccountID())
                                            .dstAccountID(command.getDstAccountID())
                                            .transferID(command.getTransferID())
                                            .amount(command.getAmount())
                                          .build());
    }

    @EventSourcingHandler
    protected void on(CompletedCancelTransferEvent event) {
        log.debug("event {}", event);
        this.balance += event.getAmount();
    }

    @CommandHandler
    protected void on(JejuBankCompensationCancelCommand command) {
        log.debug("handling {}", command);
        apply(CompletedCompensationCancelEvent.builder()
                                                .srcAccountID(command.getSrcAccountID())
                                                .dstAccountID(command.getDstAccountID())
                                                .transferID(command.getTransferID())
                                                .amount(command.getAmount())
                                              .build());
    }

    @EventSourcingHandler
    protected void on(CompletedCompensationCancelEvent event) {
        log.debug("event {}", event);
        this.balance -= event.getAmount();
    }
}

 

위 클래스 구현 내용 중에서 Timeout 테스트를 위해 50% 확률로 Sleep 하도록 임의로 추가하였습니다.

 

if (random.nextBoolean())
	TimeUnit.SECONDS.sleep(15);

6. Command 모듈 수정

 

1. Command 클래스 수정을 위해 command 패키지 하위 MoneyTransferCommand 클래스 파일을 엽니다.

 


2. 클래스 파일을 수정합니다.

 

MoneyTransferCommand.java

@Builder
@ToString
@Getter
public class MoneyTransferCommand {
    private String srcAccountID;
    @TargetAggregateIdentifier
    private String dstAccountID;
    private Long amount;
    private String transferID;
    private BankType bankType;

    public enum BankType{
        JEJU(command -> new TransferComamndFactory(new JejuBankTransferCommand(),new JejuBankCancelTransferCommand(), new JejuBankCompensationCancelCommand())),
        SEOUL(command -> new TransferComamndFactory(new SeoulBankTransferCommand(), new SeoulBankCancelTransferCommand(), new SeoulBankCompensationCancelCommand()));

        private Function<MoneyTransferCommand, TransferComamndFactory> expression;
        BankType(Function<MoneyTransferCommand, TransferComamndFactory> expression){ this.expression = expression;}
        public TransferComamndFactory getCommandFactory(MoneyTransferCommand command){
            TransferComamndFactory factory = this.expression.apply(command);
            factory.create(command.getSrcAccountID(), command.getDstAccountID(), command.amount, command.getTransferID());
            return factory;
        }

    }

    public static MoneyTransferCommand of(TransferDTO dto){
        return MoneyTransferCommand.builder()
                .srcAccountID(dto.getSrcAccountID())
                .dstAccountID(dto.getDstAccountID())
                .amount(dto.getAmount())
                .bankType(dto.getBankType())
                .transferID(UUID.randomUUID().toString())
                .build();
    }
}

3. 보상 트랜잭션 구현을 위해 Saga 클래스를 엽니다.

 


4. Saga 클래스를 수정합니다.

 

TransferManager.java

@Saga
@Slf4j
public class TransferManager {
    @Autowired
    private transient CommandGateway commandGateway;
    private boolean isExecutingCompensation = false;
    private boolean isAbortingCompensation = false;
    private TransferComamndFactory comamndFactory;

    @StartSaga
    @SagaEventHandler(associationProperty = "transferID")
    protected void on(MoneyTransferEvent event) {

        log.debug("Created saga instance");
        log.debug("event : {}", event);
        comamndFactory = event.getComamndFactory();
        SagaLifecycle.associateWith("srcAccountID", event.getSrcAccountID());

        try {
            log.info("계좌 이체 시작 : {} ", event);
            commandGateway.sendAndWait(comamndFactory.getTransferCommand(), 10, TimeUnit.SECONDS);
        } catch (CommandExecutionException e) {
            log.error("Failed transfer process. Start cancel transaction");
            cancelTransfer();
        }
    }

    private void cancelTransfer() {
        isExecutingCompensation = true;
        log.info("보상 트랜잭션 요청");
        commandGateway.send(comamndFactory.getAbortTransferCommand());
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(CompletedCancelTransferEvent event) {
        isExecutingCompensation = false;
        if (!isAbortingCompensation) {
            log.info("계좌 이체 취소 완료 : {} ", event);
            SagaLifecycle.end();
        }
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferDeniedEvent event) {
        log.info("계좌 이체 실패 : {}", event);
        log.info("실패 사유 : {}", event.getDescription());
        if(isExecutingCompensation){
            isAbortingCompensation = true;
            log.info("보상 트랜잭션 취소 요청 : {}", event);
            commandGateway.send(comamndFactory.getCompensationAbortCommand());
        }
        else {
            SagaLifecycle.end();
        }
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    @EndSaga
    protected void on(CompletedCompensationCancelEvent event){
        isAbortingCompensation = false;
        log.info("보상 트랜잭션 취소 완료 : {}",event);
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferApprovedEvent event) {
        if (!isExecutingCompensation && !isAbortingCompensation) {
            log.info("이체 금액 {} 계좌 반영 요청 : {}",event.getAmount(), event);
            SagaLifecycle.associateWith("accountID", event.getDstAccountID());
            commandGateway.send(TransferApprovedCommand.builder()
                                                            .accountID(event.getDstAccountID())
                                                            .amount(event.getAmount())
                                                            .transferID(event.getTransferID())
                                                       .build());
        }
    }

    @SagaEventHandler(associationProperty = "accountID")
    @EndSaga
    protected void on(DepositCompletedEvent event){
        log.info("계좌 이체 성공 : {}", event);
    }
}

5. 테스트 결과

정상

com.cqrs.command.saga.TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@d713db5)
com.cqrs.command.saga.TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@d713db5) 
com.cqrs.command.saga.TransferManager    : 이체 금액 1 계좌 반영 요청 : TransferApprovedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090, amount=1)
c.c.command.aggregate.AccountAggregate   : handling TransferApprovedCommand(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=1, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090)
c.c.command.aggregate.AccountAggregate   : applying DepositMoneyEvent(holderID=b01fae84-e8a5-427d-a5f4-baa7376b7163, accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=1)
c.c.command.aggregate.AccountAggregate   : balance 31
com.cqrs.command.saga.TransferManager    : 계좌 이체 성공 : DepositCompletedEvent(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=30dbfa4a-ea98-4343-bcbd-00e906870090)

 

Timeout

com.cqrs.command.saga.TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=7a01bd7a-6ce0-43bd-93e9-ee0224dfd791, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3226f9ff)
com.cqrs.command.saga.TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=1, transferID=7a01bd7a-6ce0-43bd-93e9-ee0224dfd791, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3226f9ff) 
com.cqrs.command.saga.TransferManager    : Failed transfer process. Start cancel transaction
com.cqrs.command.saga.TransferManager    : 보상 트랜잭션 요청
com.cqrs.command.saga.TransferManager    : 계좌 이체 취소 완료 : CompletedAbortTransferEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=1, transferID=7a01bd7a-6ce0-43bd-93e9-ee0224dfd791) 

 

잔고 부족

c.c.command.aggregate.AccountAggregate   : handling MoneyTransferCommand(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=300, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, bankType=JEJU)
com.cqrs.command.saga.TransferManager    : Created saga instance
com.cqrs.command.saga.TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@66a12a11)
com.cqrs.command.saga.TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@66a12a11) 
com.cqrs.command.saga.TransferManager    : 계좌 이체 실패 : TransferDeniedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=f73d08ba-004c-4e44-9929-2866f6ea02da, amount=300, description=잔고가 부족합니다.)
com.cqrs.command.saga.TransferManager    : 실패 사유 : 잔고가 부족합니다.

 

잔고 부족 + Timeout

com.cqrs.command.saga.TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3d4d8677)
com.cqrs.command.saga.TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=300, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@3d4d8677) 
com.cqrs.command.saga.TransferManager    : Failed transfer process. Start cancel transaction
com.cqrs.command.saga.TransferManager    : 보상 트랜잭션 요청
com.cqrs.command.saga.TransferManager    : 계좌 이체 실패 : TransferDeniedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, amount=300, description=잔고가 부족합니다.)
com.cqrs.command.saga.TransferManager    : 실패 사유 : 잔고가 부족합니다.
com.cqrs.command.saga.TransferManager    : 보상 트랜잭션 취소 요청 : TransferDeniedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=b132f585-3be8-49b0-ab67-0293a05684ff, amount=300, description=잔고가 부족합니다.)
com.cqrs.command.saga.TransferManager    : 보상 트랜잭션 취소 완료 : CompletedCompensationAbortEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=300, transferID=b132f585-3be8-49b0-ab67-0293a05684ff)

 

테스트 결과 각기 다른 상황에서 정상적으로 수행되는 것을 확인할 수 있습니다.


6. 마치며

 

이상으로 AxonFramework에 대한 포스팅을 마치겠습니다. 테스팅에 대해서는 다루지 않았는데, 분산 App 환경에서 테스트 코드 작성은 반드시 필요하다고 생각합니다. 따라서 공식문서를 참고하여 테스트 코드 작성 방법을 익힌다면, 보다 안전한 프로그램이 될 것입니다.

 

그 외에 쿠버네티스 지원, Tracing에 대해서도 공식문서에 소개되어 있으니 참고바랍니다. 또한, 지금까지 구현한 프로젝트 내용은 깃헙에 업로드 했습니다.

 

포스팅 내용 중 개선사항에 대해서는 댓글로 남겨주시면, 확인 후 내용 반영하도록 하겠습니다.

1. 서론

 

 

이번 포스팅은 분산 트랜잭션 제어를 위한 Saga 패턴을 구현하겠습니다. 보상 트랜잭션까지 같이 구현하면 내용이 복잡하므로 보상 트랜잭션 및 Deadline 기능은 다음 포스팅에서 다루겠습니다.

 

Saga 패턴 사용을 위한 가상의 시나리오는 다음과 같습니다.

 

테스트 시나리오

 

1. Jeju 모듈에 계좌를 개설한다.(계좌명 : test, 잔고 : 100)

2. Command 모듈에 계정 & 계좌를 개설한다.

3. Jeju 모듈의 계좌에서 30원을 Command 모듈 계좌로 이체한다.

4. Jeju 모델 DB의 계좌와 Query 모델 DB를 통해서 이체금액을 확인한다.

 

이전 포스팅에서 구현한 Jeju 모듈을 활용하여 코드 구현을 진행하겠습니다. Saga 요청을 위한 API 및 Saga 인스턴스는 Command 모듈에 생성하도록 하겠습니다.


2. Common 모듈 구현

 

1. 공통으로 사용할 Command 및 Event 추가를 위해 Common 모듈 Command 패키지 및 Command 클래스를 생성합니다.

 


 

2. 생성한 Command 클래스를 구현합니다.

 

TransferComamndFactory.java

@RequiredArgsConstructor
public class TransferComamndFactory {
    private final AbstractTransferCommand transferCommand;

    public void create(String srcAccountID, String dstAccountID, Long amount, String transferID){
        transferCommand.create(srcAccountID, dstAccountID, amount, transferID);
    }

    public AbstractTransferCommand getTransferCommand(){
        return this.transferCommand;
    }
}

 

위 클래스는 계좌 이체와 연관된 Command를 생성하는 Factory 클래스입니다. 나중에 보상 트랜잭션을 위한 Command를 같이 관리하기 위하여 생성하였습니다.

 

AbstractTransferCommand.java

@ToString
@Getter
public abstract class AbstractTransferCommand {
    @TargetAggregateIdentifier
    protected String srcAccountID;
    protected String dstAccountID;
    protected Long amount;
    protected String transferID;

    public AbstractTransferCommand create(String srcAccountID, String dstAccountID, Long amount, String transferID) {
        this.srcAccountID = srcAccountID;
        this.dstAccountID = dstAccountID;
        this.transferID = transferID;
        this.amount = amount;
        return this;
    }
}

 

계좌 이체 요청을 위한 클래스입니다. srcAccountID는 인출할 계좌 ID이며, dstAccountID는 송금 대상 계좌 ID입니다.

 

JejuBankTransferCommand.java

public class JejuBankTransferCommand extends AbstractTransferCommand {
    @Override
    public String toString() {
        return "JejuBankTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

SeoulBankTransferCommand.java

public class SeoulBankTransferCommand extends AbstractTransferCommand {
    @Override
    public String toString() {
        return "SeoulBankTransferCommand{" +
                "srcAccountID='" + srcAccountID + '\'' +
                ", dstAccountID='" + dstAccountID + '\'' +
                ", amount=" + amount +
                ", transferID='" + transferID + '\'' +
                '}';
    }
}

 

 


3. Common 모듈 event 패키지 하위에 Event 클래스를 생성합니다.

 

 


4. Event 클래스를 구현합니다.

 

MoneyTransferEvent.java

@Builder
@ToString
@Getter
public class MoneyTransferEvent {
    private String dstAccountID;
    private String srcAccountID;
    private Long amount;
    private String transferID;
    private TransferComamndFactory comamndFactory;
}

 

위 클래스는 Command 모듈로 계좌이체 요청이 들어오면, 인출할 대상 계좌번호(srcAccountID) 및 입금 계좌번호(dstAccountID), 그리고 이체 금액(amount)와 같은 기본정보와 트랜잭션간 고유 키(transferID) 및 요청 Command 구분 정보를 담고있습니다.

 

TransferApprovedEvent.java

@Builder
@ToString
@Getter
public class TransferApprovedEvent {
    private String srcAccountID;
    private String dstAccountID;
    private String transferID;
    private Long amount;
}

 

위 클래스는 계좌이체가 성공되었을 때, 금액을 반영하기 위한 Event입니다.

 

TransferDeniedEvent.java

@Getter
@Builder
@ToString
public class TransferDeniedEvent {
    private String srcAccountID;
    private String dstAccountID;
    private String transferID;
    private Long amount;
    private String description;
}

 

위 클래스는 계좌이체가 거절될 때, 요청 App에 거절 내역을 전달하기 위한 Event입니다.


3. Jeju 모듈 구현

 

1. Jeju 은행 모듈 build.gradle 파일을 엽니다.

 


2. Jeju 모듈에 State-Stored-Aggregate 구현을 위해 JPA 및 DB 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
dependencies{
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
    implementation group: 'org.postgresql', name: 'postgresql', version: '42.2.6'
}

3. Jeju 모듈 resources 패키지 하위 application.yml 파일을 엽니다.

 


4. datasource 속성을 추가합니다.

 

application.yml

server:
  port: 9091

spring:
  application:
    name: eventsourcing-cqrs-jejuBank
  datasource:
    platform: postgres
    url: jdbc:postgresql://localhost:5432/jeju
    username: jeju
    password: jeju
  jpa:
    hibernate:
      ddl-auto: update

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

logging.level.com.cqrs.jeju : debug

 

DB는 Command와 Query와 동일하게 Postgre를 사용하였으며, jeju 계정을 별도로 생성하였습니다.


5. 계좌 생성을 위해 Jeju 모듈 하위에 다음과 같은 패키지들을 생성합니다.

 


6. aggregate 패키지 내 aggregate 클래스부터 구현하겠습니다.

 

Account.java


@Entity
@Aggregate
@NoArgsConstructor
@AllArgsConstructor
@Slf4j
public class Account {
    @AggregateIdentifier
    @Id
    private String accountID;
    private Long balance;
    
    @CommandHandler
    public Account(AccountCreationCommand command) throws IllegalAccessException {
        log.debug("handling {}", command);
        if (command.getBalance() <= 0)
            throw new IllegalAccessException("유효하지 않은 입력입니다.");
        apply(new AccountCreationEvent(command.getAccountID(), command.getBalance()));
    }

    @EventSourcingHandler
    protected void on(AccountCreationEvent event) {
        log.debug("event {}", event);
        this.accountID = event.getAccountID();
        this.balance = event.getBalance();
    }

    @CommandHandler
    protected void on(JejuBankTransferCommand command) throws InterruptedException {

        log.debug("handling {}", command);
        if (this.balance < command.getAmount()) {
            apply(TransferDeniedEvent.builder()
                                        .srcAccountID(command.getSrcAccountID())
                                        .dstAccountID(command.getDstAccountID())
                                        .amount(command.getAmount())
                                        .description("잔고가 부족합니다.")
                                        .transferID(command.getTransferID())
                                     .build());
        } else {
            apply(TransferApprovedEvent.builder()
                                            .srcAccountID(command.getSrcAccountID())
                                            .dstAccountID(command.getDstAccountID())
                                            .transferID(command.getTransferID())
                                            .amount(command.getAmount())
                                        .build());
        }
    }

    @EventSourcingHandler
    protected void on(TransferApprovedEvent event) {
        log.debug("event {}", event);
        this.balance -= event.getAmount();
    }
}

 

트랜잭션 테스트 도중 Account 정보를 DB에서 바로 확인하기 위해 State-Stored Aggregate 형태로 구현하였습니다. 위 코드 내용은 Command 어플리케이션 구현파트에서 다룬 내용이 주이기에 별도 설명은 생략하겠습니다.


7. command 패키지내 Command 클래스 생성 및 이를 구현합니다.

 

AccountCreationCommand.java

@NoArgsConstructor
@AllArgsConstructor
@ToString
@Getter
public class AccountCreationCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private Long balance;
}

8. dto 패키지내 dto 클래스 생성 및 이를 구현합니다.

 

AccountDTO.java

@NoArgsConstructor
@AllArgsConstructor
@ToString
@Getter
public class AccountDTO {
    private String accountID;
    private Long balance;
}

9. event 패키지내 event 클래스를 생성 및 이를 구현합니다.

 

AccountCreationEvent.java

@ToString
@RequiredArgsConstructor
@Getter
public class AccountCreationEvent {
    private final String accountID;
    private final Long balance;
}

해당 이벤트는 Jeju 모듈내에서만 유효하기 때문에 공통 모듈에 생성하지 않았습니다.


10. service 패키지내 service 클래스 생성 및 이를 구현합니다.

 

AccountService.java

public interface AccountService {
    String createAccount(AccountDTO accountDTO);
}

 

AccountServiceImpl.java

@Service
@RequiredArgsConstructor
public class AccountServiceImpl implements AccountService {
    private final CommandGateway commandGateway;

    @Override
    public String createAccount(AccountDTO accountDTO) {
        return commandGateway.sendAndWait(new AccountCreationCommand(accountDTO.getAccountID(), accountDTO.getBalance()));
    }
}

11. controller 패키지내 controller 클래스 생성 및 이를 구현합니다.

 

AccountController.java

@RestController
@RequiredArgsConstructor
public class AccountController {
    private final AccountService accountService;

    @PostMapping("/account")
    public ResponseEntity<String> createAccount(@RequestBody AccountDTO accountDTO){
        return ResponseEntity.ok().body(accountService.createAccount(accountDTO));
    }
}

4. Command 모듈 구현

 

1. Command 구현 포스팅 8번 항목에서 AxonServerCommandBus를 SimpleCommandBus로 대체했습니다. Saga 트랜잭션에서는 다른 모듈에 대하여 Command 요청이 필요하므로 기존에 SImpleCommandBus 설정을 해제해야합니다. 이를 위해 config 패키지내에 있는 AxonConfig 클래스를 엽니다.

 


2. AxonConfig 파일에서 commandBus Bean 로직을 주석처리하거나 삭제합니다.

 

AxonConfig.java

@Configuration
@AutoConfigureAfter(AxonAutoConfiguration.class)
public class AxonConfig {
	(...중략...)
//    @Bean
//    SimpleCommandBus commandBus(TransactionManager transactionManager){
//        return  SimpleCommandBus.builder().transactionManager(transactionManager).build();
//    }
	(...중략...)
}

3. 계좌이체 요청 및 반영을 위한 Command 요청을 위해 command 패키지 내 command 클래스를 생성합니다.

 


4. Command 클래스 내용을 구현합니다.

 

TransferApprovedCommand.java

@ToString
@Getter
@Builder
public class TransferApprovedCommand {
    @TargetAggregateIdentifier
    private String accountID;
    private Long amount;
    private String transferID;
}

위 클래스는 계좌이체가 정상적으로 수행되었을 때, 요청을 위한 Command 클래스입니다.

 

MoneyTransferCommand.java

@Builder
@ToString
@Getter
public class MoneyTransferCommand {
    private String srcAccountID;
    @TargetAggregateIdentifier
    private String dstAccountID;
    private Long amount;
    private String transferID;
    private BankType bankType;

    public enum BankType{
        JEJU(command -> new TransferComamndFactory(new JejuBankTransferCommand()),
        SEOUL(command -> new TransferComamndFactory(new SeoulBankTransferCommand());

        private Function<MoneyTransferCommand, TransferComamndFactory> expression;
        BankType(Function<MoneyTransferCommand, TransferComamndFactory> expression){ this.expression = expression;}
        public TransferComamndFactory getCommandFactory(MoneyTransferCommand command){
            TransferComamndFactory factory = this.expression.apply(command);
            factory.create(command.getSrcAccountID(), command.getDstAccountID(), command.amount, command.getTransferID());
            return factory;
        }

    }

    public static MoneyTransferCommand of(TransferDTO dto){
        return MoneyTransferCommand.builder()
                .srcAccountID(dto.getSrcAccountID())
                .dstAccountID(dto.getDstAccountID())
                .amount(dto.getAmount())
                .bankType(dto.getBankType())
                .transferID(UUID.randomUUID().toString())
                .build();
    }
}

 

계좌 이체 요청시, 은행구분에 따른 Command 생성을 달리 하기 위하여 enum을 사용했습니다. 또한, DTO 클래스를 Command 클래스로 변환하기 위한 Factory 메소드를 추가하였습니다.


5. DTO 추가를 위해 dto 패키지 내 dto 클래스를 생성합니다.

 

 

TransferDTO.java

@Getter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class TransferDTO {
    private String srcAccountID;
    private String dstAccountID;
    private Long amount;
    private MoneyTransferCommand.BankType bankType;
}

6. Service 메소드 추가를 위해서 service 패키지 하위 service 클래스를 수정합니다.

 

 

TransactionService.java

public interface TransactionService {
    (...중략...)
    String transferMoney(TransferDTO transferDTO);
}

 

TransactionServiceImpl.java

@Service
@RequiredArgsConstructor
public class TransactionServiceImpl implements TransactionService {
    private final CommandGateway commandGateway;
	(...중략...)

    @Override
    public String transferMoney(TransferDTO transferDTO) {
        return commandGateway.sendAndWait(MoneyTransferCommand.of(transferDTO));
    }
}

7. API 추가를 위해서 controller 패키지내 Controller에 메소드를 추가합니다.

 

 

TransactionController.java

@RestController
@RequiredArgsConstructor
public class TransactionController {
    private final TransactionService transactionService;

    (...중략...)

    @PostMapping("/transfer")
    public ResponseEntity<String> transfer(@RequestBody TransferDTO transferDTO){
        return ResponseEntity.ok().body(transactionService.transferMoney(transferDTO));
    }
}

8. Aggregate 수정을 위해 aggregate 패키지내 AccountAggregate 클래스를 엽니다.

 


9. Aggregate에 Command 및 EventSourcing 핸들러 로직을 추가합니다.

 

AccountAggregate.java

@NoArgsConstructor
@AllArgsConstructor
@Slf4j
@Aggregate
@EqualsAndHashCode
public class AccountAggregate {
    (...중략...)

    @CommandHandler
    protected void transferMoney(MoneyTransferCommand command) {
        log.debug("handling {}", command);
        apply(MoneyTransferEvent.builder()
                .srcAccountID(command.getSrcAccountID())
                .dstAccountID(command.getDstAccountID())
                .amount(command.getAmount())
                .comamndFactory(command.getBankType().getCommandFactory(command))
                .transferID(command.getTransferID())
                .build());
    }

    @CommandHandler
    protected void transferMoney(TransferApprovedCommand command) {
        log.debug("handling {}", command);
        apply(new DepositMoneyEvent(this.holderID, command.getAccountID(), command.getAmount()));
        apply(new DepositCompletedEvent(command.getAccountID(), command.getTransferID()));
    }
}

10. Saga 패키지 및 Saga 클래스를 생성합니다.

 


11. Saga 클래스를 구현합니다.

 

TransferManager.java

@Saga
@Slf4j
public class TransferManager {
    @Autowired
    private transient CommandGateway commandGateway;
    private TransferComamndFactory comamndFactory;

    @StartSaga
    @SagaEventHandler(associationProperty = "transferID")
    protected void on(MoneyTransferEvent event) {
        log.debug("Created saga instance");
        log.debug("event : {}", event);
        comamndFactory = event.getComamndFactory();
        SagaLifecycle.associateWith("srcAccountID", event.getSrcAccountID());

		log.info("계좌 이체 시작 : {} ", event);
		commandGateway.send(comamndFactory.getTransferCommand());
    }

    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferApprovedEvent event) {
		log.info("이체 금액 {} 계좌 반영 요청 : {}", event.getAmount(), event);
        SagaLifecycle.associateWith("accountID", event.getDstAccountID());
        commandGateway.send(TransferApprovedCommand.builder()
                .accountID(event.getDstAccountID())
                .amount(event.getAmount())
                .transferID(event.getTransferID())
                .build());
    }
    
    @SagaEventHandler(associationProperty = "srcAccountID")
    protected void on(TransferDeniedEvent event) {
        log.info("계좌 이체 실패 : {}", event);
        log.info("실패 사유 : {}", event.getDescription());
		SagaLifecycle.end();
    }

    @SagaEventHandler(associationProperty = "accountID")
    @EndSaga
    protected void on(DepositCompletedEvent event){
        log.info("계좌 이체 성공 : {}", event);
    }
}

 

구현 내용을 간략하게 소개하면 다음과 같습니다. 먼저 @Saga 어노테이션을 통해 해당 클래스가 Saga의 대상임을 지정합니다. 해당 클래스는 NoArgsConstructor로 생성이 되어야하므로 이를 주의합니다.

 

@StartSaga는 Saga 인스턴스의 시작점입니다. 여기에서 associationProperty는 해당 인스턴스를 유일하게 구별할 수 있는 속성이 무엇인지 지정합니다. 자세한 내용은 Axon 공식문서를 참고바랍니다.

 

생성된 Saga는 트랜잭션이 끝나면 종료되어야합니다. 위 예제에서는 DepositCompletedEvent 메시지를 수신받으면, 전체 트랜잭션이 끝난것으로 판단하여 Saga 인스턴스를 종료하도록 되어있습니다. Saga 인스턴스 종료방법은 크게 두가지입니다. 먼저 위 에제와 같이 명시적으로 end 메소드를 기입하거나 @EndSaga 어노테이션 지정할 수 있습니다.


5. 테스트

 

1. Command, Query, Jeju 모듈 App을 기동한 다음, Jeju 은행 계좌를 개설합니다.

POST localhost:9091/account
Content-Type: application/json

{
	"accountID" : "test",
	"balance" : 100
}

2. 생성된 계좌 내역을 DB에서 확인합니다.

 


3. Command 모듈에 계정을 생성합니다.

 

POST http://localhost:8080/holder
Content-Type: application/json

{
	"holderName" : "Kevin",
	"tel" : "02-2645-5678",
	"address" : "OO시 OO구",
    "company" : "Korea"
}

4. Command 모듈에 계좌를 개설합니다.

 

POST http://localhost:8080/account
Content-Type: application/json

{
  "holderID" : "b01fae84-e8a5-427d-a5f4-baa7376b7163"
}

5. Query 모듈 DB에서 계정 및 계좌 생성 내역을 확인합니다.

 


6. Jeju 은행 계좌에서 Command 모듈에 개설된 계좌로 30원 이체합니다.

 

POST http://localhost:8080/transfer
Content-Type: application/json
{
	"srcAccountID" : "test",
	"dstAccountID" : "a31faade-5b57-4435-85da-1de0ed1c55c4",
	"amount" : 30,
	"bankType" : "JEJU"
}

 

AccountAggregate   : handling MoneyTransferCommand(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, bankType=JEJU)
TransferManager    : Created saga instance
TransferManager    : event : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@5a65bed)
TransferManager    : 계좌 이체 시작 : MoneyTransferEvent(dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, srcAccountID=test, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, comamndFactory=com.cqrs.command.transfer.factory.TransferComamndFactory@5a65bed) 
TransferManager    : 이체 금액 30 계좌 반영 요청 : TransferApprovedEvent(srcAccountID=test, dstAccountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=3a6583e3-0288-4e07-87f0-c818644e009b, amount=30)
AccountAggregate   : handling TransferApprovedCommand(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30, transferID=3a6583e3-0288-4e07-87f0-c818644e009b)
AccountAggregate   : applying DepositMoneyEvent(holderID=b01fae84-e8a5-427d-a5f4-baa7376b7163, accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, amount=30)
AccountAggregate   : balance 30
TransferManager    : 계좌 이체 성공 : DepositCompletedEvent(accountID=a31faade-5b57-4435-85da-1de0ed1c55c4, transferID=3a6583e3-0288-4e07-87f0-c818644e009b)

성공적으로 계좌이체가 완료되었으면 위와 비슷한 로그가 출력될 것입니다.


7. DB에서 계좌 상태를 확인합니다.

 


6. 마치며

 

 

serialized_sga 내용

<com.cqrs.command.saga.TransferManager>
    <comamndFactory>
        <transferCommand class="com.cqrs.command.transfer.JejuBankTransferCommand">
        <srcAccountID>test</srcAccountID>
        <dstAccountID>a31faade-5b57-4435-85da-1de0ed1c55c4</dstAccountID>
        <amount>30</amount>
        <transferID>test</transferID>
        </transferCommand>
        </compensationAbortCommand>
    </comamndFactory>
</com.cqrs.command.saga.TransferManager>

 

이번 포스팅에서는 Jeju, Command 모듈간에 일관성을 위하여 Saga 패턴을 사용했습니다. Command 모듈에서 MoneyTransferEvent Event가 발행되면, Saga 인스턴스가 생성됩니다. 이후 Application의 비즈니스 로직에 따라 모든 트랜잭션이 종료될 때까지 유지됩니다. Saga 인스턴스 정보는 동시에 SAGA_ENTRY 테이블에 적재되며, Saga 인스턴스가 종료되면 해당 데이터 또한 사라집니다.

 

다음 포스팅에서는 Deadline 기능, 보상 트랜잭션 로직이 적용된 Saga 패턴을 구현하겠습니다.

1. 서론

 

MSA 아키텍처를 구성하기 어려운 이유중 가장 큰 문제는 트랜잭션(Transaction)입니다. 기존 모놀로틱(Monolithic) 환경에서 DBMS가 기본적으로 제공해주는 트랜잭션 기능을 통해 데이터 Commit 혹은 Rollback을 통해 데이터를 일관성 있게 관리했습니다. 하지만 Application 및 DB가 분산되면서, 트랜잭션 처리를 단일 DBMS에서 제공하는 기능을 통해서 달성할 수 없습니다.

 

이번 포스팅에서는 분산 트랜잭션의 종류인 Two-Phase Commit, Saga 패턴 및 Axon에서 제공하는 Saga 기능에 대하여 소개하겠습니다.


2. Two Phase Commit

 

 

분산 DB 환경에서 쓰는 방법으로 주요 RDBMS에서 기능을 제공합니다. Two-Phase Commit은 말 그대로 2단계에 거쳐서 데이터를 영속화 하는 작업입니다. 위 그림과 같이 여러 DB가 분산 되었을 때, 트랜잭션을 조율하는 조정자(Coordinator)가 존재합니다. 조정자의 역할은 트랜잭션 요청이 들어왔을 때 두 단계를 거쳐 트랜잭션을 진행을 담당합니다. 이때 첫 번째 단계는 Prepare이며, 이는 쉽게말해 연관된 DB에게 데이터를 저장할 수 있는 상태인지 묻는 과정에 해당합니다.

 

 

메시지를 받은 DB에서는 Commit 작업을 위한 준비를 진행합니다. 이후 데이터를 영속할 수 있는 준비가 완료되면 조정자에게 준비가 완료되었음을 알리고, 반대로 Commit할 수 없다면 불가하다는 메시지를 전달합니다.

 

조정자는 첫 번째 단계에서 전달한 메시지에 대한 응답을 기다립니다. 모든 메시지 수신이 완료되면 두 번째 단계인 Commit을 진행합니다. Commit 단계에서는 조정자가 연관된 DB에게 데이터를 저장하라는 메시지를 송신하며, 수신받은 DB에서는 각자 DB에 데이터를 영속화 합니다.

 

 

모든 DB에서 트랜잭션 처리가 완료되면 전체 트랜잭션을 종료합니다.만약 두 단계를 거치는 과정에서 연관된 DB 중 하나의 DB라도 Commit을 할 수 없는 상황이라면, 모든 DB에게 Rollback을 요구합니다. 트랜잭션을 종료하는 동시에 모든 DB 데이터가 영속화됩니다. 따라서 트랜잭션의 범위는 데이터를 처리하는 DB 전체입니다.


MSA 환경에서 Two-Phase Commit 문제점

 

Two-Phase Commit은 DBMS 간 분산 트랜잭션을 지원해야 적용가능합니다. 하지만 NoSQL 제품군에는 이를 지원하지 않고, 함께 사용되는 DBMS가 동일 제품군(Oracle, MySQL, Postgres)이여야합니다. 따라서 DBMS polyglot 구성은 어렵습니다.

 

또한 Two-Phase Commit은 보통 하나의 API 엔드포인트를 통해 서비스 요청이 들어오고 내부적으로 DB가 분산되어있을 때 사용됩니다. 하지만 MSA 환경에서는 각기 다른 App에서 API간으로 통신을 통해 서비스 요청이 이루어지기 때문에 구현이 쉽지 않습니다.


3. Saga 패턴

 

 

Saga 패턴은 트랜잭션의 관리주체가 DBMS가 아닌 Application에 있습니다. App이 분산되어있을 때, 각 App 하위에 존재하는 DB는 Local 트랜잭션 처리만 담당합니다. 

 

 

따라서 각각의 App에 대한 연속적인 트랜잭션 요청 및 실패할 경우에 Rollback 처리(보상 트랜잭션)를  Application에서 구현해야합니다. 

 

Saga 패턴은 위 그림과 같이 연속적인 업데이트 연산으로 이루어져있으며, 전체가 동시에 데이터가 영속화되는 것이아니라 순차적인 단계로 트랜잭션이 이루어집니다. 따라서 Application 비즈니스 로직에서 요구되는 마지막 트랜잭션이 끝났을 때, 데이터가 완전히 영속되었음을 인지하고 이를 종료합니다.

 

Two-Phase Commit과 다르게 Saga를 활용한 트랜잭션은 데이터 격리성(Isolation)을 보장해주지 않습니다. 하지만 Application의 트랜잭션 관리를 통해 최종 일관성(Eventually Consistency)을 달성할 수 있기 때문에 분산되어있는 DB간에 정합성을 맞출 수 있습니다. 또한 트랜잭션 관리를 Application에서 하기 때문에 DBMS를 다른 제품군으로 구성할 수 있는 장점이 있습니다.

 

하지만 이러한 일관성을 달성하기 위해서는 프로세스 수행 과정상 누락되는 작업이 없는지 면밀히 살펴야하며, 실패할경우 에러 복구를 위한 보상 트랜잭션 처리 누락이 없도록 설계해야합니다.


4. Saga 패턴 종류


1. Choreography-Based Saga

 

 

Choreography-Based Saga는 자신이 보유한 서비스내 Local 트랜잭션을 관리하며, 트랜잭션이 종료되면 완료 Event를 발행합니다. 만약 그 다음에 수행되어야할 트랜잭션이 있다면, 해당 트랜잭션을 수행해야하는 App에서 완료 Event를 수신받고 다음 작업을 처리합니다. 이때 Event는 Kafka와 같은 메시지 큐를 이용해서 비동기 방식으로 전달할 수 있습니다.

 

 

Choreography-Based Saga 방식에서는 각 App별로 트랜잭션을 관리하는 로직이 있습니다. 따라서 중간에 트랜잭션이 실패하면, 해당 트랜잭션 취소처리를 실패한 App에서 보상 Event를 발행하여 Rollback 처리를 시도합니다.

 

위와 같은 구성은 구축하기 쉬운 장점이 있습니다. 하지만 운영자 입장에서 트랜잭션의 현재 상태를 알기 쉽지 않습니다. 


2. Orchestration-Based Saga

 

 

Orchestration-Based Saga는 트랜잭션 처리를 위한 Saga 인스턴스(Manager)가 별도로 존재합니다. 트랜잭션에 관여하는 모든 App은 Manager에 의하여 점진적으로 트랜잭션을 수행하며 결과를 Manager에게 전달합니다. 비즈니스 로직상 마지막 트랜잭션이 끝나면 Manager를 종료하여 전체 트랜잭션 처리를 종료합니다. 만약 중간에 실패하게 되면 Manager에서 보상 트랜잭션을 발동하여 일관성을 유지하도록 합니다.

 

 

모든 관리를 Manager가 호출하기 때문에 분산트랜잭션의 중앙 집중화가 이루어집니다. 따라서 서비스간의 복잡성이 줄어들고 구현 및 테스트가 상대적으로 쉽습니다. 또한 트랜잭션의 현재 상태를 Manager가 알고 있기 때문에 롤백을 쉽게할 수 있는 것 또한 장점입니다. 하지만 이를 관리하기 위한 Orchestrator 서비스가 추가되어야 하기 때문에 인프라 구현의 복잡성이 증가되는 단점이 존재합니다.


5. Axon Saga 기능 소개

 

AxonFramework 에서는 Orchestration 방식의 Saga 패턴을 지원합니다. 즉 트랜잭션을 시작하는 시점에 Saga 인스턴스를 생성하며. Saga 인스턴스에서 트랜잭션을 관리합니다.

 

 

Saga 인스턴스는 Event를 처리하는 UnitOfWork 단계에서 생성되며, 전체 트랜잭션 처리가 완료되면 Saga 인스턴스를 종료합니다. Axon에서는 Annotation 기반으로 Saga 인스턴스를 간편하게 설정할 수 있습니다. 또한 DB에 Saga 정보를 저장하고 있어 복구가 가능합니다. 

 

추가로 생성된 Saga에서 트랜잭션 요청시, Deadline 지정이 가능합니다. 이로인해 트랜잭션 수행 App으로부터 응답이 없을 경우 보상 트랜잭션을 수행할 수 있습니다.


6. 마치며

 

MSA를 구성하는 환경에서 Saga를 도입하기전에 비즈니스 로직상 트랜잭션처리가 반드시 필요한지에 대한 충분한 고려가 필요합니다. 이곳 저곳에 적용했다가는 트랜잭션 처리 지옥을 경험할 수 있기 때문입니다. 반드시 필요한 부분에만 일부 도입하는 것이 좋으며, 가장 좋은 상황은 MSA 환경에서 트랜잭션 처리를 하지 않도록 비즈니스 로직을 설계하는 것입니다.

 

다음 시간에는 예제 구현을 통해 Axon에서 제공하는 Saga 기능을 익혀보겠습니다.

1. 서론

 

이번 포스팅에서는 Scatter-Gather Query를 구현하겠습니다. Scatter-Gather Query는 동일한 Query를 수행하는 Query Handler가 여러 App에 존재할 경우 모든 App에 Query를 요청하여 결과를 취합받아 최초 Query를 요청한 Application에서 결과를 처리합니다.

 

데모 프로젝트에 아래와 같은 요구사항이 추가되었음을 가정하여 Scatter-Gather Query 기능을 구현하도록 하겠습니다.

 

 

 

Axon Server에 Jeju 은행과 Seoul 은행 외부시스템이 연결되어있다고 가정해봅시다. 이때 소유주(HolderID)가 보유한 잔고에 대하여 각 은행에게 대출한도를 Query하면 은행별로 전달받은 답변을 Client 화면에 표시하는 요구사항을 코드를 통해 알아보겠습니다.


2. Jeju 은행 모듈 구현

 

1. Jeju 은행과, Seoul 은행에 Query를 요청하려면, Query 클래스 정보를 공유해야하므로, 공통 모듈(Common)에 Query 클래스를 생성해야합니다. 먼저 Common 모듈에 query > loan 패키지를 생성합니다. 이후 Query 및 결과를 저장할 클래스를 생성합니다.

 


2. 생성한 두 클래스를 구현합니다.

 

LoanLimitQuery.java

@AllArgsConstructor
@ToString
@Getter
public class LoanLimitQuery {
    private String holderID;
    private Long balance;
}

 

LoanLimitResult.java

@AllArgsConstructor
@ToString
@Getter
@Builder
public class LoanLimitResult {
    private String holderID;
    private String bankName;
    private Long balance;
    private Long loanLimit;
}

3. 새로운 은행 모듈 생성을 위하여 프로젝트 root 디렉토리에 위치한 settings.gradle 파일을 연다음 모듈 추가합니다.

 

settings.gradle

rootProject.name = 'demo'
include 'command'
include 'query'
include 'common'
include 'seoulBank'
include 'jejuBank'

4. 추가된 두 묘듈에서 공통 모듈을 사용하기 위해 빌드 설정을 추가해야 합니다. 프로젝트 root 디렉토리에 위치한 build.gradle 파일을 연다음 빌드 스크립트 내용을 추가합니다.

 

build.gradle

(...중략...)
project(':jejuBank') {
    dependencies {
        compile project(':common')
    }
}

project(':seoulBank') {
    dependencies {
        compile project(':common')
    }
}

 

이후 gradle build를 수행하면, jejuBank, seoulBank 모듈이 생성됩니다.


5.  jejuBank 프로젝트 하위에 build.gradle 파일을 생성합니다.

 


6. build.gradle 파일에 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
dependencies{
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
}

7. resource 패키지 하위에 application.yml 파일을 생성합니다.

 


8. application.yml 파일에 설정 값을 기술합니다.

 

application.yml

server:
  port: 9091

spring:
  application:
    name: eventsourcing-cqrs-jejuBank

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

9. 패키지 구조 설정 한다음 Component 패키지와 Main 클래스를 생성합니다.

 


9. Main 클래스 내용을 구현합니다.

 

jejuBankApp.java

@SpringBootApplication
public class JejuBankApp {
    public static void main(String[] args) {
        SpringApplication.run(JejuBankApp.class, args);
    }
}

10. component 패키지내에 Query를 처리할 Component 클래스를 생성합니다.

 


11. Component 클래스 내용을 구현합니다. 

 

AccountLoanComponent.java

@Component
@Slf4j
public class AccountLoanComponent {

    @QueryHandler
    private LoanLimitResult on(LoanLimitQuery query) {
        log.debug("handling {}",query);
        return LoanLimitResult.builder()
                .holderID(query.getHolderID())
                .balance(query.getBalance())
                .bankName("JejuBank")
                .loanLimit(Double.valueOf(query.getBalance() * 1.2).longValue())
                .build();
    }
}

 

위 코드에서 jeju 은행의 대출한도는 일괄적으로 보유 잔고의 120%만 가능하도록 가정하였습니다.


3. Seoul 은행 모듈 구현

1. seoulBank 프로젝트 하위에 build.gradle 파일을 생성합니다.

 


2. build.gradle 파일에 의존성을 추가합니다.

 

build.gradle

ext{
    axonVersion = "4.2.1"
}
dependencies{
    implementation 'org.springframework.boot:spring-boot-starter-web'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.projectlombok:lombok'
    implementation group: 'org.axonframework', name: 'axon-spring-boot-starter', version: "$axonVersion"
}

3. resource 패키지 하위에 application.yml 파일을 생성합니다.

 


 

4. application.yml 파일에 설정 값을 기술합니다.

 

application.yml

server:
  port: 9092

spring:
  application:
    name: eventsourcing-cqrs-seoulBank

axon:
  serializer:
    general: xstream
  axonserver:
    servers: localhost:8124

5. 패키지 구조 설정 한다음 Component 패키지와 Main 클래스를 생성합니다.

 


6. Main 클래스 내용을 구현합니다.

 

SeoulBankApp.java

@SpringBootApplication
public class SeoulBankApp {
    public static void main(String[] args) {
        SpringApplication.run(SeoulBankApp.class, args);
    }
}

7. component 패키지내에 Query를 처리할 Component 클래스를 생성합니다.

 


8.  Component 클래스 내용을 구현합니다.

 

AccountLoanComponent.java

@Component
@Slf4j
public class AccountLoanComponent {
    @QueryHandler
    private LoanLimitResult on(LoanLimitQuery query) {
        log.debug("handling {}",query);
        return LoanLimitResult.builder()
                .holderID(query.getHolderID())
                .balance(query.getBalance())
                .bankName("SeoulBank")
                .loanLimit(Double.valueOf(query.getBalance() * 1.5).longValue())
                .build();
    }
}

4. Query 인터페이스 구현

 

1. 화면 생성을 위해 Query 모듈 resources > templates 패키지내에 scatter-gather.html 파일을 생성합니다.

 


2. 화면 코드를 구현합니다.

 

scatter-gather.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Scatter-Gather Query Example</title>
</head>
<script>
    window.addEventListener("DOMContentLoaded", function (){
        (function () {
            let appendDiv = document.getElementById("layout");
            let text = document.getElementById("holderInput");
            let pElem = document.createElement("p");
            document.getElementById("wrapper").addEventListener("click", append);

            function append(e) {
                let target = e.target;
                let callbackFunction = callback[target.getAttribute("data-cb")];
                callbackFunction();
            }

            let callback = {
                "search": (function () {
                    let holderId = text.value;
                    if (holderId === undefined || holderId === null || holderId ==="") {
                        alert("소유주를 입력하시오.");
                    } else {
                        let xhr = new XMLHttpRequest();
                        xhr.open('GET','http://localhost:9090/account/info/scatter/gather/'+holderId, true);
                        xhr.send();
                        xhr.onload = function(){
                            if(xhr.status === 200){
                                let elem = pElem.cloneNode();
                                elem.innerText = xhr.responseText;
                                appendDiv.appendChild(elem);
                            }
                        }
                    }
                })
            }
        }());
    });
</script>
<body>
<div id="wrapper">
    <input type="button" data-cb="search" value="조회"/>
</div>
<input type="text" id="holderInput" placeholder="소유주 ID를 입력하시오.">
<div id="layout"/>
</body>
</html>

3. Scatter-Query를 요청할 서비스를 구현하기 위하여 먼저 메소드를 정의해야합니다. Query 모듈 service 패키지에 위치한 QueryService 클래스를 엽니다.

 


4. 인터페이스에 추상 메소드를 정의합니다.

 

QueryService.java

public interface QueryService {
    (...중략...)
    List<LoanLimitResult> getAccountInfoScatterGather(String holderId);
}

5. QueryServiceImpl 클래스를 열어 추가된 추상 메소드를 구현합니다.

 

QueryServiceImpl.java

@RequiredArgsConstructor
@Slf4j
@Service
public class QueryServiceImpl implements QueryService {
    (...중략...)
    private final AccountRepository repository;
    (...중략...)
    @Override
    public List<LoanLimitResult> getAccountInfoScatterGather(String holderId) {
        HolderAccountSummary accountSummary = repository.findByHolderId(holderId).orElseThrow();

        return queryGateway.scatterGather(new LoanLimitQuery(accountSummary.getHolderId(), accountSummary.getTotalBalance()),
                ResponseTypes.instanceOf(LoanLimitResult.class),
                30, TimeUnit.SECONDS)
                .collect(Collectors.toList());
    }
}

 

Scatter-Gather 쿼리는 단일 App에 요청하는 것이 아니므로, 만약 Handler 처리 App에 장애가 발생한다면 무한정 대기할 수 있습니다. 따라서 요청시, DeadLine을 정하여 요청시간 만큼만 대기할 수 있도록 지정이 필요합니다.


6. API End Point 지정 및 화면 호출을 위하여 Controller 클래스 수정이 필요합니다. Query 모듈내 Controller 패키지안에 있는 두개의 Controller 클래스에 관련 메소드를 추가합니다.

 

 

HolderAccountController.java

@RestController
@RequiredArgsConstructor
public class HolderAccountController {
    (...중략...)

    @GetMapping("account/info/scatter/gather/{id}")
    public ResponseEntity<List<LoanLimitResult>> getAccountInfoScatterGather(@PathVariable(value = "id") @NonNull @NotBlank String holderId){
        return ResponseEntity.ok()
                .body(queryService.getAccountInfoScatterGather(holderId));
    }

}

 

WebController.java

@Controller
public class WebController {
	(...중략...)
    @GetMapping("/scatter-gather")
    public void scatterGatherQueryView(){}
}

5. 테스트

 

1. jeju, seoul 은행 App과 Query App을 기동합니다.

 

2. 웹브라우저(Chrome)에서 http://localhost:9090/scatter-gather URL 입력합니다.

 

3. 임의의 소유주 ID를 입력후 조회 버튼을 눌러 결과를 확인합니다.

 


6. 마치며

 

이번 포스팅을 끝으로 EventSourcing에 필요한 기본적인 Command, Event 처리 및 Query 요청에 대한 필수 기능 구현을 완료했습니다. 각 기능별로 세부적인 기능은 Axon 공식 홈페이지에서 제공하는 DocumentGoogle Groups를 이용하여 검색하시면 많은 자료를 구할 수 있으니 참고 바랍니다.

+ Recent posts