1. 서론

 
이번 포스팅은 지난 포스팅에 이어서 사용자가 Connection 연결을 등록한 이후에 실제 HTTP 요청을 전달했을 때 Upstream까지 어떻게 트래픽이 전달되는지에 대해서 살펴보고자 합니다.
 
그 과정에서 너무 지엽적인 부분은 다이어그램으로 표현하고 필요한 부분은 envoy 코드를 같이 보면서, 내부 동작 과정에 대한 이해를 집중적으로 해보겠습니다.
 
이번 포스팅은 특히 HttpConnectionManager와 깊은 연관이 있으므로 먼저 해당 내용을 소개한 이전 블로그 내용을 학습 후 보시는 것을 추천드립니다.

(아직 포스팅 미완성상태인데 공개합니다ㅜ 하단에 코드만 첨부된 내용은 추후 보강하겠습니다)
 


2. Client HTTP 요청 과정

 
Client Connection 연결은 설정되었기 때문에 libevent로 Client가 HTTP 접속을 요청하면, 내부적으로 어디로 이벤트를 전달해야하는지 이미 알 수 있습니다. 이전 포스팅 내용에서 ServerConnectionImpl이 생성되면 libevent에 onFileEvent를 Callback 메소드로 등록한다고 설명했습니다. 따라서 이번 포스팅에서는 해당 지점부터 살펴보겠습니다.
 
connection_impl.cc

void ConnectionImpl::onFileEvent(uint32_t events) {
  ScopeTrackerScopeState scope(this, this->dispatcher_);
  ENVOY_CONN_LOG(trace, "socket event: {}", *this, events);

  if (immediate_error_event_ == ConnectionEvent::LocalClose ||
      immediate_error_event_ == ConnectionEvent::RemoteClose) {
    if (bind_error_) {
      ENVOY_CONN_LOG(debug, "raising bind error", *this);
      // Update stats here, rather than on bind failure, to give the caller a chance to
      // setConnectionStats.
      if (connection_stats_ && connection_stats_->bind_errors_) {
        connection_stats_->bind_errors_->inc();
      }
    } else {
      ENVOY_CONN_LOG(debug, "raising immediate error", *this);
    }
    closeSocket(immediate_error_event_);
    return;
  }

  if (events & Event::FileReadyType::Closed) {
    // We never ask for both early close and read at the same time. If we are reading, we want to
    // consume all available data.
    ASSERT(!(events & Event::FileReadyType::Read));
    ENVOY_CONN_LOG(debug, "remote early close", *this);
    closeSocket(ConnectionEvent::RemoteClose);
    return;
  }

  if (events & Event::FileReadyType::Write) {
    onWriteReady();
  }

  // It's possible for a write event callback to close the socket (which will cause fd_ to be -1).
  // In this case ignore read event processing.
  if (ioHandle().isOpen() && (events & Event::FileReadyType::Read)) {
    onReadReady();
  }
}

 
 
소켓에 Write/Read 이벤트가 발생하면, libevent에 의해서 onFileEvent를 호출됩니다. 이때 코드 내용을 살펴보면, Socket이 종료되었을 경우에는 연결을 해제하도록 작업을 수행하며, 그렇지 않은 경우에는 Socket 이벤트 타입이 Write인지 혹은 Read인지에 따라서 각기 다른 메소드를 호출하는 것을 확인할 수 있습니다.
 

 
위 경우에는 HTTP 요청으로 인해 Socket에 데이터가 작성된 것이기 때문에 데이터를 읽기 위해서 onReadReady() 메소드가 호출됨을 가정하여 진행하겠습니다.
 
connection_impl.cc

void ConnectionImpl::onReadReady() {
  ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this,
                 static_cast<int>(dispatch_buffered_data_));
  const bool latched_dispatch_buffered_data = dispatch_buffered_data_;
  dispatch_buffered_data_ = false;

  ASSERT(!connecting_);

  // We get here while read disabled in two ways.
  // 1) There was a call to setTransportSocketIsReadable(), for example if a raw buffer socket ceded
  //    due to shouldDrainReadBuffer(). In this case we defer the event until the socket is read
  //    enabled.
  // 2) The consumer of connection data called readDisable(true), and instead of reading from the
  //    socket we simply need to dispatch already read data.
  if (read_disable_count_ != 0) {
    // Do not clear transport_wants_read_ when returning early; the early return skips the transport
    // socket doRead call.
    if (latched_dispatch_buffered_data && filterChainWantsData()) {
      onRead(read_buffer_->length());
    }
    return;
  }

  // Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
  // the transport socket read resumption happens as requested; onReadReady() returns early without
  // reading from the transport if the read buffer is above high watermark at the start of the
  // method.
  transport_wants_read_ = false;
  IoResult result = transport_socket_->doRead(*read_buffer_);
  uint64_t new_buffer_size = read_buffer_->length();
  updateReadBufferStats(result.bytes_processed_, new_buffer_size);

  // If this connection doesn't have half-close semantics, translate end_stream into
  // a connection close.
  if ((!enable_half_close_ && result.end_stream_read_)) {
    result.end_stream_read_ = false;
    result.action_ = PostIoAction::Close;
  }

  read_end_stream_ |= result.end_stream_read_;
  if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
      (latched_dispatch_buffered_data && read_buffer_->length() > 0)) {
    // Skip onRead if no bytes were processed unless we explicitly want to force onRead for
    // buffered data. For instance, skip onRead if the connection was closed without producing
    // more data.
    onRead(new_buffer_size);
  }

  // The read callback may have already closed the connection.
  if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
    ENVOY_CONN_LOG(debug, "remote close", *this);
    closeSocket(ConnectionEvent::RemoteClose);
  }
}

 
onReadReady() 메소드 내용은 위와 같습니다. 내용을 살펴보면, buffer에 읽을 데이터가 존재하면 해당 길이만큼 read_buffer에서 읽도록 onRead 메소드를 호출하는 것을 볼 수 있습니다.
 
connection_impl.cc

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
  ASSERT(dispatcher_.isThreadSafe());
  if (inDelayedClose() || !filterChainWantsData()) {
    return;
  }
  ASSERT(ioHandle().isOpen());

  if (read_buffer_size == 0 && !read_end_stream_) {
    return;
  }

  if (read_end_stream_) {
    // read() on a raw socket will repeatedly return 0 (EOF) once EOF has
    // occurred, so filter out the repeats so that filters don't have
    // to handle repeats.
    //
    // I don't know of any cases where this actually happens (we should stop
    // reading the socket after EOF), but this check guards against any bugs
    // in ConnectionImpl or strangeness in the OS events (epoll, kqueue, etc)
    // and maintains the guarantee for filters.
    if (read_end_stream_raised_) {
      // No further data can be delivered after end_stream
      ASSERT(read_buffer_size == 0);
      return;
    }
    read_end_stream_raised_ = true;
  }

  filter_manager_.onRead();
}

 

그리고 onRead에서는 filter_manager에 등록된 filterChains에서 데이터를 읽도록 위임합니다.
 
filter_manager_impl.cc

void FilterManagerImpl::onRead() {
  ASSERT(!upstream_filters_.empty());
  onContinueReading(nullptr, connection_);
}

 
filter_manager에서는 내부에 존재하는 upstream_filters_를 순차적으로 실행 시켜서 Filter Chains를 수행시킵니다. 이를 위해 onContinueReading 메소드를 호출합니다.
 
filter_manager_impl.cc

void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter,
                                          ReadBufferSource& buffer_source) {
  // Filter could return status == FilterStatus::StopIteration immediately, close the connection and
  // use callback to call this function.
  if (connection_.state() != Connection::State::Open) {
    return;
  }

  std::list<ActiveReadFilterPtr>::iterator entry;
  if (!filter) {
    connection_.streamInfo().addBytesReceived(buffer_source.getReadBuffer().buffer.length());
    entry = upstream_filters_.begin();
  } else {
    entry = std::next(filter->entry());
  }

  for (; entry != upstream_filters_.end(); entry++) {
    if (!(*entry)->filter_) {
      continue;
    }
    if (!(*entry)->initialized_) {
      (*entry)->initialized_ = true;
      FilterStatus status = (*entry)->filter_->onNewConnection();
      if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
        return;
      }
    }

    StreamBuffer read_buffer = buffer_source.getReadBuffer();
    if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
      FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
      if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
        return;
      }
    }
  }
}

 
 
onContinueReading 메소드는 먼저 read buffer에 입력된 값 만큼을 fetch하는 과정을 수행합니다. 이후 자신이 보유하고 있는 upstream_filters를 순회하면서 사용자 요청 처리를 각각의 필터에 위임합니다.
 
참고로 해당 메소드는 이전 포스팅에서 Read Filter 등록이 모두 완료된 이후 initializeReadFilters() 실행 과정에서도 호출된 적이 있습니다. 그 당시에는 초기화를 목적으로 onNewConnection()이 수행되었는데, 지금은 초기화가 모두완료된 이후이기 때문에 Filter를 순회하면서 onData를 호출하는 것이 이전과는 다른 차이점입니다.
 
 

 
본 포스팅에서는 HTTP 요청과 관련하여 처리됨을 가정하였으므로, filterChains에 등록된 filter 중HttpConnectionManager 필터의 onData가 호출될 것입니다.
 


 

3. HttpConnectionManager

 
 

 
HttpConnectionManager의 onData 메소드가 호출되면, 가장 먼저 수행하는 일은 codec을 생성하는 작업입니다. 해당 내용은 HttpConnectionManager 관련 포스팅에서도 다루었던 내용이며, 본 포스팅에서는 Http 1.1 요청임을 가정하므로 Http1::ServerConnectionImpl 인스턴스가 반환될 것입니다.
 
해당 과정을 코드와 함께 살펴보면 다음과 같습니다.
 
1. ServerConnectionImpl에서 filter_manager에게 upstream_filters를 호출하라고 지정하면, 내부에 등록된 HttpConnectionManager의 onData가 호출됩니다.
 
connection_manager_impl.cc

Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
  if (!codec_) {
    // Http3 codec should have been instantiated by now.
    createCodec(data);
  }

  ...(후략)...
}

 
2. HttpConnectionManager에서는 먼저 codec_ 생성여부를 판별한 다음 codec_이 만들어진 적이 없으면 이를 생성합니다. 
 

void ConnectionManagerImpl::createCodec(Buffer::Instance& data) {
  ASSERT(!codec_);
  codec_ = config_.createCodec(read_callbacks_->connection(), data, *this, overload_manager_);

  ...(중략)...
}

 
이때 내부적으로는 config에 지정된 createCodec 메소드를 다시 재 호출함으로써 codec 할당이 이루어집니다,
 
config.cc

Http::ServerConnectionPtr HttpConnectionManagerConfig::createCodec(
    Network::Connection& connection, const Buffer::Instance& data,
    Http::ServerConnectionCallbacks& callbacks, Server::OverloadManager& overload_manager) {
  switch (codec_type_) {
  case CodecType::HTTP1:
    return std::make_unique<Http::Http1::ServerConnectionImpl>(
        connection, Http::Http1::CodecStats::atomicGet(http1_codec_stats_, context_.scope()),
        callbacks, http1_settings_, maxRequestHeadersKb(), maxRequestHeadersCount(),
        headersWithUnderscoresAction(), overload_manager);
  case CodecType::HTTP2:
    return std::make_unique<Http::Http2::ServerConnectionImpl>(
        connection, callbacks,
        Http::Http2::CodecStats::atomicGet(http2_codec_stats_, context_.scope()),
        context_.api().randomGenerator(), http2_options_, maxRequestHeadersKb(),
        maxRequestHeadersCount(), headersWithUnderscoresAction(), overload_manager);
  case CodecType::HTTP3:
    return Config::Utility::getAndCheckFactoryByName<QuicHttpServerConnectionFactory>(
               "quic.http_server_connection.default")
        .createQuicHttpServerConnectionImpl(
            connection, callbacks,
            Http::Http3::CodecStats::atomicGet(http3_codec_stats_, context_.scope()),
            http3_options_, maxRequestHeadersKb(), maxRequestHeadersCount(),
            headersWithUnderscoresAction());
  case CodecType::AUTO:
    return Http::ConnectionManagerUtility::autoCreateCodec(
        connection, data, callbacks, context_.scope(), context_.api().randomGenerator(),
        http1_codec_stats_, http2_codec_stats_, http1_settings_, http2_options_,
        maxRequestHeadersKb(), maxRequestHeadersCount(), headersWithUnderscoresAction(),
        overload_manager);
  }
  PANIC_DUE_TO_CORRUPT_ENUM;
}

 
생성 과정을 살펴보면, 연결된 Protocol 종류에 따라서 Codec이 결정되는 것을 볼 수 있습니다. 본 포스팅에서는 Http 1.1 통신을 가정하여 분석함으로 Http1::ServerConnectionImpl이 생성됩니다.
 
codec_impl.cc

ServerConnectionImpl::ServerConnectionImpl(
    Network::Connection& connection, CodecStats& stats, ServerConnectionCallbacks& callbacks,
    const Http1Settings& settings, uint32_t max_request_headers_kb,
    const uint32_t max_request_headers_count,
    envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
        headers_with_underscores_action,
    Server::OverloadManager& overload_manager)
    : ConnectionImpl(connection, stats, settings, MessageType::Request, max_request_headers_kb,
                     max_request_headers_count),
      callbacks_(callbacks),
      response_buffer_releasor_([this](const Buffer::OwnedBufferFragmentImpl* fragment) {
        releaseOutboundResponse(fragment);
      }),
      owned_output_buffer_(connection.dispatcher().getWatermarkFactory().createBuffer(
          [&]() -> void { this->onBelowLowWatermark(); },
          [&]() -> void { this->onAboveHighWatermark(); },
          []() -> void { /* TODO(adisuissa): handle overflow watermark */ })),
      headers_with_underscores_action_(headers_with_underscores_action),
      abort_dispatch_(
          overload_manager.getLoadShedPoint("envoy.load_shed_points.http1_server_abort_dispatch")) {
  owned_output_buffer_->setWatermarks(connection.bufferLimit());
  // Inform parent
  output_buffer_ = owned_output_buffer_.get();
}

 
ServerConnectionImpl 인스턴스 생성 과정을 살펴보면, 내부적으로 데이터를 읽고 쓰기 위한 Buffer를 생성받으며, 그밖에 ConnectionImpl 생성자를 호출하는 것을 볼 수 있습니다.
 
