1. 서론

이전 시리즈 내용을 통해서 Envoy에서 가장 중요한 2가지 컴포넌트인 Listener Manager와 Cluster Manager에 대해서 살펴봤습니다.

 

 

 

 

 

다시 한번 전체 과정을 간략하게 표현해보면 다음과 같습니다.사용자의 접속 요청이 전달되면, Listener에서 요청을 전달받고 이에 부합하는 Cluster를 찾아 Downstream과 Upstream을 연결합니다. 이때 만약 사용자 요청이 Http 연결일 경우에는 Listener에서 Cluster Manager로 Cluster를 요청하는 주체가 Listener가 보유하고 있는 Network Filter 중 하나인 HttpConnectionManager입니다. 해당 컴포넌트는 Listener 내부에 존재하는 Filter이지만, 사용자 요청처리를 수행하는데 있어 중요한 역할을 수행하기 때문에. 이번 포스팅에서는 HttpConnectionManager의 기능에 대해서 살펴보겠습니다. 


 

 

2. HttpConnectionManager

 

 

이전 포스팅을 통해 Listener 하위에는 위 그림과 같이 Filter Chains를 관리하는 filter_chain_manager가 존재함을 확인했습니다.

 

출처 :https://www.envoyproxy.io/docs/envoy/latest/intro/life_of_a_request#listener-tcp-accept

 

 

해당 구조 내부에는 위 그림과 같이 Trie 구조가 존재하는데, 이는 사용자가 Listener를 통해서 연결을 원하는 domain을 전달했을 때, 해당 요청에 부합되는 Filter Chain을 찾기 위한 용도로 사용됩니다. 따라서 각각의 Trie 별로 매칭되는 노드에는 Filter들이 Chain 형식으로 매핑되어있습니다.

 

따라서 사용자가 접속을 요청하면, 내부적으로는 해당 Trie를 검색해서 사용자의 요청에 부합되는 항목의 Filter Chain 목록을 반환합니다. 그리고 해당 Chain 목록을 전달받으면, Chain을 탐색하면서 매핑된 Factory Callback을 실행하여 Filter Chain을 구성합니다.

 

Envoy가 제공하는 Network Filter는 굉장히 많은 종류가 있는데요. 그 중 HttpConnectionManager는 Http 요청을 처리하는 Filter로써 이에 대해서 다루어보고자 합니다.

 

 

 

 

HttpConnectionManager는 Network Filter이므로 앞서 설명한 것과 같이 사용자가 명시한 도메인에 해당되는 Network Filter 목록에 Http 처리가 매핑되어있을 경우 Filter Chain에 포함되어있을 것입니다.

 

이때 사용자가 Listener에 접속을 요청할 때마다 위 그림과 같이 내부적으로 Upstream Connection을 만듭니다. 위 그림은 현재 2개의 Connection이 생성되었음을 가정했습니다. 이때 각각의 ServerConnection은 별개의 Network Filter Chain을 가지고 있게되고 만약 2개의 Connection이 모두 Http 처리를 담당해야한다면 위 그림과 같이 2개의 별개 HttpConnectionManager가 생성될 것입니다.

 

그렇다면 HttpConnectionManager는 어떻게 구성되어있을까요?

 

 

 

생성된 HttpConnectionManager는 대략 위와 같은 모습을 구성하고 있습니다. 물론 그림으로 표현한 속성 이외에 다양한 프로퍼티가 존재하지만, 핵심이라고 생각하는 몇 개만 표현했습니다. 그렇다면 각각의 속성은 무엇이며 어떤 과정을 거쳐 생성될까요? 핵심적인 요소에 대해서 하나씩 살펴보겠습니다.

 


3. RDS

 

먼저 살펴볼 것은 RDS입니다. 해당 컴포넌트는 이전에 살펴본 HttpConnectionManager의 속성 중 config_와 관련이 있습니다. 해당 config안에는 HttpConnectionManager를 구성하는데 있어 필요한 속성이 지정되어있는데요. 그중 Route 관련 속성은 route_config_provider입니다.

 

 

 

이전에 살펴본 그림에서 HttpConnectionManager와 SingletonManager가 연관관계를 맺고 있는 것을 확인했는데요. SingletonManager가 가진 속성 중에서 RouteConfigProviderManager가 RDS 처리를 담당하고 있습니다.

 

그렇다면 왜 SingletonManager에 의해서 해당 속성이 관리될까요? 위 그림을 살펴보면 HttpConnectionManager는 사용자의 Connection 별로 여러개가 생성됨을 볼 수 있습니다. 하지만 RDS 처리 또한 각각의 HttpConnectionManager를 통해 관리되어야한다면, RDS 처리를 위한 overhead 또한 증가하게되고 무엇보다 동일한 정보가 중복 관리되기 때문에 관리 용이성 또한 증가합니다. 따라서 전역적으로 하나의 인스턴스만을 생성함으로써 데이터를 한 곳에서 관리하고 모든 HttpConnectionManager가 이를 공유하도록 싱글톤 패턴이 적용되어있습니다.

 

해당 내용을 코드로 살펴보면 다음과 같습니다.

 

config.cc

Utility::Singletons Utility::createSingletons(Server::Configuration::FactoryContext& context) {
  std::shared_ptr<Http::TlsCachingDateProviderImpl> date_provider =
      context.singletonManager().getTyped<Http::TlsCachingDateProviderImpl>(
          SINGLETON_MANAGER_REGISTERED_NAME(date_provider), [&context] {
            return std::make_shared<Http::TlsCachingDateProviderImpl>(
                context.mainThreadDispatcher(), context.threadLocal());
          });

  Router::RouteConfigProviderManagerSharedPtr route_config_provider_manager =
      context.singletonManager().getTyped<Router::RouteConfigProviderManager>(
          SINGLETON_MANAGER_REGISTERED_NAME(route_config_provider_manager), [&context] {
            return std::make_shared<Router::RouteConfigProviderManagerImpl>(context.admin());
          });

  Router::ScopedRoutesConfigProviderManagerSharedPtr scoped_routes_config_provider_manager =
      context.singletonManager().getTyped<Router::ScopedRoutesConfigProviderManager>(
          SINGLETON_MANAGER_REGISTERED_NAME(scoped_routes_config_provider_manager),
          [&context, route_config_provider_manager] {
            return std::make_shared<Router::ScopedRoutesConfigProviderManager>(
                context.admin(), *route_config_provider_manager);
          });

  auto http_tracer_manager = context.singletonManager().getTyped<Tracing::HttpTracerManagerImpl>(
      SINGLETON_MANAGER_REGISTERED_NAME(http_tracer_manager), [&context] {
        return std::make_shared<Tracing::HttpTracerManagerImpl>(
            std::make_unique<Tracing::TracerFactoryContextImpl>(
                context.getServerFactoryContext(), context.messageValidationVisitor()));
      });

  std::shared_ptr<Http::DownstreamFilterConfigProviderManager> filter_config_provider_manager =
      Http::FilterChainUtility::createSingletonDownstreamFilterConfigProviderManager(
          context.getServerFactoryContext());

  return {date_provider, route_config_provider_manager, scoped_routes_config_provider_manager,
          http_tracer_manager, filter_config_provider_manager};
}

 

코드를 살펴보면 위와 같이 가장 먼저 하는 것은 SingletonManager에 등록된 인스턴스 중에서  RouteConfigProviderManager, ScopedRoutesConfigProviderManager 와 더불어 다양한 Manager 등 다양한 Manager를 가져오는 작업을 선행합니다. 

 

이때 중요한 것은 앞서 언급한 2가지 Manager이며, RouteConfigProviderManager는 route_config 정보를 기반으로 RDS 혹은 StaticRouteConfig를 처리하는 인스턴스를 생성하는 역할을 수행합니다. 반면 ScopedRoutesConfigProviderManager는 Listener 설정에 scoped_routes 설정이 존재할 때 해당 scoped_routes를 처리하는 인스턴스를 생성하는 역할을 수행합니다.

 

config.cc

switch (config.route_specifier_case()) {
case envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager::
    RouteSpecifierCase::kRds:
case envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager::
    RouteSpecifierCase::kRouteConfig:
  route_config_provider_ = Router::RouteConfigProviderUtil::create(
      config, context_.getServerFactoryContext(), context_.messageValidationVisitor(),
      context_.initManager(), stats_prefix_, route_config_provider_manager_);
  break;
case envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager::
    RouteSpecifierCase::kScopedRoutes:
  scoped_routes_config_provider_ = Router::ScopedRoutesConfigProviderUtil::create(
      config, context_.getServerFactoryContext(), context_.initManager(), stats_prefix_,
      scoped_routes_config_provider_manager_);
  break;
case envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager::
    RouteSpecifierCase::ROUTE_SPECIFIER_NOT_SET:
  PANIC_DUE_TO_CORRUPT_ENUM;
}

 

HttpConnectionManager를 구성하는 단계에서 전달받은 두 속성은 이후 route 관련 정보를 생성하는데 사용됩니다. 다만 두 속성 모두가 생성되지는 않으며, 기존에 지정된 설정내역을 살펴보고 하나를 생성합니다.

 

즉 위 코드와 같이 config에 지정된 route_specifier에 의해서 결정됩니다. 만약 route_config 설정이 지정되어있다면, 기존에 SingletonManager로부터 부여받은 route_config_provider_manager를 전달하여 route 처리를 수행할 수 있는 route_config_provider를 전달받습니다. 반면 scoped_route라면 scoped_route_config_provider_manager를 전달하여 scoped_route_config_provider를 전달받습니다.

 

본포스팅에서는 route_config만 지정되어있음을 가정하며, 따라서 위 코드에서는 route_config_provider_ 만이 생성되었음을 전제로 진행하겠습니다. 또한 route를 처리하는 방식이 static이 아닌 dynamic xDS를 활용한 동적 변경을 가정하겠습니다.

 

 

route_config_provider가 생성된 이후 RDS를 매핑하는 과정을 살펴보면 위그림과 같습니다.

 

1. HttpConnectionManager에 매핑되어있는 route_config_provider를 기반으로 route config를 분석할 수 있는 인스턴스를 생성합니다. 이후 RouteConfigProviderManager에 존재하는 dynamic_route_config_providers_ 로부터 rds 메시지 값을 해시한 결과를 기반으로 dynamic_route_config_providers_ Map에 존재하는지 살펴보고 만약 존재한다면, 해당 provider를 반환합니다.

 

2. dynamic_route_config_providers_에 존재하지 않는다면, RDS 생성을 위해 Cluster Manager로부터 Subscription factory를 요청합니다.

 

3.  xDS에 전달받기 원하는 타입 및 수신 callback을 Multiplexer에 등록합니다.

 

4. 등록이 완료된 이후 Subscription을 반환합니다.

 

5. RdsRouteConfigProviderImpl 인스턴스를 생성하고 그 안에 subscription을 바인딩합니다. 또한 dynamic_route_config_providers_ Map에 해당 인스턴스를 삽입함으로써 이후 동일한 요청이 전달되면, 새로운 provider를 생성하지 않고 매핑된 값을 반환합니다.

 

route_config_provider_manager_impl.h

auto subscription = std::make_shared<RdsRouteConfigSubscription>(
    std::move(config_update), std::move(resource_decoder), rds.config_source(),
    rds.route_config_name(), manager_identifier, factory_context,
    stat_prefix + absl::AsciiStrToLower(getRdsName()) + ".",
    absl::AsciiStrToUpper(getRdsName()), manager_);
auto provider = std::make_shared<RdsRouteConfigProviderImpl>(std::move(subscription),
                                                             factory_context);

6. 해당 provider와 subscription 정보를 반환합니다.

 

