서론

 

Kafka Broker에 대해서 학습한 이후 아래와 같이 PDF를 정리해서 공유했었다. 이후 Kafka Broker의 네트워크 연결은 어떻게 이루어질까에 대한 생각에 이에 대하여 학습하여 공유하고자 한다. 

 

https://www.scribd.com/document/560749821/Kafka-Broker

 

Kafka Broker | PDF

Kafka Broker 내부 모듈 정리

www.scribd.com

 

 
 

Kafka는 분산 시스템으로써, 수많은 Broker와 Producer, Consumer끼리 통신을 수행하며, Broker 간에도 데이터 복제 및 상태 중재를 위한 네트워크 통신이 사용된다.

 

그렇다면, Kafka에서는 어떠한 방식으로 통신을 수행할까?

 

일반적으로 생각해보면, HTTP Protocol을 사용하는 방법으로 이미 구현된 HTTP 프레임워크를 사용하는 것이 개발 입장에서는 쉬울 것이다. 하지만 Kafka 커미터들은 다음과 같은 이슈로 인하여 자체 통신 체계를 구축했다고 한다.

 

  1. Kafka에서는 빠른 메시지 전달을 위해 Kafka에 최적화된 통신 체계가 필요하다. 또한 커다란 프레임워크 코드 영역에서 Kafka가 필요한 부분은 일부에 불과하다.
  2. 라이브러리 의존성과 버전 관리의 어려움이다.

 

Kafka는 메시지 전달을 위해 빠른 네트워크 성능이 필요하기 때문에, 고성능 통신을 위해 간결하면서도 최적화된 방식이 필요하다. 그리고 라이브러리 및 의존성 문제에서 자유로워야한다. 만약 다른 프레임워크에 의존하게된다면, Broker 및 Client 모두 해당 라이브러리에 대한 강한 의존성이 생긴다. 이는 버전 관리의 어려움이 존재하게되며, Kafka 라이브러리를 포함하는 Client의 파일 크기 또한 커지게 된다. 따라서, 위 두 가지 이슈로 인해 자체 네트워크 모델을 구축하으며, 빠른 메시지 전달과 동시 신뢰성 있는 데이터 전달이 중요하기 때문에 UDP 기반이 아닌 TCP 기반위에서 동작하도록 자체 Protocol을 설계했다.

 

 

Kafka 네트워크 모델 기반에는 Java의 NIO API가 광범위하게 사용된다. 따라서, Kafka 네트워크 모델을 살펴보기 앞서 NIO에서 Network 연관 부분만 살펴보자.

 

 


NIO

 

NIO는 New IO의 약자로써 기존 IO 방식에서 발생하는 Blocking 이슈를 개선하기 위해 Java 1.4 부터 새롭게 도입된 기능(JSR-51)이며, 다음과 같은 특징을 지니고 있다.

 

  1. Non-Blocking
  2. IO Multiplexing

 

위 두 가지 특징을 기반으로 NIO가 기존 IO와 무엇이 다른지에 대해 살펴보자.

 
 

일반적으로 Socket을 활용한 네트워킹 과정을 살펴보면 위와 같다. 여기서 기존 I/O 방식은 Client의 연결을 받아들이는 Accept 부분과 Read, Write 연산 등은 모두 Blocking된다. 이는 즉 Accept의 경우 요청이 들어올 때까지 요청을 반환하지 않음을 의미하며, Read, Write의 작업이 수행되는 동안에도 결과를 리턴 하지 않고 끝날 때까지 대기함을 의미한다.

 

이러한 경우 일반적인 단일 소켓으로는 여러 요청을 효과적으로 처리할 수 없다. 그 이유는 여러 요청을 빠르게 처리하기 위해서는 Blocking되어 Idle한 시간을 효율적으로 분배하여 다른 작업을 처리 해야하는데, 기존 구조로는 이에 대해 효과적으로 대응할 수 없기 때문이다.

 

 
 

따라서 기존에는 동기식 I/O 방식의 문제점을 해결하고자 각 Client의 연결 요청에 대해 이를 처리할 수 있는 Thread를 생성 후 매핑을 통해 동시성을 해결하고자 했다.

 

하지만 위와 같이 Multi Thread 방식의 네트워크 통신 방법에는 한계가 존재한다. 그 이유는 아무리 Thread가 Process보다는 가볍다고 하나 개별 요청 별 Thread를 할당하는 방식은 메모리 사용 및 Context Switching에 따른 Overhead가 크기 때문이다. 가령 Thread별 스택을 1M씩만 할당한다고 가정하더라도 1024개 사용자 요청을 처리하기 위해서 생성되는 스택만 1GB가 사용될 것이다. 따라서, 사용자가 증가할 수록 처리량은 감소하게된다.

 

그렇다면, Non-Blocking으로 처리하면 어떻게 될까?

 
 