3. ConnectionImpl 생성자 호출과정에서 사용자의 요청을 분석할 Parser를 등록합니다. 이때 기본적으로는 LegacyHttpParserImpl이 parser_ 로써 등록됩니다.
 
codec_impl.cc

ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stats,
                               const Http1Settings& settings, MessageType type,
                               uint32_t max_headers_kb, const uint32_t max_headers_count)
    : connection_(connection), stats_(stats), codec_settings_(settings),
      encode_only_header_key_formatter_(encodeOnlyFormatterFromSettings(settings)),
      processing_trailers_(false), handling_upgrade_(false), reset_stream_called_(false),
      deferred_end_stream_headers_(false), dispatching_(false), max_headers_kb_(max_headers_kb),
      max_headers_count_(max_headers_count) {
  if (codec_settings_.use_balsa_parser_) {
    parser_ = std::make_unique<BalsaParser>(type, this, max_headers_kb_ * 1024, enableTrailers(),
                                            codec_settings_.allow_custom_methods_);
  } else {
    parser_ = std::make_unique<LegacyHttpParserImpl>(type, this);
  }
}

 
참고로 config 설정에 use_balsa_parser가 지정되어있다면, BalsaParser가 지정됩니다.
 
connection_manager_impl.cc

Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
  if (!codec_) {
    // Http3 codec should have been instantiated by now.
    createCodec(data);
  }

  bool redispatch;
  do {
    redispatch = false;

    const Status status = codec_->dispatch(data);

    if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
      handleCodecError(status.message());
      return Network::FilterStatus::StopIteration;
    } else if (isCodecProtocolError(status)) {
      stats_.named_.downstream_cx_protocol_error_.inc();
      handleCodecError(status.message());
      return Network::FilterStatus::StopIteration;
    }
    ASSERT(status.ok());

    // Processing incoming data may release outbound data so check for closure here as well.
    checkForDeferredClose(false);

    // The HTTP/1 codec will pause dispatch after a single message is complete. We want to
    // either redispatch if there are no streams and we have more data. If we have a single
    // complete non-WebSocket stream but have not responded yet we will pause socket reads
    // to apply back pressure.
    if (codec_->protocol() < Protocol::Http2) {
      if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
          data.length() > 0 && streams_.empty()) {
        redispatch = true;
      }
    }
  } while (redispatch);

  if (!read_callbacks_->connection().streamInfo().protocol()) {
    read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
  }

  return Network::FilterStatus::StopIteration;
}

 
지금까지 3가지 단계에 걸쳐서 ServerConnectionImpl 생성과 더불어 실제 요청을 처리하는 codec 또한 생성됨을 확인했습니다. 여기까지 완료되면, 그 다음 작업은 위 코드와 같이 codec_ 에게 사용자가 요청한 데이터를 넘겨줌으로써 dispatch 하도록 처리를 위임하고 Codec 내부에서는 다시 parser_를 통해 사용자의 데이터를 분석하고 정제하는 과정을 거칩니다.
 
이 과정에 대해서 보다 자세하게 이해하기 위해 다이어그램과 코드를 통해 살펴보겠습니다.
 

 
1. HttpConnectionManager의 onData 메소드 내에서 codec_인 Http1::ServerConnectionImpl에게 dispatch를 요청합니다.
 
connection_manager_impl.cc

Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
  ...(중략)...

    const Status status = codec_->dispatch(data);
  ...(중략)...
}

 
2. ServerConnectionImpl은 내부에 존재하는 parser_ 프로퍼티를 통해서 데이터 분석을 요청합니다.
 
codec_impl.cc

Envoy::StatusOr<size_t> ConnectionImpl::dispatchSlice(const char* slice, size_t len) {
  ASSERT(codec_status_.ok() && dispatching_);
  const size_t nread = parser_->execute(slice, len);
  ...(중략)...

  return nread;
}

 
legacy_parser_impl.cc

  size_t execute(const char* slice, int len) {
    return http_parser_execute(&parser_, &settings_, slice, len);
  }

 
parser에게 execute 요청을 보내고나면, parser는 내부적으로 사용자 데이터를 전달받아 이를 분석하는 과정을 거칠 것입니다. 그리고 이전 포스팅을 통해 설명했듯이 사전에 정의된 ParserCallback 함수를 통해서 분석 중간중간의 결과물을 Callback을 통해 전달합니다.
 
3. Parser가 사용자의 데이터를 분석하면서 가장 먼저 onMessageBegin 콜백 함수가 실행됩니다.
 
legacy_parser_impl.cc

        [](http_parser* parser) -> int {
          auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
          return static_cast<int>(conn_impl->onMessageBegin());
        },

 
 
4. 해당 콜백 함수를 전달받은 ServerConnectionImpl은 ActiveRequest 인스턴스를 생성합니다. 
 
codec_impl.cc

Status ServerConnectionImpl::onMessageBeginBase() {
  if (!resetStreamCalled()) {
    ASSERT(active_request_ == nullptr);
    active_request_ = std::make_unique<ActiveRequest>(*this, std::move(bytes_meter_before_stream_));
    ...(중략)...
  }
  return okStatus();
}

 
 
5. ActiveRequest를 만든 이후에 ActiveStream을 새로 생성하기 위해 HttpConnectionManager에게 생성을 요청합니다.
 
codec_impl.cc

Status ServerConnectionImpl::onMessageBeginBase() {
  if (!resetStreamCalled()) {
    ...(중략)...
    active_request_->request_decoder_ = &callbacks_.newStream(active_request_->response_encoder_);
    ...(중략)...
  }
  return okStatus();
}

 
 
6. HttpConnectionManager에서는 새로운 ActiveStream을 생성합니다. 이때 HttpConnectionManager가 보유한 filter_factories 정보 또한 같이 참조하여 ActiveStream 인스턴스 내에 filter_manager_를 구성합니다.
 
conn_manager_impl.cc

RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encoder,
                                                 bool is_internally_created) {
  TRACE_EVENT("core", "ConnectionManagerImpl::newStream");
  if (connection_idle_timer_) {
    connection_idle_timer_->disableTimer();
  }

  ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection());

  Buffer::BufferMemoryAccountSharedPtr downstream_stream_account =
      response_encoder.getStream().account();

  if (downstream_stream_account == nullptr) {
    // Create account, wiring the stream to use it for tracking bytes.
    // If tracking is disabled, the wiring becomes a NOP.
    auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory();
    downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream());
    response_encoder.getStream().setAccount(downstream_stream_account);
  }

  ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(),
                                              std::move(downstream_stream_account)));

  accumulated_requests_++;
  if (config_.maxRequestsPerConnection() > 0 &&
      accumulated_requests_ >= config_.maxRequestsPerConnection()) {
    if (codec_->protocol() < Protocol::Http2) {
      new_stream->state_.saw_connection_close_ = true;
      // Prevent erroneous debug log of closing due to incoming connection close header.
      drain_state_ = DrainState::Closing;
    } else if (drain_state_ == DrainState::NotDraining) {
      startDrainSequence();
    }
    ENVOY_CONN_LOG(debug, "max requests per connection reached", read_callbacks_->connection());
    stats_.named_.downstream_cx_max_requests_reached_.inc();
  }

  ...(중략)...
  return **streams_.begin();
}

 
이때 먼저, config에 지정된 max_requests_per_connection을 넘겼는지 확인합니다. 해당 작업은 하나의 Connection 내부에서 여러 Stream이 생성될 때 이를 제한하기 위한 용도로 사용됩니다.
 
 
7. Stream 생성이 완료되면, HttpConnectionManager 내부에 존재하는 streams_ List에 추가하고 이를 반환합니다.
 
conn_manager_impl.cc

RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encoder,
                                                 bool is_internally_created) {
  ...(중략)...
  LinkedList::moveIntoList(std::move(new_stream), streams_);
  return **streams_.begin();
}

 
8. Stream 인스턴스가 반환되면, 이를 ActiveRequest의 request_decoder_에 저장합니다.
 
 
위와 같은 8단계 과정을 거치게되면, 사용자 요청 처리를 위해 초기화 과정이 수행되는 것을 알 수 있습니다. 또한 그 과정에서 ActiveRequest와 Stream 요청 처리를 위한 ActiveStream이 내부적으로 생성되어 저장됨을 확인할 수 있습니다. 
 
초기화 작업이 모두 완료되면, 이후에는 본격적으로 데이터 parsing 작업을 시작합니다. 해당 과정을 살펴보면 다음과 같습니다.
 

 
 
9. 분석 과정에서 먼저 원격지 호출 URL이 무엇인지를 parser가 분석하고 이에 대한 Event를 전파하기 위하여 onUrl 콜백 함수를 호출합니다.
 
legacy_parser_impl.cc

        [](http_parser* parser, const char* at, size_t length) -> int {
          auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
          return static_cast<int>(conn_impl->onUrl(at, length));
        },

 
10. ServerConnectionImpl은 해당 콜백 함수를 호출받으면, 내부에 존재하는 ActiveRequest의 request_url에 parsing된 값을 매핑합니다.
 

codec_impl.cc
CallbackResult ConnectionImpl::onUrl(const char* data, size_t length) {
  return setAndCheckCallbackStatus(onUrlBase(data, length));
}
 
codec_impl.cc
Status ServerConnectionImpl::onUrlBase(const char* data, size_t length) {
  if (active_request_) {
    active_request_->request_url_.append(data, length);

    RETURN_IF_ERROR(checkMaxHeadersSize());
  }

  return okStatus();
}
 
codec_impl.cc
Status ConnectionImpl::checkMaxHeadersSize() {
  const uint32_t total = getHeadersSize();
  if (total > (max_headers_kb_ * 1024)) {
    const absl::string_view header_type =
        processing_trailers_ ? Http1HeaderTypes::get().Trailers : Http1HeaderTypes::get().Headers;
    error_code_ = Http::Code::RequestHeaderFieldsTooLarge;
    RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().HeadersTooLarge));
    return codecProtocolError(
        absl::StrCat("http/1.1 protocol error: ", header_type, " size exceeds limit"));
  }
  return okStatus();
}
 
 
 
또한 이 과정에서 현재까지 분석된 Header의 길이가 max_headers_kb에 지정된 값(기본 60)을 넘는지를 검증하는 작업 또한 진행합니다.
 
 
 
 
 
 
 
11. Url 분석 작업이 완료된 이후에는 본격적으로 입력 값 Header를 분석하여 채우기 시작합니다. 이때 Header의 명을 Parsing 했을 경우에는 onHeaderField 콜백을 호출하고, Header의 값을 분석했을 때는 onHeaderValue 콜백을 호출합니다. 
 
 
12. Header 명과 값을 모두 추출한 경우에는 해당 값을 headers_or_trailers_ 라는 Map에 입력합니다. 해당 자료구조는 Header의 필드명을 Key로 값을 Value로 하는 Map 구조로써 향후 Http 정보를 요청할 때 활용됩니다.
 
 
codec_impl.cc
Status ConnectionImpl::completeCurrentHeader() {
  ASSERT(dispatching_);
  ENVOY_CONN_LOG(trace, "completed header: key={} value={}", connection_,
                 current_header_field_.getStringView(), current_header_value_.getStringView());
  auto& headers_or_trailers = headersOrTrailers();

  // Account for ":" and "\r\n" bytes between the header key value pair.
  getBytesMeter().addHeaderBytesReceived(CRLF_SIZE + 1);

  // TODO(10646): Switch to use HeaderUtility::checkHeaderNameForUnderscores().
  RETURN_IF_ERROR(checkHeaderNameForUnderscores());
  if (!current_header_field_.empty()) {
    // Strip trailing whitespace of the current header value if any. Leading whitespace was trimmed
    // in ConnectionImpl::onHeaderValue. http_parser does not strip leading or trailing whitespace
    // as the spec requires: https://tools.ietf.org/html/rfc7230#section-3.2.4
    current_header_value_.rtrim();

    // If there is a stateful formatter installed, remember the original header key before
    // converting to lower case.
    auto formatter = headers_or_trailers.formatter();
    if (formatter.has_value()) {
      formatter->processKey(current_header_field_.getStringView());
    }
    current_header_field_.inlineTransform([](char c) { return absl::ascii_tolower(c); });

    headers_or_trailers.addViaMove(std::move(current_header_field_),
                                   std::move(current_header_value_));
  }

  // Check if the number of headers exceeds the limit.
  if (headers_or_trailers.size() > max_headers_count_) {
    error_code_ = Http::Code::RequestHeaderFieldsTooLarge;
    RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().TooManyHeaders));
    const absl::string_view header_type =
        processing_trailers_ ? Http1HeaderTypes::get().Trailers : Http1HeaderTypes::get().Headers;
    return codecProtocolError(
        absl::StrCat("http/1.1 protocol error: ", header_type, " count exceeds limit"));
  }

  header_parsing_state_ = HeaderParsingState::Field;
  ASSERT(current_header_field_.empty());
  ASSERT(current_header_value_.empty());
  return okStatus();
}
 
 
 
 
 
 
 
 
 
 
이번에는 Header 분석이 모두 완료되었을 때 흐름에 대해서 살펴보겠습니다. 해당 과정은 이전에 살펴봤던 Cluster Manager에서의 역할과 Http filter 중 하나인 Router Filter 등이 사용되므로 해당 과정을 이해하기 위해서는 이전 시리즈의 내용에 대한 개념 숙지가 반드시 필요합니다. 
 
 
13. Parser로 부터 Header 분석이 완료되면, onHeaderComplete 콜백 함수를 호출합니다.
 
 
legacy_parser_impl.cc
        [](http_parser* parser) -> int {
          auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
          return static_cast<int>(conn_impl->onHeadersComplete());
        },
 
 