7. RdsRouteConfigProviderImpl 내부에서는 xDS API가 변경이 생겼을 때, 내부에서 ThreadLocalStorage에 존재하는 ThreadLocalConfig의 내용을 변경함으로써, 쓰레드 전체에 동일한 데이터를 공유할 수 있도록 유지합니다.

 

위와 같은 7가지 단계를 통해서 HttpConnectionManager가 생성될 때 Singleton Manager를 통해 전역적으로 RDS를 관리하는 하나의 provider를 공급받고, ThreadLocalStorage를 활용해서 모든 쓰레드에서 동일한 데이터에 대한 접근이 가능하도록 공유합니다.

 


 

4. Http filter

 

HttpConnectionManager는 Http 처리를 담당합니다. 이때 해당 컴포넌트 내부에는 http 처리를 위한 무수한 filter가 존재합니다. 참고로  envoy 공식 문서를 살펴보면 지정할 수 있는 filter가 여러가지 있음을 확인할 수 있습니다. 그리고 그 중에는 Routing을 담당하는 Router Filter 또한 존재합니다.

 

따라서 사용자의 요청이 전달되면 Network Filter Chain이 수행되면서 HttpConnectionManager가 실행되고 그리고 그 안에서 다시 HttpConnectionManager가 보유한 Http Filter들이 수행되면서 사용자의 요청이 처리됩니다.

 

이를 조금 더 살펴보겠습니다.

 

config.cc

Http::FilterChainHelper<Server::Configuration::FactoryContext,
                        Server::Configuration::NamedHttpFilterConfigFactory>
    helper(filter_config_provider_manager_, context_.getServerFactoryContext(), context_,
           stats_prefix_);
helper.processFilters(config.http_filters(), "http", "http", filter_factories_);

 

HttpConnectionManager를 구성하는 config 속성을 살펴보면, 해당 Config를 생성할 때 http_filters를 위한 factory를 생성하는 것을 볼 수 있습니다. 이때 이미 사용자가 지정하거나 LDS에 의해서 갱신된 Listener의 Config 정보를 살펴보면, 지정된 http filter 목록을 확인할 수 있습니다.

 

 

따라서 FilterChainHelper를 통해서 Config 정보를 토대로 filter_factories라는 Filter를 생성하는 Factory Callback의 리스트를 채우도록 처리를 위임합니다.

 

FilterChainHelper는 해당 요청을 전달받으면, DependencyManager를 통해서 전달된 Filter의 우선순위를 고려하여 정상적으로 입력이 되었는지를 검사합니다. 그리고 Filter간의 Dependency에 문제가 없게 Config가 전달되었다면, 이를 HttpConnectionManager가 보유한 filter_factories에 Filter 생성을 위한 Callback Factory 메소드를 매핑시킵니다.

 

위 과정을 거쳐 생성된 filter_factories_는 향후 사용자가 Http 요청을 전달하기 위해 Stream을 생성할 때 내부에 존재하는 filter_manager_로 해당 filter_factories_를 전달시켜 해당 filter_manager_가 http filter chain을 생성하고 수행할 수 있도록 처리를 위임합니다.

 


5. Codec

 

이번에 살펴볼 것은 codec입니다. HttpConnectionManager에서 codec은 사용자의 요청 정보를 분석하는 역할을 수행합니다. 사용자가 전달한 raw데이터를 지정된 프로토콜 형태로 파싱하고 분석하여 처리하는 과정에서 사용됩니다.

 

 

그림을 살펴보면, codec_은 HttpConnectionManager 내부에 있는 config에 의해서 생성할 수 있습니다. 이때 사용자의 Http 요청은 Http 1.1, Http 2.0 혹은 Http 3.0(Quic) 형태일 수 있습니다. 위 세가지 프로토콜 모두 HttpConnectionManager가 처리하는데요. 각각의 처리방식과 포맷이 다르기 때문에 사용자의 요청이 전달되었을 때, 가정 먼저 수행하는 일은 사용자의 요청이 어떤 프로토콜 형태인지를 파악하는 것입니다.

 

config.cc

Http::ServerConnectionPtr
HttpConnectionManagerConfig::createCodec(Network::Connection& connection,
                                         const Buffer::Instance& data,
                                         Http::ServerConnectionCallbacks& callbacks) {
  switch (codec_type_) {
  case CodecType::HTTP1:
    return std::make_unique<Http::Http1::ServerConnectionImpl>(
        connection, Http::Http1::CodecStats::atomicGet(http1_codec_stats_, context_.scope()),
        callbacks, http1_settings_, maxRequestHeadersKb(), maxRequestHeadersCount(),
        headersWithUnderscoresAction());
  case CodecType::HTTP2:
    return std::make_unique<Http::Http2::ServerConnectionImpl>(
        connection, callbacks,
        Http::Http2::CodecStats::atomicGet(http2_codec_stats_, context_.scope()),
        context_.api().randomGenerator(), http2_options_, maxRequestHeadersKb(),
        maxRequestHeadersCount(), headersWithUnderscoresAction());
  case CodecType::HTTP3:
#ifdef ENVOY_ENABLE_QUIC
    return std::make_unique<Quic::QuicHttpServerConnectionImpl>(
        dynamic_cast<Quic::EnvoyQuicServerSession&>(connection), callbacks,
        Http::Http3::CodecStats::atomicGet(http3_codec_stats_, context_.scope()), http3_options_,
        maxRequestHeadersKb(), maxRequestHeadersCount(), headersWithUnderscoresAction());
#else
    // Should be blocked by configuration checking at an earlier point.
    PANIC("unexpected");
#endif
  case CodecType::AUTO:
    return Http::ConnectionManagerUtility::autoCreateCodec(
        connection, data, callbacks, context_.scope(), context_.api().randomGenerator(),
        http1_codec_stats_, http2_codec_stats_, http1_settings_, http2_options_,
        maxRequestHeadersKb(), maxRequestHeadersCount(), headersWithUnderscoresAction());
  }
  PANIC_DUE_TO_CORRUPT_ENUM;
}

 

이를 Config에서 분석한 다음 Http 1.1일 경우에는 Http1::ServerConnectionImpl, Http 2.0일 경우에는 Http2:ServerConnectionImpl Http 3.0일 경우에는 QuicHttpServerConnectionImpl을 반환하여 HttpConnectionManager 내부에 존재하는 codec_에 매핑하는 작업을 수행합니다.

 

codec.h

class Connection {
public:
  virtual ~Connection() = default;

  /**
   * Dispatch incoming connection data.
   * @param data supplies the data to dispatch. The codec will drain as many bytes as it processes.
   * @return Status indicating the status of the codec. Holds any errors encountered while
   * processing the incoming data.
   */
  virtual Status dispatch(Buffer::Instance& data) PURE;

  /**
   * Indicate "go away" to the remote. No new streams can be created beyond this point.
   */
  virtual void goAway() PURE;

  /**
   * @return the protocol backing the connection. This can change if for example an HTTP/1.1
   *         connection gets an HTTP/1.0 request on it.
   */
  virtual Protocol protocol() PURE;

  /**
   * Indicate a "shutdown notice" to the remote. This is a hint that the remote should not send
   * any new streams, but if streams do arrive that will not be reset.
   */
  virtual void shutdownNotice() PURE;

  /**
   * @return bool whether the codec has data that it wants to write but cannot due to protocol
   *              reasons (e.g, needing window updates).
   */
  virtual bool wantsToWrite() PURE;

  /**
   * Called when the underlying Network::Connection goes over its high watermark.
   */
  virtual void onUnderlyingConnectionAboveWriteBufferHighWatermark() PURE;

  /**
   * Called when the underlying Network::Connection goes from over its high watermark to under its
   * low watermark.
   */
  virtual void onUnderlyingConnectionBelowWriteBufferLowWatermark() PURE;
};

 

프로토콜마다 처리 방법이 다양하지만, 모든 ServerConnectionImpl은 위와 같은 Interface 스펙을 준수합니다. 따라서 HttpConnectionManager에서는 위 interface에 정의된 메소드를 호출하여 처리를 위임할 수 있습니다. 다만 본 포스팅에서는 Http 1.1을 기준으로 처리 과정을 분석하기 때문에 Http1::ServerConnectionImpl이 반환되었다고 가정하겠습니다.

 

그렇다면 Http 1.1을 처리하기 위한 ServerConnectionImpl은 어떤 역할을 수행할까요? 해당 구조에 대해서 조금 더 자세히 살펴보겠습니다.

 

 

 

ServerConnectionImpl을 살펴보면, 여러 속성이 있지만 그 중 가장 중요한 속성은 위 2가지 입니다. 먼저 Parser_에 대해서 알아보겠습니다.

 


 

5-1. Parser

 

parser_의 역할은 Client로 부터 전달된 요청 내역을 Http 1.1 스펙에 맞게 분석하여 envoy가 원하는 형태로 데이터를 구성하는 작업을 담당합니다. 이때 envoy 내부에는 해당 내역을 처리하는 Parser가 2개가 존재합니다.

 

codec_impl.cc

ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stats,
                               const Http1Settings& settings, MessageType type,
                               uint32_t max_headers_kb, const uint32_t max_headers_count)
    : connection_(connection), stats_(stats), codec_settings_(settings),
      encode_only_header_key_formatter_(encodeOnlyFormatterFromSettings(settings)),
      processing_trailers_(false), handling_upgrade_(false), reset_stream_called_(false),
      deferred_end_stream_headers_(false), dispatching_(false), max_headers_kb_(max_headers_kb),
      max_headers_count_(max_headers_count) {
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser")) {
    parser_ = std::make_unique<BalsaParser>(type, this, max_headers_kb_ * 1024, enableTrailers());
  } else {
    parser_ = std::make_unique<LegacyHttpParserImpl>(type, this);
  }
}

 

첫 번째는 LegacyHttpParserImpl로써 envoy가 기존부터 제공해온 Parser입니다. 이후 해당 Parser의 성능 개선을 위해 추가로 개발한 것이 BalsaParser입니다. 다만 BalsaParser는 아직 완전하지는 않으며, envoy에서 해당 Parser를 사용하려면

'envoy.reloadable_features.http1_use_balsa_parser' 옵션을 활성화했을 경우 사용할 수 있습니다. 참고로 본 포스팅에서는 기본으로 사용되는 LegacyHttpParserImpl에 대해서 다루어보겠습니다.

 

 

parser_가 생성되면, 추후 parser_에게 사용자 요청을 처리하도록 위임할 것입니다. 이때 Parser가 처리 중간 중간마다 특정 event 즉 header 필드명이 무엇인지 하나씩 파악했거나, header 값을 분석했을 때 이를 요청자인 ServerConnection 에게 알려줘야 해당 데이터들을 전달받아 envoy가 원하는 형태로 데이터를 가공하거나 그 이후 처리해야할 비즈니스 로직을 수행할 수 있을 것입니다.

 

이때 ServerConnectionImpl은 기본적으로 ParserCallbacks Virtual Class를 상속받았으며, 그 안에 정의된 메소드는 해당 메소드가 호출될 때 수행해야할 비즈니스 로직이 구현되어있습니다. 구현해야할 메소드는 위와 같이 총 10개이며, 각각의 의미는 다음과 같습니다.

 

메소드명 의미
onMessageBegin Request/Response가 시작될 때 호출되는 callback으로 반환 값으로 성공/실패 메시지가 반환됨
onUrl URL data를 Parser가 분석했을 때 호출되는 callback으로 반환 값으로 성공/실패 메시지가 반환됨
onStatus Status data를 Parser가 분석헀을 때 호출되는 callback으로 반환 값으로 성공/실패 메시지가 반환됨
onHeaderField header의 field 명을 수신받았을 때 호출되는 callback으로 반환 값으로 성공/실패 메시지가 반환됨
onHeaderValue header의 value 값을 수신받았을 때 호출되는 callback으로 반환 값으로 성공/실패 메시지가 반환됨
onHeaderComplete header 분석이 완료되었을 때 호출되는 callback으로 반환 값으로 성공/실패 메시지가 반환됨
bufferBody body data를 분석했을 때 호출되는 callback
onMessageComplete Parser가 HTTP 데이터를 모두 분석 완료했을 때 호출되는 callback으로 반환 값으로 성공/실패 메시지가 반환됨
onChunkHeader chunk header를 받았을 때 호출되는 callback

 

