1. 서론

 

이번 포스팅은 그동안 envoy-internals 시리즈의 이해를 바탕으로 사용자가 Http 전달을 요청했을 때, Envoy 내부구조를 토대로 네트워크 요청이 어떻게 흘러가는지에 대해서 전체적인 흐름을 상세히 조망해보는 시간을 가져보려 합니다. 사실 envoy에 대해서 분석했던 계기 중 하나가 도대체 어떻게 사용자의 요청이 전달되는지에 대한 궁금증에서 출발했기 때문에 이번 포스팅을 위해서 이전 시리즈의 내용이 존재했다고 생각합니다. 따라서 이번 내용은 이전 내용에 대한 이해가 선행되어야하므로 이전 시리즈 내용을 정독하고 보시는 것을 추천드립니다. 

 


2.  Worker 쓰레드 소켓 할당 과정

 

이전 시리즈 내용을 통해 Listener Manager에서 네트워크 요청 처리를 위해서 여러개의 Worker 쓰레드를 생성하는 것을 이해할 수 있었습니다. 이때 Worker 쓰레드 생성 갯수는 envoy 생성 당시 --concurrency 인자에 의해서 결정되는 것 또한 확인했습니다. 이번에는 Envoy 기동 과정 중 Worker 쓰레드 상호작용을 살펴보면서 Worker 쓰레드에서 어떻게 네트워크 요청을 처리하는지 살펴보겠습니다.

 

 

위 그림은 --concurrency 값이 2개이고 static_config, dynamic_config에 의해서 등록된 Listener 개수 또한 2개이면서 모두 TCP임을 가정했습니다. 이때 기동 과정을 살펴보면 다음과 같습니다.

 

1. Envoy 기동 과정에서 --concurrency 값을 살펴보고 Listener Manager에게 Worker 쓰레드 생성을 요청합니다. Listener Manager는 Worker 쓰레드를 요청만큼 생성합니다. 이 과정에서 Worker 쓰레드 내부에 Dispatcher와 Connection Handler가 생성됩니다.

 

2. Worker 쓰레드가 생성되는 과정에서 Envoy의 TLS를 관장하는 메인 쓰레드 InstanceImpl에 쓰레드를 등록합니다. 이 과정에서 InstanceImpl에서 Worker 쓰레드에 위치한 Dispatcher 정보를 registered_threads_ 에 저장할 수 있으며, 향후 Worker 쓰레드의 Dispatcher에 참조가 가능합니다.

 

3. Worker 쓰레드 생성이 마무리되면, Config 파일 파싱 도중 static configuration 정보를 등록하기 위해 Listener Manager에게 Config 정보 등록을 요청합니다. 해당 과정은 향후 LDS에 의해서도 생성될 수 있습니다. 이 과정에서 Listener Component Factory를 통해 --concurrency 만큼 Socket을 생성합니다.

 

4. Envoy의 설정이 모두 완료되면, Listener Manager에게 기동을 요청합니다.

 

5. Listener Manager에서는 기동 과정에서 등록된 Listener Config 정보를 Worker 쓰레드에 모두 Bind하기 위해 개별 Worker 쓰레드에게 Listener 생성을 요청합니다.

 

6. Worker 쓰레드내에 존재하는 Connection Handler에 Listener를 생성하기 위해 먼저 Listener로부터 자신의 Worker 쓰레드 번호에 해당하는 Socket 정보를 얻어옵니다. 그리고 Worker 쓰레드에서 외부 요청을 참조하기 위해 Dispatcher에게 Socket 정보를 전달하면서 Listener 생성을 요청합니다.

 

connection_handler_impl.cc

details->addActiveListener(
    config, address, listener_reject_fraction_, disable_listeners_,
    std::make_unique<ActiveTcpListener>(
        *this, config, runtime,
        socket_factory->getListenSocket(worker_index_.has_value() ? *worker_index_ : 0),
        address, config.connectionBalancer(*address)));

 

active_tcp_listener.cc

ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
                                     Network::ListenerConfig& config, Runtime::Loader& runtime,
                                     Network::SocketSharedPtr&& socket,
                                     Network::Address::InstanceConstSharedPtr& listen_address,
                                     Network::ConnectionBalancer& connection_balancer)
    : OwnedActiveStreamListenerBase(
          parent, parent.dispatcher(),
          parent.dispatcher().createListener(std::move(socket), *this, runtime, config.bindToPort(),
                                             config.ignoreGlobalConnLimit()),
          config),
      tcp_conn_handler_(parent), connection_balancer_(connection_balancer),
      listen_address_(listen_address) {
  connection_balancer_.registerHandler(*this);
}

 

 

7. Dispatcher는 전달받은 Socket 정보를 토대로 Listener를 생성하여 반환합니다. 이때 주의해서 살펴볼 것은 사용자가 접속했을 때, 인자로 전달받은 TcpListenerCallbacks에게 Accept 요청을 수행하는데 호출되는 주체가 Connection Handler에서 전달한 ActiveTcpListener라는 것입니다. 따라서 향후 사용자가 접속하게되면, 그에 대한 Accept 처리는 ActiveTcpListener가 담당하게됩니다.

 

dispatcher_impl.cc

Network::ListenerPtr DispatcherImpl::createListener(Network::SocketSharedPtr&& socket,
                                                    Network::TcpListenerCallbacks& cb,
                                                    Runtime::Loader& runtime, bool bind_to_port,
                                                    bool ignore_global_conn_limit) {
  return std::make_unique<Network::TcpListenerImpl>(*this, random_generator_, runtime,
                                                    std::move(socket), cb, bind_to_port,
                                                    ignore_global_conn_limit);
}

 

8. 생성된 Listener에서는 소켓 정보를 libevent에 등록하여 향후 Client가 접속을 요청했을 때 libevent에 의해 요청을 전달받을 수 있도록 등록합니다.

 

tcp_listener_impl.cc

socket_->ioHandle().initializeFileEvent(
    dispatcher, [this](uint32_t events) -> void { onSocketEvent(events); },
    Event::FileTriggerType::Level, Event::FileReadyType::Read);

 

 

위와같이 8단계를 거치게되면, 생성된 모든 Worker 쓰레드에서 Listener 및 소켓을 생성하고 Client 요청을 수신받을 수 있는 상태가 완료됩니다.

 


3. Client Connection 연결 과정

 

지금까지 Envoy를 기동하는 과정에서 Worker 쓰레드가 생성되고, Listener 및 Socket을 생성하는 것을 살펴봤습니다.

 

tcp_listener_impl.cc

