1. 서론

 

이번 포스팅은 그동안 envoy-internals 시리즈의 이해를 바탕으로 사용자가 Http 전달을 요청했을 때, Envoy 내부구조를 토대로 네트워크 요청이 어떻게 흘러가는지에 대해서 전체적인 흐름을 상세히 조망해보는 시간을 가져보려 합니다. 사실 envoy에 대해서 분석했던 계기 중 하나가 도대체 어떻게 사용자의 요청이 전달되는지에 대한 궁금증에서 출발했기 때문에 이번 포스팅을 위해서 이전 시리즈의 내용이 존재했다고 생각합니다. 따라서 이번 내용은 이전 내용에 대한 이해가 선행되어야하므로 이전 시리즈 내용을 정독하고 보시는 것을 추천드립니다. 

 


2.  Worker 쓰레드 소켓 할당 과정

 

이전 시리즈 내용을 통해 Listener Manager에서 네트워크 요청 처리를 위해서 여러개의 Worker 쓰레드를 생성하는 것을 이해할 수 있었습니다. 이때 Worker 쓰레드 생성 갯수는 envoy 생성 당시 --concurrency 인자에 의해서 결정되는 것 또한 확인했습니다. 이번에는 Envoy 기동 과정 중 Worker 쓰레드 상호작용을 살펴보면서 Worker 쓰레드에서 어떻게 네트워크 요청을 처리하는지 살펴보겠습니다.

 

 

위 그림은 --concurrency 값이 2개이고 static_config, dynamic_config에 의해서 등록된 Listener 개수 또한 2개이면서 모두 TCP임을 가정했습니다. 이때 기동 과정을 살펴보면 다음과 같습니다.

 

1. Envoy 기동 과정에서 --concurrency 값을 살펴보고 Listener Manager에게 Worker 쓰레드 생성을 요청합니다. Listener Manager는 Worker 쓰레드를 요청만큼 생성합니다. 이 과정에서 Worker 쓰레드 내부에 Dispatcher와 Connection Handler가 생성됩니다.

 

2. Worker 쓰레드가 생성되는 과정에서 Envoy의 TLS를 관장하는 메인 쓰레드 InstanceImpl에 쓰레드를 등록합니다. 이 과정에서 InstanceImpl에서 Worker 쓰레드에 위치한 Dispatcher 정보를 registered_threads_ 에 저장할 수 있으며, 향후 Worker 쓰레드의 Dispatcher에 참조가 가능합니다.

 

3. Worker 쓰레드 생성이 마무리되면, Config 파일 파싱 도중 static configuration 정보를 등록하기 위해 Listener Manager에게 Config 정보 등록을 요청합니다. 해당 과정은 향후 LDS에 의해서도 생성될 수 있습니다. 이 과정에서 Listener Component Factory를 통해 --concurrency 만큼 Socket을 생성합니다.

 

4. Envoy의 설정이 모두 완료되면, Listener Manager에게 기동을 요청합니다.

 

5. Listener Manager에서는 기동 과정에서 등록된 Listener Config 정보를 Worker 쓰레드에 모두 Bind하기 위해 개별 Worker 쓰레드에게 Listener 생성을 요청합니다.

 

6. Worker 쓰레드내에 존재하는 Connection Handler에 Listener를 생성하기 위해 먼저 Listener로부터 자신의 Worker 쓰레드 번호에 해당하는 Socket 정보를 얻어옵니다. 그리고 Worker 쓰레드에서 외부 요청을 참조하기 위해 Dispatcher에게 Socket 정보를 전달하면서 Listener 생성을 요청합니다.

 

connection_handler_impl.cc

details->addActiveListener(
    config, address, listener_reject_fraction_, disable_listeners_,
    std::make_unique<ActiveTcpListener>(
        *this, config, runtime,
        socket_factory->getListenSocket(worker_index_.has_value() ? *worker_index_ : 0),
        address, config.connectionBalancer(*address)));

 

active_tcp_listener.cc

ActiveTcpListener::ActiveTcpListener(Network::TcpConnectionHandler& parent,
                                     Network::ListenerConfig& config, Runtime::Loader& runtime,
                                     Network::SocketSharedPtr&& socket,
                                     Network::Address::InstanceConstSharedPtr& listen_address,
                                     Network::ConnectionBalancer& connection_balancer)
    : OwnedActiveStreamListenerBase(
          parent, parent.dispatcher(),
          parent.dispatcher().createListener(std::move(socket), *this, runtime, config.bindToPort(),
                                             config.ignoreGlobalConnLimit()),
          config),
      tcp_conn_handler_(parent), connection_balancer_(connection_balancer),
      listen_address_(listen_address) {
  connection_balancer_.registerHandler(*this);
}

 

 

7. Dispatcher는 전달받은 Socket 정보를 토대로 Listener를 생성하여 반환합니다. 이때 주의해서 살펴볼 것은 사용자가 접속했을 때, 인자로 전달받은 TcpListenerCallbacks에게 Accept 요청을 수행하는데 호출되는 주체가 Connection Handler에서 전달한 ActiveTcpListener라는 것입니다. 따라서 향후 사용자가 접속하게되면, 그에 대한 Accept 처리는 ActiveTcpListener가 담당하게됩니다.

 

dispatcher_impl.cc

Network::ListenerPtr DispatcherImpl::createListener(Network::SocketSharedPtr&& socket,
                                                    Network::TcpListenerCallbacks& cb,
                                                    Runtime::Loader& runtime, bool bind_to_port,
                                                    bool ignore_global_conn_limit) {
  return std::make_unique<Network::TcpListenerImpl>(*this, random_generator_, runtime,
                                                    std::move(socket), cb, bind_to_port,
                                                    ignore_global_conn_limit);
}

 

8. 생성된 Listener에서는 소켓 정보를 libevent에 등록하여 향후 Client가 접속을 요청했을 때 libevent에 의해 요청을 전달받을 수 있도록 등록합니다.

 

tcp_listener_impl.cc

socket_->ioHandle().initializeFileEvent(
    dispatcher, [this](uint32_t events) -> void { onSocketEvent(events); },
    Event::FileTriggerType::Level, Event::FileReadyType::Read);

 

 

위와같이 8단계를 거치게되면, 생성된 모든 Worker 쓰레드에서 Listener 및 소켓을 생성하고 Client 요청을 수신받을 수 있는 상태가 완료됩니다.

 


3. Client Connection 연결 과정

 

지금까지 Envoy를 기동하는 과정에서 Worker 쓰레드가 생성되고, Listener 및 Socket을 생성하는 것을 살펴봤습니다.

 

tcp_listener_impl.cc

TcpListenerImpl::TcpListenerImpl(Event::DispatcherImpl& dispatcher, Random::RandomGenerator& random,
                                 Runtime::Loader& runtime, SocketSharedPtr socket,
                                 TcpListenerCallbacks& cb, bool bind_to_port,
                                 bool ignore_global_conn_limit)
    : BaseListenerImpl(dispatcher, std::move(socket)), cb_(cb), random_(random), runtime_(runtime),
      bind_to_port_(bind_to_port), reject_fraction_(0.0),
      ignore_global_conn_limit_(ignore_global_conn_limit) {
  if (bind_to_port) {
    socket_->ioHandle().initializeFileEvent(
        dispatcher, [this](uint32_t events) -> void { onSocketEvent(events); },
        Event::FileTriggerType::Level, Event::FileReadyType::Read);
  }
}

 

이때 dispatcher를 통해서 libevent에서 해당 소켓에 이벤트가 감지되면, onSocketEvent 메소드를 호출하도록 위 코드와 같이 등록됩니다. 즉 이 과정을 통해서 각각의 Worker 쓰레드에 존재하는 Listener는 dispatcher에 의해 libevent로 등록되었으므로 사용자가 OS로부터 Socket 생성을 요청했을 때, OS는 등록된 socket 중 하나를 임의로 선정하여 요청을 전달할 수 있게됩니다.

 

 

 

그렇다면 Listener 설정이 모두 완료된 이후 Client로부터 Connection 요청이 들어오면 어떠한 과정을 거치게될까요?

 

 

먼저 Socket Event가 감지되면, 위와 같이 두개의 Worker 쓰레드 중 누가 해당 요청을 처리해야하는지 선택해야합니다. 이때 Worker 쓰레드 선정에 대한 결정은 이전에 설명했듯이 전적으로 OS가 수행합니다. 따라서 가령 위와같이 Worker_0번이 선택되었으면, 해당 쓰레드의 onSocketEvent가 실행될 것입니다.

 

tcp_listener_impl.cc

void TcpListenerImpl::onSocketEvent(short flags) {
  ASSERT(bind_to_port_);
  ASSERT(flags & (Event::FileReadyType::Read));

  while (1) {
    if (!socket_->ioHandle().isOpen()) {
      PANIC(fmt::format("listener accept failure: {}", errorDetails(errno)));
    }

    sockaddr_storage remote_addr;
    socklen_t remote_addr_len = sizeof(remote_addr);

    IoHandlePtr io_handle =
        socket_->ioHandle().accept(reinterpret_cast<sockaddr*>(&remote_addr), &remote_addr_len);
    if (io_handle == nullptr) {
      break;
    }

    if (rejectCxOverGlobalLimit()) {
      io_handle->close();
      cb_.onReject(TcpListenerCallbacks::RejectCause::GlobalCxLimit);
      continue;
    } else if (random_.bernoulli(reject_fraction_)) {
      io_handle->close();
      cb_.onReject(TcpListenerCallbacks::RejectCause::OverloadAction);
      continue;
    }

    const Address::InstanceConstSharedPtr& local_address =
        local_address_ ? local_address_ : io_handle->localAddress();

    const Address::InstanceConstSharedPtr remote_address =
        (remote_addr.ss_family == AF_UNIX)
            ? io_handle->peerAddress()
            : Address::addressFromSockAddrOrThrow(remote_addr, remote_addr_len,
                                                  local_address->ip()->version() ==
                                                      Address::IpVersion::v6);
    cb_.onAccept(
        std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address));
  }
}

 

onSocketEvent 메소드가 호출되면, 가장 먼저 수행하는 것은 연결된 Connection 갯수가 Global 설정을 넘어섰는지 확인합니다. 이 과정에서 Global Limit이 지정되어있고 신규 연결 요청이 Limit을 넘어서게되면, 해당 소켓에 대한 연결은 Close하고 종결처리 합니다.

 

 