이후 Parser에서는 처리 도중 중간 중간에 ParserCallbacks에 정의된 메소드를 호출함으로써, ServerConnectionImpl에게 Parsing 결과를 중간 중간 callback 하도록 구현되었습니다.

 

legacy_parser_impl.cc

Impl(http_parser_type type, void* data) : Impl(type) {
  parser_.data = data;
  settings_ = {
      [](http_parser* parser) -> int {
        auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
        return static_cast<int>(conn_impl->onMessageBegin());
      },
      [](http_parser* parser, const char* at, size_t length) -> int {
        auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
        return static_cast<int>(conn_impl->onUrl(at, length));
      },
      [](http_parser* parser, const char* at, size_t length) -> int {
        auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
        return static_cast<int>(conn_impl->onStatus(at, length));
      },
      [](http_parser* parser, const char* at, size_t length) -> int {
        auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
        return static_cast<int>(conn_impl->onHeaderField(at, length));
      },
      [](http_parser* parser, const char* at, size_t length) -> int {
        auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
        return static_cast<int>(conn_impl->onHeaderValue(at, length));
      },
      [](http_parser* parser) -> int {
        auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
        return static_cast<int>(conn_impl->onHeadersComplete());
      },
      [](http_parser* parser, const char* at, size_t length) -> int {
        static_cast<ParserCallbacks*>(parser->data)->bufferBody(at, length);
        return 0;
      },
      [](http_parser* parser) -> int {
        auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
        return static_cast<int>(conn_impl->onMessageComplete());
      },
      [](http_parser* parser) -> int {
        // A 0-byte chunk header is used to signal the end of the chunked body.
        // When this function is called, http-parser holds the size of the chunk in
        // parser->content_length. See
        // https://github.com/nodejs/http-parser/blob/v2.9.3/http_parser.h#L336
        const bool is_final_chunk = (parser->content_length == 0);
        static_cast<ParserCallbacks*>(parser->data)->onChunkHeader(is_final_chunk);
        return 0;
      },
      nullptr // on_chunk_complete
  };
}

 

즉 위 코드와 같이 LegacyHttpParserImpl 내부에는 Parsing 중간 중간 처리 결과를 반환할 수 있도록, settings_에 함수를 매핑했는데, 이때 ParserCallbacks에 정의된 규약에 따른 메소드를 호출하여 결과를 전달하는 것을 볼 수 있습니다.

 

 


 

5-2. Active Request

 

이번에는 Http1::ServerConnectionImpl이 보유하고 있는 속성 중 두번째인 active_request_ 에 대해서 알아보겠습니다.

 

codec_impl.h

  struct ActiveRequest : public Event::DeferredDeletable {
    ActiveRequest(ServerConnectionImpl& connection, StreamInfo::BytesMeterSharedPtr&& bytes_meter)
        : response_encoder_(connection, std::move(bytes_meter),
                            connection.codec_settings_.stream_error_on_invalid_http_message_) {}
    ~ActiveRequest() override = default;

    void dumpState(std::ostream& os, int indent_level) const;
    HeaderString request_url_;
    RequestDecoder* request_decoder_{};
    ResponseEncoderImpl response_encoder_;
    bool remote_complete_{};
  };

 

ActiveRequest는 위와 같이 Parser에 의해서 데이터를 Parsing 하는 과정에서 RequestDecoder, url, ResponseEncoder 등을 가지고 있는 구조체입니다. 해당 구조체를 통해서 Parsing 단계에서 connection 객체에 대한 작업 요청 및 url, encoder 지정 및 수행등을 담당합니다.

 

해당 자료구조가 사용되는 흐름을 보려면 사용자 요청을 처리하는 전단계를 살펴봐야하는데요. 이번 포스팅은 HttpConnectionManager의 특징에 대해서 살펴보기 때문에 ActiveRequest의 존재에 대해서만 이번 포스팅에서는 언급하고 해당 자료구조의 쓰임새는 다음 포스팅에서 보다 자세히 다루어보겠습니다.

 


6. Stream

 

이번에는 HttpConnectionManager가 관리하는 Stream 목록에 대해서 살펴보겠습니다. Stream은 Http 요청을 전달하는 하나의 흐름으로써, 하나의 Connection을 맺은 상태로 Http 요청을 전달하기 위해 여러 Stream을 생성할 수 있습니다. 따라서 이러한 개별 Stream들의 그룹을 관리하기 위해서 HttpConnectionManager 내부에는 streams_라는 List를 보유하고 있습니다.

 

 

streams_ List는 ActiveStream을 포함하고 있는데, 해당 Stream 내부에는 Http 요청에 필요한 필수적인 항목들이 포함되어있습니다. 그렇다면 ActiveStream은 언제 생성될까요?

 

 

codec_impl.cc

Status ServerConnectionImpl::onMessageBeginBase() {
  if (!resetStreamCalled()) {
    ASSERT(active_request_ == nullptr);
    active_request_ = std::make_unique<ActiveRequest>(*this, std::move(bytes_meter_before_stream_));
    if (resetStreamCalled()) {
      return codecClientError("cannot create new streams after calling reset");
    }
    active_request_->request_decoder_ = &callbacks_.newStream(active_request_->response_encoder_);

    // Check for pipelined request flood as we prepare to accept a new request.
    // Parse errors that happen prior to onMessageBegin result in stream termination, it is not
    // possible to overflow output buffers with early parse errors.
    RETURN_IF_ERROR(doFloodProtectionChecks());
  }
  return okStatus();
}

 

과정을 간략하게 살펴보면, 이전에 Parser에 대해서 살펴봤을 때, Client가 데이터 처리를 요청하면 이를 분석해서 특정 Event마다 통지한다고 설명했습니다. 이때 onMessageBegin 이벤트가 발생하면 ServerConnection 에서는 ActiveRequest를 먼저 생성하고 ActiveRequest 하위에 request_decoder_ 속성에 새로운 Stream을 만듭니다. 이때 만들어지는 Stream이 ActiveStream입니다.

 

이후 생성된 ActiveStream은 위 과정에서 표현되지는 않았지만 Parsing 과정이 진행되면서 지속적으로 참조되고 Parsing이 완료되는 시점에서 Stream 내부에 매핑된 정보에 의하여 Http 전송이 이루어지게됩니다.

 

 

이번에는 ActiveStream의 속성에 대해서 살펴보겠습니다. stream_id는 Stream 마다 생성되는 id로써, ActiveStream이 생성하는 당시에 임의의 값으로 지정됩니다. 

 

filter_manager_ 는 이전에 Http filter에 대해서 잠깐 살펴볼 때 등장한 프로퍼티명으로써, ActiveStream 내에 Http filter 생성 및 처리를 위임하는데 관여하는 프로퍼티입니다. 사용자 요청을 처리하는 과정에서 Http filter 처리가 필요한 순간에 해당 인스턴스가 사용됩니다.

 

그 다음 살펴볼 것은 timer입니다. ActiveStream 내부에는 다양한 Timer가 존재하는데, 해당 timer의 역할은 timer가 지정된 시간내에 요구하는 조건이 충족되지 않으면, 해당 연결을 해제하는 역할을 수행합니다. 이때 각각의 Timer는 Dispatcher 로부터 Timer를 생성받아 지정된 시간내에 조건이 충족되면 Timer를 Reset 하여 다시 지정된 시간만큼을 대기하며, 시간이 초괴되면 연결을 해제합니다. 개별 Timer의 역할을 살펴보면 다음과 같습니다.

 

타이머 기능
stream_idle_timer_ 해당 Stream이 연결되고 어떠한 활동도 일어나지 않았을 때, 지정된 idle 시간을 초과하면 Stream 해제
request_timer_ Stream이 생성되고 난 이후 Request를 시작하고 응답이 올 때까지 대기시간을 의미하며, 지정된 시간 초과하면 Stream 해제
request_header_timer_ Downstream에서 header를 전송하고 나서 이에 대한 응답이 올 때까지의 대기시간을 의미하며, 지정된 시간 초과하면 Stream 해제
max_stream_duration_timer_ Stream이 생성되고 종료될 때까지 지속할 수 있는 시간을 의미하며, 지정된 시간을 초과하면 Stream 해제

 

상기 Timer와 관련해서는 envoy 공식 문서에서 확인할 수 있으며, 적정한 값 설정을 통해서 Stream이 생성되고 무한정 대기하지 않도록 처리할 수 있습니다.

 


7. Timer

 

HttpConnectionManager에서 마지막으로 살펴볼 것은 내부 프로퍼티에 존재하는 Timer입니다. 방금전에 개별 Stream 내부에서 여러 Timer 들을 살펴봤었는데, HttpConnectionManager 또한 Timer를 보유하고 있습니다.

 

 

HttpConnectionManager가 보유한 주요 Timer는 위와같이 2개입니다. 이는 HttpConnectionManager가 생성하는 단계에서 Dispatcher에게 요청하여 Timer를 생성합니다. 

 

connection_idle_timer_의 경우는 HttpConnectionManager가 생성된 이후 어떠한 Stream이 생성되지 않았을 때, idle 시간을 얼마나 부여할지를 측정하는 Timer입니다. 따라서 해당 시간 동안 Stream 연결이 이루어지지 않는다면, 연결을 해제합니다.

 

반면 connection_duration_timer_는 HttpConnectionManager가 생성되고 모든 처리가 완료될 때까지 즉 사용자의 연결 요청을 완수하는데 걸리는 데드라인을 측정하는 Timer입니다. 따라서 해당 시간 동안 모든 요청이 완료되지 않는다면, 연결을 해제합니다.

 

 


8. 마치며

 

이번 포스팅을 통해서 HttpConnectionManager가 보유하고 있는 속성에 대해서 알아봤습니다. 다만 이번 포스팅만으로는 HttpConnectionManager가 보유하고 있는 컴포넌트가 어떻게 상호작용하여 사용자의 요청을 처리하는지 완벽하게 이해하기는 힘듭니다. 이 부분은 이 다음 포스팅에서 진행되는 Envoy 프록시 연결 과정을 관찰하면서 조금 더 자세하게 살펴보겠습니다. 

1. 서론

 

앞선 두 개의 포스팅을 통해 envoy 고수준 아키텍처 구조 및 쓰레딩 모델에 대해서 학습하였습니다. 이번 포스팅 부터는 Envoy의 가장 핵심이되는 주요 컴포넌트에 대해서 각 컴포넌트가 세부적으로 어떻게 구성되어있는지 그리고 컴포넌트간에 상호 작용이 어떻게 이루어지는지에 대해 자세하게 살펴보겠습니다.

 

참고로 이번 포스팅의 내용은 TCP 기반 설정이 구성되었다는 가정하에 작성하였습니다.


 

2. Envoy 구조

 

 

이 시리즈의 첫 번째 포스팅에서 Listener, Endpoint, Cluster 개념에 대해서 간략하게 살펴봤습니다. 그 중 Endpoint의 경우 Cluster와 종속적인 관계를 지니므로 Cluster에서 Endpoint를 관리합니다. 따라서 envoy의 핵심 컴포넌트를 꼽자면 Listener와 Cluster라고 봐도 무방합니다. 그렇다면 Listener와 Cluster는 누가 관리하고 어떻게 생성될까요? 위 그림을 보면 Listener Manager와 Cluster Manager가 해당 컴포넌트를 관리하는 것을 짐작할 수 있습니다.

 

 

 