14. 해당 콜백 함수는 ServerConnectionImpl에 지정된 onHeaderCompleteImpl 메서드를 호출합니다.
 
 
codec_impl.cc
CallbackResult ConnectionImpl::onHeadersComplete() {
  return setAndCheckCallbackStatusOr(onHeadersCompleteImpl());
}
 
 
 
15. Header를 분석하는 과정에서 저장되었던 headers_or_trailers_ Map에서 RequestMap을 추출합니다. 해당 자료구조는 Http 요청에 필요한 헤더 및 요청에 필요한 속성이 포함된 자료구조입니다. 
 
 
codec_impl.cc
StatusOr<CallbackResult> ConnectionImpl::onHeadersCompleteImpl() {
  ...(중략)...
  RequestOrResponseHeaderMap& request_or_response_headers = requestOrResponseHeaders();
  ...(중략)...
}
 
 
 
codec_impl.h
  RequestOrResponseHeaderMap& requestOrResponseHeaders() override {
    return *absl::get<RequestHeaderMapPtr>(headers_or_trailers_);
  }
 
 
 
16. RequestMap에 기존 ActiveRequest에서 보관중이던 request_url을 추출하여 Upstream 대상 Url을 설정합니다. 이 과정에서 추가로 Method 타입 또한 지정합니다.
 
 

codec_impl.cc

StatusOr<CallbackResult> ConnectionImpl::onHeadersCompleteImpl() {
  ...(중략)...

  auto statusor = onHeadersCompleteBase();
  ...(중략)...
}
 
 

codec_impl.cc

Envoy::StatusOr<CallbackResult> ServerConnectionImpl::onHeadersCompleteBase() {
  // Handle the case where response happens prior to request complete. It's up to upper layer code
  // to disconnect the connection but we shouldn't fire any more events since it doesn't make
  // sense.
  if (active_request_) {
    auto& headers = absl::get<RequestHeaderMapPtr>(headers_or_trailers_);
    ...(중략)...

    // Inform the response encoder about any HEAD method, so it can set content
    // length and transfer encoding headers correctly.
    const Http::HeaderValues& header_values = Http::Headers::get();
    active_request_->response_encoder_.setIsResponseToHeadRequest(parser_->methodName() ==
                                                                  header_values.MethodValues.Head);
    active_request_->response_encoder_.setIsResponseToConnectRequest(
        parser_->methodName() == header_values.MethodValues.Connect);

    RETURN_IF_ERROR(handlePath(*headers, parser_->methodName()));
    
    ASSERT(active_request_->request_url_.empty());
    headers->setMethod(parser_->methodName());
    ...(중략)...
  }

  return CallbackResult::Success;
}
 
 
codec_impl.cc
Status ServerConnectionImpl::handlePath(RequestHeaderMap& headers, absl::string_view method) {
  const Http::HeaderValues& header_values = Http::Headers::get();
  HeaderString path(header_values.Path);

  bool is_connect = (method == header_values.MethodValues.Connect);

  // The url is relative or a wildcard when the method is OPTIONS. Nothing to do here.
  if (!is_connect && !active_request_->request_url_.getStringView().empty() &&
      (active_request_->request_url_.getStringView()[0] == '/' ||
       (method == header_values.MethodValues.Options &&
        active_request_->request_url_.getStringView()[0] == '*'))) {
    headers.addViaMove(std::move(path), std::move(active_request_->request_url_));
    return okStatus();
  }

  // If absolute_urls and/or connect are not going be handled, copy the url and return.
  // This forces the behavior to be backwards compatible with the old codec behavior.
  // CONNECT "urls" are actually host:port so look like absolute URLs to the above checks.
  // Absolute URLS in CONNECT requests will be rejected below by the URL class validation.
  if (!codec_settings_.allow_absolute_url_ && !is_connect) {
    headers.addViaMove(std::move(path), std::move(active_request_->request_url_));
    return okStatus();
  }

  Utility::Url absolute_url;
  if (!absolute_url.initialize(active_request_->request_url_.getStringView(), is_connect)) {
    RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().InvalidUrl));
    return codecProtocolError("http/1.1 protocol error: invalid url in request line");
  }
  // RFC7230#5.7
  // When a proxy receives a request with an absolute-form of
  // request-target, the proxy MUST ignore the received Host header field
  // (if any) and instead replace it with the host information of the
  // request-target. A proxy that forwards such a request MUST generate a
  // new Host field-value based on the received request-target rather than
  // forward the received Host field-value.
  headers.setHost(absolute_url.hostAndPort());
  // Add the scheme and validate to ensure no https://
  // requests are accepted over unencrypted connections by front-line Envoys.
  if (!is_connect) {
    headers.setScheme(absolute_url.scheme());
    if (!HeaderUtility::schemeIsValid(absolute_url.scheme())) {
      RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().InvalidScheme));
      return codecProtocolError("http/1.1 protocol error: invalid scheme");
    }
    if (codec_settings_.validate_scheme_ &&
        absolute_url.scheme() == header_values.SchemeValues.Https && !connection().ssl()) {
      error_code_ = Http::Code::Forbidden;
      RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().HttpsInPlaintext));
      return codecProtocolError("http/1.1 protocol error: https in the clear");
    }
  }

  if (!absolute_url.pathAndQueryParams().empty()) {
    headers.setPath(absolute_url.pathAndQueryParams());
  }
  active_request_->request_url_.clear();
  return okStatus();
}
 
 
 
 
17. ActiveRequest에 지정된 request_decoder_ 즉 ActiveStream에게 Header 처리를 위임합니다. 이 과정에서 RequestMap을 인자로 전달합니다.
 
 
codec_impl.cc
Envoy::StatusOr<CallbackResult> ServerConnectionImpl::onHeadersCompleteBase() {
    ...(중략)...
    if (parser_->isChunked() ||
        (parser_->contentLength().has_value() && parser_->contentLength().value() > 0) ||
        handling_upgrade_) {
      active_request_->request_decoder_->decodeHeaders(std::move(headers), false);

      // If the connection has been closed (or is closing) after decoding headers, pause the parser
      // so we return control to the caller.
      if (connection_.state() != Network::Connection::State::Open) {
        return parser_->pause();
      }
    } else {
      deferred_end_stream_headers_ = true;
    }
  }

  return CallbackResult::Success;
}
 
 
 
18. ActiveStream에서는 내부에 존재하는 filter_manager_ 에게 먼저 Downstream의 주소와 전달받은 RequestMap을 토대로 만든 header 정보를 매핑합니다. 매핑이 완료되면, 그 이후에는 filter_manager_가 보유한 filter chain을 생성합니다.
 
 
참고로 filter_manager_는 ActiveStream이 생성되는 당시에 HttpConnectionManager로 부터 filter_factories_ 의 정보를 전달받았으며, filter chain 생성을 요청받으면, filter_factories_가 보유한 factory 콜백 메소드를 실행시켜 filter를 내부에 생성하는 작업을 수행합니다.
 
 
filter 생성이 모두 완료되면, 해당 filter chain을 순회하면서 header 정보 처리를 위임합니다.
 
 
19. Http filter 종류 중 하나인 Router filter가 filter chain 가장 마지막에 호출됩니다.
 
 
20. Router filter에서는 Upstream 대상 Host 정보와 Connection Pool을 획득하기 위해 우선 ClusterEntry 정보를 ClusterManager에게 요청합니다.
 
 
21. Cluster Manager는 자신이 보유하고 있는 thread_local_clusters_ 에서 Router가 요청한 Cluster 정보를 찾습니다.
 
 
22. Cluster Manager로 부터 ThreadLocalCluster를 찾아서 Router로 반환합니다.
 
 
23. Router에서는 Cluster내 존재하는 host를 통해 Connection 연결을 수행해야합니다. 따라서 전달받은 ThreadLocalCluster를 통해서 Connection Pool 할당을 요청합니다.
 
 
24. 이를 전달받은 ThreadLocalCluster(Cluster Entry)는 Cluster Manager에게 요청하여 host_http_conn_pool_map에 할당 받은 Connection Pool이 존재하는지를 확인합니다. 만약 존재한다면, 해당 Connection Pool Map에 있는 Container에 접근합니다. 반면, 존재하지 않는다면 새로운 Container를 생성하여 해당 Map에 추가합니다.
 
 
25. Container 내부에는 Cluster 별로 Connection을 관리하는 Pool이 존재합니다. 여기에서 Pool 존재여부를 최종적으로 확인합니다.
 

26. 최초 접근시에는 Pool이 생성되지 않았을 것이기 때문에 먼저 Cluster가 Connection Pool을 생성이 가능한지 설정된 Resource Limit 설정 값을 살펴봅니다. 
 
27. Resource Limit 설정 값에 이상이 없어 신규 Pool 생성이 가능하면, ClusterManager에게 Pool 할당을 요청합니다. 이때 ClusterManager는 Client가 요구하는 프로토콜이 무엇인지 확인한 다음 해당 요청을 처리할 수 있는 Connection을 생성하여 반환합니다. 그리고 생성된 Pool을 Container가 보유하고 있는 active_pools_에 삽입합니다.
 
28. 생성된 Pool을 ThreadLocalCluster(Cluster Entry)에 반환합니다.
 
29. Pool을 Router에게 반환합니다. Router는 전달받은 Pool 정보를 기반으로 Upstream Connection을 생성하기 위해 UpstreamRequest를 생성합니다. 그리고 생성된 UpstreamRequest를 자신이 보유하고 있는 upstream_requests_ 리스트에 추가합니다.
 
여기서 UpstreamRequest는 전달받은 Connection Pool을 기반으로 Upstream에 연결을 요청하기 위해 설정된 자료구조로써 해당 자료구조를 통해 상위 Host와 통신을 수행할 수 있습니다. 해당 자료구조에 대해서 조금 더 살펴보면 다음과 같습니다.
 

 
 
UpstreamRequest 구조에서 핵심 속성은 위와 같습니다. 위 속성에 대해서 하나씩 살펴보겠습니다.
 
먼저 conn_pool_은 UpstreamRequest를 관리하는 Connection Pool을 가르키는 포인터입니다. 해당 속성은 이전 ThreadLocalCluster를 통해서 전달받았던 Connection Pool을 가르키고 있습니다. 따라서 해당 Pool을 통해 사용자 요청 Stream을 매핑시킴으로써 데이터 송/수신 역할 처리를 담당할 수 있도록 가교 역할을 수행합니다.
 

 
 
conn_pool_ 내부를 조금 더 살펴보겠습니다. Connection Pool 내부에는 Client를 관리하는 list가 존재합니다. 다만 Connection Pool 내부에 존재하는 Client가 상태가 모두 다를 수 있기 때문에 이를 관리하기 위한 여러 자료구조가 존재합니다.
 
가령 Connection Pool에서 생성된 Client는 위 그림과 같이 상태를 내부 속성으로 지니고 있는데, 상태가 Ready, Busy, Draining 등에 따라 재사용 가능 여부가 결정됩니다. 그 이유는 현재 Client 사용중인데, 해당 Client를 재사용하게된다면 이는 올바른 서비스를 제공할 수 없기 때문입니다.
 
따라서 현재 재사용이 가능한 Client 목록을 지니고 있는 ready_clients_, 현재 Stream 처리가 진행중이거나 Drain을 수행중인 Client 목록을 보유하는 busy_clients 그리고 연결을 시도중인 Client 목록을 보유하고 있는 connecting_clients_ 등 여러 자료구조가 존재합니다.
 
해당 자료구조는 Client의 상태 변이에 따라 저장되는 리스트가 다르며, 만약 재사용이 가능한 ready_clients_ 에서 Client를 사용하여 해당 Client가 Connecting을 수행한다면 connectiong_clients_로 Client 목록이 이동하게 됩니다.
 
 
이번에는 Upstream의 filter_manager에 대해서 살펴보겠습니다.
 
upstream_request.cc

  // Set up the upstream filter manager.
  filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this);
  filter_manager_ = std::make_unique<UpstreamFilterManager>(
      *filter_manager_callbacks_, parent_.callbacks()->dispatcher(),
      parent_.callbacks()->connection(), parent_.callbacks()->streamId(),
      parent_.callbacks()->account(), true, parent_.callbacks()->decoderBufferLimit(),
      *parent_.cluster(), *this);
  parent_.cluster()->createFilterChain(*filter_manager_);
  // The cluster will always create a codec filter, which sets the upstream

UpstreamRequest 내부 filter_manager는 Upstream 요청을 처리하기 위한 filter를 생성하고 관리하는 역할을 수행합니다.
 
 

 
upstream_codec_filter.h