TcpListenerImpl::TcpListenerImpl(Event::DispatcherImpl& dispatcher, Random::RandomGenerator& random,
                                 Runtime::Loader& runtime, SocketSharedPtr socket,
                                 TcpListenerCallbacks& cb, bool bind_to_port,
                                 bool ignore_global_conn_limit)
    : BaseListenerImpl(dispatcher, std::move(socket)), cb_(cb), random_(random), runtime_(runtime),
      bind_to_port_(bind_to_port), reject_fraction_(0.0),
      ignore_global_conn_limit_(ignore_global_conn_limit) {
  if (bind_to_port) {
    socket_->ioHandle().initializeFileEvent(
        dispatcher, [this](uint32_t events) -> void { onSocketEvent(events); },
        Event::FileTriggerType::Level, Event::FileReadyType::Read);
  }
}

 

이때 dispatcher를 통해서 libevent에서 해당 소켓에 이벤트가 감지되면, onSocketEvent 메소드를 호출하도록 위 코드와 같이 등록됩니다. 즉 이 과정을 통해서 각각의 Worker 쓰레드에 존재하는 Listener는 dispatcher에 의해 libevent로 등록되었으므로 사용자가 OS로부터 Socket 생성을 요청했을 때, OS는 등록된 socket 중 하나를 임의로 선정하여 요청을 전달할 수 있게됩니다.

 

 

 

그렇다면 Listener 설정이 모두 완료된 이후 Client로부터 Connection 요청이 들어오면 어떠한 과정을 거치게될까요?

 

 

먼저 Socket Event가 감지되면, 위와 같이 두개의 Worker 쓰레드 중 누가 해당 요청을 처리해야하는지 선택해야합니다. 이때 Worker 쓰레드 선정에 대한 결정은 이전에 설명했듯이 전적으로 OS가 수행합니다. 따라서 가령 위와같이 Worker_0번이 선택되었으면, 해당 쓰레드의 onSocketEvent가 실행될 것입니다.

 

tcp_listener_impl.cc

void TcpListenerImpl::onSocketEvent(short flags) {
  ASSERT(bind_to_port_);
  ASSERT(flags & (Event::FileReadyType::Read));

  while (1) {
    if (!socket_->ioHandle().isOpen()) {
      PANIC(fmt::format("listener accept failure: {}", errorDetails(errno)));
    }

    sockaddr_storage remote_addr;
    socklen_t remote_addr_len = sizeof(remote_addr);

    IoHandlePtr io_handle =
        socket_->ioHandle().accept(reinterpret_cast<sockaddr*>(&remote_addr), &remote_addr_len);
    if (io_handle == nullptr) {
      break;
    }

    if (rejectCxOverGlobalLimit()) {
      io_handle->close();
      cb_.onReject(TcpListenerCallbacks::RejectCause::GlobalCxLimit);
      continue;
    } else if (random_.bernoulli(reject_fraction_)) {
      io_handle->close();
      cb_.onReject(TcpListenerCallbacks::RejectCause::OverloadAction);
      continue;
    }

    const Address::InstanceConstSharedPtr& local_address =
        local_address_ ? local_address_ : io_handle->localAddress();

    const Address::InstanceConstSharedPtr remote_address =
        (remote_addr.ss_family == AF_UNIX)
            ? io_handle->peerAddress()
            : Address::addressFromSockAddrOrThrow(remote_addr, remote_addr_len,
                                                  local_address->ip()->version() ==
                                                      Address::IpVersion::v6);
    cb_.onAccept(
        std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address));
  }
}

 

onSocketEvent 메소드가 호출되면, 가장 먼저 수행하는 것은 연결된 Connection 갯수가 Global 설정을 넘어섰는지 확인합니다. 이 과정에서 Global Limit이 지정되어있고 신규 연결 요청이 Limit을 넘어서게되면, 해당 소켓에 대한 연결은 Close하고 종결처리 합니다.

 

 

이때 Global Limit으로 지정될 수 있는 값은 overload.global_downstrea_max_connections에 의해서 지정될 수 있으며, 해당 값은 OverloadManager에 의해서 관리되는 값입니다. 따라서 현재 Socket에 Accepted된 개수가 해당 값을 넘었을 경우에는 Socket 연결을 해제합니다. 해당 값에 대한 자세한 설명은 envoy 공식문서를 참고 바랍니다.

 

반대로 요청이 Global Limit을 넘지 않았을 경우는 AcceptedSocket을 생성하고 Worker 쓰레드 내 Listener는 Socket 연결에 대한 Accept 처리를 위임합니다.

 

 

위 그림은 AcceptedSocket 클래스 구조를 나타냅니다. 위 내용을 통해서 우리는 AcceptedSocket을 만드는 이유에 대해서 유추해볼 수 있습니다. 코드를 살펴보면, global_accetped_socket_count_ 라는 값이 static으로 지정되어있음을 알 수 있습니다. 그리고 생성자, 소멸자 단계에서 해당 값이 증감하는 것 또한 알 수 있습니다.

 

이를 통해서 확인되는 사실은 Socket이 접속하면 현재 Accepted된 Socket 개수를 파악할 수 있습니다. 또한 새로운 Socket이 연결되었을 때 Global Limit을 넘는지 검증할 수 있는 기준을 제시합니다.

 

AcceptedSocket을 만들고나면 그 다음에는 해당 Socket을 Listener에서 Accept하는 과정이 진행됩니다. 

 

tcp_listener_impl.cc

void TcpListenerImpl::onSocketEvent(short flags) {
  ...(중략)...
  cb_.onAccept(
        std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address));
}

 

이전에 살펴본 onSocketEvent 메소드의 마지막 줄을 살펴보면, 저장된 Callback에서 onAccept를 수행해달라고 요청하는 것을 볼 수 있습니다. 이는 TcpListenerImpl을 생성할 때, 해당 Callback 값이 Worker에 존재하는 ActiveTcpListener 이므로 해당 ActiveTcpListener가 실질적으로 Accept를 수행함을 의미합니다.

 

active_tcp_listener.cc

void ActiveTcpListener::onAccept(Network::ConnectionSocketPtr&& socket) {
  if (listenerConnectionLimitReached()) {
    RELEASE_ASSERT(socket->connectionInfoProvider().remoteAddress() != nullptr, "");
    ENVOY_LOG(trace, "closing connection from {}: listener connection limit reached for {}",
              socket->connectionInfoProvider().remoteAddress()->asString(), config_->name());
    socket->close();
    stats_.downstream_cx_overflow_.inc();
    return;
  }

  onAcceptWorker(std::move(socket), config_->handOffRestoredDestinationConnections(), false);
}

 

ActiveTcpListener에서의 Accept 과정을 살펴보면 위 코드와 같습니다.

 

 

 

 

먼저 listenerConnectionLimitReached 메소드를 수행하면서 이를 위반할 경우 Socket을 Close하는 것을 볼 수 있습니다. 이는 이전의 Connection은 Envoy 전체의 Connection을 살펴본 것이라면, 이번에는 개별 listener 별로 Connection 제한이 있는지 검사하고 만약 지정된 값이 있을 경우 그 값을 넘어서게되면 Accept하지 않습니다. 해당 설정은 Listener에서 수행할 수 있으며 envoy 공식 문서에서 이에 대해서 소개하고 있으니 참고 바랍니다.

 