이때 Global Limit으로 지정될 수 있는 값은 overload.global_downstrea_max_connections에 의해서 지정될 수 있으며, 해당 값은 OverloadManager에 의해서 관리되는 값입니다. 따라서 현재 Socket에 Accepted된 개수가 해당 값을 넘었을 경우에는 Socket 연결을 해제합니다. 해당 값에 대한 자세한 설명은 envoy 공식문서를 참고 바랍니다.

 

반대로 요청이 Global Limit을 넘지 않았을 경우는 AcceptedSocket을 생성하고 Worker 쓰레드 내 Listener는 Socket 연결에 대한 Accept 처리를 위임합니다.

 

 

위 그림은 AcceptedSocket 클래스 구조를 나타냅니다. 위 내용을 통해서 우리는 AcceptedSocket을 만드는 이유에 대해서 유추해볼 수 있습니다. 코드를 살펴보면, global_accetped_socket_count_ 라는 값이 static으로 지정되어있음을 알 수 있습니다. 그리고 생성자, 소멸자 단계에서 해당 값이 증감하는 것 또한 알 수 있습니다.

 

이를 통해서 확인되는 사실은 Socket이 접속하면 현재 Accepted된 Socket 개수를 파악할 수 있습니다. 또한 새로운 Socket이 연결되었을 때 Global Limit을 넘는지 검증할 수 있는 기준을 제시합니다.

 

AcceptedSocket을 만들고나면 그 다음에는 해당 Socket을 Listener에서 Accept하는 과정이 진행됩니다. 

 

tcp_listener_impl.cc

void TcpListenerImpl::onSocketEvent(short flags) {
  ...(중략)...
  cb_.onAccept(
        std::make_unique<AcceptedSocketImpl>(std::move(io_handle), local_address, remote_address));
}

 

이전에 살펴본 onSocketEvent 메소드의 마지막 줄을 살펴보면, 저장된 Callback에서 onAccept를 수행해달라고 요청하는 것을 볼 수 있습니다. 이는 TcpListenerImpl을 생성할 때, 해당 Callback 값이 Worker에 존재하는 ActiveTcpListener 이므로 해당 ActiveTcpListener가 실질적으로 Accept를 수행함을 의미합니다.

 

active_tcp_listener.cc

void ActiveTcpListener::onAccept(Network::ConnectionSocketPtr&& socket) {
  if (listenerConnectionLimitReached()) {
    RELEASE_ASSERT(socket->connectionInfoProvider().remoteAddress() != nullptr, "");
    ENVOY_LOG(trace, "closing connection from {}: listener connection limit reached for {}",
              socket->connectionInfoProvider().remoteAddress()->asString(), config_->name());
    socket->close();
    stats_.downstream_cx_overflow_.inc();
    return;
  }

  onAcceptWorker(std::move(socket), config_->handOffRestoredDestinationConnections(), false);
}

 

ActiveTcpListener에서의 Accept 과정을 살펴보면 위 코드와 같습니다.

 

 

 

 

먼저 listenerConnectionLimitReached 메소드를 수행하면서 이를 위반할 경우 Socket을 Close하는 것을 볼 수 있습니다. 이는 이전의 Connection은 Envoy 전체의 Connection을 살펴본 것이라면, 이번에는 개별 listener 별로 Connection 제한이 있는지 검사하고 만약 지정된 값이 있을 경우 그 값을 넘어서게되면 Accept하지 않습니다. 해당 설정은 Listener에서 수행할 수 있으며 envoy 공식 문서에서 이에 대해서 소개하고 있으니 참고 바랍니다.

 

ActiveTcpListener에서는 Limit 검사만 체크하고 다시 Socket에 대한 Accept는 onAcceptWorker 메소드 호출을 통해 후속 작업을 처리합니다.

 

 

active_tcp_listener.cc

void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket,
                                       bool hand_off_restored_destination_connections,
                                       bool rebalanced) {
  if (!rebalanced) {
    Network::BalancedConnectionHandler& target_handler =
        connection_balancer_.pickTargetHandler(*this);
    if (&target_handler != this) {
      target_handler.post(std::move(socket));
      return;
    }
  }

  auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
                                                         hand_off_restored_destination_connections);

  onSocketAccepted(std::move(active_socket));
}

 

이번에는 onAcceptWorker 메소드 내용을 살펴보겠습니다. 해당 코드를 살펴보면 connection_balancer에 의해서 TargetHandler를 구하는 것을 볼 수 있습니다.

 

그렇다면 connection_balancer는 무엇일까요?

 

static_resources:
  listeners:
  - connection_balance_config:
      extend_balance:
        name: envoy.network.connection_balance.dlb
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.network.connection_balance.dlb.v3alpha.Dlb

 

envoy에서는 쓰레드간의 효율적인 Socket 분배를 위해서 만약 하드웨어에서 DLB 지원이 된다면, 이를 사용하여 Connection을 안정적으로 분배할 수 있는 기능을 제공합니다. 이때 위와 같이 connection_balance_config를 지정하면, 해당 설정을 토대로 connection_balancer가 Target을 지정합니다.

 

해당 기술은 내부적으로는 Intel DLB hardware를 통해 구현되며, 위와 같이 Config 설정이 지정되어있다면 다른 Target으로 Load를 분산시킬 수 있습니다. 만약 지정되지 않았다면, 현재 Worker 쓰레드에서 Accept 과정이 정상적으로 진행될 것입니다. 이와 관련된 자세한 내용은 envoy 공식문서에 설명되어있으니 참고 바랍니다.

 

참고로 본 포스팅에서는 DLB 설정이 지정되어있지 않았다고 가정하므로 현재 Worker 쓰레드에서 해당 처리를 진행한다고 가정하겠습니다.

 

active_tcp_listener.cc

void ActiveTcpListener::onAcceptWorker(Network::ConnectionSocketPtr&& socket,
                                       bool hand_off_restored_destination_connections,
                                       bool rebalanced) {
  ...(중략)...

  auto active_socket = std::make_unique<ActiveTcpSocket>(*this, std::move(socket),
                                                         hand_off_restored_destination_connections);

  onSocketAccepted(std::move(active_socket));
}

 

connection_balancer에 의해서 선정된 target이 자기 자신이라면 해당 연결을 허용하기 위해 ActiveTcpSocket을 만드는 것을 볼 수 있습니다. 

 

ActiveTcpSocket까지 생성되면, 향후 Client의 요청은 해당 Socket을 통해서 모두 처리가됩니다. 

 

 

 

즉 이말은 이전에 Envoy를 처음 학습할 때 살펴봤듯이 Client가 Envoy에게 특정 API를 요청하면, 내부적으로 Listener Filters와 Filter Chains를 통과하면서 Upstream 대상을 찾고 전달한다고 했는데, 이 과정을 수행하는 주체가 해당 소켓이 됩니다. 따라서 Active Tcp Socket은 이를 지원하기 위해서 다양한 내부 프로퍼티가 있는데, 그 중 Filter와 관련된 것이 accept_filters 입니다. 해당 속성을 기반으로 향후 Listener Filters를 만들고 이를 수행하고 그 다음에는 Filter Chains를 생성하고 이를 수행하는 작업을 진행합니다. 그리고 해당 작업을 위해서 ActiveTcpSocket을 만든 이후 onSocketAccepted를 호출합니다.

 

active_stream_listener_base.h

void onSocketAccepted(std::unique_ptr<ActiveTcpSocket> active_socket) {
  // Create and run the filters
  if (config_->filterChainFactory().createListenerFilterChain(*active_socket)) {
    active_socket->startFilterChain();
  } else {
    // If create listener filter chain failed, it means the listener is missing
    // config due to the ECDS. Then close the connection directly.
    active_socket->socket().close();
    ASSERT(active_socket->isEndFilterIteration());
  }

  // Move active_socket to the sockets_ list if filter iteration needs to continue later.
  // Otherwise we let active_socket be destructed when it goes out of scope.
  if (!active_socket->isEndFilterIteration()) {
    active_socket->startTimer();
    LinkedList::moveIntoListBack(std::move(active_socket), sockets_);
  } else {
    if (!active_socket->connected()) {
      // If active_socket is about to be destructed, emit logs if a connection is not created.
      if (active_socket->streamInfo() != nullptr) {
        emitLogs(*config_, *active_socket->streamInfo());
      } else {
        // If the active_socket is not connected, this socket is not promoted to active
        // connection. Thus the stream_info_ is owned by this active socket.
        ENVOY_BUG(active_socket->streamInfo() != nullptr,
                  "the unconnected active socket must have stream info.");
      }
    }
  }
}

 

위 코드는 onSocketAccepted 메소드 내용입니다. 코드를 살펴보면, Socket이 Accepted 되면 가장 먼저 해당 소켓에 해당되는 ListenerFilterChain을 생성하고 FilterChain을 수행하는 것을 볼 수 있습니다. 즉 이 과정부터 해당 소켓은 각종 Filter들을 통과하면서 Upstream 연결이 이어지게됩니다.

 

해당 과정을 조금 더 자세히 살펴보겠습니다.

 

 

 

 

이전 내용을 토대로 ActiveTcpListener 에서 ActiveTcpSocket을 만든 것을 확인했습니다. 그리고 createListenerFactory 메소드를 호출하면서 생성한 ActiveTcpSocket을 전달했습니다.

 

그러면 내부적으로는 Listener Config에 이미 저장되어있는 listener_filter_factories 내부에 매핑된 Factory Callback을 하나씩 실행시키면서 ActiveTcpSocket의 accept_filters에 Filter를 하나씩 생성하는 과정을 거칩니다. 해당 과정을 코드로 살펴보면 다음과 같습니다.

 

 

listener_impl.cc

bool ListenerImpl::createListenerFilterChain(Network::ListenerFilterManager& manager) {
  if (Configuration::FilterChainUtility::buildFilterChain(manager, listener_filter_factories_)) {
    return true;
  } else {
    ENVOY_LOG(debug, "New connection accepted while missing configuration. "
                     "Close socket and stop the iteration onAccept.");
    missing_listener_config_stats_.extension_config_missing_.inc();
    return false;
  }
}

 

위 코드는 ActiveTcpListener 에서 ActiveTcpSocket을 생성 후 ListenerFilterChain을 생성하기 위해 createListenerFilterChain 메소드를 호출하였을 때 과정을 나타냅니다. 코드를 살펴보면, buildFilterChain 함수 호출을 통해 자신이 보유하고 있는 listener_filter_factories_ 목록을 전달하는 것을 볼 수 있습니다.

 