class UpstreamCodecFilterFactory
    : public Extensions::HttpFilters::Common::CommonFactoryBase<
          envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec>,
      public Server::Configuration::UpstreamHttpFilterConfigFactory {
public:
  UpstreamCodecFilterFactory() : CommonFactoryBase("envoy.filters.http.upstream_codec") {}

  std::string category() const override { return "envoy.filters.http.upstream"; }
  Http::FilterFactoryCb
  createFilterFactoryFromProto(const Protobuf::Message&, const std::string&,
                               Server::Configuration::UpstreamHttpFactoryContext&) override {
    return [](Http::FilterChainFactoryCallbacks& callbacks) -> void {
      callbacks.addStreamDecoderFilter(std::make_shared<UpstreamCodecFilter>());
    };
  }
  bool isTerminalFilterByProtoTyped(
      const envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec&,
      Server::Configuration::ServerFactoryContext&) override {
    return true;
  }
};

 
이때 내부적으로는 위 코드와 같이 UpstreamCodecFilterFactory에 의해서 UpstreamCodecFilter가 UpstreamRequest의 filter chain으로 등록되며, 향후 Upstream 요청을처리하는 역할을 수행합니다.
 
 
마지막으로 살펴볼 것은 stream_info_와 upstream_ 속성입니다. stream_info의 경우 현재 Upstream의 메타정보를 관리하는 속성이며, upstream_의 경우 이 다음에 바로 설명하겠지만, UpstreamRequest가 생성된 이후 사용자 요청을 처리하기 위한 헤더 정보 매핑, 해당 요청을 처리해야할 encoder 등의 속성을 지니고 있는 자료구조로써 Upstream 처리를 수행합니다.
 
지금까지 UpstreamRequest 내부에 포함된 주요 속성에 대해서 살펴봤는데요. 이번에는 UpstreamRequest를 생성한 다음 후속 과정에 대해서 살펴보겠습니다.
 

 
30. Router 에서는 upstream_requests_ 에 추가된 UpstreamRequest에게 Header 정보를 전달합니다.
 
31. UpstreamRequest 내부에서는 내부에 매핑된 pool을 통해 Client를 해당 Pool에 매핑하도록 요청합니다.
 
32. Pool 내부에는 ready_clients_ 를 먼저 살펴보고 연결 가능한 목록이 존재하면 해당 목록에 존재하는 Client 목록을 얻어와서 현재 Stream을 연결하도록 합니다.
 
conn_pool_base.cc

  if (!ready_clients_.empty()) {
    ActiveClient& client = *ready_clients_.front();
    ENVOY_CONN_LOG(debug, "using existing fully connected connection", client);
    attachStreamToClient(client, context);
    // Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
    tryCreateNewConnections();
    return nullptr;
  }

 
이때 위 코드와 같이 먼저 ready_clients_ 로부터 재사용이 가능한 Client 목록이 존재하는지 살펴보고 만약 존재한다면 해당 Client를 재사용하여 사용자의 Context를 해당 Client에게 연결시킵니다.
 
conn_pool_base.cc

  if (!host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
    ENVOY_LOG(debug, "max pending streams overflow");
    onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
                  context);
    host_->cluster().stats().upstream_rq_pending_overflow_.inc();
    return nullptr;
  }

 
하지만 ready_clients_ 목록에 연결 가능한 Client가 존재하지 않는다면, 해당 Host에 연결된 설정을 토대로 pending request 생성 여부를 판별한 다음 해당 설정을 넘어선 Stream 요청이 들어왔을 경우는 연결을 해제하도록 합니다.
 
반면 Stream 생성이 가능하다면, 해당 요청은 HttpPendingStream으로 생성하여 Pool에 보유한 pending_streams_ 리스트에 이를 추가합니다.
 

 
 
그리고 이렇게 등록된 pending_streams_는 주기적으로 Dispatcher에 존재하는 baseScheduler에 의해서 관리됩니다.
 
conn_pool_base.cc

void ConnPoolImplBase::onUpstreamReady() {
  while (!pending_streams_.empty() && !ready_clients_.empty()) {
    ActiveClientPtr& client = ready_clients_.front();
    ENVOY_CONN_LOG(debug, "attaching to next stream", *client);
    // Pending streams are pushed onto the front, so pull from the back.
    attachStreamToClient(*client, pending_streams_.back()->context());
    state_.decrPendingStreams(1);
    pending_streams_.pop_back();
  }
  if (!pending_streams_.empty()) {
    tryCreateNewConnections();
  }
}

 
이때 주기적으로 호출되는 메소드는 onUpstreamReady이며, 해당 메소드의 역할을 살펴보면, pending_streams에 존재하는 stream을 살펴보고 호출 당시 ready_clients_에 재사용 가능한 client가 존재한다면, 해당 client에 pending_streams에 존재하는 stream을 연결시켜주는 역할을 수행하는 것을 볼 수 있습니다. 
 
33. HttpUpstream이 생성되면 UpstreamRequest에 할당하여 이후 해당 Stream을 통해 Upstream과 연결을 수행할 수 있습니다.
 
지금까지 살펴볼 것은 Header가 분석이 완료된 이후 후속 작업에 대해서 살펴봤습니다. 해당 과정을 요약하자면, Header 정보를 파싱하여 이를 별도 Map에 저장하는 것은 물론 Cluster Manager와 협업을 통해서 Connection Pool을 할당받고 해당 Pool에 존재하는 Client에 연결을 수행하여 Upstream을 만드는 작업을 진행합니다. 이번에는 Http Request의 본문을 파싱하고 이를 해석하는 과정에 대해서 살펴보겠습니다.
 

 
34. Parser에서 본문을 분석한 다음 bufferBody callback 함수를 호출합니다. 
 
35. bufferBody를 호출받으면 이를 다시 bufferBody 함수를 호추하여 후속 작업을 이어갑니다.
 
legacy_parser_impl.cc

static_cast<ParserCallbacks*>(parser->data)->bufferBody(at, length);
return 0;

 
 
codec_impl.cc

void ConnectionImpl::bufferBody(const char* data, size_t length) {
  auto slice = current_dispatching_buffer_->frontSlice();
  if (data == slice.mem_ && length == slice.len_) {
    buffered_body_.move(*current_dispatching_buffer_, length);
    dispatching_slice_already_drained_ = true;
  } else {
    buffered_body_.add(data, length);
  }
}

 
ServerConnectionImpl은 bufferBody 내부에서 전달받은 데이터를 내부의  buffer_body_에 추가합니다.
 

 
 
36. Body 파싱이 모두 완료되면 Parser는 onMessageComplete callback 함수를 호출합니다.
 
legacy_parser_impl.cc

auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
return static_cast<int>(conn_impl->onMessageComplete());

 
37. 
 
 
 
 
 
 
 
 
1850:

        [](http_parser* parser) -> int {
          auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
          return static_cast<int>(conn_impl->onHeadersComplete());
        },
 
 
 

codec_impl.cc

CallbackResult ConnectionImpl::onHeadersComplete() {
  return setAndCheckCallbackStatusOr(onHeadersCompleteImpl());
}
 
codec_impl.cc
CallbackResult ConnectionImpl::onHeadersComplete() {
  return setAndCheckCallbackStatusOr(onHeadersCompleteImpl());
}
 
 
codec_impl.cc
StatusOr<CallbackResult> ConnectionImpl::onHeadersCompleteImpl() {
  ASSERT(!processing_trailers_);
  ASSERT(dispatching_);
  ENVOY_CONN_LOG(trace, "onHeadersCompleteBase", connection_);
  RETURN_IF_ERROR(completeCurrentHeader());

  if (!parser_->isHttp11()) {
    // This is not necessarily true, but it's good enough since higher layers only care if this is
    // HTTP/1.1 or not.
    protocol_ = Protocol::Http10;
  }
  RequestOrResponseHeaderMap& request_or_response_headers = requestOrResponseHeaders();
  const Http::HeaderValues& header_values = Http::Headers::get();
  if (Utility::isUpgrade(request_or_response_headers) && upgradeAllowed()) {
    // Ignore h2c upgrade requests until we support them.
    // See https://github.com/envoyproxy/envoy/issues/7161 for details.
    if (absl::EqualsIgnoreCase(request_or_response_headers.getUpgradeValue(),
                               header_values.UpgradeValues.H2c)) {
      ENVOY_CONN_LOG(trace, "removing unsupported h2c upgrade headers.", connection_);
      request_or_response_headers.removeUpgrade();
      if (request_or_response_headers.Connection()) {
        const auto& tokens_to_remove = caseUnorderdSetContainingUpgradeAndHttp2Settings();
        std::string new_value = StringUtil::removeTokens(
            request_or_response_headers.getConnectionValue(), ",", tokens_to_remove, ",");
        if (new_value.empty()) {
          request_or_response_headers.removeConnection();
        } else {
          request_or_response_headers.setConnection(new_value);
        }
      }
      request_or_response_headers.remove(header_values.Http2Settings);
    } else {
      ENVOY_CONN_LOG(trace, "codec entering upgrade mode.", connection_);
      handling_upgrade_ = true;
    }
  }
  if (parser_->methodName() == header_values.MethodValues.Connect) {
    if (request_or_response_headers.ContentLength()) {
      if (request_or_response_headers.getContentLengthValue() == "0") {
        request_or_response_headers.removeContentLength();
      } else {
        // Per https://tools.ietf.org/html/rfc7231#section-4.3.6 a payload with a
        // CONNECT request has no defined semantics, and may be rejected.
        error_code_ = Http::Code::BadRequest;
        RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().BodyDisallowed));
        return codecProtocolError("http/1.1 protocol error: unsupported content length");
      }
    }
    ENVOY_CONN_LOG(trace, "codec entering upgrade mode for CONNECT request.", connection_);
    handling_upgrade_ = true;
  }

  // https://tools.ietf.org/html/rfc7230#section-3.3.3
  // If a message is received with both a Transfer-Encoding and a
  // Content-Length header field, the Transfer-Encoding overrides the
  // Content-Length. Such a message might indicate an attempt to
  // perform request smuggling (Section 9.5) or response splitting
  // (Section 9.4) and ought to be handled as an error. A sender MUST
  // remove the received Content-Length field prior to forwarding such
  // a message.

#ifndef ENVOY_ENABLE_UHV
  // This check is moved into default header validator.
  // TODO(yanavlasov): use runtime override here when UHV is moved into the main build

  // Reject message with Http::Code::BadRequest if both Transfer-Encoding and Content-Length
  // headers are present or if allowed by http1 codec settings and 'Transfer-Encoding'
  // is chunked - remove Content-Length and serve request.
  if (parser_->hasTransferEncoding() != 0 && request_or_response_headers.ContentLength()) {
    if (parser_->isChunked() && codec_settings_.allow_chunked_length_) {
      request_or_response_headers.removeContentLength();
    } else {
      error_code_ = Http::Code::BadRequest;
      RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().ChunkedContentLength));
      return codecProtocolError(
          "http/1.1 protocol error: both 'Content-Length' and 'Transfer-Encoding' are set.");
    }
  }
#endif

  // Per https://tools.ietf.org/html/rfc7230#section-3.3.1 Envoy should reject
  // transfer-codings it does not understand.
  // Per https://tools.ietf.org/html/rfc7231#section-4.3.6 a payload with a
  // CONNECT request has no defined semantics, and may be rejected.
  if (request_or_response_headers.TransferEncoding()) {
    const absl::string_view encoding = request_or_response_headers.getTransferEncodingValue();
    if (!absl::EqualsIgnoreCase(encoding, header_values.TransferEncodingValues.Chunked) ||
        parser_->methodName() == header_values.MethodValues.Connect) {
      error_code_ = Http::Code::NotImplemented;
      RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().InvalidTransferEncoding));
      return codecProtocolError("http/1.1 protocol error: unsupported transfer encoding");
    }
  }

  auto statusor = onHeadersCompleteBase();
  if (!statusor.ok()) {
    RETURN_IF_ERROR(statusor.status());
  }

  header_parsing_state_ = HeaderParsingState::Done;

  // Returning CallbackResult::NoBodyData informs http_parser to not expect a body or further data
  // on this connection.
  return handling_upgrade_ ? CallbackResult::NoBodyData : statusor.value();
}
 

codec_impl.cc

Envoy::StatusOr<CallbackResult> ServerConnectionImpl::onHeadersCompleteBase() {
  // Handle the case where response happens prior to request complete. It's up to upper layer code
  // to disconnect the connection but we shouldn't fire any more events since it doesn't make
  // sense.
  if (active_request_) {
    auto& headers = absl::get<RequestHeaderMapPtr>(headers_or_trailers_);
    ENVOY_CONN_LOG(trace, "Server: onHeadersComplete size={}", connection_, headers->size());

    if (!handling_upgrade_ && headers->Connection()) {
      // If we fail to sanitize the request, return a 400 to the client
      if (!Utility::sanitizeConnectionHeader(*headers)) {
        absl::string_view header_value = headers->getConnectionValue();
        ENVOY_CONN_LOG(debug, "Invalid nominated headers in Connection: {}", connection_,
                       header_value);
        error_code_ = Http::Code::BadRequest;
        RETURN_IF_ERROR(
            sendProtocolError(Http1ResponseCodeDetails::get().ConnectionHeaderSanitization));
        return codecProtocolError("Invalid nominated headers in Connection.");
      }
    }

    // Inform the response encoder about any HEAD method, so it can set content
    // length and transfer encoding headers correctly.
    const Http::HeaderValues& header_values = Http::Headers::get();
    active_request_->response_encoder_.setIsResponseToHeadRequest(parser_->methodName() ==
                                                                  header_values.MethodValues.Head);
    active_request_->response_encoder_.setIsResponseToConnectRequest(
        parser_->methodName() == header_values.MethodValues.Connect);

    RETURN_IF_ERROR(handlePath(*headers, parser_->methodName()));
    ASSERT(active_request_->request_url_.empty());

    headers->setMethod(parser_->methodName());

    // Make sure the host is valid.
    auto details = HeaderUtility::requestHeadersValid(*headers);
    if (details.has_value()) {
      RETURN_IF_ERROR(sendProtocolError(details.value().get()));
      return codecProtocolError(
          "http/1.1 protocol error: request headers failed spec compliance checks");
    }

    // Determine here whether we have a body or not. This uses the new RFC semantics where the
    // presence of content-length or chunked transfer-encoding indicates a body vs. a particular
    // method. If there is no body, we defer raising decodeHeaders() until the parser is flushed
    // with message complete. This allows upper layers to behave like HTTP/2 and prevents a proxy
    // scenario where the higher layers stream through and implicitly switch to chunked transfer
    // encoding because end stream with zero body length has not yet been indicated.
    if (parser_->isChunked() ||
        (parser_->contentLength().has_value() && parser_->contentLength().value() > 0) ||
        handling_upgrade_) {
      active_request_->request_decoder_->decodeHeaders(std::move(headers), false);

      // If the connection has been closed (or is closing) after decoding headers, pause the parser
      // so we return control to the caller.
      if (connection_.state() != Network::Connection::State::Open) {
        return parser_->pause();
      }
    } else {
      deferred_end_stream_headers_ = true;
    }
  }

  return CallbackResult::Success;
}
 
 
 
 
1934:
        [](http_parser* parser) -> int {
          auto* conn_impl = static_cast<ParserCallbacks*>(parser->data);
          return static_cast<int>(conn_impl->onMessageComplete());
        },
 
 