Non-Blocking 방식은 기존의 Blocking I/O를 유발하는 메소드에 대해서 추가 설정을 통해 Non-Blocking 형태로 구성할 수 있다. 즉 기존에는 메소드 호출 시 작업이 완료될 때까지 기다렸지만, 설정 이후에는 메소드 결과가 즉시 반환하기 때문에, 하나의 소켓 서버 Thread가 여러개의 I/O를 처리할 수 있게되었다.

 

 
 

Non-Blocking 설정 이후로는 위 그림과 같이 단일 Thread에서 여러 사용자의 Channel과 매핑되어 데이터를 처리할 수 있게 되었다. 위 그림만 보면 Thread 개수를 줄일 수 있으니 성능이 많이 향상될 것으로 보인다. 하지만 다음과 같은 문제가 존재한다.

 
 
 

Blocking I/O 방식으로 처리할 경우에는 순차적으로 처리하므로 Blocking 메소드를 벗어났다는 것은 해당 처리가 완료되었음이 어느정도 보장된다. 하지만 Non-Blocking 방식은 작업 요청과 별개로 바로 리턴이 되기 때문에 실제 요청 여부를 확인하기 위해서는 주기적으로 소켓 정보를 Polling하여 처리 가능 여부를 확인해야한다.

 

List<SocketChannel> channels = new ArrayList<>();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9002));
serverSocket.configureBlocking(false);

while (true) {
  SocketChannel channel = serverSocket.accept();
  if (null != channel) {   
    socketChannel.configureBlocking(false);
    channels.add(channel);
  }

  Iterator<SocketChannel> iterator = channels.iterator();
  while (iterator.hasNext()) {
    SocketChannel channel = iterator.next();
    ..(Read 요청 확인 수행 및 처리)...
  }
}

 

위 코드는 configureBlocking 설정을 통해서 Blocking 방식 API를 Non-Blocking 방식으로 변경한 예제이다. 코드를 살펴보면, accept 혹은 read 수행하면 바로 리턴되므로 요청 확인을 위해서 지속적으로 무한 Loop를 수행하며 확인 과정이 필요하다. 예를 들어 현재 100개의 연결이 이루어져 channels List에 등록되어있다면, 매번 100번의 연결에 대하여 요청 여부를 확인한다.

 

위와 같은 방식의 경우 무한 Loop로 인하여 CPU overhead가 지속 발생하므로 Non-Blocking 기법만 적용해서는 성능 향상의 효과를 크게 얻을 수 없다. 그렇다면 이러한 문제는 어떻게 해결할까? IO Multiplexing에 대해서 알아보자.

 

 


 

IO Multiplexing

 

 

 

IO Multiplexing 방식은 하나의 Channel을 통해 여러 개의 연결을 관리하는 방식으로 해당 방식에서는 소켓 관리를 OS에서 직접 관리한다. 따라서 사용자 코드에서는 OS에 관리 대상 소켓 정보를 등록하는 단계가 필요하다.

(※ 본 포스팅에서 Kafka는 Linux 환경에서 동작함을 가정하므로 Socket은 FileDescriptor로 취급됨을 참고하자.)

 

 

등록 이후에는 OS에서 File descriptor 목록을 가지고 있고, 내부적으로 데이터를 처리해야 될 대상이 발견되면, 해당 정보를 이후 Client에서 요청 시 반환하는 역할을 담당한다. 즉 이전에는 Client에서 직접 처리 요청 대상을 관리했다면, Monitor 역할을 OS가 담당하는 셈이다.

 

위와 같은 방법을 적용하면, 사용자 코드에서 Connection 개수 여부와 관계없이 처리 대상만 OS로부터 전달받으므로 CPU overhead를 줄일 수 있으므로 적은 Thread로 많은 처리 요청을 수행할 수 있다.

 

 

 

 

Java의 NIO는 이를 위해서 Selector를 활용한다. Selector는 OS와 사용자 코드 상의 가교 역할을 수행한다. 따라서 사용자 코드에서 Selector에게 처리 대상 Channel 등록을 요청하면, OS에 해당 정보를 전달한다. 그 이후에는 주기적으로 OS에 목록 전달 요청을 전달하면, OS에서 대상 목록을 전달 받아 후속 작업을 처리한다.

 

(※ JVM 6이상 환경에서 Linux Kernel 2.6 이상을 사용하면, 기본 Selector의 구현체로 Linux의 epoll이 사용된다.)

 

ServerSocketChannel serverSocket  = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9002));
serverSocket.configureBlocking(false);

Selector selector = Selector.open();
SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT);

while (true) {

  //select() 메소드는 Blocking 방식으로 동작함.
  selector.select();
  
  Set<SelectionKey> selectionKeys = selector.selectedKeys();
  Iterator<SelectionKey> iterator = selectionKeys.iterator();

  while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    if (key.isAcceptable()) {
        SocketChannel channel = (ServerSocketChannel)key.channel().accept();
        channel.configureBlocking(false);
        channel.register(selector, SelectionKey.OP_READ);
    } else if (key.isReadable()) {
        SocketChannel channel = (SocketChannel)key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = channel.read(buffer);
        
        ...(Read 처리)...        
    }
}

 