ActiveTcpListener에서는 Limit 검사만 체크하고 다시 Socket에 대한 Accept는 onAcceptWorker 메소드 호출을 통해 후속 작업을 처리합니다.

 

 

active_tcp_listener.cc

void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket,
                                       bool hand_off_restored_destination_connections,
                                       bool rebalanced) {
  if (!rebalanced) {
    Network::BalancedConnectionHandler& target_handler =
        connection_balancer_.pickTargetHandler(*this);
    if (&target_handler != this) {
      target_handler.post(std::move(socket));
      return;
    }
  }

  auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
                                                         hand_off_restored_destination_connections);

  onSocketAccepted(std::move(active_socket));
}

 

이번에는 onAcceptWorker 메소드 내용을 살펴보겠습니다. 해당 코드를 살펴보면 connection_balancer에 의해서 TargetHandler를 구하는 것을 볼 수 있습니다.

 

그렇다면 connection_balancer는 무엇일까요?

 

static_resources:
  listeners:
  - connection_balance_config:
      extend_balance:
        name: envoy.network.connection_balance.dlb
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.network.connection_balance.dlb.v3alpha.Dlb

 

envoy에서는 쓰레드간의 효율적인 Socket 분배를 위해서 만약 하드웨어에서 DLB 지원이 된다면, 이를 사용하여 Connection을 안정적으로 분배할 수 있는 기능을 제공합니다. 이때 위와 같이 connection_balance_config를 지정하면, 해당 설정을 토대로 connection_balancer가 Target을 지정합니다.

 

해당 기술은 내부적으로는 Intel DLB hardware를 통해 구현되며, 위와 같이 Config 설정이 지정되어있다면 다른 Target으로 Load를 분산시킬 수 있습니다. 만약 지정되지 않았다면, 현재 Worker 쓰레드에서 Accept 과정이 정상적으로 진행될 것입니다. 이와 관련된 자세한 내용은 envoy 공식문서에 설명되어있으니 참고 바랍니다.

 

참고로 본 포스팅에서는 DLB 설정이 지정되어있지 않았다고 가정하므로 현재 Worker 쓰레드에서 해당 처리를 진행한다고 가정하겠습니다.

 

active_tcp_listener.cc

void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket,
                                       bool hand_off_restored_destination_connections,
                                       bool rebalanced) {
  ...(중략)...

  auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
                                                         hand_off_restored_destination_connections);

  onSocketAccepted(std::move(active_socket));
}

 

connection_balancer에 의해서 선정된 target이 자기 자신이라면 해당 연결을 허용하기 위해 ActiveTcpSocket을 만드는 것을 볼 수 있습니다. 

 

ActiveTcpSocket까지 생성되면, 향후 Client의 요청은 해당 Socket을 통해서 모두 처리가됩니다. 

 

 

 

즉 이말은 이전에 Envoy를 처음 학습할 때 살펴봤듯이 Client가 Envoy에게 특정 API를 요청하면, 내부적으로 Listener Filters와 Filter Chains를 통과하면서 Upstream 대상을 찾고 전달한다고 했는데, 이 과정을 수행하는 주체가 해당 소켓이 됩니다. 따라서 Active Tcp Socket은 이를 지원하기 위해서 다양한 내부 프로퍼티가 있는데, 그 중 Filter와 관련된 것이 accept_filters 입니다. 해당 속성을 기반으로 향후 Listener Filters를 만들고 이를 수행하고 그 다음에는 Filter Chains를 생성하고 이를 수행하는 작업을 진행합니다. 그리고 해당 작업을 위해서 ActiveTcpSocket을 만든 이후 onSocketAccepted를 호출합니다.

 

active_stream_listener_base.h

void onSocketAccepted(std::unique_ptr<ActiveTcpSocket> active_socket) {
  // Create and run the filters
  if (config_->filterChainFactory().createListenerFilterChain(*active_socket)) {
    active_socket->startFilterChain();
  } else {
    // If create listener filter chain failed, it means the listener is missing
    // config due to the ECDS. Then close the connection directly.
    active_socket->socket().close();
    ASSERT(active_socket->isEndFilterIteration());
  }

  // Move active_socket to the sockets_ list if filter iteration needs to continue later.
  // Otherwise we let active_socket be destructed when it goes out of scope.
  if (!active_socket->isEndFilterIteration()) {
    active_socket->startTimer();
    LinkedList::moveIntoListBack(std::move(active_socket), sockets_);
  } else {
    if (!active_socket->connected()) {
      // If active_socket is about to be destructed, emit logs if a connection is not created.
      if (active_socket->streamInfo() != nullptr) {
        emitLogs(*config_, *active_socket->streamInfo());
      } else {
        // If the active_socket is not connected, this socket is not promoted to active
        // connection. Thus the stream_info_ is owned by this active socket.
        ENVOY_BUG(active_socket->streamInfo() != nullptr,
                  "the unconnected active socket must have stream info.");
      }
    }
  }
}

 

위 코드는 onSocketAccepted 메소드 내용입니다. 코드를 살펴보면, Socket이 Accepted 되면 가장 먼저 해당 소켓에 해당되는 ListenerFilterChain을 생성하고 FilterChain을 수행하는 것을 볼 수 있습니다. 즉 이 과정부터 해당 소켓은 각종 Filter들을 통과하면서 Upstream 연결이 이어지게됩니다.

 

해당 과정을 조금 더 자세히 살펴보겠습니다.

 

 

 

 

이전 내용을 토대로 ActiveTcpListener 에서 ActiveTcpSocket을 만든 것을 확인했습니다. 그리고 createListenerFactory 메소드를 호출하면서 생성한 ActiveTcpSocket을 전달했습니다.

 

그러면 내부적으로는 Listener Config에 이미 저장되어있는 listener_filter_factories 내부에 매핑된 Factory Callback을 하나씩 실행시키면서 ActiveTcpSocket의 accept_filters에 Filter를 하나씩 생성하는 과정을 거칩니다. 해당 과정을 코드로 살펴보면 다음과 같습니다.

 

 

listener_impl.cc

bool ListenerImpl::createListenerFilterChain(Network::ListenerFilterManager& manager) {
  if (Configuration::FilterChainUtility::buildFilterChain(manager, listener_filter_factories_)) {
    return true;
  } else {
    ENVOY_LOG(debug, "New connection accepted while missing configuration. "
                     "Close socket and stop the iteration onAccept.");
    missing_listener_config_stats_.extension_config_missing_.inc();
    return false;
  }
}

 

위 코드는 ActiveTcpListener 에서 ActiveTcpSocket을 생성 후 ListenerFilterChain을 생성하기 위해 createListenerFilterChain 메소드를 호출하였을 때 과정을 나타냅니다. 코드를 살펴보면, buildFilterChain 함수 호출을 통해 자신이 보유하고 있는 listener_filter_factories_ 목록을 전달하는 것을 볼 수 있습니다.

 