실제로 envoy server 기동 코드를 살펴보면 config.yaml 파일을 읽어서 envoy 기동에 필요한 사용자 정의 설정을 파싱하는 작업을 선행합니다. 그 이후 주요 컴포넌트 기동에 필요한 항목등을 생성합니다.

 

이때 위 그림을보면 Listener Component Factory와 Cluster Manager Factory를 생성하는 것을 볼 수 있습니다. 여기서 Factory라는 말에서 의미가 바로 전달되듯이 해당 Factory 들은 Listener Manager와 Cluster Manager를 생성하는데 있어 중요 역할을 수행합니다.

 

지금부터는 Listener Component Factory로부터 생성되는 Listener Manager의 세부 구조와 Cluster Manager Factory로부터 생성되는 Cluster Manager에 대해서 상세하게 알아보고자 합니다. 다만 이번 포스팅에서는 Cluster Manager에 대해서 중점적으로 알아보고 Listener는 다음 포스팅에서 다루겠습니다.

 


 

3. Subscription Factory 관리

 

Cluster Manager는 Cluster Manager Factory로 부터 생성되는 컴포넌트로써, 이름을 통해 유추할 수 있듯이 Cluster를 관리하는 역할을 수행합니다. 따라서 이에 대한 이해를 위해서는 Cluster 관리가 어떻게 이루어지는지 먼저 학습하는 것이 좋습니다. 하지만 그 전에 Cluster Manager의 주요 역할 중 하나인 Subscription Factory 관리와 gRPC Multiplexer에 대해서 먼저 다루어보고자 합니다.

 

이를 먼저 다루는 이유는 Cluster Manager에서 관리하는 Cluster를 등록하는 방식에는 Static 방식과 Dynamic 방식이 존재하는데, Dynamic 방식의 Cluster Update 과정에 대해 이해하려면 Subscription과 gRPC Multiplexer에 대한 이해가 선행되어야하기 때문입니다.

 

방금 전 언급했듯이 Cluster Manager의 역할 중 하나는 Subscription Factory를 관리하는 것이라고 설명했습니다. 그렇다면, Subscription은 무엇이고 Subscription Factory는 어떤 역할을 수행할까요? 이 부분에 대해서 살펴보겠습니다.

 

이전에 Envoy는 xDS API를 통한 Resource(Listener, Endpoint 등)를 관리한다고 언급한바 있습니다.

 

 

 

참고로 xDS API에는 gRPC, Rest API, 파일 동기화 방법이 있지만, 여기서는 gRPC 방식을 사용한다고 가정하겠습니다.

 

이때 gRPC 통신을 위해서 내부적으로 Multiplexer를 사용하는데, 자원 동기화 요청이 Envoy로 전달되면, 이를 수신받아 해당 Resource를 처리하는 모듈(CDS, LDS, EDS 등)에게 전달 해줘야합니다. 그리고 처리하는 모듈 쪽에서는 사전에 Callback을 등록하여 응답이 전달되면 해당 Callback을 실행할 수 있도록 합니다.

 

 

이 과정에서 중간 매개체 역할을 하는 것이 Subscription입니다. 즉 Resource를 처리하는 모듈에서 Config 대상 오브젝트 타입과 Callback을 Subscription 객체로 생성하여 등록합니다. 그리고 해당 Subscription 생성은 Subscription Factory가 수행하고, 추후 gRPC Multiplexer에 등록합니다. 이후 gRPC Multiplexer로 부터 Resource 동기화 요청이 수신되면, 해당 요청에 상응하는 Subscription의 Callback이 호출되어 데이터 동기화 처리를 수행할 수 있게됩니다.

 

예를 통해서 살펴보겠습니다. 가령 Listener Manager에서 LDS를 등록한다고 가정하겠습니다. 이 경우 먼저 Cluster Manager의 Subscription Factory로부터 Subscription 생성을 요청합니다. 이때 Subscription Factory는 내부 gRPC Multiplexer에 해당 정보를 등록합니다. 이후 LDS 요청 타입의 응답이 전달되었을 경우 Callback을 통해 Listener Manager에게 응답을 전달합니다.

 

이때 Cluster Manager는 Subscription Factory를 내부 프로퍼티로 가지고 있고, 외부의 다른 컴포넌트로부터 Subscription 생성, 삭제 등의 처리를 위임받아 처리하는 역할을 수행합니다.

 

 

이를 조금 더 자세히 표현하면, 개별 xDS API들은 SubscriptionBase 클래스를 상속받고 있으며, 해당 클래스에는 각각 Config Update에 대하여 수행해야할 메소드들이 정의되어있습니다. 따라서 각각의 API는 구현이 요구되는 메소드들에 대해서 모듈 특성에 따라 처리방법이 기술되어있습니다. 이후 xDS API를 사용하기 위해서 Cluster Manager에 존재하는 Subscription Factory로부터 Subscription을 요청하는데, 이때 Subscription Factory 내부에서는 Multiplexer에 해당 Callback 정보를 등록하고 생성된 Subscription을 반환합니다.

 


 

4. gRPC Multiplexer

 

지금까지 xDS API 사용을 위해 Subscription Factory로부터 Subscription을 생성받는 과정에 대해서 살펴봤습니다. 이번에는 subscription 생성 이후 상호작용 수행하는 gRPC Multiplexer의 구성요소 및 동작 원리에 대해서 살펴보겠습니다. 

 

 

 

gRPC Multiplexer에는 SotW를 위한 GrpcMuxSotw, Delta XDS를 위한 GrpcMuxDelta 등 여러가지 GrpcMuxImpl을 위한 구현체가 존재합니다. 그리고 해당 구현체는 공통적으로 위 3가지 컴포넌트가 내부에 존재합니다. 해당 컴포넌트가 무엇이고 어떠한 역할을 하는지 살펴보겠습니다.

 

1. watchMap은 외부에서 생성한 Subscription을 기반으로 gRPC 통신 수행결과를 전달받아 Callback 함수를 실행시키기 위한 자료구조로써 Watch 객체를 생성하여 저장합니다. Watch 객체에는 사용자가 지정한 Callback이 매핑되어있습니다.

 

2. gRPC 통신을 위해서는 SubscriptionState가 필요합니다. 해당 객체는 SubscriptionStateFactory로 부터 생성이 가능하며, SubscriptionState에는 WatchMap에 존재하는 Watch 객체를 매핑하여, 향후 gRPC 응답이 반환되었을 때 이를 전달할 수 있도록 기능이 구현되어있습니다.

 

3. grpcStream은 gRPC 통신을 수행하는 주체로써 외부 Management Server와 통신을 담당합니다.

 

gRPC Multiplexer는 여러개가 존재할 수 있는데, 개별 Multiplexer들을 관리하기 위해서 외부에 AllMuxes가 존재합니다. AllMuxes는 생성되는 여러개의 GrpcMuxImpl의 정보를 관리하기 위한 자료구조로써 hash set 형태로 생성되는 모든 GrpcMuxImpl을 관리합니다.

 

지금까지 gRPC Multiplexer와 관련하여 컴포넌트 정보를 확인했습니다. 아직은 gRPC를 통한 동기화 과정을 살펴보지 않았기 때문에 위 컴포넌트가 어떻게 상호작용하며 어떤 역할을 수행하는지 와닿지 않을 수 있습니다. 따라서 실제 사용자가 Subscription을 요청하여 데이터가 처리되는 과정을 살펴보면서 각각의 컴포넌트가 어떻게 연계되는지를 살펴보겠습니다.

 

 

 

 

1. 이전에 설명했듯이 xDS API를 사용하려는 CDS, EDS, LDS 등은 Cluster Manager가 가지고 있는 Subscription Factory로부터 Subscription을 전달받아야합니다. 따라서 먼저 Cluster Manager로부터 Subscription Factory 정보를 얻어옵니다.

 

2. Subscription Factory는 CDS, EDS, LDS 등에 매핑되어있는 api_type(SotW, Delta 등)을 분석하여 적절한 GrpcMuxImpl 구현체를 반환하도록 요청합니다. 

 

3. api_type에 따른 GrpcMuxImpl 구현체를 생성한 이후 Subscription에 해당정보를 매핑하여 반환합니다.

 

4. Subscription을 전달받으면, 이후 통신 수행을 위하 GrpcMuxImpl에게 통신 수행을 요청합니다.

 

5. GrpcMuxImpl은 WatchMap에 Subscription에서 요구하는 type_url 정보에 해당하는 Watch 정보가 존재하는지 확인합니다. 만약 존재하지 않는다면 새로운 Watch를 생성합니다. 이때 Watch에는 Subscription에 존재하는 Callback을 저장합니다.

 

6. WatchMap에 type_url에 해당하는 Watch 정보가 없을 경우 SubscriptionStateFactory로부터 새로운 SubscriptionState를 생성합니다. 이때 생성되는 SubscriptionState에는 5번 단계에서 생성한 Watch 정보를 포함시켜, 향후 gRPC 응답이 왔을 때 해당 Watch에게 응답 수신 행위를 수행할 수 있도록 설정합니다. SubscriptionState 생성이 완료되면, 해당 정보는 GrpcMuxImpl 내부에 존재하는 subscriptions_ 자료구조에 저장됩니다.

 

7. SubscriptionState를 통해 gRPC 통신을 수행을 요청합니다.

 

8. gRPCStream은 xDS의 Management Server(ex: istio)와 연결을 수행하여 DiscoveryRequest를 요청합니다. 이후 Management Server로부터 Discovery Response를 전달하면 이를 수신받습니다.

 

9. gRPCStream은 응답 메시지의 type_url을 기준으로  subscriptions_ 자료구조에서 해당 type_url에 해당하는 Subscription을 찾아 응답 메시지를 전달합니다.

 

10. SubscriptionState는 6번 단계에서 매핑한 Watch를 통해 응답 처리를 위임합니다.

 

11. Watch는 생성당시 저장된 요청자의 Callback을 수행하여 처리를 위임합니다.

 

 

 

지금까지 gRPC Multiplexer에 대해서 살펴봤습니다. 이전 시나리오에서는 1개의 Subscription이 등록되고 gRPC 통신이 수행되는 과정에 대해서 살펴봤습니다. 실제로는 CDS, EDS, LDS 등 여러개의 Subscription이 존재하기 때문에 Subscription Factory에서 생성되는 Subscription은 다수가 될 수 있으며, 그에 따라 gRPC Mux 내부에는 Watch와 SubscriptionState 수가 다수가 될 수 있습니다. 마찬가지로 gRPC를 통해 수행되는 Mux 타입이 다수라면 해당 타입 또한 여러개가 될 수 있습니다.

 


 

5. Cluster 관리

 

지금까지 Cluster Manager의 기능 중 하나인 gRPC Multiplexer와 Subscription 관리에 대해서 살펴봤습니다. 이번에는 Cluster Manager에서 가장 핵심이 되는 Cluster 관리에 대해서 살펴보겠습니다.

 

Cluster Manager는 이름 그대로 Cluster를 관리하는 컴포넌트이기 때문에 가장 중요한 역할은 Cluster의 생성과 삭제 수정등을 수행하는 것 입니다.

 

 

 

Cluster Manager는 기동 당시에 Cluster Manager Factory에 의해서 생성됨을 이전에 설명했습니다. 이때 생성 과정에서 주요하게 살펴볼 부분은 Cluster Manager 생성과 동시에 CDS 설정이 존재한다면, CDS를 생성하고 config.yaml에 존재하는 Static Resource는 Parsing 정보를 토대로 읽어들어 ClusterData를 생성하는 것입니다. 또한 이후 CDS를 통해 ClusterData 정보가 업데이트되면, 해당 정보를 Cluster Manager가 전달받아 업데이트를 수행하는 것 또한 Cluster Manager의 역할입니다.

 