codec_impl.cc
 
CallbackResult ConnectionImpl::onMessageComplete() {
  return setAndCheckCallbackStatusOr(onMessageCompleteImpl());
}

 
codec_impl.cc

StatusOr<CallbackResult> ConnectionImpl::onMessageCompleteImpl() {
  ENVOY_CONN_LOG(trace, "message complete", connection_);

  dispatchBufferedBody();

  if (handling_upgrade_) {
    // If this is an upgrade request, swallow the onMessageComplete. The
    // upgrade payload will be treated as stream body.
    ASSERT(!deferred_end_stream_headers_);
    ENVOY_CONN_LOG(trace, "Pausing parser due to upgrade.", connection_);
    return parser_->pause();
  }

  // If true, this indicates we were processing trailers and must
  // move the last header into current_header_map_
  if (header_parsing_state_ == HeaderParsingState::Value) {
    RETURN_IF_ERROR(completeCurrentHeader());
  }

  return onMessageCompleteBase();
}

 
codec_impl.cc

void ConnectionImpl::dispatchBufferedBody() {
  ASSERT(parser_->getStatus() == ParserStatus::Ok || parser_->getStatus() == ParserStatus::Paused);
  ASSERT(codec_status_.ok());
  if (buffered_body_.length() > 0) {
    onBody(buffered_body_);
    buffered_body_.drain(buffered_body_.length());
  }
}

 
body에서 추출할게 있으면 추출한다.
 
codec_impl.cc

CallbackResult ServerConnectionImpl::onMessageCompleteBase() {
  ASSERT(!handling_upgrade_);
  if (active_request_) {

    // The request_decoder should be non-null after we've called the newStream on callbacks.
    ASSERT(active_request_->request_decoder_);
    active_request_->remote_complete_ = true;

    if (deferred_end_stream_headers_) {
      active_request_->request_decoder_->decodeHeaders(
          std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true);
      deferred_end_stream_headers_ = false;
    } else if (processing_trailers_) {
      active_request_->request_decoder_->decodeTrailers(
          std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));
    } else {
      Buffer::OwnedImpl buffer;
      active_request_->request_decoder_->decodeData(buffer, true);
    }

    // Reset to ensure no information from one requests persists to the next.
    headers_or_trailers_.emplace<RequestHeaderMapPtr>(nullptr);
  }

  // Always pause the parser so that the calling code can process 1 request at a time and apply
  // back pressure. However this means that the calling code needs to detect if there is more data
  // in the buffer and dispatch it again.
  return parser_->pause();
}

 
conn_manager_impl.cc

// Ordering in this function is complicated, but important.
//
// We want to do minimal work before selecting route and creating a filter
// chain to maximize the number of requests which get custom filter behavior,
// e.g. registering access logging.
//
// This must be balanced by doing sanity checking for invalid requests (one
// can't route select properly without full headers), checking state required to
// serve error responses (connection close, head requests, etc), and
// modifications which may themselves affect route selection.
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
                                                        bool end_stream) {
  ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream,
                   *headers);
  ScopeTrackerScopeState scope(this,
                               connection_manager_.read_callbacks_->connection().dispatcher());
  request_headers_ = std::move(headers);
  filter_manager_.requestHeadersInitialized();
  if (request_header_timer_ != nullptr) {
    request_header_timer_->disableTimer();
    request_header_timer_.reset();
  }

  // Both saw_connection_close_ and is_head_request_ affect local replies: set
  // them as early as possible.
  const Protocol protocol = connection_manager_.codec_->protocol();
  state_.saw_connection_close_ = HeaderUtility::shouldCloseConnection(protocol, *request_headers_);

  // We end the decode here to mark that the downstream stream is complete.
  maybeEndDecode(end_stream);

  if (!validateHeaders()) {
    ENVOY_STREAM_LOG(debug, "request headers validation failed\n{}", *this, *request_headers_);
    return;
  }

  // We need to snap snapped_route_config_ here as it's used in mutateRequestHeaders later.
  if (connection_manager_.config_.isRoutable()) {
    if (connection_manager_.config_.routeConfigProvider() != nullptr) {
      snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
    } else if (connection_manager_.config_.scopedRouteConfigProvider() != nullptr) {
      snapped_scoped_routes_config_ =
          connection_manager_.config_.scopedRouteConfigProvider()->config<Router::ScopedConfig>();
      snapScopedRouteConfig();
    }
  } else {
    snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
  }

  // Drop new requests when overloaded as soon as we have decoded the headers.
  if (connection_manager_.random_generator_.bernoulli(
          connection_manager_.overload_stop_accepting_requests_ref_.value())) {
    // In this one special case, do not create the filter chain. If there is a risk of memory
    // overload it is more important to avoid unnecessary allocation than to create the filters.
    filter_manager_.skipFilterChainCreation();
    connection_manager_.stats_.named_.downstream_rq_overload_close_.inc();
    sendLocalReply(Http::Code::ServiceUnavailable, "envoy overloaded", nullptr, absl::nullopt,
                   StreamInfo::ResponseCodeDetails::get().Overload);
    return;
  }

  if (!connection_manager_.config_.proxy100Continue() && request_headers_->Expect() &&
      // The Expect field-value is case-insensitive.
      // https://tools.ietf.org/html/rfc7231#section-5.1.1
      absl::EqualsIgnoreCase((request_headers_->Expect()->value().getStringView()),
                             Headers::get().ExpectValues._100Continue)) {
    // Note in the case Envoy is handling 100-Continue complexity, it skips the filter chain
    // and sends the 100-Continue directly to the encoder.
    chargeStats(continueHeader());
    response_encoder_->encode1xxHeaders(continueHeader());
    // Remove the Expect header so it won't be handled again upstream.
    request_headers_->removeExpect();
  }

  connection_manager_.user_agent_.initializeFromHeaders(*request_headers_,
                                                        connection_manager_.stats_.prefixStatName(),
                                                        connection_manager_.stats_.scope_);

  // Make sure we are getting a codec version we support.
  if (protocol == Protocol::Http10) {
    // Assume this is HTTP/1.0. This is fine for HTTP/0.9 but this code will also affect any
    // requests with non-standard version numbers (0.9, 1.3), basically anything which is not
    // HTTP/1.1.
    //
    // The protocol may have shifted in the HTTP/1.0 case so reset it.
    filter_manager_.streamInfo().protocol(protocol);
    if (!connection_manager_.config_.http1Settings().accept_http_10_) {
      // Send "Upgrade Required" if HTTP/1.0 support is not explicitly configured on.
      sendLocalReply(Code::UpgradeRequired, "", nullptr, absl::nullopt,
                     StreamInfo::ResponseCodeDetails::get().LowVersion);
      return;
    }
    if (!request_headers_->Host() &&
        !connection_manager_.config_.http1Settings().default_host_for_http_10_.empty()) {
      // Add a default host if configured to do so.
      request_headers_->setHost(
          connection_manager_.config_.http1Settings().default_host_for_http_10_);
    }
  }

  if (!request_headers_->Host()) {
    // Require host header. For HTTP/1.1 Host has already been translated to :authority.
    sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
                   StreamInfo::ResponseCodeDetails::get().MissingHost);
    return;
  }

  // Verify header sanity checks which should have been performed by the codec.
  ASSERT(HeaderUtility::requestHeadersValid(*request_headers_).has_value() == false);

  // Check for the existence of the :path header for non-CONNECT requests, or present-but-empty
  // :path header for CONNECT requests. We expect the codec to have broken the path into pieces if
  // applicable. NOTE: Currently the HTTP/1.1 codec only does this when the allow_absolute_url flag
  // is enabled on the HCM.
  if ((!HeaderUtility::isConnect(*request_headers_) || request_headers_->Path()) &&
      request_headers_->getPathValue().empty()) {
    sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
                   StreamInfo::ResponseCodeDetails::get().MissingPath);
    return;
  }

  // Currently we only support relative paths at the application layer.
  if (!request_headers_->getPathValue().empty() && request_headers_->getPathValue()[0] != '/') {
    connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc();
    sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
                   StreamInfo::ResponseCodeDetails::get().AbsolutePath);
    return;
  }

  // Path sanitization should happen before any path access other than the above sanity check.
  const auto action =
      ConnectionManagerUtility::maybeNormalizePath(*request_headers_, connection_manager_.config_);
  // gRPC requests are rejected if Envoy is configured to redirect post-normalization. This is
  // because gRPC clients do not support redirect.
  if (action == ConnectionManagerUtility::NormalizePathAction::Reject ||
      (action == ConnectionManagerUtility::NormalizePathAction::Redirect &&
       Grpc::Common::hasGrpcContentType(*request_headers_))) {
    connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
    sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
                   StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
    return;
  } else if (action == ConnectionManagerUtility::NormalizePathAction::Redirect) {
    connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
    sendLocalReply(
        Code::TemporaryRedirect, "",
        [new_path = request_headers_->Path()->value().getStringView()](
            Http::ResponseHeaderMap& response_headers) -> void {
          response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
        },
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
    return;
  }

  ASSERT(action == ConnectionManagerUtility::NormalizePathAction::Continue);
  auto optional_port = ConnectionManagerUtility::maybeNormalizeHost(
      *request_headers_, connection_manager_.config_, localPort());
  if (optional_port.has_value() &&
      requestWasConnect(request_headers_, connection_manager_.codec_->protocol())) {
    filter_manager_.streamInfo().filterState()->setData(
        Router::OriginalConnectPort::key(),
        std::make_unique<Router::OriginalConnectPort>(optional_port.value()),
        StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Request);
  }

  if (!state_.is_internally_created_) { // Only sanitize headers on first pass.
    // Modify the downstream remote address depending on configuration and headers.
    const auto mutate_result = ConnectionManagerUtility::mutateRequestHeaders(
        *request_headers_, connection_manager_.read_callbacks_->connection(),
        connection_manager_.config_, *snapped_route_config_, connection_manager_.local_info_);

    // IP detection failed, reject the request.
    if (mutate_result.reject_request.has_value()) {
      const auto& reject_request_params = mutate_result.reject_request.value();
      connection_manager_.stats_.named_.downstream_rq_rejected_via_ip_detection_.inc();
      sendLocalReply(reject_request_params.response_code, reject_request_params.body, nullptr,
                     absl::nullopt,
                     StreamInfo::ResponseCodeDetails::get().OriginalIPDetectionFailed);
      return;
    }

    filter_manager_.setDownstreamRemoteAddress(mutate_result.final_remote_address);
  }
  ASSERT(filter_manager_.streamInfo().downstreamAddressProvider().remoteAddress() != nullptr);

  ASSERT(!cached_route_);
  refreshCachedRoute();

  if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
    filter_manager_.streamInfo().setTraceReason(
        ConnectionManagerUtility::mutateTracingRequestHeader(
            *request_headers_, connection_manager_.runtime_, connection_manager_.config_,
            cached_route_.value().get()));
  }

  filter_manager_.streamInfo().setRequestHeaders(*request_headers_);

  const bool upgrade_rejected = filter_manager_.createFilterChain() == false;

  // TODO if there are no filters when starting a filter iteration, the connection manager
  // should return 404. The current returns no response if there is no router filter.
  if (hasCachedRoute()) {
    // Do not allow upgrades if the route does not support it.
    if (upgrade_rejected) {
      // While downstream servers should not send upgrade payload without the upgrade being
      // accepted, err on the side of caution and refuse to process any further requests on this
      // connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
      // contains a smuggled HTTP request.
      state_.saw_connection_close_ = true;
      connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
      sendLocalReply(Code::Forbidden, "", nullptr, absl::nullopt,
                     StreamInfo::ResponseCodeDetails::get().UpgradeFailed);
      return;
    }
    // Allow non websocket requests to go through websocket enabled routes.
  }

  // Check if tracing is enabled.
  if (connection_manager_tracing_config_.has_value()) {
    traceRequest();
  }

  filter_manager_.decodeHeaders(*request_headers_, end_stream);

  // Reset it here for both global and overridden cases.
  resetIdleTimer();
}

 
위 과정에서 이제 connectionManager와 결합하여 Connection을 만든다.
과정을 조금 자세히 살펴보면 다음과 같다.
 
header_utility.cc