configuration_impl.cc

bool FilterChainUtility::buildFilterChain(Network::ListenerFilterManager& filter_manager,
                                          const Filter::ListenerFilterFactoriesList& factories) {
  for (const auto& filter_config_provider : factories) {
    auto config = filter_config_provider->config();
    if (!config.has_value()) {
      return false;
    }
    auto config_value = config.value();
    config_value(filter_manager);
  }

  return true;
}

 

buildFilterChain 내부를 살펴보면, 전달받은 factories를 순회하면서 callback 함수를 순차적으로 수행시키는 것을 볼 수 있습니다. 그리고 해당 callback을 수행할 때 전달받은 ActiveTcpSocket 정보를 다시 넘기는 것을 확인할 수 있습니다.

 

config.cc

[listener_filter_matcher, config](Network::ListenerFilterManager& filter_manager) -> void {
  filter_manager.addAcceptFilter(listener_filter_matcher, std::make_unique<Filter>(config));
};

 

위 코드는 http instpector Listener Filter의 Factory 코드이며, 개별 Listener Filter Factory의 리턴 값은 위와 같이 FilterManager 즉 ActiveTcpSocket 정보로 받아서 addAcceptFilter 메소드를 호출하고 있는 것을 볼 수 있습니다. 또한 인자를 통해서 Factory에서 보유하고 있는 Filter 정보를 새롭게 생성하여 전달하는 것을 볼 수 있습니다.

 

 

active_tcp_socket.h

void addAcceptFilter(const Network::ListenerFilterMatcherSharedPtr& listener_filter_matcher,
                     Network::ListenerFilterPtr&& filter) override {
  accept_filters_.emplace_back(
      std::make_unique<GenericListenerFilter>(listener_filter_matcher, std::move(filter)));
}

 

그리고 ActiveTcpSocket에서는 전달받은 Filter를 accept_filters에 추가함으로써 Listener Filter Chain을 완성합니다.

 

 

active_stream_listener_base.h

void onSocketAccepted(std::unique_ptr<ActiveTcpSocket> active_socket) {  
  ...(중략)...
    active_socket->startFilterChain();
  ...(후략)...
}

 

FilterChain이 완성되면, startFilterChain()을 통해 ListenerFilterChain을 차례로 수행하도록 요청합니다.

 

active_tcp_socket.h

void startFilterChain() { continueFilterChain(true); }

 

startFilterChain()은 다시 내부에 continueFilterChain 메소드에 처리를 위임합니다.

 

 

active_tcp_socket.cc

void ActiveTcpSocket::continueFilterChain(bool success) {
  if (success) {
    bool no_error = true;
    if (iter_ == accept_filters_.end()) {
      iter_ = accept_filters_.begin();
    } else {
      iter_ = std::next(iter_);
    }

    for (; iter_ != accept_filters_.end(); iter_++) {
      Network::FilterStatus status = (*iter_)->onAccept(*this);
      if (status == Network::FilterStatus::StopIteration) {        
        if (!socket().ioHandle().isOpen()) {          
          no_error = false;
          break;
        } else {
          // If the listener maxReadBytes() is 0, then it shouldn't return
          // `FilterStatus::StopIteration` from `onAccept` to wait for more data.
          ASSERT((*iter_)->maxReadBytes() != 0);
          if (listener_filter_buffer_ == nullptr) {
            if ((*iter_)->maxReadBytes() > 0) {
              createListenerFilterBuffer();
            }
          } else {
            // If the current filter expect more data than previous filters, then
            // increase the filter buffer's capacity.
            if (listener_filter_buffer_->capacity() < (*iter_)->maxReadBytes()) {
              listener_filter_buffer_->resetCapacity((*iter_)->maxReadBytes());
            }
          }
          if (listener_filter_buffer_ != nullptr) {
            listener_filter_buffer_->activateFileEvent(Event::FileReadyType::Read);
          }
          // Waiting for more data.
          return;
        }
      }
    }
    // Successfully ran all the accept filters.
    if (no_error) {
      newConnection();
    } else {
      // Signal the caller that no extra filter chain iteration is needed.
      iter_ = accept_filters_.end();
    }
  }

  // Filter execution concluded, unlink and delete this ActiveTcpSocket if it was linked.
  if (inserted()) {
    unlink();
  }
}

 

continueFilterChain 코드 내용을 자세히 살펴보면, 생성된 Listener Filters(accept_filters)를 순회하면서 onAccept를 통해 Filter로직을 수행하는 것을 볼 수 있습니다.

 

 

만약 Filter onAccept 순회 도중에 Stop을 해야될 경우가 존재한다면, 이유를 살펴보고 요청을 중지하던지 아니면 설정 변경 후 재수행을 수행하도록 작성되어있습니다.

 

그리고 모든 Filter 순회가 종료되면, 사용자 요청에 대한 Metadata 및 요청 정보를 모두 분석하였으므로 그 다음에는 newConnection() 메소드를 호출하여 본격적으로 downstream간의 연결을 위한 작업을 진행합니다.

 

active_tcp_socket.cc

void ActiveTcpSocket::newConnection() {
  connected_ = true;

  // Check if the socket may need to be redirected to another listener.
  Network::BalancedConnectionHandlerOptRef new_listener;

  if (hand_off_restored_destination_connections_ &&
      socket_->connectionInfoProvider().localAddressRestored()) {
    // Find a listener associated with the original destination address.
    new_listener =
        listener_.getBalancedHandlerByAddress(*socket_->connectionInfoProvider().localAddress());
  }

  // Reset the file events which are registered by listener filter.
  // reference https://github.com/envoyproxy/envoy/issues/8925.
  if (listener_filter_buffer_ != nullptr) {
    listener_filter_buffer_->reset();
  }

  if (new_listener.has_value()) {
    ...(중략)...
    new_listener.value().get().onAcceptWorker(std::move(socket_), false, false);
  } else {
    // Set default transport protocol if none of the listener filters did it.
    if (socket_->detectedTransportProtocol().empty()) {
      socket_->setDetectedTransportProtocol("raw_buffer");
    }
    accept_filters_.clear();
    // Create a new connection on this listener.
    listener_.newConnection(std::move(socket_), std::move(stream_info_));
  }
}

 

newConnection 메소드를 호출하면, 다른 Listener로 Redirect 해야할 필요가 있는지를 살펴보고 listener에 새로운 Connection 할당을 요청합니다.

 

active_stream_listener_base.cc

void ActiveStreamListenerBase::newConnection(Network::ConnectionSocketPtr&& socket,
                                             std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
  // Find matching filter chain.
  const auto filter_chain = config_->filterChainManager().findFilterChain(*socket);
  if (filter_chain == nullptr) {
    ...(중략)...
    socket->close();
    return;
  }
 ...(후략)...
}

 

 