코드를 통해 구체적인 수행 과정을 살펴보면 다음과 같다.

 
 
 
  1. Server 소켓을 생성하고 open() 메소드를 통해 Selector를 생성한다. 해당 메소드는 Linux 내부에 epoll Object를 생성한다.
  2. Client가 IP 및 Port 정보를 통해서 접속 요청을 할 것이다. 그러면 OS는 바인딩된 내부 오브젝트에 반영한다.
  3. 사용자 코드에서 select() 메소드를 호출하면, 접속 요청이 존재하므로 해당 정보를 반환한다.
  4. 사용자 코드에서 해당 접속 요청을 받아들인 이후에 Read 요청이 들어오면 이를 감지하기 위해 Kernel에 Read 이벤트에 대한 수신을 받을 수 있도록 요청한다. 이때 호출되는 register()를 통해 내부에 epoll_ctl및 epoll_wait 시스템 콜이 호출된다.
  5. Linux Kernel은 해당 요청을 다룰 수 있는 connection이 존재하는지 확인 후 client와 연결한다.
  6. 연결이 성공적으로 이루어지면, Selector에게 알림을 통지하고 내부적으로 Channel을 생성한다.
  7. 사용자가 데이터 fetch 요청을 전달하면 내부 Buffer에 이를 저장한다. 이때 Buffer의 위치는 direct 방식과 아닐 경우에 따라서 달라질 수 있는데, 이는 나중에 다루도록 한다.
  8. Channel은 연결 역할을 수행할 뿐 데이터 fetch는 Buffer를 통해 이루어진다.
  9. Selector는 지속적으로 poll을 수행하여 Channel에 등록된 Buffer의 내용을 읽어간다.

( ※ 위 그림에서 Channel과 Buffer는 연결된 Client 마다 생성된다.)

 

위 코드 중 가장 중요한 것은 select()이다. 이는 해당 메소드 또한 Blocking 방식이기 때문이다. 따라서 Selector를 활용한 방식은 완벽한 비동기 방식은 아니므로 Synchronous Non-Blocking 방식이라고 볼 수 있다.

 

Java의 NIO에 대해서 정리하자면, 기존 동기 방식의 API로 인한 동시성 저하를 막고자 Non-Blocking API를 제공하며, IO Multiplexing을 통해 처리량을 높일 수 있다. 하지만 완전한 방식의 비동기 방식은 아니다.

 


Kafka Broker Network 구조

 

 

 
 

지금까지 학습한 Java의 NIO를 바탕으로 Kafka Broker내의 Network 통신을 위한 구조를 살펴보자. Broker 구조는 크게 Socket Server, Request Handler Pool, API 세 가지로 이루어져있다. 해당 컴포넌트에 무엇이 있는지 하나씩 살펴보자.

 


Socket Server

 

Socket Server는 사용자 접속 및 요청을 담당하는 역할을 담당하며, Acceptor, Processor, Request Channel로 이루어진 Request-Plane 세트이다.

 

 

 

 

이전 그림에는 1개의 Plane을 묘사했지만, 실제로는 data-plane과 control-plane 총 2개의 plane이 존재한다. 여기서 control-plane은 Broker와 Controller 간의 통신을 위해 연결된 전용 네트워크이며, data-plane은 Broker 끼리 혹은 client의 요청을 처리하기 위한 네트워크이다.

 

그렇다면, Request-Plane 구성 요소인 Acceptor, Processor, Request Channel은 각각 무엇일까?

 

Acceptor는 Client의 접속 요청을 감지하는 문지기의 역할을 수행한다. Acceptor를 통해 연결 요청을 전달받으면, 하위에 존재하는 Processor 중 하나에게 Read/Write 처리를 수행할 수 있도록 연결해준다.

 

Processor는 연결된 Socket에 대하여 Read/Write 요청이 전달되는 것을 감지하고, 이를 Request Channel의 Request Queue에 전달하는 역할과 실제 작업이 완료된 이후 결과를 전달받아 사용자에게 반환하는 것을 담당한다.

 

Request Channel은 모든 Processor, Handler, API가 공유하는 전역 저장소로써, 사용자의 요청이 전달되면 해당 정보를 보관하고 처리가 완료되면 요청한 Processor에게 결과를 반환하는 역할을 수행한다.

 

 

 

Socket Server의 구조를 보면, Acceptor가 여러개의 Processor를 가지고 있고 Processor는 Request Channel과 연관이 있음을 알 수 있다. Socket Server에는 data-plane과 control-plane 두 개가 존재한다고 이전에 설명했는데, data-plane의 경우 Acceptor는 여러개의 Processor를 가질 수 있으며, 해당 설정은 num.network.threads 설정을 통해서 개수를 조절할 수 있다. 반면 control-plane의 경우는 Processor가 1개만 존재한다.

 

 