bool HeaderUtility::shouldCloseConnection(Http::Protocol protocol,
                                          const RequestOrResponseHeaderMap& headers) {
  // HTTP/1.0 defaults to single-use connections. Make sure the connection will be closed unless
  // Keep-Alive is present.
  if (protocol == Protocol::Http10 &&
      (!headers.Connection() ||
       !Envoy::StringUtil::caseFindToken(headers.Connection()->value().getStringView(), ",",
                                         Http::Headers::get().ConnectionValues.KeepAlive))) {
    return true;
  }

  if (protocol == Protocol::Http11 && headers.Connection() &&
      Envoy::StringUtil::caseFindToken(headers.Connection()->value().getStringView(), ",",
                                       Http::Headers::get().ConnectionValues.Close)) {
    return true;
  }

  // Note: Proxy-Connection is not a standard header, but is supported here
  // since it is supported by http-parser the underlying parser for http
  // requests.
  if (protocol < Protocol::Http2 && headers.ProxyConnection() &&
      Envoy::StringUtil::caseFindToken(headers.ProxyConnection()->value().getStringView(), ",",
                                       Http::Headers::get().ConnectionValues.Close)) {
    return true;
  }
  return false;
}

 
먼저 shouldCloseCOnnection을 통해서 해당 프로토콜이 http2이하인지를 살펴보고 종료 여부를 결정한다.
 
conn_manager_impl.cc

void ConnectionManagerImpl::ActiveStream::maybeEndDecode(bool end_stream) {
  // If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
  if (end_stream && !filter_manager_.remoteDecodeComplete()) {
    filter_manager_.streamInfo().downstreamTiming().onLastDownstreamRxByteReceived(
        connection_manager_.read_callbacks_->connection().dispatcher().timeSource());
    ENVOY_STREAM_LOG(debug, "request end stream", *this);
  }
}

 
그 다음에 end_stream이면서 decode 완료 여부를 확인한다.
 
conn_manager_impl.cc

void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
                                                        bool end_stream) {
  ....(중략)...

  // We need to snap snapped_route_config_ here as it's used in mutateRequestHeaders later.
  if (connection_manager_.config_.isRoutable()) {
    if (connection_manager_.config_.routeConfigProvider() != nullptr) {
      snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
    } else if (connection_manager_.config_.scopedRouteConfigProvider() != nullptr) {
      snapped_scoped_routes_config_ =
          connection_manager_.config_.scopedRouteConfigProvider()->config<Router::ScopedConfig>();
      snapScopedRouteConfig();
    }
  } else {
    snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
  }

 ...(중략)...
}

Connection Manager로부터 Route Config가 존재하는지를 확인하고 존재하면 해당 Config 정보를 가져온다. 만약 ScopedRouteConfig가 설정되어있으면, 해당 정보를 가져오고 그렇지 않을 경우에는 Route 정보를 가져올 것이다.
 
conn_manager_impl.cc

void ConnectionManagerImpl::ActiveStream::snapScopedRouteConfig() {
  // NOTE: if a RDS subscription hasn't got a RouteConfiguration back, a Router::NullConfigImpl is
  // returned, in that case we let it pass.
  snapped_route_config_ = snapped_scoped_routes_config_->getRouteConfig(*request_headers_);
  if (snapped_route_config_ == nullptr) {
    ENVOY_STREAM_LOG(trace, "can't find SRDS scope.", *this);
    // TODO(stevenzzzz): Consider to pass an error message to router filter, so that it can
    // send back 404 with some more details.
    snapped_route_config_ = std::make_shared<Router::NullConfigImpl>();
  }
}

 
scoped_config_impl.cc

Router::ConfigConstSharedPtr
ScopedConfigImpl::getRouteConfig(const Http::HeaderMap& headers) const {
  ScopeKeyPtr scope_key = scope_key_builder_.computeScopeKey(headers);
  if (scope_key == nullptr) {
    return nullptr;
  }
  auto iter = scoped_route_info_by_key_.find(scope_key->hash());
  if (iter != scoped_route_info_by_key_.end()) {
    return iter->second->routeConfig();
  }
  return nullptr;
}

 
 
그외 route를 위한 사전 작업을 수행한다.
 
conn_manager_impl.cc

void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
                                                        bool end_stream) {
  ...(중략)...

  if (!state_.is_internally_created_) { // Only sanitize headers on first pass.
    // Modify the downstream remote address depending on configuration and headers.
    const auto mutate_result = ConnectionManagerUtility::mutateRequestHeaders(
        *request_headers_, connection_manager_.read_callbacks_->connection(),
        connection_manager_.config_, *snapped_route_config_, connection_manager_.local_info_);

    // IP detection failed, reject the request.
    if (mutate_result.reject_request.has_value()) {
      const auto& reject_request_params = mutate_result.reject_request.value();
      connection_manager_.stats_.named_.downstream_rq_rejected_via_ip_detection_.inc();
      sendLocalReply(reject_request_params.response_code, reject_request_params.body, nullptr,
                     absl::nullopt,
                     StreamInfo::ResponseCodeDetails::get().OriginalIPDetectionFailed);
      return;
    }

    filter_manager_.setDownstreamRemoteAddress(mutate_result.final_remote_address);
  }
  ...(중략)...
}

그 다음에 하는 일은 DownStreamRemoteAddress를 확인해서 설정하는 작업을 수행한다.
 
filter_manager.h

  void setDownstreamRemoteAddress(
      const Network::Address::InstanceConstSharedPtr& downstream_remote_address) {
    stream_info_.setDownstreamRemoteAddress(downstream_remote_address);
  }

 
conn_manager_impl.cc

void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
                                                        bool end_stream) {
  ...(중략)...

  refreshCachedRoute();

  if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
    filter_manager_.streamInfo().setTraceReason(
        ConnectionManagerUtility::mutateTracingRequestHeader(
            *request_headers_, connection_manager_.runtime_, connection_manager_.config_,
            cached_route_.value().get()));
  }

  filter_manager_.streamInfo().setRequestHeaders(*request_headers_);

  const bool upgrade_rejected = filter_manager_.createFilterChain() == false;

  // TODO if there are no filters when starting a filter iteration, the connection manager
  // should return 404. The current returns no response if there is no router filter.
  if (hasCachedRoute()) {
    // Do not allow upgrades if the route does not support it.
    if (upgrade_rejected) {
      // While downstream servers should not send upgrade payload without the upgrade being
      // accepted, err on the side of caution and refuse to process any further requests on this
      // connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
      // contains a smuggled HTTP request.
      state_.saw_connection_close_ = true;
      connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
      sendLocalReply(Code::Forbidden, "", nullptr, absl::nullopt,
                     StreamInfo::ResponseCodeDetails::get().UpgradeFailed);
      return;
    }
    // Allow non websocket requests to go through websocket enabled routes.
  }

  // Check if tracing is enabled.
  if (connection_manager_tracing_config_.has_value()) {
    traceRequest();
  }

  filter_manager_.decodeHeaders(*request_headers_, end_stream);

  // Reset it here for both global and overridden cases.
  resetIdleTimer();
}

 
그 다음에는 refreshCachedRoute()을 호출하여 Route를 결정한다.
 
conn_manager_impl.cc

void ConnectionManagerImpl::ActiveStream::refreshCachedRoute() { refreshCachedRoute(nullptr); }

 
conn_manager_impl.cc

void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::RouteCallback& cb) {
  Router::RouteConstSharedPtr route;
  if (request_headers_ != nullptr) {
    if (connection_manager_.config_.isRoutable() &&
        connection_manager_.config_.scopedRouteConfigProvider() != nullptr) {
      // NOTE: re-select scope as well in case the scope key header has been changed by a filter.
      snapScopedRouteConfig();
    }
    if (snapped_route_config_ != nullptr) {
      route = snapped_route_config_->route(cb, *request_headers_, filter_manager_.streamInfo(),
                                           stream_id_);
    }
  }

  setRoute(route);
}

 
conn_manager_impl.cc

void ConnectionManagerImpl::ActiveStream::snapScopedRouteConfig() {
  // NOTE: if a RDS subscription hasn't got a RouteConfiguration back, a Router::NullConfigImpl is
  // returned, in that case we let it pass.
  snapped_route_config_ = snapped_scoped_routes_config_->getRouteConfig(*request_headers_);
  if (snapped_route_config_ == nullptr) {
    ENVOY_STREAM_LOG(trace, "can't find SRDS scope.", *this);
    // TODO(stevenzzzz): Consider to pass an error message to router filter, so that it can
    // send back 404 with some more details.
    snapped_route_config_ = std::make_shared<Router::NullConfigImpl>();
  }
}

 
conn_manager_impl.cc

Router::RouteConstSharedPtr
ConnectionManagerImpl::ActiveStream::route(const Router::RouteCallback& cb) {
  if (cached_route_.has_value()) {
    return cached_route_.value();
  }
  refreshCachedRoute(cb);
  return cached_route_.value();
}

 
conn_manager_impl.cc

/**
 * Sets the cached route to the RouteConstSharedPtr argument passed in. Handles setting the
 * cached_route_/cached_cluster_info_ ActiveStream attributes, the FilterManager streamInfo, tracing
 * tags, and timeouts.
 *
 * Declared as a StreamFilterCallbacks member function for filters to call directly, but also
 * functions as a helper to refreshCachedRoute(const Router::RouteCallback& cb).
 */
void ConnectionManagerImpl::ActiveStream::setRoute(Router::RouteConstSharedPtr route) {
  filter_manager_.streamInfo().route_ = route;
  cached_route_ = std::move(route);
  if (nullptr == filter_manager_.streamInfo().route() ||
      nullptr == filter_manager_.streamInfo().route()->routeEntry()) {
    cached_cluster_info_ = nullptr;
  } else {
    Upstream::ThreadLocalCluster* local_cluster =
        connection_manager_.cluster_manager_.getThreadLocalCluster(
            filter_manager_.streamInfo().route()->routeEntry()->clusterName());
    cached_cluster_info_ = (nullptr == local_cluster) ? nullptr : local_cluster->info();
  }

  filter_manager_.streamInfo().setUpstreamClusterInfo(cached_cluster_info_.value());
  refreshCachedTracingCustomTags();
  refreshDurationTimeout();
  refreshIdleTimeout();
}

 
cluster_manager_impl.cc

ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view cluster) {
  ThreadLocalClusterManagerImpl& cluster_manager = *tls_;

  auto entry = cluster_manager.thread_local_clusters_.find(cluster);
  if (entry != cluster_manager.thread_local_clusters_.end()) {
    return entry->second.get();
  } else {
    return nullptr;
  }
}

 
stream_info_impl.h

  void setUpstreamClusterInfo(
      const Upstream::ClusterInfoConstSharedPtr& upstream_cluster_info) override {
    upstream_cluster_info_ = upstream_cluster_info;
  }

upstream cluster 정보를 등록하고 만약 tracing 정보가 추가되어야한다면 해당 정보를 추가한다.
그리고 Timeout 관련해서 refresh를 수행한다.
 
stream_info_impl.h

  void setUpstreamClusterInfo(
      const Upstream::ClusterInfoConstSharedPtr& upstream_cluster_info) override {
    upstream_cluster_info_ = upstream_cluster_info;
  }

 
conn_manager_impl.cc

void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
                                                        bool end_stream) {
  ...(중략)...

  filter_manager_.streamInfo().setRequestHeaders(*request_headers_);

  ...(중략)...

  // Check if tracing is enabled.
  if (connection_manager_tracing_config_.has_value()) {
    traceRequest();
  }

  filter_manager_.decodeHeaders(*request_headers_, end_stream);

  // Reset it here for both global and overridden cases.
  resetIdleTimer();
}

 
그 다음에는 filter_manager에게 request header 정보를 저장하고 decodeheaders를 통해서 Route 과정을 수행할 수 있또록 진행한다.
 
filter_manager.h

  void decodeHeaders(RequestHeaderMap& headers, bool end_stream) {
    state_.remote_decode_complete_ = end_stream;
    decodeHeaders(nullptr, headers, end_stream);
  }

 
filter_manager.cc

void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
                                  bool end_stream) {
  // Headers filter iteration should always start with the next filter if available.
  std::list<ActiveStreamDecoderFilterPtr>::iterator entry =
      commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext);
  std::list<ActiveStreamDecoderFilterPtr>::iterator continue_data_entry = decoder_filters_.end();

  for (; entry != decoder_filters_.end(); entry++) {
    ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders));
    state_.filter_call_state_ |= FilterCallState::DecodeHeaders;
    (*entry)->end_stream_ = (end_stream && continue_data_entry == decoder_filters_.end());
    FilterHeadersStatus status = (*entry)->decodeHeaders(headers, (*entry)->end_stream_);
    if (state_.decoder_filter_chain_aborted_) {
      ENVOY_STREAM_LOG(trace,
                       "decodeHeaders filter iteration aborted due to local reply: filter={}",
                       *this, (*entry)->filter_context_.config_name);
      status = FilterHeadersStatus::StopIteration;
    }

    ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
           "Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from "
           "decodeHeaders when end_stream is already false");

    state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
    ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this,
                     (*entry)->filter_context_.config_name, static_cast<uint64_t>(status));

    (*entry)->decode_headers_called_ = true;

    const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(status, end_stream);
    ENVOY_BUG(!continue_iteration || !state_.local_complete_,
              "Filter did not return StopAll or StopIteration after sending a local reply.");

    // If this filter ended the stream, decodeComplete() should be called for it.
    if ((*entry)->end_stream_) {
      (*entry)->handle_->decodeComplete();
    }

    // Skip processing metadata after sending local reply
    if (state_.local_complete_ && std::next(entry) != decoder_filters_.end()) {
      maybeContinueDecoding(continue_data_entry);
      return;
    }

    const bool new_metadata_added = processNewlyAddedMetadata();
    // If end_stream is set in headers, and a filter adds new metadata, we need to delay end_stream
    // in headers by inserting an empty data frame with end_stream set. The empty data frame is sent
    // after the new metadata.
    if ((*entry)->end_stream_ && new_metadata_added && !buffered_request_data_) {
      Buffer::OwnedImpl empty_data("");
      ENVOY_STREAM_LOG(
          trace, "inserting an empty data frame for end_stream due metadata being added.", *this);
      // Metadata frame doesn't carry end of stream bit. We need an empty data frame to end the
      // stream.
      addDecodedData(*((*entry).get()), empty_data, true);
    }

    if (!continue_iteration && std::next(entry) != decoder_filters_.end()) {
      // Stop iteration IFF this is not the last filter. If it is the last filter, continue with
      // processing since we need to handle the case where a terminal filter wants to buffer, but
      // a previous filter has added body.
      maybeContinueDecoding(continue_data_entry);
      return;
    }

    // Here we handle the case where we have a header only request, but a filter adds a body
    // to it. We need to not raise end_stream = true to further filters during inline iteration.
    if (end_stream && buffered_request_data_ && continue_data_entry == decoder_filters_.end()) {
      continue_data_entry = entry;
    }
  }

  maybeContinueDecoding(continue_data_entry);

  if (end_stream) {
    disarmRequestTimeout();
  }
}

 
그러면 HTTP Connection Manager에는 cors, fault, router 등과 같은 필터가 있을 수 있는데, 해당 필터를 수행한다. 여기서는 router가 수행됨을 가정해보자.
 