static_resources:
  clusters:
  - name: local_service
    connect_timeout: 0.25s
    type: STRICT_DNS
    lb_policy: ROUND_ROBIN
    load_assignment:
      cluster_name: local_service
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address: { address: webservice, port_value: 90 }         
    health_checks:
      timeout: 3s
      interval: 90s
      unhealthy_threshold: 5
      healthy_threshold: 5
      no_traffic_interval: 240s      
      http_health_check:
        path: "/ping"
        event_log_path: /var/log/envoy/health_check.log
        expected_statuses:
          start: 200
          end: 201
    outlier_detection:
      consecutive_5xx: 2
      base_ejection_time: 30s
      max_ejection_percent: 40
      interval: 20s
      success_rate_minimum_hosts: 5
      success_rate_request_volume: 10

 

해당 과정에 대해서 조금 더 알아보기 위해 예를 통해서 살펴보겠습니다. 가령 위와 같이 static cluster 정보가 존재한다고 가정해봅시다. 여기서 Cluster 정보를 살펴보면, Load Balancing, Health Check, Outlier Detection 정보가 포함되어있는 것을 확인할 수 있습니다. 따라서 이를 통해 유추하자면 Envoy 내부에서 Cluster를 관리할 때 그 안에는 Load Balancing, Health Check  및 Outlier Detection을 수행하는 컴포넌트가 존재할 것이라는 것을 알 수 있습니다.

 

이번에는 Cluster Manager에서 Cluster를 만드는 과정에 대해서 살펴보겠습니다.

 

 

 

먼저 살펴볼 것은 static resource에 존재하는 cluster 정보를 Cluster Manager에서 관리하는 방법입니다. 해당 과정은 Envoy 기동 과정에서 수행되는 작업으로 생성 과정을 간략하게 살펴보면 다음과 같습니다. 먼저 envoy 기동시에 config 파일을 파싱합니다. 이후 Cluster Manager의 Cluster 저장을 위해 ClusterData 구조로 만들어서 이를 active_clusters_ 라고 불리는 map에 Cluster 정보를 저장합니다. 그렇다면 active_clusters_는 어떻게 구성되어있을까요?

 

 

해당 자료구조는 위 그림과 같이 Cluster 이름과 Cluster 정보로 이루어진 Map입니다. Cluster가 생성된 이후에 active_clusters_에 ClusterData를 추가하며, Cluster 삭제 이벤트가 발생하면 이를 삭제하는 등 해당 자료구조를 통하여 Cluster Manager가 관리하고 있는 Cluster의 종류와 갯수 현행화를 수행합니다.

 

이때 Map의 Value로 저장되는 것은 ClusterDataPtr으로써 파싱된 결과를 ClusterData 데이터 구조로 만들고 이에 대한 포인터 값을 저장하는데, ClusterData가 보유한 내부 속성 중 중요한 속성 설명 및 생성 과정에 대해 조금 더 자세히 알아보겠습니다.

 

 

 

 

 

먼저 Parsing된 Cluster 정보를 토대로 ClusterData 인스턴스 생성을 위해 ProdClusterManagerFactory에게 처리를 위임합니다. 해당 Factory 내부에서는 다시 ClusterFactoryImplBase에게 Cluster 생성 주체를 위임합니다.

 

 

cluster_factory_impl.cc

std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>
ClusterFactoryImplBase::create(Server::Configuration::ServerFactoryContext& server_context,
                               const envoy::config::cluster::v3::Cluster& cluster,
                               ClusterFactoryContext& context) {
  auto stats_scope = generateStatsScope(cluster, context.stats());
  std::unique_ptr<Server::Configuration::TransportSocketFactoryContextImpl>
      transport_factory_context =
          std::make_unique<Server::Configuration::TransportSocketFactoryContextImpl>(
              server_context, context.sslContextManager(), *stats_scope, context.clusterManager(),
              context.stats(), context.messageValidationVisitor());

  std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr> new_cluster_pair =
      createClusterImpl(server_context, cluster, context, *transport_factory_context,
                        std::move(stats_scope));

  if (!cluster.health_checks().empty()) {
    if (cluster.health_checks().size() != 1) {
      throw EnvoyException("Multiple health checks not supported");
    } else {
      new_cluster_pair.first->setHealthChecker(HealthCheckerFactory::create(
          cluster.health_checks()[0], *new_cluster_pair.first, context.runtime(),
          context.mainThreadDispatcher(), context.logManager(), context.messageValidationVisitor(),
          context.api()));
    }
  }

  new_cluster_pair.first->setOutlierDetector(Outlier::DetectorImplFactory::createForCluster(
      *new_cluster_pair.first, cluster, context.mainThreadDispatcher(), context.runtime(),
      context.outlierEventLogger(), context.api().randomGenerator()));

  new_cluster_pair.first->setTransportFactoryContext(std::move(transport_factory_context));
  return new_cluster_pair;
}

 

ClusterFactoryImplBase의 처리 과정을 살펴보기 위해 코드레벨로 확인해보겠습니다. ProdClusterManagerFactory로 부터 Cluster 생성 처리를 위임받은 ClusterFactoryImpl의 create 메소드내용을 살펴보면, Socket 관리, Cluster, HeathChecker, OutlierDetector를 생성하기 위해 각각의 Factory에게 처리를 위임하고 생성 결과를 전달받아 Cluster에 할당하는 것을 볼 수 있습니다. 참고로 여기서 생성되는 HealthCheker 및 OutlierDetector에 대한 자세한 설명은 조금 뒤에 다시 살펴보겠습니다.

 

위 코드에서 실제 Cluster 생성은 createClusterImpl 메소드내에서 이루어지며, 해당 메소드 내에서는 Proto 파일 정의에 따른 인스턴스 생성과 실제 Parsing 내용을 결합한 Cluster 인스턴스를 전달받습니다.

 

 

 

 

 

Cluster 생성이 완료된 이후에는 Cluster 인스턴스와 내부에 Parsing된 정보를 토대로 생성된 HealthChecker, OutlierDetector, LoadBalancer가 존재할 것입니다. 이후에는 다음과 과정을 추가로 거칩니다.

 

 

1. healthChecker가 등록되어있다면, hostCheckCompleteCallback 함수를 등록시킵니다. 해당 함수는 host가 Health Check가 실패했을 때, 해당 정보를 Cluster Manager에게 전달하고, 결과적으로는 해당 Cluster가 Connection Pool에서 해제하도록 후속 작업을 수행하기 위한 용도로 사용됩니다.

 

2. outlierDector가 등록되어있다면, changeStateCallback 함수를 등록시킵니다. 해당 함수는 만약 host가 Outlier로 판정되고, ejection 상황이라면, 해당 정보를 Cluster Manager에게 전달하고, 결과적으로는 해당 Cluster가 Connection Pool에서 해제하도록 후속 작업을 수행하기 위한 용도로 사용됩니다.

 

3. Callback이 모두 등록이 완료되면, Cluster에 설정된 Load Balancer를 생성하기 위한 메타데이터를 등록합니다. 이때 등록되는 Load Balancer 타입은 lb_policy에 명시된 종류에 따라 다르게 생성되며, Round Robin, Least Request, Hash Ring과 같은 타입들을 지원합니다.

 

4. 생성된 Cluster 정보를 Cluster Manager가 관리하는 active_clusters_ 에 추가합니다.

 

위 네가지 과정을 거치게되면 ClusterData 설정은 마무리됩니다. 지금까지 static resource로 등록된 Cluster 처리 과정에 대해서 살펴봤습니다. 이번에는 CDS를 통해 dynamic으로 등록되는 resource 처리 과정에 대해서 살펴보겠습니다.

 

 

 

 

Dynamic 설정의 경우에는 CDS를 통해 처리됨을 설명했습니다. 따라서 CDS 처리를 위해서는 Cluster Manager에서 관리하는 gRPC Multiplexer(ads_mux)와 Subscription Factory와의 상호작용이 필요합니다. 이를 토대로 CDS를 통한 동기화 과정을 살펴보면 다음과 같습니다.

 

1. CDS 등록을 위해 가장 먼저 Cluster Manager에 존재하는 Subscription Factory로부터 Subscription을 요청합니다.

 

2. Subscription Factory는 Subscription을 반환하기 CDS가 전달받기 희망하는 Resource 타입에 대한 데이터 요청 및 Callback 처리를 위해 Multiplexer에 이를 등록합니다.

 

3. CDS가 요청하는 Resource 타입 등록 및 해당 Subscription을 CDS에 반환합니다.

 

4. 외부에서 Cluster 변화(생성/삭제/수정)가 감지되어 Envoy에 통지되면 해당 내용이 Multiplexer를 지나 Callback을 통해 CDS로 전달될 것입니다. 이때 CDS 내부에서는 이를 처리하기 위해 CdsApiHelper에게 처리를 위임합니다.

 

5. CdsApiHelper에서는 해당 내용을 정제합니다. Cluster Manager에 위치한 active_clusters_에 해당 내용을 반영합니다.

만약 기존 Cluster의 업데이트라면 해당 내용을 수정하고, 신규 추가라면 Cluster 정보를 active_clusters_에 추가할 것입니다.

 

 

 

5-1. Cluster Entry

 

 

지금까지 Static 방식과 Dynamic 방식을 통해서 Cluster Manager가 Cluster를 관리하기 위해 사용되는 active_clusters_에 Cluster 정보를 현행화하는 방식에 대해서 살펴봤습니다. 하지만 여기서 active_clusters_에서 값으로 관리되는 ClusterData는 Cluster의 역할 중 핵심 기능을 포함하고 있지 않습니다. 

 

그렇다면 Cluster가 제공하는 핵심 기능은 무엇일까요?

 

 

Cluster 의 가장 중요한 기능 중 하나는 Client가 Cluster 내에 존재하는 여러 B/E host와의 연결을 시도할 때, 중간에서 Client의 연결과 B/E의 연결을 이어주는 Connection Pool 인터페이스 기능을 제공한다는 점입니다.

 

해당 기능은  active_clusters_가 관리하는 ClusterData에서 이를 제공하지 않습니다. 따라서 Cluster Manager에서는 active_clusters_ 를 Cluster 변경이 생겼을 때(CDS) 관리하는 메타데이터 용도로써 내부적으로 관리하고 외부와의 Cluster 연결 등에는 별도 자료구조(thread_local_clusters_)와 ClusterData를 확장한 ClusterEntry를 통해서 Connection Pool 관리 기능과 Load Balancer 기능을 제공합니다.

 

따라서 이번에는 Static, Dynamic 방식을 통해서 active_clusters_에 등록된 이후 진행되는 후속 과정과 이를 통해서 생성되는 ClusterEntry 및 ClusterEntry를 관리하는 thread_local_clusters_ 에 대해서 살펴보겠습니다.

 

 

 

 

Cluster Manager를 구동하는 과정에서 수행하는 작업 중 하나는 Main 쓰레드에 존재하는 Dispatcher를 통해 TLS를 할당받는 것입니다. 이때 생성하는 Slot은 ThreadLocalClustManagerImpl 로써, 외부에 존재하는 모듈이나 내부에서 Cluster 데이터 동기화를 수행할 때 해당 Slot에 존재하는 인스턴스를 활용합니다.

 

그리고 Cluster Manager 내부에는 thread_local_clusters_라고 불리는 Map이 존재하는데, 해당 Map은 Key로써는 Cluster의 이름을 Value는 ClusterData를 기반으로 만들어진 ClusterEntry를 가지고 있습니다. 즉 이전에 설명했듯이 Connection Pool 기능과 실질적인 LoadBalancer 인스턴스를 지니고 있는 ClusterEntry를 해당 자료구조가 가지고 있으며, 외부 모듈에서는 thread_local_clusters_ 접근을 통해 Cluster Manager가 보유하고 있는 Cluster 정보에 대해 참조가 가능합니다.

 