Request Handler

 
 

Request Handler는 ReuqestChannel에서 Request 정보를 가져와 API에게 처리를 요청하고 요청 결과를 다시 RequestChannel에 전달하는 역할을 담당한다. RequestHandler는 1개가 아니라 여러개의 Thread로 구성될 수 있으며, 이는 num.io.threads 속성을 통해 개수를 조정할 수 있다.

 

이전 Kafka 버전(0.7)에서는 Request Handler가 따로 존재하지 않았고 Processor를 통해 직접 처리를 수행하였다. 하지만 Network Read/Write 요청을 감지하는 영역과 I/O를 처리하는 부분이 하나의 Thread안에 있으므로 탄력적으로 Thread 개수를 늘리기 어려운 문제가 있었다.

 

따라서 I/O와 Network 처리를 위한 Thread를 분리함으로써, 현재와 같은 모습을 갖추게 되었다.

 


API

 

request.header.apiKey match {
        case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
        case ApiKeys.FETCH => handleFetchRequest(request)
        ...(중략)...
        case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
      }

 

API는 Client가 요청한 정보를 기반으로 Kafka 내부 모듈에 필요한 메소드를 호출하는 역할을 담당한다. Kafka Protocol에는 위와 같이 어떤 요청인지 header에 포함시키도록 규정되었다. 따라서 Kafka API가 요구하는 Spec에 맞게 작성하면, 이를 Parsing 하여 개별 모듈로 Routing을 시켜준다. 요청 처리가 완료되면, RequestHelper를 통해 RequestChannel로 전달한다.

 


동작 과정

 

지금까지 Kafka Network 구조에 대해서 큰 틀에서 살펴봤다. 이번에는 각 모듈끼리 어떠한 상호작용을 거쳐 동작하는지 살펴보자. 먼저 큰 흐름 속에서 어떻게 동작하는지 보고 이후 코드 레벨에서 보다 자세하게 살펴보도록 하자.

 
 

첫 번째로 살펴볼 것은 Client가 접속 요청 시도시 내부 동작 과정이다.

  1. 사용자가 접속 요청을 한다.
  2. Acceptor가 해당 접속 요청을 수락하고, 자신이 보유한 Processor 중 하나에게 할당한다. Processor는 해당 요청을 자신이 보유한 Kafka Selector에 요청하여 이후 Client로부터 데이터 처리 요청이 왔을 경우 감지할 수 있도록 사전 준비한다.

Client 접속 요청이 완료되면, Processor는 사용자 요청을 처리할 수 있는 단계가 된다. 이후 사용자 요청이 발생했을 때 처리 과정을 살펴보자.

 

 
 
  1. 사용자가 데이터 fetch 요청을 하면, Kernel은 이를 감지한다.
  2. Processor에서 Kafka Selector에게 데이터 fetch 요청 이후 해당 요청을 Request Channel의 Request Queue에 저장한다. 이때 향후 처리 결과를 자신에게 포워딩 하기 위해 Queue 삽입시 자신의 Processor Id를 함께 추가한다.
  3. Request Handler에서 Request Queue에 존재하는 요청을 fetch한다.
  4. 해당 요청을 API에게 전달한다.
  5. API는 요청을 처리한다음 자신이 보유한 Request Helper를 통해 RequestChannel로 전달한다.
  6. Request Channel은 Processor Id를 보고 해당 Processor의 Response Queue에 결과를 삽입한다.
  7. Processor는 Response Queue 내용을 확인하고 Client에게 결과를 전달한다.

 

지금까지 살펴본 내용은 큰 틀에서 컴포넌트간 상호 작용에 대해서 확인했다. 이번에는 코드 레벨에서 자세하게 각 모듈이 어떻게 구동되고 상호작용하는지 알아보자.

 


Socket Server 동작 과정

 

// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)

// control-plane
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
    new RequestChannel(20, ControlPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics))

 

가장 먼저 살펴볼 것은 Socket Server에 속한 2개의 plane이 어떻게 구성되어있는지 살펴보자. 위 내용을 살펴보면, 2개의 plane이 서로 다른점이 몇 가지 보인다.

 

  1. data-plane의 경우 Acceptor, Processor가 여러개이지만, controlPlane의 경우 하나만 존재한다.
  2. data-plane의 경우 RequestChannel 내에 존재하는 RequestQueue의 크기를 queued.max.requests 속성 크기만큼 지정 가능한 반면, control-plane의 경우는 20개로 크기가 고정되어있다.

 