configuration_impl.cc

bool FilterChainUtility::buildFilterChain(Network::ListenerFilterManager& filter_manager,
                                          const Filter::ListenerFilterFactoriesList& factories) {
  for (const auto& filter_config_provider : factories) {
    auto config = filter_config_provider->config();
    if (!config.has_value()) {
      return false;
    }
    auto config_value = config.value();
    config_value(filter_manager);
  }

  return true;
}

 

buildFilterChain 내부를 살펴보면, 전달받은 factories를 순회하면서 callback 함수를 순차적으로 수행시키는 것을 볼 수 있습니다. 그리고 해당 callback을 수행할 때 전달받은 ActiveTcpSocket 정보를 다시 넘기는 것을 확인할 수 있습니다.

 

config.cc

[listener_filter_matcher, config](Network::ListenerFilterManager& filter_manager) -> void {
  filter_manager.addAcceptFilter(listener_filter_matcher, std::make_unique<Filter>(config));
};

 

위 코드는 http instpector Listener Filter의 Factory 코드이며, 개별 Listener Filter Factory의 리턴 값은 위와 같이 FilterManager 즉 ActiveTcpSocket 정보로 받아서 addAcceptFilter 메소드를 호출하고 있는 것을 볼 수 있습니다. 또한 인자를 통해서 Factory에서 보유하고 있는 Filter 정보를 새롭게 생성하여 전달하는 것을 볼 수 있습니다.

 

 

active_tcp_socket.h

void addAcceptFilter(const Network::ListenerFilterMatcherSharedPtr& listener_filter_matcher,
                     Network::ListenerFilterPtr&& filter) override {
  accept_filters_.emplace_back(
      std::make_unique<GenericListenerFilter>(listener_filter_matcher, std::move(filter)));
}

 

그리고 ActiveTcpSocket에서는 전달받은 Filter를 accept_filters에 추가함으로써 Listener Filter Chain을 완성합니다.

 

 

active_stream_listener_base.h

void onSocketAccepted(std::unique_ptr<ActiveTcpSocket> active_socket) {  
  ...(중략)...
    active_socket->startFilterChain();
  ...(후략)...
}

 

FilterChain이 완성되면, startFilterChain()을 통해 ListenerFilterChain을 차례로 수행하도록 요청합니다.

 

active_tcp_socket.h

void startFilterChain() { continueFilterChain(true); }

 

startFilterChain()은 다시 내부에 continueFilterChain 메소드에 처리를 위임합니다.

 

 

active_tcp_socket.cc

void ActiveTcpSocket::continueFilterChain(bool success) {
  if (success) {
    bool no_error = true;
    if (iter_ == accept_filters_.end()) {
      iter_ = accept_filters_.begin();
    } else {
      iter_ = std::next(iter_);
    }

    for (; iter_ != accept_filters_.end(); iter_++) {
      Network::FilterStatus status = (*iter_)->onAccept(*this);
      if (status == Network::FilterStatus::StopIteration) {        
        if (!socket().ioHandle().isOpen()) {          
          no_error = false;
          break;
        } else {
          // If the listener maxReadBytes() is 0, then it shouldn't return
          // `FilterStatus::StopIteration` from `onAccept` to wait for more data.
          ASSERT((*iter_)->maxReadBytes() != 0);
          if (listener_filter_buffer_ == nullptr) {
            if ((*iter_)->maxReadBytes() > 0) {
              createListenerFilterBuffer();
            }
          } else {
            // If the current filter expect more data than previous filters, then
            // increase the filter buffer's capacity.
            if (listener_filter_buffer_->capacity() < (*iter_)->maxReadBytes()) {
              listener_filter_buffer_->resetCapacity((*iter_)->maxReadBytes());
            }
          }
          if (listener_filter_buffer_ != nullptr) {
            listener_filter_buffer_->activateFileEvent(Event::FileReadyType::Read);
          }
          // Waiting for more data.
          return;
        }
      }
    }
    // Successfully ran all the accept filters.
    if (no_error) {
      newConnection();
    } else {
      // Signal the caller that no extra filter chain iteration is needed.
      iter_ = accept_filters_.end();
    }
  }

  // Filter execution concluded, unlink and delete this ActiveTcpSocket if it was linked.
  if (inserted()) {
    unlink();
  }
}

 

continueFilterChain 코드 내용을 자세히 살펴보면, 생성된 Listener Filters(accept_filters)를 순회하면서 onAccept를 통해 Filter로직을 수행하는 것을 볼 수 있습니다.

 

 

만약 Filter onAccept 순회 도중에 Stop을 해야될 경우가 존재한다면, 이유를 살펴보고 요청을 중지하던지 아니면 설정 변경 후 재수행을 수행하도록 작성되어있습니다.

 

그리고 모든 Filter 순회가 종료되면, 사용자 요청에 대한 Metadata 및 요청 정보를 모두 분석하였으므로 그 다음에는 newConnection() 메소드를 호출하여 본격적으로 downstream간의 연결을 위한 작업을 진행합니다.

 

active_tcp_socket.cc

void ActiveTcpSocket::newConnection() {
  connected_ = true;

  // Check if the socket may need to be redirected to another listener.
  Network::BalancedConnectionHandlerOptRef new_listener;

  if (hand_off_restored_destination_connections_ &&
      socket_->connectionInfoProvider().localAddressRestored()) {
    // Find a listener associated with the original destination address.
    new_listener =
        listener_.getBalancedHandlerByAddress(*socket_->connectionInfoProvider().localAddress());
  }

  // Reset the file events which are registered by listener filter.
  // reference https://github.com/envoyproxy/envoy/issues/8925.
  if (listener_filter_buffer_ != nullptr) {
    listener_filter_buffer_->reset();
  }

  if (new_listener.has_value()) {
    ...(중략)...
    new_listener.value().get().onAcceptWorker(std::move(socket_), false, false);
  } else {
    // Set default transport protocol if none of the listener filters did it.
    if (socket_->detectedTransportProtocol().empty()) {
      socket_->setDetectedTransportProtocol("raw_buffer");
    }
    accept_filters_.clear();
    // Create a new connection on this listener.
    listener_.newConnection(std::move(socket_), std::move(stream_info_));
  }
}

 

newConnection 메소드를 호출하면, 다른 Listener로 Redirect 해야할 필요가 있는지를 살펴보고 listener에 새로운 Connection 할당을 요청합니다.

 

active_stream_listener_base.cc

void ActiveStreamListenerBase::newConnection(Network::ConnectionSocketPtr&& socket,
                                             std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
  // Find matching filter chain.
  const auto filter_chain = config_->filterChainManager().findFilterChain(*socket);
  if (filter_chain == nullptr) {
    ...(중략)...
    socket->close();
    return;
  }
 ...(후략)...
}

 

 

newConnection 메소드를 살펴보면, 먼저 Listener의 FilterChainManager로부터 해당 소켓 정보에 해당하는 Filter Chain을 찾는 과정을 수행합니다. 그리고 이 과정에서 매칭되는 Filter Chain을 찾지 못한다면, 연결된 Socket을 종료하고 마칩니다.

 

filter_chain_manager_impl.cc

const Network::FilterChain*
FilterChainManagerImpl::findFilterChain(const Network::ConnectionSocket& socket) const {
  if (matcher_) {
    return findFilterChainUsingMatcher(socket);
  }

  const auto& address = socket.connectionInfoProvider().localAddress();

  const Network::FilterChain* best_match_filter_chain = nullptr;
  // Match on destination port (only for IP addresses).
  if (address->type() == Network::Address::Type::Ip) {
    const auto port_match = destination_ports_map_.find(address->ip()->port());
    if (port_match != destination_ports_map_.end()) {
      best_match_filter_chain = findFilterChainForDestinationIP(*port_match->second.second, socket);
      if (best_match_filter_chain != nullptr) {
        return best_match_filter_chain;
      } else {
        // There is entry for specific port but none of the filter chain matches. Instead of
        // matching catch-all port 0, the fallback filter chain is returned.
        return default_filter_chain_.get();
      }
    }
  }
  // Match on catch-all port 0 if there is no specific port sub tree.
  const auto port_match = destination_ports_map_.find(0);
  if (port_match != destination_ports_map_.end()) {
    best_match_filter_chain = findFilterChainForDestinationIP(*port_match->second.second, socket);
  }
  return best_match_filter_chain != nullptr
             ? best_match_filter_chain
             // Neither exact port nor catch-all port matches. Use fallback filter chain.
             : default_filter_chain_.get();
}

 

위 코드는 Filter Chain Manager로 부터 Filter Chain을 찾는 과정을 보여줍니다.

 

Filter Chain Manager는 Listener 기동 시점에 설정 정보를 파싱하여 Filter Chain 정보를 모두 가지고 있습니다. 따라서 Socket의 Address 정보를 토대로 원하는 Filter Chain을 찾아서 반환해줍니다. 만약에 상응하는 Filter Chain 정보를 찾지 못한 경우에는 Default Filter Chain을 반환합니다.

 

 

active_stream_listener_base.cc

void ActiveStreamListenerBase::newConnection(Network::ConnectionSocketPtr&& socket,
                                             std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
  ...(중략)...
  stream_info->setFilterChainName(filter_chain->name());
  auto transport_socket = filter_chain->transportSocketFactory().createDownstreamTransportSocket();
  auto server_conn_ptr = dispatcher().createServerConnection(
      std::move(socket), std::move(transport_socket), *stream_info);
  if (const auto timeout = filter_chain->transportSocketConnectTimeout();
      timeout != std::chrono::milliseconds::zero()) {
    server_conn_ptr->setTransportSocketConnectTimeout(
        timeout, stats_.downstream_cx_transport_socket_connect_timeout_);
  }
  server_conn_ptr->setBufferLimits(config_->perConnectionBufferLimitBytes());
  ...(중략)...
  const bool empty_filter_chain = !config_->filterChainFactory().createNetworkFilterChain(
      *server_conn_ptr, filter_chain->networkFilterFactories());
  if (empty_filter_chain) {
    ...(중략),,,
    server_conn_ptr->close(Network::ConnectionCloseType::NoFlush);
  }
  newActiveConnection(*filter_chain, std::move(server_conn_ptr), std::move(stream_info));
}

 

매칭되는 Filter Chain을 찾았으면, 위 코드와 같은 과정을 거칩니다. 주요 내용을 살펴보면 다음과 같습니다.

 