newConnection 메소드를 살펴보면, 먼저 Listener의 FilterChainManager로부터 해당 소켓 정보에 해당하는 Filter Chain을 찾는 과정을 수행합니다. 그리고 이 과정에서 매칭되는 Filter Chain을 찾지 못한다면, 연결된 Socket을 종료하고 마칩니다.

 

filter_chain_manager_impl.cc

const Network::FilterChain*
FilterChainManagerImpl::findFilterChain(const Network::ConnectionSocket& socket) const {
  if (matcher_) {
    return findFilterChainUsingMatcher(socket);
  }

  const auto& address = socket.connectionInfoProvider().localAddress();

  const Network::FilterChain* best_match_filter_chain = nullptr;
  // Match on destination port (only for IP addresses).
  if (address->type() == Network::Address::Type::Ip) {
    const auto port_match = destination_ports_map_.find(address->ip()->port());
    if (port_match != destination_ports_map_.end()) {
      best_match_filter_chain = findFilterChainForDestinationIP(*port_match->second.second, socket);
      if (best_match_filter_chain != nullptr) {
        return best_match_filter_chain;
      } else {
        // There is entry for specific port but none of the filter chain matches. Instead of
        // matching catch-all port 0, the fallback filter chain is returned.
        return default_filter_chain_.get();
      }
    }
  }
  // Match on catch-all port 0 if there is no specific port sub tree.
  const auto port_match = destination_ports_map_.find(0);
  if (port_match != destination_ports_map_.end()) {
    best_match_filter_chain = findFilterChainForDestinationIP(*port_match->second.second, socket);
  }
  return best_match_filter_chain != nullptr
             ? best_match_filter_chain
             // Neither exact port nor catch-all port matches. Use fallback filter chain.
             : default_filter_chain_.get();
}

 

위 코드는 Filter Chain Manager로 부터 Filter Chain을 찾는 과정을 보여줍니다.

 

Filter Chain Manager는 Listener 기동 시점에 설정 정보를 파싱하여 Filter Chain 정보를 모두 가지고 있습니다. 따라서 Socket의 Address 정보를 토대로 원하는 Filter Chain을 찾아서 반환해줍니다. 만약에 상응하는 Filter Chain 정보를 찾지 못한 경우에는 Default Filter Chain을 반환합니다.

 

 

active_stream_listener_base.cc

void ActiveStreamListenerBase::newConnection(Network::ConnectionSocketPtr&& socket,
                                             std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
  ...(중략)...
  stream_info->setFilterChainName(filter_chain->name());
  auto transport_socket = filter_chain->transportSocketFactory().createDownstreamTransportSocket();
  auto server_conn_ptr = dispatcher().createServerConnection(
      std::move(socket), std::move(transport_socket), *stream_info);
  if (const auto timeout = filter_chain->transportSocketConnectTimeout();
      timeout != std::chrono::milliseconds::zero()) {
    server_conn_ptr->setTransportSocketConnectTimeout(
        timeout, stats_.downstream_cx_transport_socket_connect_timeout_);
  }
  server_conn_ptr->setBufferLimits(config_->perConnectionBufferLimitBytes());
  ...(중략)...
  const bool empty_filter_chain = !config_->filterChainFactory().createNetworkFilterChain(
      *server_conn_ptr, filter_chain->networkFilterFactories());
  if (empty_filter_chain) {
    ...(중략),,,
    server_conn_ptr->close(Network::ConnectionCloseType::NoFlush);
  }
  newActiveConnection(*filter_chain, std::move(server_conn_ptr), std::move(stream_info));
}

 

매칭되는 Filter Chain을 찾았으면, 위 코드와 같은 과정을 거칩니다. 주요 내용을 살펴보면 다음과 같습니다.

 

1. Downstream을 위한 Transport Socket을 생성합니다.

2. dispatcher에게 ServerConnection을 요청합니다. 

 

 

dispatcher_impl.cc

Network::ServerConnectionPtr
DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket,
                                       Network::TransportSocketPtr&& transport_socket,
                                       StreamInfo::StreamInfo& stream_info) {
  ASSERT(isThreadSafe());
  return std::make_unique<Network::ServerConnectionImpl>(
      *this, std::move(socket), std::move(transport_socket), stream_info, true);
}

 

이때 내부적으로 dispatcher에서는 ServerConnectionImpl을 생성하는 것을 볼 수 있습니다.

 

connection_impl.cc

ServerConnectionImpl::ServerConnectionImpl(Event::Dispatcher& dispatcher,
                                           ConnectionSocketPtr&& socket,
                                           TransportSocketPtr&& transport_socket,
                                           StreamInfo::StreamInfo& stream_info, bool connected)
    : ConnectionImpl(dispatcher, std::move(socket), std::move(transport_socket), stream_info,
                     connected) {}

 

생성되는 ServerConnectionImpl의 생성자는 위와 같으며, 부호 생성자에게 전달받은 인자들을 넘깁니다.

 