그렇다면 Static, Dynamic 등록을 통해서 active_clusters_에 생성된 Cluster 정보를 기반으로 어떤 시점에 thread_local_clusters_를 만들어낼까요? ClusterEntry 생성과 thread_local_clusters_ 삽입 과정을 통해서 이해해보겠습니다. 먼저 Static Cluster 등록부터 살펴보겠습니다.

 

 

Static Resource가 모두 등록되면 해당 ClusterData는 모두 active_clusters_에 저장되어있을 것입니다. 이후 Cluster Manager 설정이 끝나게되면, active_clusters_를 순회하면서 초기화 작업을 진행합니다.

 

이때 수행하는 작업은 크게 4단계로 이루어져있습니다.

 

1. 개별 ClusterData를 순회하면서 Cluster에 멤버가 추가되거나 우선순위가 변경되었을 때 후속 작업을 수행하기 위한 Callback을 등록합니다.

 

2. 등록되어있는 Callback의 역할은 Cluster Manager가 여러개일 경우 개별 Cluster Manager에서 보유중인 local cluster를 업데이트 하는 것입니다. 따라서 Cluster Manager 동기화를 위해 TLS Slot에 저장된 ThreadLocalClusterManagerImpl을 참조하여 후속 작업 처리를 요청합니다.

 

3. Callback 등록이 완료되면, thread_local_clusters_에 Cluster를 생성하기 위해 TLS Slot에 저장된 ThreadLocalClusterManagerImpl를 참조하여 모든 Cluster Manager에게 Cluster 생성에 대한 내용을 통지하고  이를 전달받은 Cluster Manager에서는 ClusterEntry를 생성하여 자신의 thread_local_clusters_에 저장합니다.

 

 

 

ClusterEntry는 ClusterData에 저장된 info 정보를 포함하고 있으며, 내부에는 클러스터 관리를 위한 여러 속성이 존재합니다. 그 중 몇개만 소개하자면, 먼저 Connection Pool에 접근할 수 있는 인터페이스 기능 제공을 포함합니다. 그리고 등록된 B/E의 host 및 우선순위를 관리하기 위한 PrioritySetIml이 있습니다. LoadBalancer는 해당 Cluster에 대해 외부에서 사용자 접속 요청이 들어왔을 때 실질적으로 LoadBalancer를 수행하기 위한 인스턴스가 매핑되며, AsyncClient를 통해서 Upstream과 연결을 수행합니다.

 

4. 만약 Cluster의 변경에 대하여 외부에서 통지를 받기위해 Callback을 등록했었다면, Cluster 변경에 대한 통지를 외부에게 알립니다.

 

위와 같은 과정을 거쳐 Static Resource로 등록한 자원들이 active_clusters_에 최초 저장이되고 이를 토대로 다시 전체 Cluster Manager에서 Cluster Entry를 만들고 이를 자신의 thread_local_clusters_에 저장함으로써 Cluster 관리가 이루어집니다. 또한 이후에는 외부에서 Cluster 접근을 요청할 때 thread_local_clusters_를 통해서 해당 Cluster 정보 참조가 가능해집니다.

 

 

 

 

Dynamic Resource 등록의 경우 이전에 active_clusters_ 에 추가함을 설명했는데, 이 과정에서 Cluster 초기화를 후속 진행합니다. 이때 ClusterEntry가 생성되고 해당 데이터가 thread_local_clusters_에 추가되면서 Cluster Manager에서 Cluster 동기화가 이루어집니다.

 

 

5-2. Connection Pool 관리

 

이번에는 Cluster Entry 내부에 있는 ConnectionPool 인스턴스를 통해서 Cluster Manager가 어떻게 Connection Pool을 관리하는지에 대해서 살펴보고자 합니다. 여기서 Connection Pool 관리는 Cluster Manager가 하며, 외부에서는 Cluster Entry를 통해 Cluster Manager에서 관리하는 Connection Pool 획득 및 해제등의 요청을 수행할 수 있습니다. 따라서 이 부분에 대해서 자세히 알아보도록 하겠습니다.

 

 

Cluster Manager 내부에는 Connection Pool을 관리하기 위한 map이 여러개 존재합니다. 그 중 host_http_conn_pool_map은 http 요청 관련 Connection Pool을 관리하며, 그 밖에 TCP 기반으로 Connection이 이루어지는 요청의 경우는 host_tcp_conn_pool_map을 통해 Connection Pool을 관리합니다. 본 포스팅에서는 Http 연결 요청에 대한 처리만을 다루고 있으므로 Cluster Manager에서 관리하는 host_http_conn_pool_map에 대해서만 설명하겠습니다.

 

host_http_conn_pool_map은 이름을 통해 알 수 있듯이 Map 자료구조입니다. 이는 HostPtr을 Key로 하며, Value는 Connection을 관리하기 위한 Container로 구성되어있습니다. 그리고 해당 Container 안에는 Connection Pool Map이 별도로 존재합니다.

 

따라서, 외부에서 Connection 연결을 시도하기 위해서는 먼저 host를 기반으로 host_http_conn_pool_map으로 부터 Container를 획득해야하며, 그 안에 존재하는 Connection Map을 통해 Connection 할당 및 해제를 수행할 수 있습니다.

 

그렇다면 Connection 할당 및 해제는 언제 어떻게 이루어질까요?

 

 

Envoy 관련 첫 포스팅 때, Envoy의 내부 흐름은 위와 같이 진행된다고 설명한 적이 있습니다. 여기서 Listener를 통해 Cluster에 접근하고 Load Balancing을 수행하는 과정에서 Cluster Manager를 통해 Endpoint 즉 host가 지정됩니다. 그리고 이 과정에서 Connection Pool로부터 Connection을 할당받아 요청을 전달할 수 있습니다. 정리하자면 Connection Pool로부터 Connection을 할당 받는 시점은 Load Balancing이 완료된 이후입니다.

 

위 그림으로 표시된 영역을 조금 더 자세히 확대하여 구체적으로 내부 컴포넌트가 어떻게 연계되는지 살펴보겠습니다.

 

 

1. Listener를 거쳐 Router에 도달하게되면, Client의 요청이 어떤 Cluster에게 전달되어야하는지 이미 알고 있습니다. 따라서 Router에서는 Cluster Manager에게 자신이 접근하고자 하는 Cluster 정보를 요청합니다.

 

2. Cluster Manager는 자신이 보유하고 있는 thread_local_clusters_ 에서 Router가 요청한 Cluster 정보를 찾습니다.

 

cluster_manager_impl.cc

ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view cluster) {
  ThreadLocalClusterManagerImpl& cluster_manager = *tls_;

  auto entry = cluster_manager.thread_local_clusters_.find(cluster);
  if (entry != cluster_manager.thread_local_clusters_.end()) {
    return entry->second.get();
  } else {
    return nullptr;
  }
}

 

 

참고로 이때 thread_local_clusters_에 저장된 값은 ClusterEntry이지만, ClusterEntry는 ThreadLocalCluster를 상속받기 때문에 리턴 타입은 ThreadLocalCluster로 하여 ThreadLocalCluster에서 제공되는 메소드만 호출 가능합니다.

 

3. Cluster Manager로 부터 ThreadLocalCluster를 찾아서 Router로 반환합니다.

 

4. Router에서는 Cluster내에 존재하는 host를 통해 Connection 연결을 수행해야되기 때문에, 전달받은 ThreadLocalCluster를 통해 Connection Pool 할당을 요청합니다.

 

5. 이를 전달받은 ThreadLocalCluster(ClusterEntry)는 Cluster Manager에게 요청하여 host_http_conn_pool_map 에 할당 받은 Connection Pool이 존재하는지를 확인합니다. 만약 존재한다면 해당 Connection Pool Map에 있는 Container에 접근합니다. 반면 존재하지 않는다면 새로운 Container를 생성하여 해당 Map에 추가합니다.

 

7. Container 내부에는 Cluster 별로 Connection을 관리하는 Pool을 가지고 있습니다. 그리고 여기에서 Pool 존재여부를 최종적으로 확인하고, 해당 Pool을 반환합니다. 해당 과정을 코드로 살펴보면 다음과 같습니다.

 

conn_pool_map_impl.cc

template <typename KEY_TYPE, typename POOL_TYPE>
typename ConnPoolMap<KEY_TYPE, POOL_TYPE>::PoolOptRef
ConnPoolMap<KEY_TYPE, POOL_TYPE>::getPool(const KEY_TYPE& key, const PoolFactory& factory) {
  Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_);

  auto pool_iter = active_pools_.find(key);
  if (pool_iter != active_pools_.end()) {
    return std::ref(*(pool_iter->second));
  }
  ResourceLimit& connPoolResource = host_->cluster().resourceManager(priority_).connectionPools();
  // We need a new pool. Check if we have room.
  if (!connPoolResource.canCreate()) {
    // We're full. Try to free up a pool. If we can't, bail out.
    if (!freeOnePool()) {
      host_->cluster().stats().upstream_cx_pool_overflow_.inc();
      return absl::nullopt;
    }

    ...(중략)...
  }

  auto new_pool = factory();
  connPoolResource.inc();
  for (const auto& cb : cached_callbacks_) {
    new_pool->addIdleCallback(cb);
  }

  auto inserted = active_pools_.emplace(key, std::move(new_pool));
  return std::ref(*inserted.first->second);
}

 

active_pools_가 Container가 보유하고 있는 Pool을 의미합니다. 해당 과정을 살펴보면, 먼저 Container 내부에서 관리하는 active_pools_에서 해당 pool이 존재하는지를 찾습니다. 참고로 여기서 key 값은 protocol을 uint8_t로 캐스팅한 값입니다.

 

그리고 만약 값이 존재하지 않았을 때에는, 해당 Cluster가 Connection Pool을 생성이 가능한지 Resource Limit 설정 값을 살펴봅니다. 그리고 생성이 불가할 경우에는 위 코드와 같이 nullopt를 반환하는 것을 볼 수 있습니다.

 

8. 만약 Resource Limit 설정 값을 확인했을 경우 신규 Pool 생성이 가능한 경우에는 기존에 입력받은 factory 메소드로부터 신규로 Pool을 할당받고자 호출합니다.

 

cluster_manager_impl.cc

Http::ConnectionPool::InstancePtr ProdClusterManagerFactory::allocateConnPool(
    Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority,
    std::vector<Http::Protocol>& protocols,
    const absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>&
        alternate_protocol_options,
    const Network::ConnectionSocket::OptionsSharedPtr& options,
    const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
    TimeSource& source, ClusterConnectivityState& state, Http::PersistentQuicInfoPtr& quic_info) {

  Http::HttpServerPropertiesCacheSharedPtr alternate_protocols_cache;
  if (alternate_protocol_options.has_value()) {
    // If there is configuration for an alternate protocols cache, always create one.
    alternate_protocols_cache = alternate_protocols_cache_manager_->getCache(
        alternate_protocol_options.value(), dispatcher);
  } else if (!alternate_protocol_options.has_value() &&
             (protocols.size() == 2 ||
              (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2)) &&
             Runtime::runtimeFeatureEnabled(
                 "envoy.reloadable_features.allow_concurrency_for_alpn_pool")) {
    // If there is no configuration for an alternate protocols cache, still
    // create one if there's an HTTP/2 upstream (either explicitly, or for mixed
    // HTTP/1.1 and HTTP/2 pools) to track the max concurrent streams across
    // connections.
    envoy::config::core::v3::AlternateProtocolsCacheOptions default_options;
    default_options.set_name(host->cluster().name());
    alternate_protocols_cache =
        alternate_protocols_cache_manager_->getCache(default_options, dispatcher);
  }

  absl::optional<Http::HttpServerPropertiesCache::Origin> origin =
      getOrigin(transport_socket_options, host);
  if (protocols.size() == 3 &&
      context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100)) {
    ASSERT(contains(protocols,
                    {Http::Protocol::Http11, Http::Protocol::Http2, Http::Protocol::Http3}));
    ASSERT(alternate_protocol_options.has_value());
    ASSERT(alternate_protocols_cache);
#ifdef ENVOY_ENABLE_QUIC
    Envoy::Http::ConnectivityGrid::ConnectivityOptions coptions{protocols};
    if (quic_info == nullptr) {
      quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster());
    }
    return std::make_unique<Http::ConnectivityGrid>(
        dispatcher, context_.api().randomGenerator(), host, priority, options,
        transport_socket_options, state, source, alternate_protocols_cache, coptions,
        quic_stat_names_, stats_, *quic_info);