1. Downstream을 위한 Transport Socket을 생성합니다.

2. dispatcher에게 ServerConnection을 요청합니다. 

 

 

dispatcher_impl.cc

Network::ServerConnectionPtr
DispatcherImpl::createServerConnection(Network::ConnectionSocketPtr&& socket,
                                       Network::TransportSocketPtr&& transport_socket,
                                       StreamInfo::StreamInfo& stream_info) {
  ASSERT(isThreadSafe());
  return std::make_unique<Network::ServerConnectionImpl>(
      *this, std::move(socket), std::move(transport_socket), stream_info, true);
}

 

이때 내부적으로 dispatcher에서는 ServerConnectionImpl을 생성하는 것을 볼 수 있습니다.

 

connection_impl.cc

ServerConnectionImpl::ServerConnectionImpl(Event::Dispatcher& dispatcher,
                                           ConnectionSocketPtr&& socket,
                                           TransportSocketPtr&& transport_socket,
                                           StreamInfo::StreamInfo& stream_info, bool connected)
    : ConnectionImpl(dispatcher, std::move(socket), std::move(transport_socket), stream_info,
                     connected) {}

 

생성되는 ServerConnectionImpl의 생성자는 위와 같으며, 부호 생성자에게 전달받은 인자들을 넘깁니다.

 

connection_impl.cc

ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
                               TransportSocketPtr&& transport_socket,
                               StreamInfo::StreamInfo& stream_info, bool connected)
    : ConnectionImplBase(dispatcher, next_global_id_++),
      transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
      stream_info_(stream_info), filter_manager_(*this, *socket_),
      write_buffer_(dispatcher.getWatermarkFactory().createBuffer(
          [this]() -> void { this->onWriteBufferLowWatermark(); },
          [this]() -> void { this->onWriteBufferHighWatermark(); },
          []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
      read_buffer_(dispatcher.getWatermarkFactory().createBuffer(
          [this]() -> void { this->onReadBufferLowWatermark(); },
          [this]() -> void { this->onReadBufferHighWatermark(); },
          []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
      write_buffer_above_high_watermark_(false), detect_early_close_(true),
      enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
      write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
      transport_wants_read_(false) {

  if (!connected) {
    connecting_ = true;
  }

  Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType;

  // We never ask for both early close and read at the same time. If we are reading, we want to
  // consume all available data.
  socket_->ioHandle().initializeFileEvent(
      dispatcher_, [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
      Event::FileReadyType::Read | Event::FileReadyType::Write);

  transport_socket_->setTransportSocketCallbacks(*this);

  // TODO(soulxu): generate the connection id inside the addressProvider directly,
  // then we don't need a setter or any of the optional stuff.
  socket_->connectionInfoProvider().setConnectionID(id());
  socket_->connectionInfoProvider().setSslConnection(transport_socket_->ssl());
}

 

 

ConnectionImpl 생성자 내부를 살펴보면, Socket 내부에 Write/Read 전용 Buffer를 설정하는 것을 볼 수 있습니다. 해당 Buffer는 사용자가 HTTP 요청을 전달했을 때 데이터를 읽는 용도 그리고 upstream으로부터 응답 데이터가 전달되었을 때 기록하는 용도로 사용됩니다.

 

또한 downstream을 위한 transport_socket 생성 및 Filter Chains 관리를 위한 filter_manager를 생성합니다.

 

중요하게 살펴볼 부분은 향후 Socket 내부에 Read/Write 이벤트가 감지되면 이를 통지받고 후속 작업을 처리하기 위해 Dispatcher에게 onFileEvent()를 등록하는 부분입니다. 이는 내부적으로 다시 libevent에게 등록이 되며, 향후 Client가 HTTP 요청을 전달하게 되면, 해당 메소드로 요청 항목이 전달되어 후속 작업을 수행할 수 있습니다.

 

 

3. Filter Chain으로부터 생성해야할 Filter 목록을 확인하여 Filter Chains(Network Filters)를 생성합니다.

 

 

ServerConnection을 생성하고 나면, 그 다음에는 Filter Chains를 생성하는 작업을 수행합니다. 이를 위해 Listener 에게 FilterChain 생성을 요청합니다. 이때 생성되는 Filter Chains는 ServerConnection 내부에 매핑되어야하기 때문에 해당 정보를 Filter Chains와 같이 전달합니다.

 

listener_impl.cc

bool ListenerImpl::createNetworkFilterChain(
    Network::Connection& connection,
    const std::vector<Network::FilterFactoryCb>& filter_factories) {
  return Configuration::FilterChainUtility::buildFilterChain(connection, filter_factories);
}

 

요청을 전달받은 Listener는 Filter Chains 생성 처리를 buildFilterChain util 함수에 위임합니다.

 

configuration_impl.cc

bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager,
                                          const std::vector<Network::FilterFactoryCb>& factories) {
  for (const Network::FilterFactoryCb& factory : factories) {
    factory(filter_manager);
  }

  return filter_manager.initializeReadFilters();
}

 

buildFilterChains는 전달받은 Filter를 순차적으로 순회하면서 ServerConnection 내부에 생성하도록 Callback 팩토리 메소드를 수행합니다.

 

config.cc

return [singletons, filter_config, &context,
        clear_hop_by_hop_headers](Network::FilterManager& filter_manager) -> void {
  auto hcm = std::make_shared<Http::ConnectionManagerImpl>(
      *filter_config, context.drainDecision(), context.api().randomGenerator(),
      context.httpContext(), context.runtime(), context.localInfo(), context.clusterManager(),
      context.overloadManager(), context.mainThreadDispatcher().timeSource());
  if (!clear_hop_by_hop_headers) {
    hcm->setClearHopByHopResponseHeaders(false);
  }
  filter_manager.addReadFilter(std::move(hcm));
};

 

가령 위 코드는 Http Connection Manager Filter에서 반환하는 Callback Factory 메소드가 수행되었다고 가정해봤습니다.

 

 

이때 인자로 ServerConnection을 전달받았으며, 해당 람다내에서는 HttpConnectionManager를 생성한 다음 ServerConnection 내부에 addReadFilter를 호출하여 Filter 정보를 등록하는 것을 볼 수 있습니다.

 

connection_impl.cc

void ConnectionImpl::addReadFilter(ReadFilterSharedPtr filter) {
  filter_manager_.addReadFilter(filter);
}

 

이 경우 호출되는 ServerConnection 내부에서는 filter_manager를 통해 readFilter를 등록하도록 재요청합니다. 

 

filter_manager_impl.cc

void FilterManagerImpl::addReadFilter(ReadFilterSharedPtr filter) {
  ASSERT(connection_.state() == Connection::State::Open);
  ActiveReadFilterPtr new_filter = std::make_unique<ActiveReadFilter>(*this, filter);
  filter->initializeReadFilterCallbacks(*new_filter);
  LinkedList::moveIntoListBack(std::move(new_filter), upstream_filters_);
}

 

filter_manager에서 addReadFilter가 호출되면, 생성된 Filter에 ActiveReadFilter 정보를 전달하여 해당 Filter 내에서 향후 Callback할 수 있도록 initializeReadFilterCallbacks를 등록합니다. 그리고 자신의 upstream_filters_에 Read Filter를 등록하는 것을 볼 수 있습니다.

 

conn_manager_impl.cc

void ConnectionManagerImpl::initializeReadFilterCallbacks(Network::ReadFilterCallbacks& callbacks) {
  read_callbacks_ = &callbacks;
  stats_.named_.downstream_cx_total_.inc();
  stats_.named_.downstream_cx_active_.inc();
  if (read_callbacks_->connection().ssl()) {
    stats_.named_.downstream_cx_ssl_total_.inc();
    stats_.named_.downstream_cx_ssl_active_.inc();
  }

  read_callbacks_->connection().addConnectionCallbacks(*this);

  if (!read_callbacks_->connection()
           .streamInfo()
           .filterState()
           ->hasData<Network::ProxyProtocolFilterState>(Network::ProxyProtocolFilterState::key())) {
    read_callbacks_->connection().streamInfo().filterState()->setData(
        Network::ProxyProtocolFilterState::key(),
        std::make_unique<Network::ProxyProtocolFilterState>(Network::ProxyProtocolData{
            read_callbacks_->connection().connectionInfoProvider().remoteAddress(),
            read_callbacks_->connection().connectionInfoProvider().localAddress()}),
        StreamInfo::FilterState::StateType::ReadOnly,
        StreamInfo::FilterState::LifeSpan::Connection);
  }

  if (config_.idleTimeout()) {
    connection_idle_timer_ = read_callbacks_->connection().dispatcher().createScaledTimer(
        Event::ScaledTimerType::HttpDownstreamIdleConnectionTimeout,
        [this]() -> void { onIdleTimeout(); });
    connection_idle_timer_->enableTimer(config_.idleTimeout().value());
  }

  if (config_.maxConnectionDuration()) {
    connection_duration_timer_ = read_callbacks_->connection().dispatcher().createTimer(
        [this]() -> void { onConnectionDurationTimeout(); });
    connection_duration_timer_->enableTimer(config_.maxConnectionDuration().value());
  }

  read_callbacks_->connection().setDelayedCloseTimeout(config_.delayedCloseTimeout());

  read_callbacks_->connection().setConnectionStats(
      {stats_.named_.downstream_cx_rx_bytes_total_, stats_.named_.downstream_cx_rx_bytes_buffered_,
       stats_.named_.downstream_cx_tx_bytes_total_, stats_.named_.downstream_cx_tx_bytes_buffered_,
       nullptr, &stats_.named_.downstream_cx_delayed_close_timeout_});
}

 

등록된 Filter가 Http Connection Manager Filter임을 가정하였으므로, initializeReadFilter를 호출하면, 위 코드 구문이 호출될 것입니다. 해당 코드를 개략적으로 살펴보면, Callback을 자신의 프로퍼티에 등록하는 작업 외에 metric 정보를 갱신하는 것을 볼 수 있습니다. 또한 Timeout이 발생하면 이를 처리하기 위해 dispatcher에게 다양한 Timer 생성 및 Timeout 발생 시 처리 하도록 등록하는 등 Read Filter 동작과 관련된 기본 초기화 작업을 수행합니다.

 

Http Connection Manager의 경우는 Read Filter의 역할만을 수행하기 때문에 filter_manager 내부에 upstream_filters에만 filter가 추가됩니다. 하지만 FilterChain Factory 내부에는 Read Filter 뿐만 아니라 Writer Filter의 역할을 수행하는 것도 있고 Read/Write를 모두 수행하는 Filter가 존재합니다.

 

filter_manager_impl.cc

void FilterManagerImpl::addWriteFilter(WriteFilterSharedPtr filter) {
  ASSERT(connection_.state() == Connection::State::Open);
  ActiveWriteFilterPtr new_filter = std::make_unique<ActiveWriteFilter>(*this, filter);
  filter->initializeWriteFilterCallbacks(*new_filter);
  LinkedList::moveIntoList(std::move(new_filter), downstream_filters_);
}

void FilterManagerImpl::addFilter(FilterSharedPtr filter) {
  addReadFilter(filter);
  addWriteFilter(filter);
}

 

이 경우에는 람다내에서 ServerConnectionImpl에 Filter 생성을 요청할 때 addWriteFilter 혹은 addFilter 메소드를 호출합니다. 가령 addWriterFilter의 경우는 Writer Filter만의 역할을 수행하기 때문에 궁극적으로는 filter_manager의 downstream_filters_ 에 추가되면서 Read Filter 등록과 마찬가지로 ActiveWriterFilter 인스턴스를 만들어서 향후 Callback할 수 있도록 initializeWriteFilterCallbacks를 등록하는 것을 볼 수 있습니다.

 

반면 addFilter의 경우는 Read/Write를 모두 수행할 때 호출되기 때문에 addReadFilter와 addWriteFilter를 차례대로 호출합니다.

 

configuration_impl.cc

bool FilterChainUtility::buildFilterChain(Network::FilterManager& filter_manager,
                                          const std::vector<Network::FilterFactoryCb>& factories) {
  for (const Network::FilterFactoryCb& factory : factories) {
    factory(filter_manager);
  }

  return filter_manager.initializeReadFilters();
}

 

Filter Factories로 부터 Filter 생성이 모두 완료되면, ServerConnectionImpl 내부 filter_manager에는 upstream 전용 filter와 downstream 전용 filter가 모두 생성되어있습니다. 그 과정이 완료되면 ServerConnectionImpl 내부의 initializeReadFilters()를 호출 합니다.

 

connection_impl.cc

bool ConnectionImpl::initializeReadFilters() { return filter_manager_.initializeReadFilters(); }

 

그리고 해당 요청을 전달받은 ServerConnectionImpl 내부에서는 다시 filter_manager에게 해당 요청 처리를 위임합니다.

 

filter_manager_impl.cc

bool FilterManagerImpl::initializeReadFilters() {
  if (upstream_filters_.empty()) {
    return false;
  }
  onContinueReading(nullptr, connection_);
  return true;
}

void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter,
                                          ReadBufferSource& buffer_source) {
  // Filter could return status == FilterStatus::StopIteration immediately, close the connection and
  // use callback to call this function.
  if (connection_.state() != Connection::State::Open) {
    return;
  }

  std::list<ActiveReadFilterPtr>::iterator entry;
  if (!filter) {
    connection_.streamInfo().addBytesReceived(buffer_source.getReadBuffer().buffer.length());
    entry = upstream_filters_.begin();
  } else {
    entry = std::next(filter->entry());
  }

  for (; entry != upstream_filters_.end(); entry++) {
    if (!(*entry)->filter_) {
      continue;
    }
    if (!(*entry)->initialized_) {
      (*entry)->initialized_ = true;
      FilterStatus status = (*entry)->filter_->onNewConnection();
      if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
        return;
      }
    }

    StreamBuffer read_buffer = buffer_source.getReadBuffer();
    if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
      FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
      if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
        return;
      }
    }
  }
}

 