connection_impl.cc

ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
                               TransportSocketPtr&& transport_socket,
                               StreamInfo::StreamInfo& stream_info, bool connected)
    : ConnectionImplBase(dispatcher, next_global_id_++),
      transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
      stream_info_(stream_info), filter_manager_(*this, *socket_),
      write_buffer_(dispatcher.getWatermarkFactory().createBuffer(
          [this]() -> void { this->onWriteBufferLowWatermark(); },
          [this]() -> void { this->onWriteBufferHighWatermark(); },
          []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
      read_buffer_(dispatcher.getWatermarkFactory().createBuffer(
          [this]() -> void { this->onReadBufferLowWatermark(); },
          [this]() -> void { this->onReadBufferHighWatermark(); },
          []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
      write_buffer_above_high_watermark_(false), detect_early_close_(true),
      enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
      write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
      transport_wants_read_(false) {

  if (!connected) {
    connecting_ = true;
  }

  Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType;

  // We never ask for both early close and read at the same time. If we are reading, we want to
  // consume all available data.
  socket_->ioHandle().initializeFileEvent(
      dispatcher_, [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
      Event::FileReadyType::Read | Event::FileReadyType::Write);

  transport_socket_->setTransportSocketCallbacks(*this);

  // TODO(soulxu): generate the connection id inside the addressProvider directly,
  // then we don't need a setter or any of the optional stuff.
  socket_->connectionInfoProvider().setConnectionID(id());
  socket_->connectionInfoProvider().setSslConnection(transport_socket_->ssl());
}

 

 

ConnectionImpl 생성자 내부를 살펴보면, Socket 내부에 Write/Read 전용 Buffer를 설정하는 것을 볼 수 있습니다. 해당 Buffer는 사용자가 HTTP 요청을 전달했을 때 데이터를 읽는 용도 그리고 upstream으로부터 응답 데이터가 전달되었을 때 기록하는 용도로 사용됩니다.

 

또한 downstream을 위한 transport_socket 생성 및 Filter Chains 관리를 위한 filter_manager를 생성합니다.

 

중요하게 살펴볼 부분은 향후 Socket 내부에 Read/Write 이벤트가 감지되면 이를 통지받고 후속 작업을 처리하기 위해 Dispatcher에게 onFileEvent()를 등록하는 부분입니다. 이는 내부적으로 다시 libevent에게 등록이 되며, 향후 Client가 HTTP 요청을 전달하게 되면, 해당 메소드로 요청 항목이 전달되어 후속 작업을 수행할 수 있습니다.

 

 

3. Filter Chain으로부터 생성해야할 Filter 목록을 확인하여 Filter Chains(Network Filters)를 생성합니다.

 

 

ServerConnection을 생성하고 나면, 그 다음에는 Filter Chains를 생성하는 작업을 수행합니다. 이를 위해 Listener 에게 FilterChain 생성을 요청합니다. 이때 생성되는 Filter Chains는 ServerConnection 내부에 매핑되어야하기 때문에 해당 정보를 Filter Chains와 같이 전달합니다.

 

listener_impl.cc

bool ListenerImpl::createNetworkFilterChain(
    Network::Connection& connection,
    const std::vector<Network::FilterFactoryCb>& filter_factories) {
  return Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);
}

 

요청을 전달받은 Listener는 Filter Chains 생성 처리를 buildFilterChain util 함수에 위임합니다.

 

configuration_impl.cc

bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager,
                                          const std::vector<Network::FilterFactoryCb>& factories) {
  for (const Network::FilterFactoryCb& factory : factories) {
    factory(filter_manager);
  }

  return filter_manager.initializeReadFilters();
}

 

buildFilterChains는 전달받은 Filter를 순차적으로 순회하면서 ServerConnection 내부에 생성하도록 Callback 팩토리 메소드를 수행합니다.

 

config.cc

return [singletons, filter_config, &context,
        clear_hop_by_hop_headers](Network::FilterManager& filter_manager) -> void {
  auto hcm = std::make_shared<Http::ConnectionManagerImpl>(
      *filter_config, context.drainDecision(), context.api().randomGenerator(),
      context.httpContext(), context.runtime(), context.localInfo(), context.clusterManager(),
      context.overloadManager(), context.mainThreadDispatcher().timeSource());
  if (!clear_hop_by_hop_headers) {
    hcm->setClearHopByHopResponseHeaders(false);
  }
  filter_manager.addReadFilter(std::move(hcm));
};

 

가령 위 코드는 Http Connection Manager Filter에서 반환하는 Callback Factory 메소드가 수행되었다고 가정해봤습니다.

 

 

이때 인자로 ServerConnection을 전달받았으며, 해당 람다내에서는 HttpConnectionManager를 생성한 다음 ServerConnection 내부에 addReadFilter를 호출하여 Filter 정보를 등록하는 것을 볼 수 있습니다.

 

connection_impl.cc

void ConnectionImpl::addReadFilter(ReadFilterSharedPtr filter) {
  filter_manager_.addReadFilter(filter);
}

 

이 경우 호출되는 ServerConnection 내부에서는 filter_manager를 통해 readFilter를 등록하도록 재요청합니다. 

 

filter_manager_impl.cc

void FilterManagerImpl::addReadFilter(ReadFilterSharedPtr filter) {
  ASSERT(connection_.state() == Connection::State::Open);
  ActiveReadFilterPtr new_filter = std::make_unique<ActiveReadFilter>(*this, filter);
  filter->initializeReadFilterCallbacks(*new_filter);
  LinkedList::moveIntoListBack(std::move(new_filter), upstream_filters_);
}

 

filter_manager에서 addReadFilter가 호출되면, 생성된 Filter에 ActiveReadFilter 정보를 전달하여 해당 Filter 내에서 향후 Callback할 수 있도록 initializeReadFilterCallbacks를 등록합니다. 그리고 자신의 upstream_filters_에 Read Filter를 등록하는 것을 볼 수 있습니다.

 

conn_manager_impl.cc

void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
  read_callbacks_ = &callbacks;
  stats_.named_.downstream_cx_total_.inc();
  stats_.named_.downstream_cx_active_.inc();
  if (read_callbacks_->connection().ssl()) {
    stats_.named_.downstream_cx_ssl_total_.inc();
    stats_.named_.downstream_cx_ssl_active_.inc();
  }

  read_callbacks_->connection().addConnectionCallbacks(*this);

  if (!read_callbacks_->connection()
           .streamInfo()
           .filterState()
           ->hasData<Network::ProxyProtocolFilterState>(Network::ProxyProtocolFilterState::key())) {
    read_callbacks_->connection().streamInfo().filterState()->setData(
        Network::ProxyProtocolFilterState::key(),
        std::make_unique<Network::ProxyProtocolFilterState>(Network::ProxyProtocolData{
            read_callbacks_->connection().connectionInfoProvider().remoteAddress(),
            read_callbacks_->connection().connectionInfoProvider().localAddress()}),
        StreamInfo::FilterState::StateType::ReadOnly,
        StreamInfo::FilterState::LifeSpan::Connection);
  }

  if (config_.idleTimeout()) {
    connection_idle_timer_ = read_callbacks_->connection().dispatcher().createScaledTimer(
        Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout,
        [this]() -> void { onIdleTimeout(); });
    connection_idle_timer_->enableTimer(config_.idleTimeout().value());
  }

  if (config_.maxConnectionDuration()) {
    connection_duration_timer_ = read_callbacks_->connection().dispatcher().createTimer(
        [this]() -> void { onConnectionDurationTimeout(); });
    connection_duration_timer_->enableTimer(config_.maxConnectionDuration().value());
  }

  read_callbacks_->connection().setDelayedCloseTimeout(config_.delayedCloseTimeout());

  read_callbacks_->connection().setConnectionStats(
      {stats_.named_.downstream_cx_rx_bytes_total_, stats_.named_.downstream_cx_rx_bytes_buffered_,
       stats_.named_.downstream_cx_tx_bytes_total_, stats_.named_.downstream_cx_tx_bytes_buffered_,
       nullptr, &stats_.named_.downstream_cx_delayed_close_timeout_});
}

 

등록된 Filter가 Http Connection Manager Filter임을 가정하였으므로, initializeReadFilter를 호출하면, 위 코드 구문이 호출될 것입니다. 해당 코드를 개략적으로 살펴보면, Callback을 자신의 프로퍼티에 등록하는 작업 외에 metric 정보를 갱신하는 것을 볼 수 있습니다. 또한 Timeout이 발생하면 이를 처리하기 위해 dispatcher에게 다양한 Timer 생성 및 Timeout 발생 시 처리 하도록 등록하는 등 Read Filter 동작과 관련된 기본 초기화 작업을 수행합니다.

 

Http Connection Manager의 경우는 Read Filter의 역할만을 수행하기 때문에 filter_manager 내부에 upstream_filters에만 filter가 추가됩니다. 하지만 FilterChain Factory 내부에는 Read Filter 뿐만 아니라 Writer Filter의 역할을 수행하는 것도 있고 Read/Write를 모두 수행하는 Filter가 존재합니다.

 

filter_manager_impl.cc

void FilterManagerImpl::addWriteFilter(WriteFilterSharedPtr filter) {
  ASSERT(connection_.state() == Connection::State::Open);
  ActiveWriteFilterPtr new_filter = std::make_unique<ActiveWriteFilter>(*this, filter);
  filter->initializeWriteFilterCallbacks(*new_filter);
  LinkedList::moveIntoList(std::move(new_filter), downstream_filters_);
}

void FilterManagerImpl::addFilter(FilterSharedPtr filter) {
  addReadFilter(filter);
  addWriteFilter(filter);
}

 

이 경우에는 람다내에서 ServerConnectionImpl에 Filter 생성을 요청할 때 addWriteFilter 혹은 addFilter 메소드를 호출합니다. 가령 addWriterFilter의 경우는 Writer Filter만의 역할을 수행하기 때문에 궁극적으로는 filter_manager의 downstream_filters_ 에 추가되면서 Read Filter 등록과 마찬가지로 ActiveWriterFilter 인스턴스를 만들어서 향후 Callback할 수 있도록 initializeWriteFilterCallbacks를 등록하는 것을 볼 수 있습니다.

 

반면 addFilter의 경우는 Read/Write를 모두 수행할 때 호출되기 때문에 addReadFilter와 addWriteFilter를 차례대로 호출합니다.

 

configuration_impl.cc

bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager,
                                          const std::vector<Network::FilterFactoryCb>& factories) {
  for (const Network::FilterFactoryCb& factory : factories) {
    factory(filter_manager);
  }

  return filter_manager.initializeReadFilters();
}

 

Filter Factories로 부터 Filter 생성이 모두 완료되면, ServerConnectionImpl 내부 filter_manager에는 upstream 전용 filter와 downstream 전용 filter가 모두 생성되어있습니다. 그 과정이 완료되면 ServerConnectionImpl 내부의 initializeReadFilters()를 호출 합니다.

 

connection_impl.cc

bool ConnectionImpl::initializeReadFilters() { return filter_manager_.initializeReadFilters(); }

 

그리고 해당 요청을 전달받은 ServerConnectionImpl 내부에서는 다시 filter_manager에게 해당 요청 처리를 위임합니다.

 

filter_manager_impl.cc

bool FilterManagerImpl::initializeReadFilters() {
  if (upstream_filters_.empty()) {
    return false;
  }
  onContinueReading(nullptr, connection_);
  return true;
}

void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter,
                                          ReadBufferSource& buffer_source) {
  // Filter could return status == FilterStatus::StopIteration immediately, close the connection and
  // use callback to call this function.
  if (connection_.state() != Connection::State::Open) {
    return;
  }

  std::list<ActiveReadFilterPtr>::iterator entry;
  if (!filter) {
    connection_.streamInfo().addBytesReceived(buffer_source.getReadBuffer().buffer.length());
    entry = upstream_filters_.begin();
  } else {
    entry = std::next(filter->entry());
  }

  for (; entry != upstream_filters_.end(); entry++) {
    if (!(*entry)->filter_) {
      continue;
    }
    if (!(*entry)->initialized_) {
      (*entry)->initialized_ = true;
      FilterStatus status = (*entry)->filter_->onNewConnection();
      if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
        return;
      }
    }

    StreamBuffer read_buffer = buffer_source.getReadBuffer();
    if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
      FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
      if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
        return;
      }
    }
  }
}

 