def startup(startProcessingRequests: Boolean = true,
            controlPlaneListener: Option[EndPoint] = config.controlPlaneListener,
            dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = {
this.synchronized {
      createControlPlaneAcceptorAndProcessor(controlPlaneListener)
      createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, dataPlaneListeners)
      ...(중략)...
}

 

Socket Server를 생성하고 나면, 해당 Server를 기동하는 startup 메소드가 호출된다. 이때 개별 data, control plane 각각에 대하여 Acceptor와 Processor를 생성하는 메소드가 실행된다.

 

private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
                                                    endpoints: Seq[EndPoint]): Unit = {
  endpoints.foreach { endpoint =>
    connectionQuotas.addListener(config, endpoint.listenerName)
    val dataPlaneAcceptor = createAcceptor(endpoint, DataPlaneMetricPrefix)
    addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
    dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)      
  }
}

private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = {
  ...(중략)...    
  for (_ <- 0 until newProcessorsPerListener) {
    val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool, isPrivilegedListener)
    ...(중략)...
    dataPlaneRequestChannel.addProcessor(processor)
    nextProcessorId += 1
  }
  ...(중략)...
  acceptor.addProcessors(listenerProcessors, DataPlaneThreadPrefix)
}
 

 

 

두 가지 생성 메소드 중 data-plane 생성 코드를 살펴보자. 위와같이 listeners를 통해서 전달받은 endpoint 별로 acceptor가 생성되며, num.network.threads 개수만큼 processor 또한 생성 된다. processor 생성 이후 acceptor와 channel에 해당 processor를 등록한다.

 

해당 과정을 통해 Acceptor와 RequestChannel의 Processor 간의 매핑 관계를 이해할 수 있다.


Acceptor 동작과정

 

def run(): Unit = {
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
...(중략)...
try {
  while (isRunning) {
    try {
      acceptNewConnections()
      ...(중략)...
    }
    catch {
      ...(중략)...
    }
  }
}

 

설정 작업이 마무리되면, listeners에 매핑된 Endpoint 개수 만큼의 Kafka 쓰레드를 생성하여 Acceptor에게 할당한다. 위 코드는 Acceptor에게 쓰레드 할당 후 start() 호출 이후 수행 과정을 나타낸다.

 

NIO Selector를 통해 Accept 이벤트를 통지할 수 있도록 요청하면, 내부적으로 Kernel에 epoll 오브젝트가 생성되고, Accept 요청이 왔을 때 이를 수신받을 수 있음을 이전 NIO 개념을 학습하면서 살펴봤다.

 

소켓 정보 등록 후에는 무한 Loop를 통해 새로운 연결 요청이 있는지를 확인한다.

 

private def acceptNewConnections(): Unit = {
  val ready = nioSelector.select(500)
  if (ready > 0) {
    val keys = nioSelector.selectedKeys()
    val iter = keys.iterator()
    while (iter.hasNext && isRunning) {
      try {
        val key = iter.next
        ...(중략)...
        if (key.isAcceptable) {
          accept(key).foreach { socketChannel =>
            ...(중략)...
            var processor: Processor = null
            do {
                ...(중략)...
                processor = synchronized {
                currentProcessorIndex = currentProcessorIndex % processors.length
                processors(currentProcessorIndex)
              }
              currentProcessorIndex += 1
            } while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
          }
        } else
          throw new IllegalStateException("Unrecognized key state for acceptor thread.")
      } catch {
        ...(중략)...
      }
    }
  }
}

 

이때 select() 메소드를 통해 Accept 요청이 들어왔는지를 OS에게 확인하는데, 해당 메소드는 Blocking 메소드이므로 무한 대기를 막기 위해 500ms 기간의 Timeout을 지정한다. 이 과정에서 Accept 요청이 들어온다면, 자신이 보유하고 있는 Processor 중 하나에게 향후 Read/Write 요청에 대한 처리를 담당하도록 한다. 이때 살펴볼 것은 Processor에게 균등한 분배를 위해서 Round-Robin 방식으로 접속 요청을 분배한다는 점이다.

 

Acceptor의 역할은 여기까지이고, 지금 부터는 위 코드를 통해 새로운 요청이 Processor에게 할당된 이후 처리 과정에 대해서 살펴보자.

 


Processor 동작 과정

 
 
 

이전에 Acceptor에서 Processor에게 요청을 할당한다고 했는데, 해당 과정은 어떻게 이루어질까? 먼저 Processor가 지닌 프로퍼티에 대해 먼저 살펴보자.

 

위 그림을 살펴보면 일반 Selector가 아닌 Kafka Selector를 내부 프로퍼티로 가지고 있는 것을 확인할 수 있다. 여기서 Kafka Selector에는 내부에 NIO의 Selector를 포함하며, 그 외에 Kafka 데이터 송수신에 필요한 프로퍼티 및 내부 메소드를 지닌 클래스이다.

 

 

 

Kafka Selector에는 위 그림외에도 수많은 내부 프로퍼티가 존재하지만, 일부만 간추려서 알아보자. nioSelector는 Java NIO의 selector를 의미한다.

 
 

channel은 Processor를 통해 연결된 Client와의 Channel을 의미하며, Connection Id 와 KafkaChannel로 이루어진 Map이다. 따라서 특정 Client와 연결 시 Connection Id 기준으로 해당 Channel과 연결한다.

 

completedSends, completedReceives 및 disconnected는 데이터 송수신 및 close 처리 시, 해당 요청을 임시 저장하는 용도의 buffer로써 활용된다.

 

 
 

여기까지 Kafka Selector에 대해서 알아보고 이번에는 Processor의 또 다른 주요 프로퍼티 중 하나인 newConnections에 대해서 알아보자. 해당 자료구조는 Queue로써 Acceptor가 새로운 요청을 Processor에게 할당할 때, 해당 Queue에 입력이 된다.

 

이제 Processor의 내부 프로퍼티를 토대로 Processor 동작 과정에 대해 살펴보자.

 

override def run(): Unit = {
  ...(중략)...
  try {
    while (isRunning) {
      try {
        configureNewConnections()
        processNewResponses()
        poll()
        processCompletedReceives()
        processCompletedSends()
        processDisconnected()
        closeExcessConnections()
      } catch {
        ...(중략)...
      }
    }
  }
  ...(중략)...
}

 

Processor 또한 Acceptor와는 별개의 Thread로 수행된다. 위 코드는 Processor 기동 시작 후 수행 과정을 나타내며, 무한 Loop를 통해서 동일한 작업을 지속 반복 수행하는 것을 확인할 수 있다. 위 코드와 같이 7개의 동작을 수행하는데, 전부다 살펴보지는 않고 주요 동작에 대해서만 살펴보자.

 

configureNewConnections()

 

  private def configureNewConnections(): Unit = {
    var connectionsProcessed = 0
    while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
      val channel = newConnections.poll()
      try {
        ...(중략)...
        selector.register(connectionId(channel.socket), channel)
        connectionsProcessed += 1
      } catch {
        ...(중략)...
      }
    }
  }

 