#else
    (void)quic_info;
    // Should be blocked by configuration checking at an earlier point.
    PANIC("unexpected");
#endif
  }
  if (protocols.size() >= 2) {
    if (Runtime::runtimeFeatureEnabled(
            "envoy.reloadable_features.allow_concurrency_for_alpn_pool") &&
        origin.has_value()) {
      ENVOY_BUG(origin.has_value(), "Unable to determine origin for host ");
      envoy::config::core::v3::AlternateProtocolsCacheOptions default_options;
      default_options.set_name(host->cluster().name());
      alternate_protocols_cache =
          alternate_protocols_cache_manager_->getCache(default_options, dispatcher);
    }

    ASSERT(contains(protocols, {Http::Protocol::Http11, Http::Protocol::Http2}));
    return std::make_unique<Http::HttpConnPoolImplMixed>(
        dispatcher, context_.api().randomGenerator(), host, priority, options,
        transport_socket_options, state, origin, alternate_protocols_cache);
  }
  if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http2 &&
      context_.runtime().snapshot().featureEnabled("upstream.use_http2", 100)) {
    return Http::Http2::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
                                         priority, options, transport_socket_options, state, origin,
                                         alternate_protocols_cache);
  }
  if (protocols.size() == 1 && protocols[0] == Http::Protocol::Http3 &&
      context_.runtime().snapshot().featureEnabled("upstream.use_http3", 100)) {
#ifdef ENVOY_ENABLE_QUIC
    if (quic_info == nullptr) {
      quic_info = Quic::createPersistentQuicInfoForCluster(dispatcher, host->cluster());
    }
    return Http::Http3::allocateConnPool(dispatcher, context_.api().randomGenerator(), host,
                                         priority, options, transport_socket_options, state,
                                         quic_stat_names_, {}, stats_, {}, *quic_info);
#else
    UNREFERENCED_PARAMETER(source);
    // Should be blocked by configuration checking at an earlier point.
    PANIC("unexpected");
#endif
  }
  ASSERT(protocols.size() == 1 && protocols[0] == Http::Protocol::Http11);
  return Http::Http1::allocateConnPool(dispatcher, context_.api().randomGenerator(), host, priority,
                                       options, transport_socket_options, state);
}

 

여기서 할당된 factory 메소드는 ClusterManager에서 관리하는 allocateCoonPool이 최종적으로는 호출되며, Cluster Manager에서는 client가 요구하는 프로토콜이 무엇인지 확인한 다음에 해당 요청을 처리할 수 있는 Connection을 생성하여 반환합니다. 그리고 생성된 Pool을 Container가 보유하고 있는 active_pools_에 삽입합니다.

 

9. 생성된 Pool을 ThreadLocalCluster에 반환합니다.

 

10. Pool을 Router에게 반환합니다. 이후 해당 Router에서는 전달받은 Pool을 통해 Downstream과 Upstream과의 연결 작업을 요청할 수 있게됩니다.

 

위와 같이 10단계를 거치게되면, Router는 Connection Pool을 할당받아 요청을 처리할 수 있게되었음을 확인할 수 있습니다. 반면 Connection 해제는 Router로 전달되었던 Connection Pool을 통해 해제를 요청할 수 있습니다.

 


 

6. ADS 관리

 

dynamic_resources:
  lds_config:
    resource_api_version: V3
    ads: {}
  cds_config:
    resource_api_version: V3
    ads: {}
  ads_config:
    transport_api_version: V3
    api_type: GRPC
    grpc_services:
      envoy_grpc:
        cluster_name: envoy-control-xds

 

이전 포스팅에서 xDS API와 함께 ADS (Aggregate Discovery Service)에 대해서 설명했습니다. Cluster Manager의 또 다른 ADS를 생성하고 관리하는 것입니다. 즉 Cluster Manager에서는 xDS API를 위한 Subscription, gRPC Multiplxer 및 ADS 관리를 총체적으로 담당합니다.


 

7. Health Check 동작 원리

 

지금까지 Static Resource 그리고 CDS를 통한 Cluster 추가에 대해서 살펴봤습니다. 이번에는 ClusterData를 생성하는 과정에서 Health Check, Outlier Detector가 어떻게 생성되고 동작하는지 살펴보겠습니다. 먼저 살펴볼 것은 Health Check 입니다.

 

 

이전에 살펴봤듯이 ClusterData를 생성하는 과정에서 Outlier Detector, Health Chekcer, Load Balancer 등이 추가됨을 확인할 수 있습니다. 이때 Health Checker를 생성하는 역할은 HealthCheckerFactory가 담당하며, 이후 수행과정을 살펴보면 다음과 같습니다.

 

 

 

Config를 통해 전달받은 Cluster 내부에는 Http 방식 뿐만 아니라 Tcp, gRPC 혹은 Custom 방식의 Health Check를 지정할 수 있습니다. 따라서 HealthCheckerFactory에서는 사용자가 입력한 Config의 Heath Checker 방식을 확인하고 그에 걸맞는 Health Checker를 생성하도록 지정합니다. 본 포스팅에서는 Http 기반으로 생성했음을 가정하였음으로 ProdHttpHealthCheckerImpl을 생성할 것입니다.

 

 

생성된 Health Checker 인스턴스는 ClusterData의 HealthChecker에 바인딩되어 동작합니다.

 

 

cluster_manager_impl.cc

  if (new_cluster->healthChecker() != nullptr) {
    new_cluster->healthChecker()->addHostCheckCompleteCb(
        [this](HostSharedPtr host, HealthTransition changed_state) {
          if (changed_state == HealthTransition::Changed &&
              host->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)) {
            postThreadLocalHealthFailure(host);
          }
        });
  }

 

Health Checker가 매핑되고 나면 Cluster Manager에서는 Health Checker가 정상적으로 매핑되어있는지를 확인합니다. 그리고 인스턴스가 존재할 경우에는 Health Checker에서 비정상 Health 대상으로 판단한 host에 대해서 ejection 수행을 위한 Callback을 등록합니다.

 

이때 postThreadLocalHealthFailure는 TLS를 통해서 Health가 실패했음을 전달하며, 이를 수신받은 Cluster Manager에서는 해당 host에 연결된 Connection Pool을 해제하는 작업을 수행합니다.

 

 

수행 과정을 살펴보면 위 그림과 같습니다.

 

1. Health Checker에서 주기적으로 Health Check를 수행하다가 이상이 발생하면, 내부 종료 작업을 먼저 거친 뒤에 등록된 Callback을 통해 외부로 전파합니다.

 

2. Cluster Manager에서는 기존에 등록한 Callback 함수를 수행합니다. 이때 내부적으로 ThreadLocalClusterManagerImpl을 참조하여 해당 Host 이상 여부를 전체로 전파합니다.

 

3. Dispatcher를 통해 해당 내역을 전달받은 Cluster Manager에서는 Connection을 해제하기 위하여 Connection Pool에서 해당 host 내부에 존재하는 Connection들을 순차적으로 종료하고 해당 Pool 또한 Close 시킵니다.

 

해당 과정이 정상적으로 완료되면, Health Check에 따른 연결된 Client 및 host 또한 정상적으로 종료되는 것을 이해할 수 있습니다. 그렇다면 Health Check는 어떻게 이루어질까요? Health Check Factory로부터 만들어진 ProdHttpHealthCheckerImpl 인스턴스의 구조와 수행 과정을 살펴보면서 조금 더 자세하게 알아보겠습니다.

 

 

 

1. ProdHttpHealthCheckerImpl 생성자 내부에서 Cluster  데이터변경이 발생했을 때 처리를 위한 Callback을 등록합니다.

 

2. 해당 Callback은 호출될 때 추가되는 host 정보와 삭제되는 host 정보를 같이 전달받는데, 이를 토대로 추가되는 host 정보를 active_sessions_ 에 Session을 만들어 추가합니다. 반대로 삭제되는 host 정보 또한 active_sessions_ 에서 삭제합니다.

 

3. 이때 추가되는 Session에는 Health Check를 정기적으로 수행하기 위해서 Dispatcher에 존재하는 libevent로 부터 Timer를 할당받습니다. 이때 생성되는 timer는 총 2개로 Health Check를 주기적으로 수행하기 위한 interval_timer_와 timeout을 판별하기 위한 timeout_timer_ 를 생성합니다.

 

여기서 Health Check는 Session 내에 생성되는 Timer에 의해 이루어지므로 Session 생성 과정을 보다 자세하게 살펴보겠습니다.

 

 

 

 

onInterval은 HealthCheck를 위해 주기적으로 호출되는 메소드로써, 아래와 같은 4가지 단계를 거칩니다.

 

1. client_가 존재하는지를 살펴보고 client_가 존재하지 않는다면 최초 실행임을 의미하므로 Connection을 생성합니다. 이때 생성된 Connection은 CodecClient를 통해 Wrapping 되는데, CodecClient는 HTTP Codec 타입에 따라서 생성되는 ClientConnectionImpl 인스턴스 타입입니다.

 

2. Stream을 생성합니다.

 

3. Health Check 관련 Header 설정을 진행합니다. 이때 method 및 path 등은 Health Check 설정으로 입력된 값을 따릅니다.

 

4. Upstream 설정 및 Health Check 처리를 위한 encoding 처리를 담당합니다.

 

위 과정이 마무리되면, 주기에 맞추어 Health Check 요청을 HTTP로 전달할 것입니다. 그렇다면 HTTP 요청에 대한 응답은 어떻게 처리될까요?

 

 

HttpActiveHealthCheckSession의 구조를 보면, 위와 같이 여러 Class가 상속되어있음을 확인할 수 있습니다. 그 중 요청 응답 처리는 ResponseDecoder 내부 정의된 I/F에 의해서 호출됩니다. 참고로 해당 과정에서 호출되는 메소드는 decodeHeaders와 decodeData 입니다. 따라서 HttpActiveHealthCheckSession 내부에 정의된 하기 2개의 메소드가 호출됩니다.

 

health_checker_impl.cc

void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::decodeHeaders(
    Http::ResponseHeaderMapPtr&& headers, bool end_stream) {
  ASSERT(!response_headers_);
  response_headers_ = std::move(headers);
  if (end_stream) {
    onResponseComplete();
  }
}

void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::decodeData(Buffer::Instance& data,
                                                                     bool end_stream) {
  if (parent_.response_buffer_size_ != 0) {
    if (!parent_.receive_bytes_.empty() &&
        response_body_->length() < parent_.response_buffer_size_) {
      response_body_->move(data, parent_.response_buffer_size_ - response_body_->length());
    }
  } else {
    if (!parent_.receive_bytes_.empty()) {
      response_body_->move(data, data.length());
    }
  }

  if (end_stream) {
    onResponseComplete();
  }
}

 