해당 처리 과정을 살펴보면, 초기에 upstream_filters_에 등록할 때 ActiveReadFilter 인스턴스를 새로 생성해서 등록했는데 초기 initialized_ 값은 기본적으로 false 입니다. 따라서 위 코드에서는 등록된 upstream_filters_ 를 순회하면서 가장 먼저 onNewConnection() 작업을 수행할 것입니다.

 

여기에는 등록된 Filter가 Http Connection Manager 하나만 존재한다고 가정하기 때문에, 해당 Filter의 onNewConnection()이 호출됩니다.

 

connection_manager_impl.cc

Network::FilterStatus ConnectionManagerImpl::onNewConnection() {
  if (!read_callbacks_->connection().streamInfo().protocol()) {
    // For Non-QUIC traffic, continue passing data to filters.
    return Network::FilterStatus::Continue;
  }
  // Only QUIC connection's stream_info_ specifies protocol.
  Buffer::OwnedImpl dummy;
  createCodec(dummy);
  ASSERT(codec_->protocol() == Protocol::Http3);
  // Stop iterating through each filters for QUIC. Currently a QUIC connection
  // only supports one filter, HCM, and bypasses the onData() interface. Because
  // QUICHE already handles de-multiplexing.
  return Network::FilterStatus::StopIteration;
}

 

해당 코드를 잠시 살펴보면, QUIC 트래픽인지 확인하고 일반 HTTP 프로토콜이라면 해당 Filter 사용이 가능하도록 구현되어있음을 확인할 수 있습니다.

 

여기까지 수행하면 Filter Chains를 생성하고 Filter의 초기화까지 수행하는 전 과정을 살펴볼 수 있습니다.

 

active_stream_listener_base.cc

void ActiveStreamListenerBase::newConnection(Network::ConnectionSocketPtr&& socket,
                                             std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
  ...(중략)...
  const bool empty_filter_chain = !config_->filterChainFactory().createNetworkFilterChain(
      *server_conn_ptr, filter_chain->networkFilterFactories());
  if (empty_filter_chain) {
    ...(중략),,,
    server_conn_ptr->close(Network::ConnectionCloseType::NoFlush);
  }
  newActiveConnection(*filter_chain, std::move(server_conn_ptr), std::move(stream_info));
}

 

앞서 살펴본 긴 과정의 Filter Chains를 생성하고나면, Filter Chains가 존재하는지 여부를 bool 값으로 반환합니다. 만약에 Filter Chains 생성 과정에서 생성된 FilterChains가 전혀 존재하지 않는다면, Stream을 연결하여 작업을 이어나가는 것이 무의미하기 때문에 해당 Server Connection을 종료합니다. 그렇지 않을 경우 생성된 Filter Chains를 기반으로 ActiveConnection을 생성하는 과정을 진행합니다.

 

active_tcp_listener.cc

void ActiveTcpListener::newActiveConnection(const Network::FilterChain& filter_chain,
                                            Network::ServerConnectionPtr server_conn_ptr,
                                            std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
  auto& active_connections = getOrCreateActiveConnections(filter_chain);
  auto active_connection =
      std::make_unique<ActiveTcpConnection>(active_connections, std::move(server_conn_ptr),
                                            dispatcher().timeSource(), std::move(stream_info));
  // If the connection is already closed, we can just let this connection immediately die.
  if (active_connection->connection_->state() != Network::Connection::State::Closed) {
    ENVOY_CONN_LOG(
        debug, "new connection from {}", *active_connection->connection_,
        active_connection->connection_->connectionInfoProvider().remoteAddress()->asString());
    active_connection->connection_->addConnectionCallbacks(*active_connection);
    LinkedList::moveIntoList(std::move(active_connection), active_connections.connections_);
  }
}

 