configureNewConnections 메소드는 newConnections를 통해 새로운 Channel이 입력되면, Connection Id를 부여한 다음 해당 정보를 Kafka Selector에게 전달하여 궁극 적으로는 OS 내부에 해당 소켓 정보를 등록시킨다. 따라서, Processor가 지닌 Kafka Selector를 통해서 향후 Read/Write 요청이 들어왔을 때 이를 감지해 후속 작업을 처리할 수 있다.

 

processNewResponse()

 

private def processNewResponses(): Unit = {
  var currentResponse: RequestChannel.Response = null
  while ({currentResponse = dequeueResponse(); currentResponse != null}) {
    val channelId = currentResponse.request.context.connectionId
    try {
      currentResponse match {
        case response: NoOpResponse =>
          ...(중략)...
        case response: SendResponse =>
          sendResponse(response, response.responseSend)
        case response: CloseConnectionResponse =>
          ...(중략)...
          close(channelId)
        ...(중략)...  
      }
    } catch {
        ...(중략)...
    }
  }
}

 

processNewResponse() 메소드는 RequestHandler를 통해 API 호출 후 Client에게 결과를 전달하는 과정을 처리한다. API 수행이 모두 완료되면, Channel을 통해 Processor의 개별 ResponseQueue에 결과가 적재된다.

 

그 이후 위 코드가 실행되면, dequeueResponse() 메소드를 통해 결과를 추출 이후 결과 유형에 따라서 처리를 달리 수행한다.

 

protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = {
  val connectionId = response.request.context.connectionId
  ...(중략)...
  
  if (openOrClosingChannel(connectionId).isDefined) {
    selector.send(new NetworkSend(connectionId, responseSend))
    ...(중략)...
  }
}

 

만약 해당 요청이 SendResponse라면, 위 코드와 같이 Kafka Selector의 저장 Buffer에 임시 보관하도록 요청한다.

 

private def close(connectionId: String): Unit = {
  openOrClosingChannel(connectionId).foreach { channel =>
    ...(중략)...
    selector.close(connectionId)
    ...(중략)...
  }
}

 

만약 처리 유형이 CloseConnectionResponse 형태라면, Selector에게 close 요청을 전달하여 Channel을 정상적으로 종료하도록 한다. 그리고 Kafka Selector는 자신이 지닌 Client 연결 항목에서 해제하고 Channel을 종료시킨다.

 

poll()

 

private def poll(): Unit = {
  val pollTimeout = if (newConnections.isEmpty) 300 else 0
  try selector.poll(pollTimeout)
  catch {
    ...(중략)...
  }
}

 

poll() 메소드는 KafkaSelector를 통해 데이터가 Channel에 존재하면, fetch를 요청하는 작업이다.

 
 