해당 처리 과정을 살펴보면, 초기에 upstream_filters_에 등록할 때 ActiveReadFilter 인스턴스를 새로 생성해서 등록했는데 초기 initialized_ 값은 기본적으로 false 입니다. 따라서 위 코드에서는 등록된 upstream_filters_ 를 순회하면서 가장 먼저 onNewConnection() 작업을 수행할 것입니다.

 

여기에는 등록된 Filter가 Http Connection Manager 하나만 존재한다고 가정하기 때문에, 해당 Filter의 onNewConnection()이 호출됩니다.

 

connection_manager_impl.cc

Network::FilterStatus ConnectionManagerImpl::onNewConnection() {
  if (!read_callbacks_->connection().streamInfo().protocol()) {
    // For Non-QUIC traffic, continue passing data to filters.
    return Network::FilterStatus::Continue;
  }
  // Only QUIC connection's stream_info_ specifies protocol.
  Buffer::OwnedImpl dummy;
  createCodec(dummy);
  ASSERT(codec_->protocol() == Protocol::Http3);
  // Stop iterating through each filters for QUIC. Currently a QUIC connection
  // only supports one filter, HCM, and bypasses the onData() interface. Because
  // QUICHE already handles de-multiplexing.
  return Network::FilterStatus::StopIteration;
}

 

해당 코드를 잠시 살펴보면, QUIC 트래픽인지 확인하고 일반 HTTP 프로토콜이라면 해당 Filter 사용이 가능하도록 구현되어있음을 확인할 수 있습니다.

 

여기까지 수행하면 Filter Chains를 생성하고 Filter의 초기화까지 수행하는 전 과정을 살펴볼 수 있습니다.

 

active_stream_listener_base.cc

void ActiveStreamListenerBase::newConnection(Network::ConnectionSocketPtr&& socket,
                                             std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
  ...(중략)...
  const bool empty_filter_chain = !config_->filterChainFactory().createNetworkFilterChain(
      *server_conn_ptr, filter_chain->networkFilterFactories());
  if (empty_filter_chain) {
    ...(중략),,,
    server_conn_ptr->close(Network::ConnectionCloseType::NoFlush);
  }
  newActiveConnection(*filter_chain, std::move(server_conn_ptr), std::move(stream_info));
}

 

앞서 살펴본 긴 과정의 Filter Chains를 생성하고나면, Filter Chains가 존재하는지 여부를 bool 값으로 반환합니다. 만약에 Filter Chains 생성 과정에서 생성된 FilterChains가 전혀 존재하지 않는다면, Stream을 연결하여 작업을 이어나가는 것이 무의미하기 때문에 해당 Server Connection을 종료합니다. 그렇지 않을 경우 생성된 Filter Chains를 기반으로 ActiveConnection을 생성하는 과정을 진행합니다.

 

active_tcp_listener.cc

void ActiveTcpListener::newActiveConnection(const Network::FilterChain& filter_chain,
                                            Network::ServerConnectionPtr server_conn_ptr,
                                            std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
  auto& active_connections = getOrCreateActiveConnections(filter_chain);
  auto active_connection =
      std::make_unique<ActiveTcpConnection>(active_connections, std::move(server_conn_ptr),
                                            dispatcher().timeSource(), std::move(stream_info));
  // If the connection is already closed, we can just let this connection immediately die.
  if (active_connection->connection_->state() != Network::Connection::State::Closed) {
    ENVOY_CONN_LOG(
        debug, "new connection from {}", *active_connection->connection_,
        active_connection->connection_->connectionInfoProvider().remoteAddress()->asString());
    active_connection->connection_->addConnectionCallbacks(*active_connection);
    LinkedList::moveIntoList(std::move(active_connection), active_connections.connections_);
  }
}

 

여기서 ActiveConnection이란 생성된 Filter Chains를 사용하는 Connection이 얼마나 있는지를 관리하기 위한 자료구조 입니다. 따라서 getOrCreateActiveConnection 메소드를 호출함으로써, 먼저 해당 Filter에 존재하는 ActiveConnection이 있는지를 살펴봅니다. 

 

active_stream_listener_base.cc

ActiveConnections& OwnedActiveStreamListenerBase::getOrCreateActiveConnections(
    const Network::FilterChain& filter_chain) {
  ActiveConnectionCollectionPtr& connections = connections_by_context_[&filter_chain];
  if (connections == nullptr) {
    connections = std::make_unique<ActiveConnections>(*this, filter_chain);
  }
  return *connections;
}

active_stream_listener_base.h

absl::flat_hash_map<const Network::FilterChain*, ActiveConnectionCollectionPtr>
    connections_by_context_;

 

active_stream_listener_base.h

class ActiveConnections : public Event::DeferredDeletable {
public:
  ActiveConnections(OwnedActiveStreamListenerBase& listener,
                    const Network::FilterChain& filter_chain);
  ~ActiveConnections() override;

  // listener filter chain pair is the owner of the connections
  OwnedActiveStreamListenerBase& listener_;
  const Network::FilterChain& filter_chain_;
  // Owned connections.
  std::list<std::unique_ptr<ActiveTcpConnection>> connections_;
};

 

 

이때 ActiveConnections를 가져오는 메소드는 위 코드와 같으며, 내부적으로는 FilterChain 별로 ActiveConnection 목록을 관리하는 hash map을 통해서 참조하는 것을 확인할 수 있습니다.

 

ActiveConnections를 가져오면 신규로 생성하는 ActiveConnection을 만들고 ActiveConnections에 추가하는 것으로 Client의 접속 요청 이후 Connection 할당 과정은 마무리됩니다.

 

 


4. Client Connection 연결 과정 정리

 

이전에 Client Connection 연결 과정을 코드를 통해서 어떻게 구성되는지 집중적으로 살펴봤습니다. 다만 지엽적인 내용까지 살펴보느라 전체적인 흐름을 이해하기 쉽지 않았을 수도 있습니다. 따라서 이번에는 큰 그림에서 컴포넌트간 메시지 교환을 중점으로 Client Connection이 어떻게 연결되는지 개략적으로 다시 살펴보도록 하겠습니다. 

 

 

 

1. Libevent 로 부터 Listener 가 Listen 하고 있는 소켓으로 사용자의 Connection 연결 요청이 접수되었을 때, 내부적으로 존재하는 Worker 쓰레드 중 하나를 선정하여 해당 Listener의 onSocketEvent 메소드를 호출합니다. 참고로 위 예시에서는 Listener 0에 요청이 전달되었을 때 Worker 0번 쓰레드가 담당한다고 가정하여 ActiveTcpListener 0이 수신받았습니다.

 

2. ActiveTcpListener에서는 전체 연결된 Connection 개수가 Global Limit을 넘었는지 확인하고 내부적으로 AcceptedSocket을 거쳐 ActiveTcpSocket을 생성하여 Connection 연결을 Accept합니다.

 