위 메소드에서 살펴볼 것은 data가 전달이 완료되지 않았을 경우에는 response_body 데이터를 지속 수신받는 것을 알 수 있고 stream이 종료되었을 경우에는 onResponseComplete() 메소드가 호출됨을 알 수 있습니다. 즉 Health Check를 결정하는 요인은 onResponseComplete() 메소드 내부에서 이루어짐을 짐작할 수 있습니다.

 

 

health_checker_impl.cc

void HttpHealthCheckerImpl::HttpActiveHealthCheckSession::onResponseComplete() {
  request_in_flight_ = false;

  switch (healthCheckResult()) {
  case HealthCheckResult::Succeeded:
    handleSuccess(false);
    break;
  case HealthCheckResult::Degraded:
    handleSuccess(true);
    break;
  case HealthCheckResult::Failed:
    handleFailure(envoy::data::core::v3::ACTIVE, /*retriable=*/false);
    break;
  case HealthCheckResult::Retriable:
    handleFailure(envoy::data::core::v3::ACTIVE, /*retriable=*/true);
    break;
  }

  if (shouldClose()) {
    client_->close();
  }

  response_headers_.reset();
  response_body_->drain(response_body_->length());
}

 

onResponseComplete() 메소드 내부를 살펴보면, healthCheck에 대한 결과를 기점으로 Switch로 분기하여 각기 다른 결과를 호출하는 것을 볼 수 있습니다. 만약 Failure가 발생한다면, 해당 timer를 disabled 시키고 종료처리할 것이며, shouldClose() 호출로 인해 client 연결 또한 종료됩니다.

 

만약 정상적으로 처리되었다면, 지속적으로 Health Check를 주기적으로 반복하여 정상여부 확인을 반복합니다.

 


 

8. Outlier Detection 동작 원리

 

 

 

지금까지 Health Checker의 생성 및 동작과정에 대해서 살펴봤다면, 이번에는 Outlier Detector가 생성되는 과정과 동작 방법에 대해 살펴보겠습니다.

 

 

Outlier Detector는 ClusterData 생성 당시에 DetectorImplFactory에 의해서 생성됩니다. 이때 생성되는 Outlier Detector의 인스턴스는 DetectorImpl이며 해당 인스턴스가 ClusterData의 OutlierDetector에 바인딩되어 동작합니다. 

 

해당 내용을 코드를 통해서 살펴보겠습니다.

 

outlier_detection_impl.cc

DetectorSharedPtr DetectorImplFactory::createForCluster(
    Cluster& cluster, const envoy::config::cluster::v3::Cluster& cluster_config,
    Event::Dispatcher& dispatcher, Runtime::Loader& runtime, EventLoggerSharedPtr event_logger,
    Random::RandomGenerator& random) {
  if (cluster_config.has_outlier_detection()) {

    return DetectorImpl::create(cluster, cluster_config.outlier_detection(), dispatcher, runtime,
                                dispatcher.timeSource(), std::move(event_logger), random);
  } else {
    return nullptr;
  }
}

 

다만 위 코드와 같이 모든 ClusterData에서 OutlierDetector를 생성하는 것은 아니며, Static 혹은 Dynamic Resource 내에 outlier_detection 설정이 활성화되어있는 경우에만 생성됩니다. 만약 해당 설정이 비활성화 되어있다면, OutlierDetector는 nullptr이 매핑됩니다.

 

 

cluster_manager_impl.cc

  if (new_cluster->outlierDetector() != nullptr) {
    new_cluster->outlierDetector()->addChangedStateCb([this](HostSharedPtr host) {
      if (host->healthFlagGet(Host::HealthFlag::FAILED_OUTLIER_CHECK)) {
        ENVOY_LOG_EVENT(debug, "outlier_detection_ejection",
                        "host {} in cluster {} was ejected by the outlier detector",
                        host->address()->asStringView(), host->cluster().name());
        postThreadLocalHealthFailure(host);
      }
    });
  }

 

OutlierDetector가 매핑되고 나면 Cluster Manager에서는 Outlier Detector가 정상적으로 매핑되어있는지를 확인합니다. 그리고 인스턴스가 존재할 경우에는 Outlier Detector에서 Ejection 대상으로 판단한 host에 대해서 ejection 수행을 위한 Callback을 등록합니다.

 

이때 postThreadLocalHealthFailure는 TLS를 통해서 Health가 실패했음을 전달하며, 이를 수신받은 Cluster Manager에서는 해당 host에 연결된 Connection Pool을 해제하는 작업을 수행합니다. 해당 작업은 Health Check를 수행하면서 실패했을 때 Connection 해제하는 과정과 완벽하게 동일합니다.

 

지금까지 Outlier Detector 설정 관련해서 살펴봤습니다. 이번에는 실제 Outlier Detector가 수행하는 기능에 대해서 조금 더 자세하게 살펴보기 위해 Outlier로 등록되는 DetectorImpl 구조 및 동작 원리에 대해서 살펴보겠습니다.

 

 

 

이를 위해 DetectorImpl을 생성 과정에 대해서 상세하게 짚어보겠습니다.

 

1. DetectorImpl 내부에서도 정기적으로 Cluster 내부에 존재하는 host들의 상태를 살펴보고 Outlier Detection을 지정한 임계치를 넘어가는 host를 점검하기 위해 Dispatcher로 부터 Timer를 할당받아 interval_timer_에 지정하는 작업을 합니다.

 

2. 그 다음은 외부에서 Cluster 내부 host가 추가되거나 삭제되는 등의 작업이 발생했을 때 Outlier Detector에서도 이를 감지하고 설정을 변경해야되기 때문에 ClusterEntry 내부에 존재하는 PriortySet에 Callback을 추가합니다. 해당 Callback 등록을 통해 실질적으로 Cluster 내부 host 변경이 발생했을 경우 이를 Detector가 통지받아 후속 작업을 수행할 수 있습니다. Callback을 전달받으면, 내부에서는 hosts를 관리하는 host_monitors_ Map 자료구조에 해당 hosts의 변경사항을 기록합니다. 이때 host 별로 DetectorHostMonitorImpl 인스턴스를 생성하여 Outlier Detection을 위한 기본 속성을 저장하는데, 이는 잠시 후 다시 살펴보겠습니다.

 

3. DetectorImpl 생성이 완료되면, ClusterData OutlierDetector에 매핑합니다.

 

4. ClusterData를 active_clusters_에 추가합니다.

 

5. ClusterManager에서 해당 OutlierDetector에게 Callback을 등록합니다. 이는 Outlier Detection 판정이 된 host에 대해서 Client 연결 종료 등의 후속작업을 처리하기 위함입니다.

 

위와 같은 과정을 거치고나면, Outlier Detector는 본격적으로 본인의 역할을 수행할 수 있습니다. 이번에는 초기화가 완료된 이후 후속 작업 진행 절차에 대해서 살펴보겠습니다.

 

outlier_detection_impl.cc

void DetectorImpl::armIntervalTimer() {
  interval_timer_->enableTimer(std::chrono::milliseconds(
      runtime_.snapshot().getInteger(IntervalMsRuntime, config_.intervalMs())));
}

 

먼저 초기화 작업이 끝나고나서 가장 먼저 수행하는 작업은 timer를 활성화 시키는 것입니다. 이는 위 메소드를 호출하여 실행되며, interval_timer_의 수행 주기는 위와 같이 config에 지정된 값을 활용하는 것을 볼 수 있습니다. 그렇다면, interval_timer_를 활성화 시켰을 때 어떤 작업을 수행할까요?

 

outlier_detection_impl.cc

DetectorImpl::DetectorImpl(const Cluster& cluster,
                           const envoy::config::cluster::v3::OutlierDetection& config,
                           Event::Dispatcher& dispatcher, Runtime::Loader& runtime,
                           TimeSource& time_source, EventLoggerSharedPtr event_logger,
                           Random::RandomGenerator& random)
    : config_(config), dispatcher_(dispatcher), runtime_(runtime), time_source_(time_source),
      stats_(generateStats(cluster.info()->statsScope())),
      interval_timer_(dispatcher.createTimer([this]() -> void { onIntervalTimer(); })),
      event_logger_(event_logger), random_generator_(random) {
  // Insert success rate initial numbers for each type of SR detector
  external_origin_sr_num_ = {-1, -1};
  local_origin_sr_num_ = {-1, -1};
}

 

이는 위와 같이 해당 DetectorImpl의 생성자 코드를 살펴보면 힌트를 얻을 수 있습니다. 내용을 살펴보면 dispatcher로부터 timer를 생성받고나서 해당 timer가 호출해야되는 callback이 지정된 것을 확인할 수 있습니다. 이는  위와 같이 onIntervalTimer() 메소드가 지정되어있습니다. 따라서 주기적으로 onIntervalTimer() 메소드가 호출됨으로써 Outlier Detection 작업을 수행함을 알 수 있습니다.

 

outlier_detection_impl.cc

void DetectorImpl::onIntervalTimer() {
  MonotonicTime now = time_source_.monotonicTime();

  for (auto host : host_monitors_) {
    checkHostForUneject(host.first, host.second, now);

    // Need to update the writer bucket to keep the data valid.
    host.second->updateCurrentSuccessRateBucket();
    // Refresh host success rate stat for the /clusters endpoint. If there is a new valid value, it
    // will get updated in processSuccessRateEjections().
    host.second->successRate(DetectorHostMonitor::SuccessRateMonitorType::LocalOrigin, -1);
    host.second->successRate(DetectorHostMonitor::SuccessRateMonitorType::ExternalOrigin, -1);
  }

  processSuccessRateEjections(DetectorHostMonitor::SuccessRateMonitorType::ExternalOrigin);
  processSuccessRateEjections(DetectorHostMonitor::SuccessRateMonitorType::LocalOrigin);

  armIntervalTimer();
}

 

그렇다면, 해당 메소드는 어떻게 구현되어있을까요?

위 코드와 같이 host_monitors_에 매핑된 전체 host들에 대해서 순회하면서 ejection 여부를 판단합니다. 만약 ejection이 결정되었다면, 해당 host_monitors_에 저장된 Monitor 정보를 초기화하고 ejection을 수행합니다. 또한 외부로부터 등록된 callback을 호출시켜, 후속 작업을 진행할 수 있도록 합니다.

 

 

 

참고로 host_monitors_에 매핑된 인스턴스는 DetectorHostMonitorImpl 인스턴스이며, 해당 인스턴스 내부에는 Outlier Detection을 판별하기 위한 여러가지 내부 속성 프로퍼티 값들이 존재합니다. 해당 프로퍼티 값은 Client가 Cluster Manager로부터 Connection Pool을 받아 Upstream host에 Http 요청을 완료한 이후에 전달받은 응답값을 토대로 Router가 갱신을 수행합니다.

 

따라서 Outlier Detector 내부에서 onIntervalTimer()가 수행될 때 해당 Monitor 인스턴스 내부 속성 값을 기준으로 ejection 여부를 판별할 수 있습니다.

 

 

outlier_detection_impl.cc

void DetectorImpl::armIntervalTimer() {
  interval_timer_->enableTimer(std::chrono::milliseconds(
      runtime_.snapshot().getInteger(IntervalMsRuntime, config_.intervalMs())));
}

모든 host_monitors_ 순회가 모두 끝나고 나면 다시 timer를 활성화 시켜 다음 outlier detection 처리를 반복 수행함으로써 지속적인 Outlier Detection 확인이 가능합니다.

 


 

9. 마치며

 

이번 포스팅에서는 Cluster Manager의 기능에 대해 일부 살펴봤습니다. Cluster Manager의 가장 중요한 것은 Cluster 관리입니다. 다만 그 외에도 ads 관리 및 subscription factory 등을 담당하는 것을 살펴봤습니다.

 

다음 포스팅에서는 Cluster에 지정되는 Load Balancer 종류에 대해 개념적으로 살펴보도록 하겠습니다. 

+ Recent posts