이때 Kafka Selector의 내부 동작 방식은 복잡하지만, 핵심 부분만 도식화해보면 위 흐름과 같다.

 

  1. Processor가 poll()을 통해 변경 대상 Channel 확인 및 데이터 fetch를 요청한다.
  2. Kafka Selector 내부 nioSelector를 통해서 Kernel에 변경 대상 Channel이 존재하는지를 요청한다.
  3. Kernel 내부에서 변화가 감지된 Channel 정보를 전달한다.
  4. 해당 Channel이 데이터 수신이 가능한 상태라면 데이터를 추출하여 completedReceives에 저장한다. 만약 processNewResponse() 메소드 수행 결과 전달할 데이터가 존재한다면, Selector 쓰기 임시 버퍼에 저장되어있을 것이다. 해당 내용을 completedSends에 저장한다.

 

※ completedSends, completedReceives에 저장된 데이터는 다음에 확인할 processCompletedReceives()와 processCompletedSends() 과정을 통해서 처리된다.

 

processCompletedReceives()

 

private def processCompletedReceives(): Unit = {
  selector.completedReceives.forEach { receive =>
    try {
      openOrClosingChannel(receive.source) match {
        case Some(channel) =>           
            ...(중략)...
          val connectionId = receive.source
          val context = new RequestContext(header, connectionId, channel.socketAddress,channel.principal, listenerName, securityProtocol,
                  channel.channelMetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde)

          val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)

          ...(중략)...
          requestChannel.sendRequest(req)
          ...(중략)...                        
        case None => ...(중략)...
      }
    } catch {
      ...(중략)...
    }
  }
  selector.clearCompletedReceives()
}

 

이전에 poll() 과정이 끝나고나면, 데이터 수신이 완료된 내용은 completedReceives에 저장됨을 확인했다. processCompletedReceives()는 해당 내용을 가져와서 처리를 수행하기 위해 Context를 만들고 이를 RequestChannel에 추가한다. 이때 데이터 처리 이후 자신의 Processor가 후속 작업을 처리하기 위해서 요청시 자신의 Processor Id를 파라미터로 넘기는 것을 참고하자.

 

Channel에 Request 요청을 넣은 이후에는 completedReceives 내용을 초기화하여 이후 중복 처리 되지 않도록한다.

 

processCompletedSends()

 

private def processCompletedSends(): Unit = {
  selector.completedSends.forEach { send =>
    try {
      ...(중략)...

      // Invoke send completion callback
      response.onComplete.foreach(onComplete => onComplete(send))

      ...(중략)...
    } catch {
      ...(중략)...
    }
  }
  selector.clearCompletedSends()
}

 

발송할 데이터는 poll() 메소드 수행 과정을 통해 모두 completedSends에 저장되어있다. 이후 해당 메소드에서 실제 후속 작업 처리를 진행함으로써, Client에게 결과를 반환하고, completedSends를 모두 초기화 한다.

 

지금까지, Processor 동작 과정에 대해서 살펴봤다. 해당 내용을 정리하자면 다음과 같다.

 

  1. Acceptor로 부터 Client를 할당 받는다.
  2. API로 부터 처리 결과를 전달받으면, Kafka Selector에 존재하는 임시 버퍼(각 Channel마다 존재)에 저장한다.
  3. Kafka Selector로부터 Client의 요청이 있는지 확인하며, 이 과정에서 사용자의 요청이 전달된다면, completedReceives에 저장하고, API 처리 결과를 completedSends에 한데 모은다.
  4. completedReceives은 사용자의 요청이므로 Request Channel에 전달하여 데이터 처리 요청하고, completedSends 내용은 결과를 Client에 반환하는 후속 작업을 처리한다.

 


Request Handler 동작 과정

 

class KafkaRequestHandlerPool(val brokerId: Int,
                              val requestChannel: RequestChannel,
                              val apis: ApiRequestHandler,
                              time: Time,
                              numThreads: Int,
                              requestHandlerAvgIdleMetricName: String,
                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
  ...(중략)...

  val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i <- 0 until numThreads) {
    createHandler(i)
  }

  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
  }
  
  ...(중략)...
}

 

Request Handler는 KafkaRequestHandlerPool 내부에 존재한다. 따라서 먼저 KafkaRequestHandlerPool가 생성된다. 생성 당시 SocketServer와 연결될 Channel과 API로 전달할 APIRequestHandler가 인자로 같이 전달되는 것을 참고하자. 또한 RequestHandler는 단일 쓰레드로 동작하는 것이 아니라 num.io.threads 인자에 따라 개수 조절이 가능하므로 생성 당시 해당 값 또한 전달된다.

 

위 과정을 통해 KafkaHandler는 데몬쓰레드로 동작한다.

 

def run(): Unit = {
  while (!stopped) {
    ...(중략)...
    val req = requestChannel.receiveRequest(300)
    req match {
      case RequestChannel.ShutdownRequest =>
        ...(중략)...
        completeShutdown()
        return

      case request: RequestChannel.Request =>
        try {
          ...(중략)...
          apis.handle(request, requestLocal)
        } catch {
          ...(중략)...
        }
        ...(중략)...
    }
  }
  completeShutdown()
}

 