3. 생성된 Socket에서 Listener Filter Chain을 생성하기 위해 Listener Config에게 Listener Filter Chain 정보를 요청합니다.

 

4. Listener Config는 Envoy 기동 과정에서 Parsing되었거나 LDS로 부터 갱신된 Listener Filter Chains 정보를 기반으로 Chains 내부에 존재하는 Factory Callback 메소드를 순회하면서 ActiveTcpSocket 내부에 있는 accept_filters에 Listener Filter를 생성하여 바인딩합니다.

 

5. ActiveTcpSocket은 Listener Filters를 순회하면서 onAccept를 수행합니다. 

 

6. Filter Chains(Network Filters)를 생성하기 위해 ActiveTcpListener는 Listener Config 로부터 보유하고 있는 Filter Chains 정보를 요청합니다.

 

7. Listener Config 내부에 존재하는 filter_chain_manager에는 Trie, HashMap 등을 비롯하여 Filter Chain 정보를 빠르게 찾기 위한 다양한 자료구조가 존재하는데, 이를 활용하여 사용자 요청한 Filter Chain 정보를 제공합니다.

 

8. ActiveTcpListener는 ServerConnetionImpl을 생성하기 위해 Dispatcher에 요청합니다.

 

9. Dispatcher는 ServerConnectionImpl을 생성합니다.

 

10. 생성된 ServerConnectionImpl은 사용자가 접속 연결 이후 실질적으로 HTTP 요청 시, 이벤트를 수신 받아야되기 때문에 Dispatcher에게 Socket 이벤트를 전달하도록 등록 요청합니다.

 

11. Didpatcher는 Libevent에게 ServerConnectionImpl이 요청한 정보를 등록합니다. 이후 해당 Socket에 이벤트가 감지되면 ServerConnectionImpl이 등록한 onFileEvent 메소드를 호출합니다.

 

12. ActiveTcpListener는 Filter Chains를 생성하기 위해 Listner Config에게 처리를 요청합니다.

 

13. Listener Config는 Util 함수를 활용하여 Filter Chains를 생성합니다. 이때 Filter의 특성에 따라 Read Filter인 경우에는 ServerConnectionImpl 내에 있는 filter_manager의 upstream_filters_에 매핑되고 Write Filter라면 downstream_filters_에 매핑됩니다. 만약 Read/Write 둘 다 처리하는 경우에는 두 군데 모두 입력합니다.

 

14. 생성된 Filter Chain에 연결된 Server Connection 정보를 관리하기 위해 ActiveTcpSocket 내부에 있는 connections_by_contexts_에 신규로 생성된 ActiveConnection 정보를 등록합니다.

 


5. 마무리

 

이번 포스팅에서는 사용자가 HTTP 요청을 통해 envoy의 기능을 이용하고자 할 때 내부적으로 Listener 구성이 어떻게 되어있는지 그리고 Client Connection이 어떻게 할당되는지에 대해서 집중적으로 알아봤습니다.

 

envoy를 처음 학습하면, 이러한 과정이 마법처럼 느껴지는데 코드를 통해서 살펴보니 동작 과정에 대해서 어느정도 이해할 수 있었던 것 같습니다. 다음 포스팅에서는 연결이 완료된 이후 실제 HTTP 요청이 전달될 때 처리 과정에 대해서 살펴보겠습니다.

1. 서론

 

istio를 처음 사용하면, 이전과 달리 설치가 및 설정 방법이 편해져서 사용하기 쉽습니다. 그럼에도 불구하고 istio를 운영하는 것은 여전히 어려운 일입니다. 그 이유는 istio에 문제가 생겼을 때 이를 해결하기 위해서는 istio 내부 아키텍처에 대한 이해가 필수적이기 때문입니다.

 

istio는 엄밀히 말해서 envoy proxy를 관리하는 컨트롤러라고 볼 수 있습니다. 따라서 istio 뿐만 아니라 envoy proxy의 내부 구조에 대해서도 잘 알고 있어야합니다. 이번 포스팅부터 진행되는 envoy internals 시리즈는 envoy 내부 구조와 흐름에 대해서 살펴보면서, 트러블 슈팅을 위한 인사이트를 갖는 것을 목적으로 하고 있습니다. 다만 모든 내용을 다루지는 않으며, istio를 이해하는데 있어 필수적인 부분에 대해서만 살펴보겠습니다. 이번 포스팅은 envoy 관련 첫번째 포스팅으로 envoy 구조와 내부 컴포넌트 동작 원리에 대해서 알아보도록 하겠습니다.

 


 

2. Envoy 컴포넌트

 

 

Envoy는 Proxy 프로그램으로 사용자와 Service 중간에서 Proxy 역할을 수행합니다. 이때 Client로부터 전달받는 트래픽은 Downstream, 전달받은 요청을 Service로 전달하는 트래픽을 Upstream이라고 부릅니다. 즉 Envoy를 기준으로 상위 서비스 전달은 Upstream Envoy로 흘러 들어오는 스트림은 Downstream입니다.

 

그렇다면 Downstream을 통해 전달되는 트래픽이 Envoy의 어떤 과정을 거쳐서 Upstream으로 Service에 전달될까요?

먼저 Envoy의 주요 컴포넌트에 대해서 먼저 알아보겠습니다.

 

 

 

Envoy 내부에서 가장 핵심이 되는 컴포넌트는 위 그림과 같습니다. 도식화된 그림을 살펴보면 여러개 Listener 그리고 요청을 전달하기 위한 Route 과정 그리고 해당 Route 과정을 통해서 Cluster에 전달되고 Cluster는 Load Balancing 정책에 따라서 자신이 보유하고 있는 Endpoint 중 하나를 선정합니다. 결과적으로는 해당 Endpoint에 매칭되는 Service에 트래픽이 전달됩니다. 

 

갑자기 여러가지 컴포넌트가 등장했는데요. 지금부터 하나하나씩 살펴보겠습니다.

 


2-1 Endpoint

 

 

먼저 살펴볼 것은 Endpoint입니다. Endpoint는 위 그림과 같이 Proxy를 통해 연결해야하는 최종 목적지 주소와 Port 번호를 의미합니다. 위와 같이 address는 IP일 수도 있고 혹은 도메인 네임일 수도 있습니다. 그 밖에 health check를 위한 설정 및 Load Balancing을 수행할 때 가중치 와 우선순위 등을 설정할 수 있습니다. 자세한 내용은 Envoy 공식 문서를 통해서 확인하시기 바랍니다.

 


 

2-2 Cluster

 

 

Service는 가용성 혹은 성능 향상의 목적으로 여러 서버에 동일한 Service를 배포합니다. 따라서 단일 Endpoint를 통해 여러개 Service를 관리할 수 있는 논리적인 집합 단위가 필요합니다.

 

Envoy Proxy에서는 Cluster 컴포넌트를 통해 Endpoint들을 그룹핑하여 관리할 수 있습니다. 해당 컴포넌트를 통해서 Endpoint 중에서 어디로 트래픽을 보낼지 결정하는 Load Balancing을 결정할 수 있습니다.

 

  clusters:
  - name: some_service
    connect_timeout: 0.25s
    type: STATIC
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: some_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: 127.0.0.1
                port_value: 1234

 

위 설정은 Envoy proxy 공식 문서의 설정 예시입니다. Cluster 하위에 Endpoint 목록을 지정하도록 되어 있는 것을 확인할 수 있습니다. 달리 말하면 Cluster와 Endpoint 간에는 의존성이 존재함을 확인할 수 있습니다. 위 설정에는 Endpoint 지정외에도 다양한 설정을 지정할 수 있습니다. 가령 Circuit Breaker 설정 HTTP Connection과 DNS refresh와 Resolving 정책 등의 부가적인 옵션을 해당 컴포넌트를 통해서 설정할 수 있습니다. 자세한 옵션 설정은 Envoy 공식 문서를 통해서 확인하시기 바랍니다.

 


 

2-3 Listener

 

 

Listener는 Envoy Proxy가 어떤 address의 어떤 port로 접속하는 요청에 대해서 Proxy 처리를 수행할 것인지를 설정합니다. 가령 위와같이 지정했다면, 해당 Envoy Proxy가 위치한 서버로 접속하는 80 Port 요청에 대해서 Envoy Proxy가 트래픽을 전달받고 후속 작업을 처리하게됩니다. 즉 Listener는 Envoy Proxy로 흐름을 전달하는 문지기 역할이라고 볼 수 있습니다.

 

 

Listener를 통해 트래픽이 전달되었다면, 이를 Cluster에 전달하기 위해서는 여러 내부 과정을 거쳐야합니다. Listener에는 Listener Filters와 Filter Chains(Network Filters)를 지니고 있습니다. 따라서 실제 트래픽이 전달되었을 때 Listener 내부에 위치한 Filter들을 통과하면서 사용자 요청을 분석하고 어떤 Cluster로 전달해야할지 등을 결정합니다. 그렇다면 Listener Filter와 Filter Chains는 무엇이 다를까요?

 

Listener Filters는 Connection에 대한 Metadata를 조작하거나 추가하는 데 사용되며, 변경된 정보를 토대로 Filter Chains에 존재하는 무수한 Filter 중에서 해당 요청을 처리하는데 적합한 Filter를 선정하는데 사용됩니다. 즉 실제 사용자 요청을 처리하는 것은 FIlter Chains에 존재하는 Filter이지만 해당 Filter를 사용하기 위해서 사전에 보조적인 작업을 담당하는 것이 Listener FIlter라고 볼 수 있습니다.

 

참고로 Envoy에서 제공하는 Listener Filter 목록은 위와 같으며, 개별 Filter의 역할은 다음과 같습니다.

 

Filter 역할
HTTP Inspector Application에서 전달한 Traffic을 분석하여 해당 네트워크 요청이 HTTP인지 확인합니다. 또한 HTTP 요청이 맞다면, HTTP 1.1 요청인지 혹은 HTTP 2 요청인지를 분석합니다.