여기서 ActiveConnection이란 생성된 Filter Chains를 사용하는 Connection이 얼마나 있는지를 관리하기 위한 자료구조 입니다. 따라서 getOrCreateActiveConnection 메소드를 호출함으로써, 먼저 해당 Filter에 존재하는 ActiveConnection이 있는지를 살펴봅니다. 

 

active_stream_listener_base.cc

ActiveConnections& OwnedActiveStreamListenerBase::getOrCreateActiveConnections(
    const Network::FilterChain& filter_chain) {
  ActiveConnectionCollectionPtr& connections = connections_by_context_[&filter_chain];
  if (connections == nullptr) {
    connections = std::make_unique<ActiveConnections>(*this, filter_chain);
  }
  return *connections;
}

active_stream_listener_base.h

absl::flat_hash_map<const Network::FilterChain*, ActiveConnectionCollectionPtr>
    connections_by_context_;

 

active_stream_listener_base.h

class ActiveConnections : public Event::DeferredDeletable {
public:
  ActiveConnections(OwnedActiveStreamListenerBase& listener,
                    const Network::FilterChain& filter_chain);
  ~ActiveConnections() override;

  // listener filter chain pair is the owner of the connections
  OwnedActiveStreamListenerBase& listener_;
  const Network::FilterChain& filter_chain_;
  // Owned connections.
  std::list<std::unique_ptr<ActiveTcpConnection>> connections_;
};

 

 

이때 ActiveConnections를 가져오는 메소드는 위 코드와 같으며, 내부적으로는 FilterChain 별로 ActiveConnection 목록을 관리하는 hash map을 통해서 참조하는 것을 확인할 수 있습니다.

 

ActiveConnections를 가져오면 신규로 생성하는 ActiveConnection을 만들고 ActiveConnections에 추가하는 것으로 Client의 접속 요청 이후 Connection 할당 과정은 마무리됩니다.

 

 


4. Client Connection 연결 과정 정리

 

이전에 Client Connection 연결 과정을 코드를 통해서 어떻게 구성되는지 집중적으로 살펴봤습니다. 다만 지엽적인 내용까지 살펴보느라 전체적인 흐름을 이해하기 쉽지 않았을 수도 있습니다. 따라서 이번에는 큰 그림에서 컴포넌트간 메시지 교환을 중점으로 Client Connection이 어떻게 연결되는지 개략적으로 다시 살펴보도록 하겠습니다. 

 

 

 

1. Libevent 로 부터 Listener 가 Listen 하고 있는 소켓으로 사용자의 Connection 연결 요청이 접수되었을 때, 내부적으로 존재하는 Worker 쓰레드 중 하나를 선정하여 해당 Listener의 onSocketEvent 메소드를 호출합니다. 참고로 위 예시에서는 Listener 0에 요청이 전달되었을 때 Worker 0번 쓰레드가 담당한다고 가정하여 ActiveTcpListener 0이 수신받았습니다.

 

2. ActiveTcpListener에서는 전체 연결된 Connection 개수가 Global Limit을 넘었는지 확인하고 내부적으로 AcceptedSocket을 거쳐 ActiveTcpSocket을 생성하여 Connection 연결을 Accept합니다.

 

3. 생성된 Socket에서 Listener Filter Chain을 생성하기 위해 Listener Config에게 Listener Filter Chain 정보를 요청합니다.

 

4. Listener Config는 Envoy 기동 과정에서 Parsing되었거나 LDS로 부터 갱신된 Listener Filter Chains 정보를 기반으로 Chains 내부에 존재하는 Factory Callback 메소드를 순회하면서 ActiveTcpSocket 내부에 있는 accept_filters에 Listener Filter를 생성하여 바인딩합니다.

 

5. ActiveTcpSocket은 Listener Filters를 순회하면서 onAccept를 수행합니다. 

 

6. Filter Chains(Network Filters)를 생성하기 위해 ActiveTcpListener는 Listener Config 로부터 보유하고 있는 Filter Chains 정보를 요청합니다.

 

7. Listener Config 내부에 존재하는 filter_chain_manager에는 Trie, HashMap 등을 비롯하여 Filter Chain 정보를 빠르게 찾기 위한 다양한 자료구조가 존재하는데, 이를 활용하여 사용자 요청한 Filter Chain 정보를 제공합니다.

 

8. ActiveTcpListener는 ServerConnetionImpl을 생성하기 위해 Dispatcher에 요청합니다.

 

9. Dispatcher는 ServerConnectionImpl을 생성합니다.

 

10. 생성된 ServerConnectionImpl은 사용자가 접속 연결 이후 실질적으로 HTTP 요청 시, 이벤트를 수신 받아야되기 때문에 Dispatcher에게 Socket 이벤트를 전달하도록 등록 요청합니다.

 

11. Didpatcher는 Libevent에게 ServerConnectionImpl이 요청한 정보를 등록합니다. 이후 해당 Socket에 이벤트가 감지되면 ServerConnectionImpl이 등록한 onFileEvent 메소드를 호출합니다.

 

12. ActiveTcpListener는 Filter Chains를 생성하기 위해 Listner Config에게 처리를 요청합니다.

 

13. Listener Config는 Util 함수를 활용하여 Filter Chains를 생성합니다. 이때 Filter의 특성에 따라 Read Filter인 경우에는 ServerConnectionImpl 내에 있는 filter_manager의 upstream_filters_에 매핑되고 Write Filter라면 downstream_filters_에 매핑됩니다. 만약 Read/Write 둘 다 처리하는 경우에는 두 군데 모두 입력합니다.

 

14. 생성된 Filter Chain에 연결된 Server Connection 정보를 관리하기 위해 ActiveTcpSocket 내부에 있는 connections_by_contexts_에 신규로 생성된 ActiveConnection 정보를 등록합니다.

 


5. 마무리

 

이번 포스팅에서는 사용자가 HTTP 요청을 통해 envoy의 기능을 이용하고자 할 때 내부적으로 Listener 구성이 어떻게 되어있는지 그리고 Client Connection이 어떻게 할당되는지에 대해서 집중적으로 알아봤습니다.

 

envoy를 처음 학습하면, 이러한 과정이 마법처럼 느껴지는데 코드를 통해서 살펴보니 동작 과정에 대해서 어느정도 이해할 수 있었던 것 같습니다. 다음 포스팅에서는 연결이 완료된 이후 실제 HTTP 요청이 전달될 때 처리 과정에 대해서 살펴보겠습니다.

+ Recent posts