RequestHandler의 역할은 단순하다. 만약 연결되어있는 Channel이 종료된다면, Handler의 역할이 더이상 필요 없으므로 종료한다. 반면 RequestChannel에서 Request가 존재한다면, API에게 처리를 위임한다.

 


API 동작 과정

 

class ControllerApis(val requestChannel: RequestChannel,
                     val authorizer: Option[Authorizer],
                     val quotas: QuotaManagers,
                     val time: Time,
                     val supportedFeatures: Map[String, VersionRange],
                     val controller: Controller,
                     val raftManager: RaftManager[ApiMessageAndVersion],
                     val config: KafkaConfig,
                     val metaProperties: MetaProperties,
                     val controllerNodes: Seq[Node],
                     val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {

  val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
  
  override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
    try {
      request.header.apiKey match {
        case ApiKeys.FETCH => handleFetch(request)
        case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
        ...(중략)...
        case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
      }
    } catch {
      case e: FatalExitError => throw e
      case e: ExecutionException => requestHelper.handleError(request, e.getCause)
      case e: Throwable => requestHelper.handleError(request, e)
    }
  }
}

 

API는 사용자 요청을 라우터의 역할로써, Header에 명시된 API Key를 보고 요청을 전달한다.

 

def handleFetch(request: RequestChannel.Request): Unit = {
  authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
  handleRaftRequest(request, response => new FetchResponse(response.asInstanceOf[FetchResponseData]))
}

 

요청이 전달되면, 요청을 처리하고 결과를 반환하기 위해서 위와 같이 반환 메소드를 호출한다.

 

private def handleRaftRequest(request: RequestChannel.Request,
                              buildResponse: ApiMessage => AbstractResponse): Unit = {
  val requestBody = request.body[AbstractRequest]
  ...(중략)...

  future.whenComplete { (responseData, exception) =>
    val response = if (exception != null) {
      requestBody.getErrorResponse(exception)
    } else {
      buildResponse(responseData)
    }
    requestHelper.sendResponseExemptThrottle(request, response)
  }
}

 

반환 메소드 안에서는 ResponseBody를 만든 이후에 requestHelper를 통하여 반환을 위임한다.

 

def sendResponseExemptThrottle(request: RequestChannel.Request,
                               response: AbstractResponse,
                               onComplete: Option[Send => Unit] = None): Unit = {
  ...(중략)...
  requestChannel.sendResponse(request, response, onComplete)
}

 

requestHelper는 해당 결과를 requestChannel에 전달함으로써 API 역할은 마무리된다.


 

Request Channel 동작 과정

 

Request Channel은 Processor와 Handler 그리고 API가 상호 작용에 필수적인 컴포넌트로써 요청을 전달하고 결과를 수신받는 중간 버퍼의 역할을 담당한다.

 

class RequestChannel(val queueSize: Int,
                     val metricNamePrefix: String,
                     time: Time,
                     val metrics: RequestChannel.Metrics) extends KafkaMetricsGroup {
  import RequestChannel._
  private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
  private val processors = new ConcurrentHashMap[Int, Processor]()
  ...(중략)...

  def addProcessor(processor: Processor): Unit = {
    if (processors.putIfAbsent(processor.id, processor) != null)
      warn(s"Unexpected processor with processorId ${processor.id}")

    ...(중략)...
  }
  
  ...(중략)...
}

RequestChannel의 핵심 프로퍼티는 requestQueue와 processors이다. 여기서 requestQueue는 사용자 요청을 저장하는 임시 버퍼의 역할을 수행하며, queueSize를 통해 전달된다. 해당 값은 이전에 Socket Server를 살펴볼 때 확인했듯이 data-plane과 control-plane에 따라서 서로 다른 값을 지니고 있다.

 

processors는 API로부터 결과를 반환할 때 Processor Id를 기반으로 빠르게 Processor 객체를 찾기 위한 자료구조로 사용되며, Socket Server의 구동 당시 Acceptor와 Processor가 만들어지고 나면, addProcessor 메소드를 통해서 Processor Id와 참조 객체를 전달받아 processors 자료구조에 삽입하게 된다.

 

private[network] def sendResponse(response: RequestChannel.Response): Unit = {
  ...(중략)...
  
  val processor = processors.get(response.processor)

  if (processor != null) {
    processor.enqueueResponse(response)
  }
}

 

이후 API로부터 결과를 전달받게되면, response에 저장된 processor Id를 기반으로 processors에서 참조 객체를 찾아 Processor에 위치한 Response Queue에 결과를 삽입한다.

 


마무리

 

지금까지 Kafka Broker 입장에서 네트워크 모델이 어떻게 구성되어있고 요청/응답이 어떤 식으로 이루어지는지 살펴봤다. 내부 구조를 살펴보면서, Kafka에 대한 이해가 조금 더 올라간 것 같다.

 

혹시 틀린 부분이 있으면 언제든 피드백 부탁드립니다.

+ Recent posts