이를 토대로 추후 네트워크 요청에 적합한 Filter Chain을  찾는데 사용됩니다.
Original Destination IpTable에 의해서 redirect된 소켓의 원래 목적지 값 주소를 알기 위해 SO_ORIGINAL_DST 값을 읽는 역할을 수행합니다.  해당 값은 Envoy 처리 이후 Connection Local address로 설정하는데 사용됩니다.
Original Source Client가 Envoy의 주소로 Downstream 연결을 시도하면,  Envoy는 Upstream과 통신을 위해서는  Source IP를 Envoy의 주소로 변경이 필요합니다. 따라서 해당 과정을 통해 연결의 목적지 주소를 Source 주소로 복제하는데 사용합니다.
Proxy Protocol 해당 Listener Filter는 HA Proxy Protocol을 지원하기 위한 역할을 수행합니다.
TLS Inspector HTTP Inspector와 유사하게 해당 요청이 TLS 요청인지를 확인합니다. 이를 토대로 추후 네트워크 요청에 적합한 Filter Chain을 찾는데 사용됩니다.

 

Listener Filter를 통해서 Connection 요청 Metadata를 조작하거나 어떤 요청인지를 분석하고난 이후에는 Filter Chains(Network Filters)에 위치한 FIlter들을 통과하면서 사용자의 요청에 적합한 Filter를 찾아서 수행합니다.

 

 

기본적으로 제공하는 FIlter Chains 목록은 위와 같습니다. 해당 항목들은 L3/L4 Filter 기능을 담당하며, 사용자 요청에 적합한 Filter를 찾아서 처리하고 목적지로 전달하는데 사용합니다. 만약 위 Filters 중 사용자 요청을 처리할 수 없는 경우에는 Envoy에서 제공하는 Default Chain이 사용되며, 만약 Default Chain 설정을 하지 않았다면 해당 Connection은 종료됩니다. 참고로 envoy proxy 기동시 위와 같이 모든 Filter가 등록되는 것은 아니며 순서 또한 다를 수 있습니다. 위 Filter 목록 중 필요한 Filter만 선별적으로 등록 가능합니다. 이에 대해서는 추후 살펴보겠습니다.

 

개별 FIlter의 기능 설명은 공식 문서를 참고하기 바라며, 여기서는 HTTP connection manager에 대해 중점으로 다루어 보고자 합니다.

 


2-3-1 HTTP Connection manager

Envoy에서 HTTP를 담당하는 Network Level의 Filter로써 Filter Chain의 가장 마지막에 위치합니다. 해당 Filter의 주요 기능은 raw byte를 HTTP 메시지 convert를 담당하며, 내부적으로 HTTP L7 Filter들이 존재하여 부가적인 작업을 수행합니다. 따라서 HTTP 요청이 전달되었을 때, Listener Filters 이후 Filter Chain을 수행하는 과정에서 HTTP Connection manager가 요청을 처리할 경우 내부적으로 sub filters를 적용하여 부가작업을 수행합니다.

 

HTTP Connection manager가 담당하는 기능에 대해서 몇 가지 살펴보도록 하겠습니다.

 

1) HTTP header 조작

 - 여러가지 Security 이유로 인해 Envoy를 통해서 특정 header를 삭제하거나 값을 변경할 수 있습니다. 가령 use_remote_address 옵션을 true로 변경했을 경우 connection manager는 실제 전달되는 remote address를 x-forwared-for http header에 사용합니다. 그밖에 다양한 header에 대해서 조작이 가능합니다. 

 

2) Retry 설정

- HTTP 요청에 대해서 내부적으로 연결이 실패했을 때 얼만큼 Retry할 것인지를 설정할 수 있습니다.

 

3) Redirect

- 요청 서비스에서 Redirect 응답이 왔을 때 Proxy 내부에서 해당 3xx 응답을 기반으로 Redirect를 수행할 수 있습니다.

 

4) Timeout

- HTTP 요청에 대해서 응답이 지정 시간동안 없을 경우 요청을 취소할 수 있습니다.

 

위와 같은 기능 외 HTTP Connection manager에는 L7 Filters들이 있어서 해당 Filters를 통해 부가적인 작업을 수행할 수 있다고 말했습니다. 여기에는 다음과 같은 Filter들이 해당됩니다.

 

 

각 Filter에 대한 설명은 Envoy 공식 문서를 참고하시기 바라며, 참고로 위 Filter 중에 Router Filter의 경우는 사용자가 지정한 router 규칙에 일치하는 URL로 접근하였을 경우 지정된 Cluster로 Forwarding을 담당하는 Filter입니다.

 

그렇다면, 지금까지 설명을 바탕으로 HTTP 요청에 대한 Envoy Proxy 내부의 네트워크 흐름을 다시 한번 짚어보겠습니다.

 

1) Listener에 구성된 address와 port에 상응하는 요청이 들어오면, Listener 내부적으로 Listener Filters로 전달합니다.

2) Listener Filters를 순회하면서 Connection Metadata를 조작하고 이후 Network Filters로 전달합니다.

3) Filter Chains(Network Filters)를 순회하면서 사용자 요청을 처리하는데 적합한 Filter를 찾고 해당 Filter가 요청을 처리합니다.

(※ 위 예제에서는 HTTP 요청이 들어왔음을 가정했으므로 HTTP Connection Manager가 이를 담당합니다.)

4) HTTP Connection Manager 내부에 있는 Sub Filters를 순회합니다.

5) L7 필터 마지막에 위치한 Router Filter는 사용자의 Routing Path 요청과 적합한 Cluster를 찾고 해당 Cluster에게 트래픽을 Forwarding 하는 역할을 수행합니다. 

6) Cluster는 내부에 설정된 로드밸런싱 정책등을 고려하여 적합한 Endpoint를 선정하며, 해당 Endpoint에 매칭되는 Service로 트래픽이 전달됩니다.

 


 

2-4 Envoy 컴포넌트 등록 방법

 

 

지금까지 Envoy 내부에 존재하는 컴포넌트를 기반으로 Envoy 네트워크 흐름에 대해서 살펴봤습니다. 그렇다면 Cluster, Listener, Endpoint 등은 어떻게 등록할 수 있을까요?

 

Envoy에서는 2가지 방식으로 컴포넌트 정보를 등록하거나 수정할 수 있습니다. 첫 번째 방식은 Static 방식이고 두 번째 방식은 Dynamic 방식입니다. 

 


2-4-1 Static 등록 방식

 

Static 방식은 말 그대로 Envoy 기동 시점에 사용자가 지정한 Config 파일 정보를 기반으로 내부 컴포넌트를 등록하는 방식입니다. 따라서 Envoy가 이해할 수 있는 형태로 내부 컴포넌트 설정을 기술해야합니다. Envoy에서는 YAML 형태로 이를 지정할 수 있으며, 아래 예제를 통해서 간단히 살펴보겠습니다.

 

static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address: { address: 127.0.0.1, port_value: 10000 }
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          codec_type: AUTO
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match: { prefix: "/" }
                route: { cluster: some_service }
          http_filters:
          - name: envoy.filters.http.router
            typed_config:
              "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
  clusters:
  - name: some_service
    connect_timeout: 0.25s
    type: STATIC
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: some_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: 127.0.0.1
                port_value: 1234

 

해당 설정은 Envoy 공식 문서에 존재하는 예제입니다. 지금까지 살펴본 Envoy 컴포넌트에 대한 개념을 잘 숙지했다면, 위 예제의 큰 구조를 쉽게 파악할 수 있습니다. 

 

먼저 Listener를 보면 listener_0이라는 이름으로 1개의 Listener가 등록되어있고, 내부에는 filter_chains 항목을 통해 Network Filter인 HTTP Connection Manager를 등록하는 것을 확인할 수 있습니다. 또한 Routes 설정을 통해 /으로 들어오는 모든 prefix에 대해서 some_service라는 Cluster로 전달하도록 지정했음을 확인할 수 있습니다. 이를 토대로 Listener로 들어오는 모든 HTTP 설정에 대해서 some_service Cluster로 트래픽이 전달되도록 Router Filter가 지정될 것임을 알 수 있습니다.

 

Clusters 항목을 보면, 이전에 Route에서 지정한 some_service 이름으로 등록되어있음을 확인할 수 있고 내부적으로 endpoint를 등록하여 Cluster로 트래픽이 전달되었을 때 내부적으로 어떤 endpoint로 전달할 수 있는지를 알 수 있습니다. 추가적으로 ROUND_ROBIN 정책을 적용하여 트래픽 부하를 고르게 분산하도록 지정되어있음을 확인할 수 있습니다.

 


 

2-4-2 Dynamic 등록 방식

 

Static 등록 방식은 Config 파일에 직접 기술하여 해당 정보를 토대로 envoy proxy를 구성하는 방법입니다. 하지만 Listener, Endpoint와 Cluster 정보가 수시로 바뀌는 상황에서는 Static 등록 방식을 사용하기에 적합하지 않습니다. 그 이유는 변경할 때마다 해당 Config 파일 수정이 필요하며, envoy proxy 또한 reload 해야하기 때문입니다.

 

따라서 envoy proxy에서는 Static 등록 방식 이외에 Dynamic 방식을 제공하여 envoy proxy 기동 중에도 내부 컴포넌트 설정을 변경할 수 있도록 인터페이스를 제공하였습니다. 이를 xDS API라고 부르며, FIle 동기화, REST 혹은 gRPC  방식이 존재합니다. 참고로 아직 살펴보지는 않았지만 istio에서는 gRPC 통신을 이용하여 envoy proxy의 정보를 동적으로 변경합니다.

 

그렇다면 xDS는 어떠한 종류가 있으며, 어떤 컴포넌트와 매칭되어 변경을 수행할 수 있을까요? 이에 대해서 간단히 알아보도록 하겠습니다.

 

 

 

이전에 살펴봤던 주요 컴포넌트 Listener, Cluster, Endpoint를 갱신할 수 있는 Discovery Service가 제공됩니다. 따라서 envoy proxy에서 요구하는 spec에 맞게 gRPC 혹은 REST 호출을 보내면 개별 컴포넌트에 해당하는 Discovery Service를 통해서 내부 컴포넌트 설정을 변경할 수 있습니다. 

 

 

 

참고로 istio에서는 위와 같이 중앙에 envoy를 관리하는 Management Server가 있으며, Envoy의 xDS를 통해서 중앙에서 Configuration을 등록과 수정등을 자유롭게 할 수 있습니다.

 

위와 같은 Discovery Service외에도 VHDS, SRDS, LDS, SDS, RTDS, ECDS등 다양한 Discovery Service가 존재하며, 이는 envoy 공식 문서를 통해서 참고하시기 바랍니다.

 


 

ADS

 

이번에는 xDS 관련하여 또 하나 살펴볼 주요 내용 중 하나인 ADS(Aggregated xDS)에 대해서 살펴보겠습니다. 먼저 ADS는 무엇이고 왜 사용할까요?

 