router\config.cc

Http::FilterFactoryCb RouterFilterConfig::createFilterFactoryFromProtoTyped(
    const envoy::extensions::filters::http::router::v3::Router& proto_config,
    const std::string& stat_prefix, Server::Configuration::FactoryContext& context) {
  Stats::StatNameManagedStorage prefix(stat_prefix, context.scope().symbolTable());
  Router::FilterConfigSharedPtr filter_config(new Router::FilterConfig(
      prefix.statName(), context,
      std::make_unique<Router::ShadowWriterImpl>(context.clusterManager()), proto_config));

  return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void {
    callbacks.addStreamDecoderFilter(
        std::make_shared<Router::ProdFilter>(*filter_config, filter_config->default_stats_));
  };
}

 
filter_manager.h

  void addStreamDecoderFilter(ActiveStreamDecoderFilterPtr filter) {
    // Note: configured decoder filters are appended to decoder_filters_.
    // This means that if filters are configured in the following order (assume all three filters
    // are both decoder/encoder filters):
    //   http_filters:
    //     - A
    //     - B
    //     - C
    // The decoder filter chain will iterate through filters A, B, C.
    LinkedList::moveIntoListBack(std::move(filter), decoder_filters_);
  }

참고로  Router 필터 등록 시에는 위와 같은 필터가 decoder_filters로 등록되어있다.
 
router.cc

Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
  downstream_headers_ = &headers;

  // Extract debug configuration from filter state. This is used further along to determine whether
  // we should append cluster and host headers to the response, and whether to forward the request
  // upstream.
  const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
  const DebugConfig* debug_config = filter_state->getDataReadOnly<DebugConfig>(DebugConfig::key());

  // TODO: Maybe add a filter API for this.
  grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
  exclude_http_code_stats_ = grpc_request_ && config_.suppress_grpc_request_failure_code_stats_;

  // Only increment rq total stat if we actually decode headers here. This does not count requests
  // that get handled by earlier filters.
  stats_.rq_total_.inc();

  // Initialize the `modify_headers` function as a no-op (so we don't have to remember to check it
  // against nullptr before calling it), and feed it behavior later if/when we have cluster info
  // headers to append.
  std::function<void(Http::ResponseHeaderMap&)> modify_headers = [](Http::ResponseHeaderMap&) {};

  // Determine if there is a route entry or a direct response for the request.
  route_ = callbacks_->route();
  if (!route_) {
    stats_.no_route_.inc();
    ENVOY_STREAM_LOG(debug, "no route match for URL '{}'", *callbacks_, headers.getPathValue());

    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound);
    callbacks_->sendLocalReply(Http::Code::NotFound, "", modify_headers, absl::nullopt,
                               StreamInfo::ResponseCodeDetails::get().RouteNotFound);
    return Http::FilterHeadersStatus::StopIteration;
  }

  // Determine if there is a direct response for the request.
  const auto* direct_response = route_->directResponseEntry();
  if (direct_response != nullptr) {
    stats_.rq_direct_response_.inc();
    direct_response->rewritePathHeader(headers, !config_.suppress_envoy_headers_);
    callbacks_->streamInfo().setRouteName(direct_response->routeName());
    callbacks_->sendLocalReply(
        direct_response->responseCode(), direct_response->responseBody(),
        [this, direct_response,
         &request_headers = headers](Http::ResponseHeaderMap& response_headers) -> void {
          std::string new_path;
          if (request_headers.Path()) {
            new_path = direct_response->newPath(request_headers);
          }
          // See https://tools.ietf.org/html/rfc7231#section-7.1.2.
          const auto add_location =
              direct_response->responseCode() == Http::Code::Created ||
              Http::CodeUtility::is3xx(enumToInt(direct_response->responseCode()));
          if (!new_path.empty() && add_location) {
            response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
          }
          direct_response->finalizeResponseHeaders(response_headers, callbacks_->streamInfo());
        },
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().DirectResponse);
    return Http::FilterHeadersStatus::StopIteration;
  }

  // A route entry matches for the request.
  route_entry_ = route_->routeEntry();
  // If there's a route specific limit and it's smaller than general downstream
  // limits, apply the new cap.
  retry_shadow_buffer_limit_ =
      std::min(retry_shadow_buffer_limit_, route_entry_->retryShadowBufferLimit());
  callbacks_->streamInfo().setRouteName(route_entry_->routeName());
  if (debug_config && debug_config->append_cluster_) {
    // The cluster name will be appended to any local or upstream responses from this point.
    modify_headers = [this, debug_config](Http::ResponseHeaderMap& headers) {
      headers.addCopy(debug_config->cluster_header_.value_or(Http::Headers::get().EnvoyCluster),
                      route_entry_->clusterName());
    };
  }
  Upstream::ThreadLocalCluster* cluster =
      config_.cm_.getThreadLocalCluster(route_entry_->clusterName());
  if (!cluster) {
    stats_.no_cluster_.inc();
    ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName());

    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoClusterFound);
    callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", modify_headers,
                               absl::nullopt,
                               StreamInfo::ResponseCodeDetails::get().ClusterNotFound);
    return Http::FilterHeadersStatus::StopIteration;
  }
  cluster_ = cluster->info();

  // Set up stat prefixes, etc.
  request_vcluster_ = route_entry_->virtualCluster(headers);
  if (request_vcluster_ != nullptr) {
    callbacks_->streamInfo().setVirtualClusterName(request_vcluster_->name());
  }
  route_stats_context_ = route_entry_->routeStatsContext();
  ENVOY_STREAM_LOG(debug, "cluster '{}' match for URL '{}'", *callbacks_,
                   route_entry_->clusterName(), headers.getPathValue());

  if (config_.strict_check_headers_ != nullptr) {
    for (const auto& header : *config_.strict_check_headers_) {
      const auto res = FilterUtility::StrictHeaderChecker::checkHeader(headers, header);
      if (!res.valid_) {
        callbacks_->streamInfo().setResponseFlag(
            StreamInfo::ResponseFlag::InvalidEnvoyRequestHeaders);
        const std::string body = fmt::format("invalid header '{}' with value '{}'",
                                             std::string(res.entry_->key().getStringView()),
                                             std::string(res.entry_->value().getStringView()));
        const std::string details =
            absl::StrCat(StreamInfo::ResponseCodeDetails::get().InvalidEnvoyRequestHeaders, "{",
                         StringUtil::replaceAllEmptySpace(res.entry_->key().getStringView()), "}");
        callbacks_->sendLocalReply(Http::Code::BadRequest, body, nullptr, absl::nullopt, details);
        return Http::FilterHeadersStatus::StopIteration;
      }
    }
  }

  const Http::HeaderEntry* request_alt_name = headers.EnvoyUpstreamAltStatName();
  if (request_alt_name) {
    alt_stat_prefix_ = std::make_unique<Stats::StatNameDynamicStorage>(
        request_alt_name->value().getStringView(), config_.scope_.symbolTable());
    headers.removeEnvoyUpstreamAltStatName();
  }

  // See if we are supposed to immediately kill some percentage of this cluster's traffic.
  if (cluster_->maintenanceMode()) {
    callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
    chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true);
    callbacks_->sendLocalReply(
        Http::Code::ServiceUnavailable, "maintenance mode",
        [modify_headers, this](Http::ResponseHeaderMap& headers) {
          if (!config_.suppress_envoy_headers_) {
            headers.addReference(Http::Headers::get().EnvoyOverloaded,
                                 Http::Headers::get().EnvoyOverloadedValues.True);
          }
          // Note: append_cluster_info does not respect suppress_envoy_headers.
          modify_headers(headers);
        },
        absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode);
    cluster_->stats().upstream_rq_maintenance_mode_.inc();
    return Http::FilterHeadersStatus::StopIteration;
  }

  // Fetch a connection pool for the upstream cluster.
  const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions();

  if (upstream_http_protocol_options.has_value() &&
      (upstream_http_protocol_options.value().auto_sni() ||
       upstream_http_protocol_options.value().auto_san_validation())) {
    // Default the header to Host/Authority header.
    absl::string_view header_value = headers.getHostValue();

    // Check whether `override_auto_sni_header` is specified.
    const auto override_auto_sni_header =
        upstream_http_protocol_options.value().override_auto_sni_header();
    if (!override_auto_sni_header.empty()) {
      // Use the header value from `override_auto_sni_header` to set the SNI value.
      const auto overridden_header_value = Http::HeaderUtility::getAllOfHeaderAsString(
          headers, Http::LowerCaseString(override_auto_sni_header));
      if (overridden_header_value.result().has_value() &&
          !overridden_header_value.result().value().empty()) {
        header_value = overridden_header_value.result().value();
      }
    }
    const auto parsed_authority = Http::Utility::parseAuthority(header_value);
    bool should_set_sni = !parsed_authority.is_ip_address_;
    // `host_` returns a string_view so doing this should be safe.
    absl::string_view sni_value = parsed_authority.host_;

    if (should_set_sni && upstream_http_protocol_options.value().auto_sni()) {
      callbacks_->streamInfo().filterState()->setData(
          Network::UpstreamServerName::key(),
          std::make_unique<Network::UpstreamServerName>(sni_value),
          StreamInfo::FilterState::StateType::Mutable);
    }

    if (upstream_http_protocol_options.value().auto_san_validation()) {
      callbacks_->streamInfo().filterState()->setData(
          Network::UpstreamSubjectAltNames::key(),
          std::make_unique<Network::UpstreamSubjectAltNames>(
              std::vector<std::string>{std::string(sni_value)}),
          StreamInfo::FilterState::StateType::Mutable);
    }
  }

  transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(
      *callbacks_->streamInfo().filterState());

  if (auto downstream_connection = downstreamConnection(); downstream_connection != nullptr) {
    if (auto typed_state = downstream_connection->streamInfo()
                               .filterState()
                               .getDataReadOnly<Network::UpstreamSocketOptionsFilterState>(
                                   Network::UpstreamSocketOptionsFilterState::key());
        typed_state != nullptr) {
      auto downstream_options = typed_state->value();
      if (!upstream_options_) {
        upstream_options_ = std::make_shared<Network::Socket::Options>();
      }
      Network::Socket::appendOptions(upstream_options_, downstream_options);
    }
  }

  if (upstream_options_ && callbacks_->getUpstreamSocketOptions()) {
    Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions());
  }

  std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);

  if (!generic_conn_pool) {
    sendNoHealthyUpstreamResponse();
    return Http::FilterHeadersStatus::StopIteration;
  }
  Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host();

  if (debug_config && debug_config->append_upstream_host_) {
    // The hostname and address will be appended to any local or upstream responses from this point,
    // possibly in addition to the cluster name.
    modify_headers = [modify_headers, debug_config, host](Http::ResponseHeaderMap& headers) {
      modify_headers(headers);
      headers.addCopy(
          debug_config->hostname_header_.value_or(Http::Headers::get().EnvoyUpstreamHostname),
          host->hostname());
      headers.addCopy(debug_config->host_address_header_.value_or(
                          Http::Headers::get().EnvoyUpstreamHostAddress),
                      host->address()->asString());
    };
  }

  // If we've been instructed not to forward the request upstream, send an empty local response.
  if (debug_config && debug_config->do_not_forward_) {
    modify_headers = [modify_headers, debug_config](Http::ResponseHeaderMap& headers) {
      modify_headers(headers);
      headers.addCopy(
          debug_config->not_forwarded_header_.value_or(Http::Headers::get().EnvoyNotForwarded),
          "true");
    };
    callbacks_->sendLocalReply(Http::Code::NoContent, "", modify_headers, absl::nullopt, "");
    return Http::FilterHeadersStatus::StopIteration;
  }

  hedging_params_ = FilterUtility::finalHedgingParams(*route_entry_, headers);

  timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_.suppress_envoy_headers_,
                                         grpc_request_, hedging_params_.hedge_on_per_try_timeout_,
                                         config_.respect_expected_rq_timeout_);

  const Http::HeaderEntry* header_max_stream_duration_entry =
      headers.EnvoyUpstreamStreamDurationMs();
  if (header_max_stream_duration_entry) {
    dynamic_max_stream_duration_ =
        FilterUtility::tryParseHeaderTimeout(*header_max_stream_duration_entry);
    headers.removeEnvoyUpstreamStreamDurationMs();
  }

  // If this header is set with any value, use an alternate response code on timeout
  if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) {
    timeout_response_code_ = Http::Code::NoContent;
    headers.removeEnvoyUpstreamRequestTimeoutAltResponse();
  }

  include_attempt_count_in_request_ = route_entry_->includeAttemptCountInRequest();
  if (include_attempt_count_in_request_) {
    headers.setEnvoyAttemptCount(attempt_count_);
  }

  // The router has reached a point where it is going to try to send a request upstream,
  // so now modify_headers should attach x-envoy-attempt-count to the downstream response if the
  // config flag is true.
  if (route_entry_->includeAttemptCountInResponse()) {
    modify_headers = [modify_headers, this](Http::ResponseHeaderMap& headers) {
      modify_headers(headers);

      // This header is added without checking for config_.suppress_envoy_headers_ to mirror what is
      // done for upstream requests.
      headers.setEnvoyAttemptCount(attempt_count_);
    };
  }
  callbacks_->streamInfo().setAttemptCount(attempt_count_);

  route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(),
                                       !config_.suppress_envoy_headers_);
  FilterUtility::setUpstreamScheme(
      headers, callbacks_->streamInfo().downstreamAddressProvider().sslConnection() != nullptr);

  // Ensure an http transport scheme is selected before continuing with decoding.
  ASSERT(headers.Scheme());

  retry_state_ =
      createRetryState(route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_,
                       route_stats_context_, config_.runtime_, config_.random_,
                       callbacks_->dispatcher(), config_.timeSource(), route_entry_->priority());

  // Determine which shadow policies to use. It's possible that we don't do any shadowing due to
  // runtime keys. Also the method CONNECT doesn't support shadowing.
  auto method = headers.getMethodValue();
  if (method != Http::Headers::get().MethodValues.Connect) {
    for (const auto& shadow_policy : route_entry_->shadowPolicies()) {
      const auto& policy_ref = *shadow_policy;
      if (FilterUtility::shouldShadow(policy_ref, config_.runtime_, callbacks_->streamId())) {
        active_shadow_policies_.push_back(std::cref(policy_ref));
        shadow_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_);
      }
    }
  }

  ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers);

  // Hang onto the modify_headers function for later use in handling upstream responses.
  modify_headers_ = modify_headers;

  conn_pool_new_stream_with_early_data_and_http3_ =
      Runtime::runtimeFeatureEnabled(Runtime::conn_pool_new_stream_with_early_data_and_http3);
  const bool can_send_early_data =
      conn_pool_new_stream_with_early_data_and_http3_ &&
      route_entry_->earlyDataPolicy().allowsEarlyDataForRequest(*downstream_headers_);
  // Set initial HTTP/3 use based on the presence of HTTP/1.1 proxy config.
  // For retries etc, HTTP/3 usability may transition from true to false, but
  // will never transition from false to true.
  bool can_use_http3 =
      !transport_socket_options_ || !transport_socket_options_->http11ProxyInfo().has_value();
  UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
      *this, std::move(generic_conn_pool), can_send_early_data, can_use_http3);
  LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
  upstream_requests_.front()->acceptHeadersFromRouter(end_stream);
  if (end_stream) {
    onRequestComplete();
  }

  return Http::FilterHeadersStatus::StopIteration;
}

 
legacy_parser_impl.cc

CallbackResult LegacyHttpParserImpl::pause() { return impl_->pause(); }

 
legacy_parser_impl.cc

  CallbackResult pause() {
    http_parser_pause(&parser_, 1);
    return CallbackResult::Success;
  }

 
http_parser.cc

void
http_parser_pause(http_parser *parser, int paused) {
  /* Users should only be pausing/unpausing a parser that is not in an error
   * state. In non-debug builds, there's not much that we can do about this
   * other than ignore it.
   */
  if (HTTP_PARSER_ERRNO(parser) == HPE_OK ||
      HTTP_PARSER_ERRNO(parser) == HPE_PAUSED) {
    uint32_t nread = parser->nread; /* used by the SET_ERRNO macro */
    SET_ERRNO((paused) ? HPE_PAUSED : HPE_OK);
  } else {
    assert(0 && "Attempting to pause parser in error state");
  }
}

 
 
 
Parsing 과정이 끝나면 다시 dispatchSlice에서부터 시작함
 
codec_impl.cc

Envoy::StatusOr<size_t> ConnectionImpl::dispatchSlice(const char* slice, size_t len) {
  ASSERT(codec_status_.ok() && dispatching_);
  const size_t nread = parser_->execute(slice, len);
  if (!codec_status_.ok()) {
    return codec_status_;
  }

  const ParserStatus status = parser_->getStatus();
  if (status != ParserStatus::Ok && status != ParserStatus::Paused) {
    absl::string_view error = Http1ResponseCodeDetails::get().HttpCodecError;
    if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser")) {
      if (parser_->errorMessage() == "headers size exceeds limit" ||
          parser_->errorMessage() == "trailers size exceeds limit") {
        error = Http1ResponseCodeDetails::get().HeadersTooLarge;
      } else if (parser_->errorMessage() == "header value contains invalid chars") {
        error = Http1ResponseCodeDetails::get().InvalidCharacters;
      }
    }
    RETURN_IF_ERROR(sendProtocolError(error));
    // Avoid overwriting the codec_status_ set in the callbacks.
    ASSERT(codec_status_.ok());
    codec_status_ =
        codecProtocolError(absl::StrCat("http/1.1 protocol error: ", parser_->errorMessage()));
    return codec_status_;
  }

  return nread;
}

 
Parsing 과정이 끝나면 Parser는 Paused 상태가 되니 Parser를 통해 읽은 size 개수를 반환한다.
 
codec_impl.cc

Http::Status ConnectionImpl::dispatch(Buffer::Instance& data) {
  ...(중략),,,

  ssize_t total_parsed = 0;
  if (data.length() > 0) {
    current_dispatching_buffer_ = &data;
    while (data.length() > 0) {
      ...(중략)...
      if (!dispatching_slice_already_drained_) {
        ASSERT(statusor_parsed.value() <= slice.len_);
        data.drain(statusor_parsed.value());
      }

      total_parsed += statusor_parsed.value();
      if (parser_->getStatus() != ParserStatus::Ok) {
        // Parse errors trigger an exception in dispatchSlice so we are guaranteed to be paused at
        // this point.
        ASSERT(parser_->getStatus() == ParserStatus::Paused);
        break;
      }
    }
    current_dispatching_buffer_ = nullptr;
    dispatchBufferedBody();
  } else {
    auto result = dispatchSlice(nullptr, 0);
    if (!result.ok()) {
      return result.status();
    }
  }
  ASSERT(buffered_body_.length() == 0);

  ENVOY_CONN_LOG(trace, "parsed {} bytes", connection_, total_parsed);

  // If an upgrade has been handled and there is body data or early upgrade
  // payload to send on, send it on.
  maybeDirectDispatch(data);
  return Http::okStatus();
}

 
buffer_impl.h

  void drain(uint64_t size) {
    ASSERT(data_ + size <= reservable_);
    data_ += size;
    if (data_ == reservable_) {
      // All the data in the slice has been drained. Reset the offsets so all
      // the data can be reused.
      data_ = 0;
      reservable_ = 0;
    }
  }

 
codec_impl.cc

Http::Status ConnectionImpl::dispatch(Buffer::Instance& data) {
  ...(중략),,,

  ssize_t total_parsed = 0;
  if (data.length() > 0) {
    current_dispatching_buffer_ = &data;
    while (data.length() > 0) {
      ...(중략)...
      if (parser_->getStatus() != ParserStatus::Ok) {
        // Parse errors trigger an exception in dispatchSlice so we are guaranteed to be paused at
        // this point.
        ASSERT(parser_->getStatus() == ParserStatus::Paused);
        break;
      }
    }
    current_dispatching_buffer_ = nullptr;
    dispatchBufferedBody();
  } else {
    auto result = dispatchSlice(nullptr, 0);
    if (!result.ok()) {
      return result.status();
    }
  }
  ASSERT(buffered_body_.length() == 0);

  ENVOY_CONN_LOG(trace, "parsed {} bytes", connection_, total_parsed);

  // If an upgrade has been handled and there is body data or early upgrade
  // payload to send on, send it on.
  maybeDirectDispatch(data);
  return Http::okStatus();
}

 
dispatch가 완료되면 parser 상태가 중지이므로 do-while 문을 벗어난다.
 
codec_impl.cc

void ConnectionImpl::dispatchBufferedBody() {
  ASSERT(parser_->getStatus() == ParserStatus::Ok || parser_->getStatus() == ParserStatus::Paused);
  ASSERT(codec_status_.ok());
  if (buffered_body_.length() > 0) {
    onBody(buffered_body_);
    buffered_body_.drain(buffered_body_.length());
  }
}

parser가 완료되면 body length가 없으므로 처리하는 내용은 없다.
 
codec_impl.cc

bool ConnectionImpl::maybeDirectDispatch(Buffer::Instance& data) {
  if (!handling_upgrade_) {
    // Only direct dispatch for Upgrade requests.
    return false;
  }

  ENVOY_CONN_LOG(trace, "direct-dispatched {} bytes", connection_, data.length());
  onBody(data);
  data.drain(data.length());
  return true;
}

 
parser가 완료되면 return fals를 통해 더 이상 데이터 fetch를 수행하지 않는다.
 

 
codec_impl.cc

Http::Status ServerConnectionImpl::dispatch(Buffer::Instance& data) {
  ...(중략)...

  if (active_request_ != nullptr && active_request_->remote_complete_) {
    // Read disable the connection if the downstream is sending additional data while we are working
    // on an existing request. Reading from the connection will be re-enabled after the active
    // request is completed.
    if (data.length() > 0) {
      active_request_->response_encoder_.readDisable(true);
    }
  }
  return status;
}

 
상태를 반환한다.
 
conn_manager_impl.cc

Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
...(중략)...

  bool redispatch;
  do {
    ...(중략)...

    if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
      handleCodecError(status.message());
      return Network::FilterStatus::StopIteration;
    } else if (isCodecProtocolError(status)) {
      stats_.named_.downstream_cx_protocol_error_.inc();
      handleCodecError(status.message());
      return Network::FilterStatus::StopIteration;
    }
    ASSERT(status.ok());

    // Processing incoming data may release outbound data so check for closure here as well.
    checkForDeferredClose(false);

    // The HTTP/1 codec will pause dispatch after a single message is complete. We want to
    // either redispatch if there are no streams and we have more data. If we have a single
    // complete non-WebSocket stream but have not responded yet we will pause socket reads
    // to apply back pressure.
    if (codec_->protocol() < Protocol::Http2) {
      if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
          data.length() > 0 && streams_.empty()) {
        redispatch = true;
      }
    }
  } while (redispatch);

  if (!read_callbacks_->connection().streamInfo().protocol()) {
    read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
  }

  return Network::FilterStatus::StopIteration;
}

 
conn_manager_impl.cc

void ConnectionManagerImpl::checkForDeferredClose(bool skip_delay_close) {
  Network::ConnectionCloseType close = Network::ConnectionCloseType::FlushWriteAndDelay;
  if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.skip_delay_close") &&
      skip_delay_close) {
    close = Network::ConnectionCloseType::FlushWrite;
  }
  if (drain_state_ == DrainState::Closing && streams_.empty() && !codec_->wantsToWrite()) {
    doConnectionClose(close, absl::nullopt,
                      StreamInfo::ResponseCodeDetails::get().DownstreamLocalDisconnect);
  }
}

 
이후 StopIteration을 return한다.
 
filter_manager_impl.cc

void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter,
                                          ReadBufferSource& buffer_source) {
  ...(중략)...

  for (; entry != upstream_filters_.end(); entry++) {
    ... (중략)...

    StreamBuffer read_buffer = buffer_source.getReadBuffer();
    if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
      FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
      if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
        return;
      }
    }
  }
}

 
Stopiteration이 반환되었으므로 upstream_filter 또한 종료한다.
 
filter_manager_impl.cc

void FilterManagerImpl::onRead() {
  ASSERT(!upstream_filters_.empty());
  onContinueReading(nullptr, connection_);
}

 
onContinueReading 메소드를 종료한다.
 
connection_impl.cc

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
  ...(중략)...

  filter_manager_.onRead();
}

 
onRead 메소드를 종료한다.
 
connection_impl.cc

void ConnectionImpl::onReadReady() {
  ...(중략)...

  read_end_stream_ |= result.end_stream_read_;
  if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
      (latched_dispatch_buffered_data && read_buffer_->length() > 0)) {
    // Skip onRead if no bytes were processed unless we explicitly want to force onRead for
    // buffered data. For instance, skip onRead if the connection was closed without producing
    // more data.
    onRead(new_buffer_size);
  }

  // The read callback may have already closed the connection.
  if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
    ENVOY_CONN_LOG(debug, "remote close", *this);
    closeSocket(ConnectionEvent::RemoteClose);
  }
}

 
onRead가 끝난 이후 IoAction이 Close된 경우 소켓을 종료한다. 여기서는 정상적인 연결이기 때문에 해당 로직을 수행하지 않는다.
 
connection_impl.cc

void ConnectionImpl::onFileEvent(uint32_t events) {
  ...(중략)...

  // It's possible for a write event callback to close the socket (which will cause fd_ to be -1).
  // In this case ignore read event processing.
  if (ioHandle().isOpen() && (events & Event::FileReadyType::Read)) {
    onReadReady();
  }
}

 
최초에 호출했던 onFileEvent를 종료한다.

+ Recent posts