Envoy 내부 동작 구조에 대해서 자세히 살펴보지 않았지만, 우선 가볍게 짚고 넘어가자면 Envoy 내부의 쓰레딩 모델은 기본적으로 Lock 없이 데이터를 주고 받으며, 데이터 동기화는 Eventually Consistency를 전제로 설계되어있습니다. 즉 이말은 데이터를 전달한다고 해서 바로 반영하는 것은 아니고 완벽한 동기화가 일시에 이루어지지 않음을 의미합니다.

 

그리고 이러한 구조는 다음과 같은 상황을 맞이할 수 있습니다.

 

 

 

이전에 살펴봤듯이 Endpoint와 Cluster는 의존 관계를 맺고 있음을 확인했습니다. 즉 Cluster는 Endpoint의 논리적 집합이었음을 이전 내용을 통해 확인했습니다.

 

그리고 이러한 상황에서 만약 위와 같이 Endpoint와 Cluster가 추가되어서 이를 갱신하는 상황이 발생한다고 가정했을 때, EDS를 통한 갱신이 CDS보다 먼저 이루어진다면 어떻게 될까요?

 

Envoy 입장에서는 EDS를 통해서 전달된 Endpoint의 대상이 Cluster-A라고 전달받았지만, 아직 CDS를 통해 Cluster 정보를 전달받지 못한 상황이므로 일정 기간 동안에는 Cluster 정보에 대한 동기화가 진행되지 않는 이상현상이 발생됩니다. 

 

따라서 이러한 이슈를 해결하고자 등장한 것이 Aggregated Discovery Service입니다.

 

 

ADS는 Single gRPC 스트림으로 구성된 서비스로써 envoy에 전달되는 resource의 순서를 적용하려는 사용자를 위해 집계된 xDS를 단일 gRPC 스트림으로 전달할 수 있습니다. 따라서 위의 경우 ADS와 CDS의 의존 관계에 있을 때에도 이를 집계한 결과를 단일 스트림 형태로 envoy 전달하기 때문에 일관성을 달성할 수 있는 특징을 지니고 있습니다.

 

  "dynamic_resources": {
    "lds_config": {
      "ads": {},
      "initial_fetch_timeout": "0s",
      "resource_api_version": "V3"
    },
    "cds_config": {
      "ads": {},
      "initial_fetch_timeout": "0s",
      "resource_api_version": "V3"
    },
    "ads_config": {
      "api_type": "GRPC",
      "set_node_on_first_message_only": true,
      "transport_api_version": "V3",
      "grpc_services": [
        {
          "envoy_grpc": {
            "cluster_name": "xds-grpc"
          }
        }
      ]
    }
  },

 

dynamic_resources는 위와 같이 기존 Config yaml에 위와 같이 설정할 수 있습니다. 예를들어 위와 같이 설정을 지정할 수 있습니다. 내용을 살펴보면 lds와 cds는 ads를 통해서 해당 내용을 전달받을 수 있으며, ads는 xds-grpc 클러스터를 통해서 해당 정보를 가져오도록 지정되어 있습니다.

 

참고로 위 설정은 istio를 통해 배포된 Pod에 속한 envoy proxy의 설정 파일 중 일부를 발췌한 내용이며, 해당 설정에 대한 자세한 내역은 차후 포스팅을 통해 다루어보겠습니다.

 


3. xDS API 

 

xDS API는 istio와 연계하여 Service Discovery를 수행하는데 있어 주요하게 사용됩니다. 따라서 xDS API의 종류와 동작 방법에 대해서 보다 자세히 살펴보고자 합니다.

 

먼저 xDS 지원 방식 부터 살펴보겠습니다. xDS 지원 방식은 총 3가지(File 동기화, HTTP, gRPC)입니다. 

 

 

File 동기화 방식은 위와 같이 envoy에서 File의 상태를 관찰하고 설정이 적용된 File에서 변경이 일어났을 경우 envoy에서 이를 인지하여 내부 컴포넌트의 설정을 동기화하는 방식입니다. 

 

 

반면 gRPC와 HTTP 방식은 config 정보를 전달하는 Management Server가 중앙에 존재합니다. 따라서 envoy에서는 Management Server에 요청하여 config 정보를 전달받고 전달받은 정보를 토대로 자신의 내부 컴포넌트 설정을 동기화합니다.

 

HTTP 방식은 주기적인 polling을 통해서 Management Server로부터 변경된 데이터 항목을 전달받아 갱신합니다. 반면 gRPC는 bidirectional streaming 통신을 통해 데이터를 주고 받는 차이점이 존재합니다. 해당 통신 방법에 대해서 궁금하신 분은 제 블로그 아래 내용을 참고 부탁드립니다.

 

https://cla9.tistory.com/177?category=993774 

 

3. gRPC는 왜 빠를까? (통신 방식) - 2

서론 지난 포스팅에서는 gRPC에서 사용되는 protobuf와 REST 통신에서 사용되는 JSON 크기와 Serialization/Deserialization 관점에서 성능을 비교해봤습니다. 이번에는 gRPC에서 제공하는 통신 방법에 대해서

cla9.tistory.com

 

envoy에서 널리 사용하는 방식은 gRPC 방식이며, istio 또한 gRPC 방식을 통해 xDS 정보를 전달받습니다. 따라서 본 포스팅에서는 gRPC 방식에 보다 초점을 맞추어 살펴보겠습니다.

 


 

3-1 xDS gRPC 

 

 

 

앞서 envoy에서 gRPC를 활용한 xDS 방식은 envoy와 config를 전달하는 Management Server 사이에 bidirectional streaming 통신을 사용한다고 설명했습니다. 따라서 해당 방식은 Connection이 끊기지 않고 지속 연결된 상태라고 봐도 무방합니다.

 

이러한 상황에서 처음 Management Server에 envoy가 연결되면, 위와 같이 Discovery Request를 Management Server에 전달합니다. 그러면 Management Server에서는 envoy가 요청하는 Config에 대해서 전체 목록을 전달하게되고, envoy는 해당 설정을 전달받아 Config 업데이트를 수행합니다.

 

 

 

이후 Config 업데이트가 완료되면, envoy는 이전에 Management Server로부터 전달받은 Config 항목에 대한 응답을 전달합니다. 만약 Config 내용을 정상적으로 업데이트를 수행했을 경우에는 ACK를 응답하고 그렇지 않을 경우에는 NACK를 응답합니다. (※  ACK와 NACK의 구조와 동작방식에 대해서는 envoy 공식 문서에서 자세히 다루고 있으니 참고바랍니다.)

 

이때 응답 메시지는 별개의 포맷을 활용하지 않고 다음 Discovery Request를 전달할 때, 응답을 포함하여 전달됩니다. 해당 Discovery Request 메시지는 Config 업데이트 완료 이후 Management Server에 Config가 변경되었을 때, 동기화된 Config 내역을 다시 전달받기 위해 요청하는 메시지입니다. 따라서 Management Server에 Config가 변경되거나 새로운 Resource가 추가되었을 경우 envoy에게 Discovery Response를 전달함으로써 동기화 작업이 이루어집니다.

 


 

3-2 SotW(State of the world), Delta xDS

 

 

지금까지 envoy gRPC 동작 과정에 대해서 가볍게 살펴봤는데, 이번에는 Management Server에서 Discovery Response를 응답 메시지를 전달하는데 있어서 선택할 수 있는 2가지 방법에 대해서 살펴보겠습니다.

 

첫 번째 방법은 SotW(State of the world) 방식입니다.

 

 

가령 envoy가 Cluster 정보를 동기화 하기 위해 xDS로 연결되었으며 이미 한차례 Cluster 동기화되었다고 가정해봅시다. 이때 SotW 방식은 동기화된 Cluster 정보 중 하나가 변경되었을 경우에 전체 Cluster 정보를 전달하는 방식을 의미합니다. 해당 방식은 구현이 간단한 반면에 전체 데이터 중 일부만 변경되었음에도 불구하고 전체 정보를 전달하기 때문에 많은 Network overhead가 발생할 수 있습니다.

 

 

참고로 이때 DiscoveryResponse 응답 포맷은 위와같으며, 여기서 resources를 통해서 전체 resource 정보가 전달됩니다.

 

 

두 번째 방법은 Delta 방식입니다.

 

 

해당 방식은 변경된 Config 정보만을 선별하여 전체를 전달하는 것이 아니라 변경분 (Delta 혹은 incremental)만 전달하는 방식을 의미합니다. 

 

 

이때 Delta Discovery Response 응답 포맷은 위와 같으며, resources 항목은 변경된 항목만 추가됩니다. 또한 기존에 존재하는 Resource가 삭제되었을 경우에는 removed_resources를 통해서 이를 envoy에게 전달합니다.

 

지금까지 SotW, Delta 두 가지 방식에 대해서 살펴봤습니다. 그렇다면 두 방식은 어떻게 지정할 수 있을까요?

 

 

위 설정과 같이 envoy configuration에서 api_type을 지정할 때, GRPC로 지정 혹은 DELTA_GRPC로 지정하면 입력 값에 따라서 동작 방식이 결정됩니다.

 

 

 

지금까지 gRPC 방식에서 Config 정보를 전달하는 두 가지 방법에 대해서 살펴봤습니다. 그렇다면 istio에서는 어떤 방식을 기본적으로 사용할까요? istio에서는 기본적으로는 SotW 방식을 사용하고 있으며, 사이드카 컨테이너를 주입할 때 사용자가 지정한 ISTIO_DELTA_XDS 값을 통해서 Delta 방식으로 변경할 수 있습니다. 하지만 현재는 값을 변경한다고 할지라도 실제 데이터를 전달할 때 Delta 값만을 전달하지 않습니다. 

 


 

4. 마무리

 

지금까지 Envoy 내부 주요 컴포넌트 및 설정 방식에 대해 살펴봤습니다. istio는 envoy를 얼만큼 잘 이해하고 있느냐에 따라서 istio에 대한 전문성을 확보한다고 생각합니다. 또한 istio가 문제가 생겼을 때 트러블 슈팅할 때는 envoy 설정에 대한 이해를 기반으로 진행되어야합니다. 따라서 이번 포스팅의 내용은 간략하지만 중요한 부분을 다루고 있습니다.

 

다음 포스팅에서는 Envoy 내부 구조에 대해서 조금 더 자세히 알아보도록 하겠습니다.

+ Recent posts