이번 포스팅에서는 이전 포스팅 내용에 이어서 Sidecar Injector에 의해 주입된 사이드카 컨테이너에 대해서 살펴보겠습니다.
2. 사이드카 컨테이너
이전 포스팅에서 Application Pod에 주입된 init-container에 대해서 살펴봤습니다. init-container는 말 그대로 원래 기동하고자하는 컨테이너 기동 전에 사전 작업 설정을 위해서 잠시 실행되는 임시 컨테이너입니다. 따라서 iptables 등을 조작해서 Application 컨테이너가 기동할 수 있는 환경을 만든 이후 해당 컨테이너는 종료됩니다.
이번에는 본격적으로 주입된 사이드카 컨테이너의 Yaml 내용을 살펴보면서, 어떤 설정이 추가되었는지 살펴보겠습니다.
init-container에는 iptables를 조작해야했기 때문에 root와 NET_ADMIN, NET_RAW와 같은 capability가 필요했는데, 사이드카 컨테이너에서는 이미 iptables 변경이 완료되었기 때문에 더 이상 root로 실행시키지 않으며, capabilities도 모두 삭제하도록 되어있음을 확인할 수 있습니다.
세 번째로 살펴볼 것은 argument입니다. 인자를 통해서 해당 프로그램이 proxy로 기동됨을 전달합니다. 여기서 살펴볼 점은 argument를 통해서 sidecar 인자를 넘겨줌에 따라 해당 Container가 sidecar proxy로 기동될 것임을 전달합니다.
두번째 중요한 정보는 --concurrency입니다. 이전에 envoy 아키텍처 포스팅에서 확인하였듯이. conccurency는 Worker 쓰레드의 개수를 의미합니다. 즉 해당 envoy proxy는 Master와 2개의 Worker 쓰레드가 구성되어 동작함을 유추할 수 있습니다.
3. Pilot-Agent 기동 과정
지금까지 YAML 내용을 살펴보면서 sidecar injector에 의해 주입된 사이드카 컨테이너의 주요 설정에 대해서 알아봤습니다. 이번에는 주입된 컨테이너 내부를 살펴보면서 어떻게 동작하는지를 살펴보고자 합니다.
사이드카 컨테이너 내부에는 pilot-agent 프로그램이 존재합니다. 따라서 해당 프로그램이 먼저 기동됩니다. 이때 pilot-agent 내부에는 여러가지 컴포넌트가 존재하는데, 그 중 가장 핵심은 XDS Proxy 입니다.
XDS Proxy는 Envoy Proxy와 istiod의 pilot-discovery의 통신을 중개하는 역할을 담당하는 컴포넌트로써 Envoy에서의 요청사항을 pilot-discovery에 전달하고 pilot-discovery로부터 전달되는 응답값을 Envoy에게 다시 전달하는 역할을 수행합니다. 이때 내부 Envoy와의 통신은 Unix Domain Socket으로 수행하며 pilot-discovery와는 gRPC를 통해 요청을 주고 받는 것이 특징입니다.
Local DNS Server는 istio의 ServiceEntry에서 DNS에 등록되지 않은 host명으로 입력한 경우 올바르게 Resolution되지 않습니다. 이때 내부 Local DNS Server를 둠으로써 Kubernetes 외부에 위치한 Service에 대해서도 DNS 결과를 제공하기 위해 제공되는 서버입니다. 해당 서버는 ISTIO_META_DNS_CAPTURE 환경 변수 여부에 따라 기동 여부가 결정되며, 기본적으로는 false이므로 기동되지 않습니다. 이와 관련한 자세한 내용은 아래 공식문서를 참고하시기 바랍니다.
SDS Server는 istio에서 제공하는 mTLS 기능 구현을 위해 CA에게 인증서를 요청하고 제공된 인증서를 관리하기 위한 서버입니다. 해당 컴포넌트에 대해서는 추후 다른 포스팅을 통해 다루어보겠습니다.
Pilot-Agent 내부에 속한 주요 컴포넌트에 대해 간략하게 살펴봤습니다. 지금부터는 envoy와 xds proxy를 중점으로 구동 과정을 살펴보겠습니다.
3-1 Proxy 초기화
pilot-agent가 기동될 때 가장 먼저 수행하는 것은 Proxy 생성을 위한 초기화 작업입니다. 사이드카 컨테이너의 YAML을 살펴보면 위 그림과 같이 Pod의 IP와 이름 그리고 namespace가 환경변수로 주입된 것을 확인할 수 있는데, 해당 값등을 조합하여 Proxy 설정을 진행합니다.
Proxy 초기화 과정에서는 입력된 IP와 namespace를 지정하는 것 외에도 ID와 DNS Domain을 수행하는데, ID는 입력된 POD_NAME + "." + POD_NAMESPACE로 지정됩니다. 반면 DNS Domain의 경우 위와 같이 Argument에 --domain 인자로 값이 입력되었으면, 해당 값이 사용되고 입력되지 않았을 경우에는 POD_NAMESPACE + ".svc.cluster.loocal" 이 사용됩니다.
3-2 Proxy Config 설정
Proxy 초기화 이후 수행 작업은 Proxy Config를 설정하는 것입니다. 이때 Proxy Config를 구성하기 위해 세 군데에서 입력되는 정보들을 조합하기 위해 값을 읽어들입니다.
Pod에 입력한 annotation 정보들은 kubernetes의 downwardAPI에 의하여 Pod 내부 /etc/istio/pod/annotations 경로에 Mount 됩니다. 해당 정보는 pilot-agent가 기동하면서 참조할 수 있으며, 이를 통해 annotation 정보를 추출할 수 있습니다. 여기서 annotation 중 proxy.istio.io/config 값은 Pod의 Proxy Config를 설정할 수 있습니다. 따라서 Proxy Config를 구성하는 과정에서 해당 정보를 참고합니다.
두 번째는 Pod 내부에 /etc/istio/config/mesh에 Proxy Config 정보를 위한 파일을 mount 했다면, 해당 정보를 읽어들여서 Proxy Config를 구성합니다.
세 번째는 환경 변수로 입력한 PROXY_CONFIG 정보를 참조하는 것입니다. 해당 정보는 sidecar injector에 의해서 주입된 정보이며, Global Proxy Config 설정을 했다면, sidecar injector가 Pod의 YAML을 조작하는 과정에서 해당 정보가 주입됩니다.
위 세 정보를 추출한 이후에는 Proxy Config를 구성합니다. 이때 pilot-agent 내부에 별도 Default Config가 존재하며, Default Config 외에 입력된 세 가지 Config 정보를 차례 차례로 merge 하여 구성합니다. 이때 위 그림과 같은 순서대로 merge 작업을 수행하며, 같은 값이 존재하는 경우에는 덮어씁니다. 따라서 우선순위 기준으로 보면 Pod에 부착된 어노테이션이 가장 높습니다.
이전 포스팅에서는 istiod의 수동 방식과 pilot-discovery Sidecar Injector에 의해 자동 주입된 envoy proxy 과정에 대해서 살펴봤습니다.
Sidecar Injector에 의해 주입된 변경된 Pod를 살펴보면 istio-init이라는 init-container와 istio-proxy라는 사이드카 컨테이너 총 2개가 주입되는 것을 확인할 수 있습니다. 이번 포스팅에서는 그 중 init-container 부분에 대해서 다루어 보고자합니다.
우선 본론에 들어가기 앞서 간단히 해당 컨테이너의 역할에 대해서 살펴보면, envoy가 application과 트래픽을 주고 받고 istio control plane과 통신을 원활하게 수행하기 위한 iptables를 조작합니다.
지금부터 본격적으로 iptables 조작 과정이 어떻게 이루어지는지에 대해서 알아보겠습니다. 다만 필자가 네트워크나 서버 전문가가 아니기 때문에 틀린 부분이 있을 수 있습니다. 틀린 부분이 있다면, 꼭 알려주시기 바랍니다.
2. iptables
Kubernetes에서 네트워크 트래픽을 다루는데 있어 핵심은 iptables와 Netfilter 입니다. 이를 통해서 외부로 트래픽을 전달하기도 하고 내부 컨테이터로 트래픽을 전달합니다. istio 또한 inbound와 outbound 트래픽을 envoy로 전달하고 외부로 보내야하기 때문에 Netfilter에서 제공하는 hook을 사용하여 패킷의 흐름을 변경하며 이를 위해서 사이드카 컨테이너를 주입할 때 iptables 체인을 만들고 라우팅 규칙을 설정하는 작업을 선행합니다.
iptables와 Netfilter에 대해서는 내용이 방대하고 깊으므로 별도 학습을 권장드리며, 본 포스팅에서는 envoy의 설정과 네트워크 조작에 대해 알아보기 이전에 가볍게 iptables에서 사용되는 네트워크 체인에 대해서만 가볍게 다루어보고 넘어가겠습니다.
iptables에는 기본적으로 위와같이 5개의 체인이 등록되어있습니다. 각각의 체인은 역할이 존재하는데, 체인들의 관계와 흐름을 이해하는 것이 중요합니다. 따라서 위 그림을 기반으로 각 체인의 특징에 대해 살펴보겠습니다.
1. PREROUTING
PREROUTING은 Packet이 처음 도달하는 체인으로 패킷 내용을 조사하여 목적지가 로컬 주소인지 아닌지를 판단합니다. 이때 만약 로컬이라면, INPUT 체인으로 전달하고 다른 host로 전달되어야한다면 FORWARDING으로 전달합니다. 그리고 해당 체인을 통해서 목적지의 주소 변경(DNAT)이 가능한 것이 주요 특징 중 하나입니다.
그렇다면 DNAT는 왜 할까요?
가령 Docker를 예로 들자면, 생성한 도커 컨테이너를 외부에서 접속 가능하게 하기 위해 흔히 포트 포워딩을 수행합니다. 이때 위 그림과 같이 특정 서버 포트(8080)에 대해서 컨테이너 내부 포트와 연결시키면, 위와 같이 PREROUTING을 통해 들어온 트래픽은 DOCKER 체인으로 전달되고 그 과정에서 8080 포트로 들어온 연결에 대해서 목적지 주소를 변경하는 작업을 수행해서 이후 컨테이너에 트래픽이 전달됩니다.
2. INPUT
INPUT은 PREROUTING을 통해 전달되는 Packet이 로컬 주소의 목적지를 향할 경우에 해당 체인으로 라우팅됩니다. 만약 해당 체인을 통해 최종 목적지를 지정하면 그쪽으로 트래픽을 전달할 수 있습니다.
3. OUTPUT
OUTPUT은 외부에서 들어오는 패킷이 아니라 서버에서 생성되어 나가는 패킷이 발생될 때 트리거링되는 체인입니다.
4. FORWARDING
FORWARDING은 해당 서버로 보낸 패킷은 아니지만 외부 패킷을 외부로 전달하는 경우 설정 여부에 따라 패킷을 전달할지 여부를 결정합니다. 사례를 통해 조금 더 자세히 살펴보겠습니다.
위와 같이 2개의 서버를 연결하는 라우터로써 사용될 때 중간에 있는 서버는 외부에서 들어온 패킷을 외부로 전달하는 역할을 수행합니다. 이때 패킷을 다른 서버에 전달하기 위해서는 sysctl.conf 파일에서 net.ipv4.ip_forward 값을 1로 설정이 필요합니다. 설정이 완료되면, 해당 체인을 통해서 지정된 Gateway를 통해 목적지로 전달될 것입니다.
두 번째 사례는 위 그림과 같이 단일 서버내라도 Bridge interface로 구성되어있고 하위에 별도 Network namespace와 서로 다른 네트워크 주소로 구성된 환경에서 namespace간 통신을 수행할 때 해당 체인 설정이 필요할 수 있습니다. 가령 FORWARD 체인에 대해서 Policy가 DROP으로 되어있을 경우 출발지 IP에 대해 ACCEPT 하도록 정책을 등록할 수 있습니다.
위 상황에서 같은 서버내 통신인데도 FORWARDING 체인으로 전달되는 이유는 서로 다른 네트워크 주소를 가지고 있기 때문에 host 입장에서는 출발지와 목적지 모두가 외부 패킷이기 때문입니다.
5. POSTROUTING
POSTROUTING은 OUTPUT에서 전달된 패킷이나 FORWARDING을 통해서 전달된 패킷을 통해 네트워크 인터페이스를 통해 나갈 패킷에 대한 처리를 수행할 수 있습니다. 이때 전달된 패킷에 대하여 출발지 주소를 변경(SNAT)할 수 있습니다.
SNAT의 경우는 공인 IP와 사설 IP로 구성되었을 경우 사설 IP 대역에서 외부 인터넷 접속 시 돌아올 목적지를 공인 IP로 지정해야되기 때문에 출발지 주소를 서버의 IP로 변경해서 나갑니다. 따라서 이 경우 해당 체인을 통해 출발지 주소를 변경할 수 있습니다.
3. init conatiner
지금까지 iptables에서 사용되는 체인에 대해서 살펴봤습니다. 지금부터는 이전 포스팅에서 살펴본 istio에 의해서 주입된 사이드카 컨테이너 설정을 차근 차근 살펴보면서 어떤 것이 적용되었는지를 분석해보겠습니다.
먼저 iptables를 조작하기 위해서 securityContext에서 NET_ADMIN과 NET_RAW에 대한 capabilites를 추가하고 그 외 나머지는 drop 하도록 하였습니다. 그리고 해당 container를 root 유저로 기동하는 것을 확인할 수 있습니다.
세번째로 살펴볼 것은 argument 입니다. argument로 istio-iptables 옵션을 전달하면 내부에서 istio-iptables.sh을 수행해서 iptables를 조작하는 작업을 수행합니다. 위 코드에서 입력된 옵션에 대해서 살펴보면 다음과 같습니다.
옵션
설명
-p
Traffic을 redirect 받을 envoy의 포트
-z
Inbound TCP traffic에 대해서 redirect 포트
-u
proxy container의 UID
-m
envoy로 redirect되는 Inbound 연결에 사용되는 모드로써 REDIRECT 혹은 TPROXY 중 선택 가능
-i
envoy로 redirect 시킬 CIDR IP Range로 복수개 입력 시 "," 로 구분하여 list 형태로 입력한다. 해당 값이 *일 경우 모두 outbound로 redirect 하며, 값이 없을 경우에는 모든 outbound 트래픽을 disable 한다.
-x
envoy로 redirect 제외 시킬 CIDR IP Range로 복수개 입력 시 "," 로 구분하여 list 형태로 입력함
-b
envoy로 redirect 시킬 Inbound Port를 명시하는 것으로써 복수개 입력 시 "," 로 구분하여 list 형태로 입력함
-d
envoy로 redirect 제외시킬 Inbound Port를 명시하는 것으로써 복수개 입력 시 "," 로 구분하여 list 형태로 입력함
위 예제에서 작성한 옵션을 기반으로 설명하면, 위 args는 다음과 같은 명령을 요구하였음을 알 수 있습니다.
1) Inboud의 경우 15006 그 외는 15001 포트로 전달한다. 2) 해당 proxy를 사용하는 컨테이너의 UID는 1337이다 3) 모든 IP로부터 전달되는 트래픽은 모두 envoy로 redirect한다. 4) envoy redirect 하는데 있어 제외대상 IP는 없다. 5) 15090, 15021, 15020 포트를 제외한 나머지 포트를 통해 들어오는 트래픽은 envoy로 redirect한다.
위 내용에서 15090, 15021, 15020 포트 등은 envoy의 헬스체크와 prometheus 연결 등에 사용하기 위한 포트로 사용되기 때문에 제외하였습니다. 그 밖에도 istio에서 사용하는 여러 포트가 있는데, 해당 내용은 공식문서를 참고하시기 바랍니다.
참고로 위 istio-iptables의 실제 동작 과정이나 위에 설명한 옵션 외에 다른 옵션이 무엇이 있는지 궁금하신 분은 istio github을 참고하시기 바랍니다.
4. iptables 룰 변경 내역 확인
이전 내용들을 토대로 init container 안에서 iptables를 조작함을 확인했습니다. 이번에는 init container를 통해서 변경된 내용을 확인하기 위해 istio-init container의 로그를 확인해보겠습니다.
[cla9@DESKTOP-FBK64T0]$ kubectl logs -c istio-init nginx
2022-08-26T00:34:38.813305Z info Istio iptables environment:
ENVOY_PORT=
INBOUND_CAPTURE_PORT=
ISTIO_INBOUND_INTERCEPTION_MODE=
ISTIO_INBOUND_TPROXY_ROUTE_TABLE=
ISTIO_INBOUND_PORTS=
ISTIO_OUTBOUND_PORTS=
ISTIO_LOCAL_EXCLUDE_PORTS=
ISTIO_EXCLUDE_INTERFACES=
ISTIO_SERVICE_CIDR=
ISTIO_SERVICE_EXCLUDE_CIDR=
ISTIO_META_DNS_CAPTURE=
INVALID_DROP=
2022-08-26T00:34:38.813346Z info Istio iptables variables:
PROXY_PORT=15001
PROXY_INBOUND_CAPTURE_PORT=15006
PROXY_TUNNEL_PORT=15008
PROXY_UID=1337
PROXY_GID=1337
INBOUND_INTERCEPTION_MODE=REDIRECT
INBOUND_TPROXY_MARK=1337
INBOUND_TPROXY_ROUTE_TABLE=133
INBOUND_PORTS_INCLUDE=*
INBOUND_PORTS_EXCLUDE=15090,15021,15020
OUTBOUND_OWNER_GROUPS_INCLUDE=*
OUTBOUND_OWNER_GROUPS_EXCLUDE=
OUTBOUND_IP_RANGES_INCLUDE=*
OUTBOUND_IP_RANGES_EXCLUDE=
OUTBOUND_PORTS_INCLUDE=
OUTBOUND_PORTS_EXCLUDE=
KUBE_VIRT_INTERFACES=
ENABLE_INBOUND_IPV6=false
DNS_CAPTURE=false
DROP_INVALID=false
CAPTURE_ALL_DNS=false
DNS_SERVERS=[],[]
OUTPUT_PATH=
NETWORK_NAMESPACE=
CNI_MODE=false
EXCLUDE_INTERFACES=
2022-08-26T00:34:38.813822Z info Writing following contents to rules file: /tmp/iptables-rules-1661474078813392800.txt3716182416
* nat
-N ISTIO_INBOUND
-N ISTIO_REDIRECT
-N ISTIO_IN_REDIRECT
-N ISTIO_OUTPUT
-A ISTIO_INBOUND -p tcp --dport 15008 -j RETURN
-A ISTIO_REDIRECT -p tcp -j REDIRECT --to-ports 15001
-A ISTIO_IN_REDIRECT -p tcp -j REDIRECT --to-ports 15006
-A PREROUTING -p tcp -j ISTIO_INBOUND
-A ISTIO_INBOUND -p tcp --dport 15090 -j RETURN
-A ISTIO_INBOUND -p tcp --dport 15021 -j RETURN
-A ISTIO_INBOUND -p tcp --dport 15020 -j RETURN
-A ISTIO_INBOUND -p tcp -j ISTIO_IN_REDIRECT
-A OUTPUT -p tcp -j ISTIO_OUTPUT
-A ISTIO_OUTPUT -o lo -s 127.0.0.6/32 -j RETURN
-A ISTIO_OUTPUT -o lo ! -d 127.0.0.1/32 -m owner --uid-owner 1337 -j ISTIO_IN_REDIRECT
-A ISTIO_OUTPUT -o lo -m owner ! --uid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -m owner --uid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -o lo ! -d 127.0.0.1/32 -m owner --gid-owner 1337 -j ISTIO_IN_REDIRECT
-A ISTIO_OUTPUT -o lo -m owner ! --gid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -m owner --gid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -d 127.0.0.1/32 -j RETURN
-A ISTIO_OUTPUT -j ISTIO_REDIRECT
COMMIT
위 코드는 istio-init container의 로그 일부를 발췌하였습니다. 많은 내용이 있지만 이 중에서 유의미한 내용만 추려서 살펴보겠습니다.
위 내용들을 살펴보면, 이전에 arg 인자로 전달했던 옵션들이 정상적으로 매칭되었음을 확인할 수 있습니다. 먼저 inbound 트래픽은 15006 포트로 전달되도록 되어있고 outbound의 경우는 15001로 지정되었음을 확인할 수 있습니다.
PROXY의 UID와 GID는 1337로 이 또한 인자로 지정한 값입니다. PORT 전달 대상은 15090, 15021, 15020을 제외한 나머지 PORT를 모두 INBOUND로 전달하도록 되어있습니다.
* nat
-N ISTIO_INBOUND
-N ISTIO_REDIRECT
-N ISTIO_IN_REDIRECT
-N ISTIO_OUTPUT
-A ISTIO_INBOUND -p tcp --dport 15008 -j RETURN
-A ISTIO_REDIRECT -p tcp -j REDIRECT --to-ports 15001
-A ISTIO_IN_REDIRECT -p tcp -j REDIRECT --to-ports 15006
-A PREROUTING -p tcp -j ISTIO_INBOUND
-A ISTIO_INBOUND -p tcp --dport 15090 -j RETURN
-A ISTIO_INBOUND -p tcp --dport 15021 -j RETURN
-A ISTIO_INBOUND -p tcp --dport 15020 -j RETURN
-A ISTIO_INBOUND -p tcp -j ISTIO_IN_REDIRECT
-A OUTPUT -p tcp -j ISTIO_OUTPUT
-A ISTIO_OUTPUT -o lo -s 127.0.0.6/32 -j RETURN
-A ISTIO_OUTPUT -o lo ! -d 127.0.0.1/32 -m owner --uid-owner 1337 -j ISTIO_IN_REDIRECT
-A ISTIO_OUTPUT -o lo -m owner ! --uid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -m owner --uid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -o lo ! -d 127.0.0.1/32 -m owner --gid-owner 1337 -j ISTIO_IN_REDIRECT
-A ISTIO_OUTPUT -o lo -m owner ! --gid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -m owner --gid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -d 127.0.0.1/32 -j RETURN
-A ISTIO_OUTPUT -j ISTIO_REDIRECT
Argument로 전달된 값을 기반으로 생성된 NAT 체인 룰을 보면 위와 같이 지정된 것을 로그를 통해 확인할 수 있습니다. 그렇다면 구체적으로 어떤 값이 어떻게 변경되었을까요? 이에 대해서 조금 자세히 살펴보겠습니다.
로그 내용을 살펴보면 먼저 4개의 체인을 생성한 것을 확인할 수 있습니다. 여기서 각각의 체인은 다음과 같은 역할을 수행합니다.
ISTIO_INBOUND : PREROUTING으로 전달된 트래픽 중 tcp 트래픽 전달받음 ISTIO_REDIRECT : Outbound Traffic을 envoy의 Outbound Handler인 15001 포트로 전달함 ISTIO_IN_REDIRECT : Inbound Traffic을 envoy의 Inbound Handler인 15006 포트로 전달함 ISTIO_OUTPUT : envoy traffic을 결정하는 가장 핵심적인 체인으로 traffic 전달과 관련한 규칙이 정의되어있음.
체인을 생성하고 난 이후에는 체인 호출 및 트래픽 라우팅과 관련된 규칙을 설정하고 있습니다. 위 설정 중에서 가장 중요한 것은 ISTIO_OUTPUT이며, 해당 체인에 적용된 규칙에 의하여 그 다음에 전달할 체인이 결정됩니다.
-A ISTIO_OUTPUT -o lo -s 127.0.0.6/32 -j RETURN
-A ISTIO_OUTPUT -o lo ! -d 127.0.0.1/32 -m owner --uid-owner 1337 -j ISTIO_IN_REDIRECT
-A ISTIO_OUTPUT -o lo -m owner ! --uid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -m owner --uid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -o lo ! -d 127.0.0.1/32 -m owner --gid-owner 1337 -j ISTIO_IN_REDIRECT
-A ISTIO_OUTPUT -o lo -m owner ! --gid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -m owner --gid-owner 1337 -j RETURN
-A ISTIO_OUTPUT -d 127.0.0.1/32 -j RETURN
-A ISTIO_OUTPUT -j ISTIO_REDIRECT
참고로 ISTIO_OUTPUT에 적용된 규칙은 위와 같으며 해당 규칙을 조금 더 풀어서 설명하면 다음과 같습니다.
처리 결과
Out Interface
조건
RETURN
loopback 인터페이스
출발지가 127.0.0.6 일 경우
ISTIO_IN_REDIRECT
loopback 인터페이스
목적지가 localhost(127.0.0.1)가 아니면서 UID가 1337일 경우
RETURN
loopback 인터페이스
UID가 1337가 아닐 경우
RETURN
*
UID가 1337일 경우
ISTIO_IN_REDIRECT
loopback 인터페이스
목적지가 localhost(127.0.0.1)가 아니면서 GID가 1337일 경우
RETURN
loopback 인터페이스
GID가 1337가 아닐 경우
RETURN
*
GID가 1337일 경우
RETURN
*
출발지가 localhost(127.0.0.1)일 경우
ISTIO_REDIRECT
*
*
위 규칙에 따라 외부에서 접속하는 요청을 envoy에 전달하고 다시 envoy inbound handler에서 나온 트래픽을 application에 전달합니다. 그리고 반대로 application에서 나오는 트래픽을 외부로 전달하는 역할을 수행합니다.
체인의 내용만 봐서는 트래픽이 어떤 흐름으로 어떻게 도달되는지 감이 잡히지는 않습니다. 다만 이번 포스팅은 설정과 관련된 내용만을 다루기 때문에 트래픽 전달에 대해서는 추후에 다루어보도록 하고 여기서는 위 설정 관련해서 알아야할 주요 포인트 3가지 (loopback 인터페이스, 127.0.0.6 IP 존재 이유, 1337 UID/GID)에 대해서 알아보겠습니다.
4-1 loopback 인터페이스
일반적으로 쿠버네티스를 통해 1개의 컨테이너가 포함된 POD를 배포하면 왼쪽과 같이 배포된다고 생각하지만, 실제로는 오른쪽과 같이 사용자에게는 보이지 않는 Pause 컨테이너가 포함된 형태로 배포됩니다.
여기서 Pause 컨테이너는 네트워크, IPC namespace를 생성하고 다른 컨테이너와 공유하는 역할을 수행하며, init process의 역할을 수행해서 좀비 프로세스를 방지하는 역할을 수행합니다.
그 결과 POD 내부에 여러개의 컨테이너가 생성되었을 때 내부적으로는 loopback 인터페이스를 통해 컨테이너간에 localhost로 통신이 가능합니다. 따라서 위 ISTIO_OUTPUT 규칙에서 Out Interface가 loopback인 것은 envoy proxy와 Application 통신을 위해 localhost를 사용함을 알 수 있습니다.
4-2 127.0.0.6
해당 IP의 의미에 대해 알기 전에 envoy의 cluster 타입 중 하나인 ORIGINAL_DST와 passthroughcluster에 대한 내용 이해가 필요합니다. 따라서 해당 개념 사전학습 이후에 살펴보겠습니다.
4-2-1 ORIGINAL_DST
ORIGINAL_DST는 envoy의 cluster 타입 중 하나로써, 트래픽이 들어왔을 때 목적지 주소가 envoy에 등록된 Cluster와 Endpoint 중 일치하는 것이 없을 경우에 해당 목적지 주소를 다시 매핑해서 Forwarding할 수 있도록 지원하는 기능입니다.
이를 위해서 envoy 공식 문서에 따르면 2가지 컴포넌트가 상호 작용하는 것을 알 수 있습니다.
1. Original Destination Listener Filter 2. Original Destination Cluster
Original Destination Listener Filter는 envoy 아키텍처 포스팅에서 다룬 내용으로 Listener Filters에 해당합니다. 해당 Filter는 SO_ORIGINAL_DST 소켓 정보를 읽어서 iptables에 의해 목적지 주소가 바뀌기 이전 사용자의 원래 목적지 정보를 읽는 역할을 수행합니다.
iptables를 지나게되면 envoy proxy로 트래픽이 전달되어야 되기 때문에 목적지 주소가 바뀌게 되는데, Original Destination Filter를 통과하면서 redirect된 소켓의 주소 정보를 해당 Filter에서 읽습니다.
이후 해당 정보는 Listener를 지나 Original Destination Cluster에 전해지는데, 이때 해당 트래픽의 목적지 주소를 iptables를 지나기 이전 목적지 주소로 재설정하는데 사용됩니다. 이를 통해 upstream address에 대해서 동적으로 런타임에 주소 변경이 가능한 것이 특징입니다. 그외 Original Destination Cluster에 대한 부가적인 설명은 envoy 공식 문서를 통해 참고 바랍니다.
4-2-2 PassthroughCluster
istio의 Passthrough는 서비스 메시 바깥으로 나가는 egress 트래픽에 대해서 ALLOW_ANY(default)로 지정된 경우 Service Entry로 등록하지 않은 트래픽을 envoy proxy를 통해 외부로 전달하기 위해 사용됩니다. 이를 위해서 PassthroughCluster라는 Virtual Cluster를 envoy 내부에 생성하고 envoy가 보유한 cluster 혹은 endpoint와 매치되지 않는 목적지 트래픽에 대해 해당 Cluster로 트래픽을 전달하고 후속 작업을 처리합니다. 이때 해당 PassthroughCluster의 타입은 이전에 설명한 ORIGINAL_DST로 지정되어 있어 목적지의 IP를 유지한채로 외부에 전달할 수 있습니다.
지금까지 ORIGINAL_DST와 PassthroughCluster에 대해서 설명했습니다. 그렇다면, 이제 127.0.0.6 IP가 왜 등장했는지에 대해서 살펴보겠습니다.
14443 github 이슈를 살펴보면, 이전에는 Service로 등록하지 않은 Pod의 port를 외부에서 호출했을 경우 iptables과 envoy간에 무한 루프로 인하여 트래픽이 정상적으로 전달되지 못하고 계속 반복되는 현상이 있었습니다.
두 번째 문제로는 Pod 내부 호출을 위해서 Wildcard bind 혹은 localhost를 입력하거나 Kubernetes의 downward API를 사용하여 Pod IP만을 지정하게되면, Port가 명시되어있지 않기 때문에 cluster에서 매칭되는 결과를 찾을 수가 없습니다. 따라서 이 경우에는 InboundPassthroughCluster의 경로를 따르게 되는데, 이러한 이유로 Istio가 적용된 클러스터와 그렇지 않은 클러스터에서 Pod간 통신 할 때 제약사항이 존재하게 되었습니다. 여기서 제약사항에 대한 자세한 내용은 Inbound Forwarding 문서를 참고하시기 바랍니다.
따라서, 위 두 가지 문제를 해결하기 위해서 다음과 같은 결정을 하게 됩니다.
출처 : Inbound Forwarding - Google Docs
내용을 살펴보면 Inbound Cluster 타입은 ORIGINAL_DST로 변경했으며, UpstreamBindConfig를 통해 upstream 값을 127.0.0.6으로 지정하여, localhost에 위치한 애플리케이션으로 트래픽을 전달할 수 있도록 하였습니다. 이를 통해 iptables과 envoy간의 무한 루프를 탈피하고자 했습니다.
[cla9@DESKTOP-FBK64T0]$ ip route show table local | grep 127.0.0.0/8
local 127.0.0.0/8 dev lo proto kernel scope host src 127.0.0.1
여기서 127.0.0.6은 127.0.0.0/8 즉 로컬호스트 루프백 주소에 사용하기 위한 수많은 주소 중 하나이며, 컨트리뷰터가 밝힌 수 많은 주소 중 127.0.0.6이 채택된 이유는 envoy의 Inbound 트래픽을 전달받는 포트가 15006 이기 때문입니다.
정리하자면 127.0.0.6는 envoy proxy에 트래픽이 진입한 이후에 바인딩되는 주소로써, envoy 밖으로 나간 이후 Outbound Handler를 우회하여 Pod의 Application으로 전달될 수 있도록 하는 역할을 수행합니다. 이러한 과정이 없다면, envoy에서 upstream으로 전달을 해야하는데, iptables를 지날 때 다시 envoy로 전달될 수 있기 때문에 해당 magic 주소를 임의로 추가하여 iptables에서 envoy가 아닌 application으로 트래픽을 전달하는데 목적이 있습니다. 해당 주소는 istio 코드에 IboundPassthroughClusterIpv4로 하드코딩되어있는 값이며, 자세한 내용은 github 이슈 내용을 참고 바랍니다.
4-3 1337 UID, GID
iptables에서 UID 혹은 GID가 1337로 구분되어 있는 이유는 envoy proxy와 application 트래픽을 구분하기 위한 용도입니다.
5. 마무리
이번 포스팅에서는 사이드카 컨테이너 주입 설정에서 init-container에 주입된 설정에 대해서 살펴봤습니다. init-container에서는 inbound 트래픽을 envoy proxy에게 전달하고 이를 다시 application에게 전달하기 위한 설정과 outbound 트래픽 흐름을 조작하기 위한 iptables 설정에 대해서 살펴봤습니다.
실제 트래픽이 들어왔을 때 어떤 체인을 통해 어떻게 전달되는지에 대해서는 살펴보지 않았기 때문에 해당 iptables 설정이 어떻게 쓰이는지에 대해서는 아직 잘 모를 수 있습니다. 이는 차후에 몇 가지 사례를 통해서 어떤 체인을 통해 전달되는지 살펴보겠습니다.
다음 포스팅에서는 사이드카 컨테이너인 envoy proxy 설정과 내부 구조에 대해서 살펴보겠습니다.
지금까지 pilot-discovery에 대해서 학습하면서, k8s의 리소스 변화를 감지하고 이를 pilot-agent에게 전달하는 과정에 대해서 살펴봤습니다. 이번에는 애플리케이션 서비스에 pilot-agent를 배포하는 과정을 살펴보며, 이때 pilot-discovery는 어떤 역할을 수행하는지 살펴보겠습니다.
2. pilot-agent 배포
istio를 사용하지 않는 상황에서 단일 서비스에 envoy proxy를 사용한다면, envoy proxy와 service 그리고 proxy와 service 연결을 위한 설정 파일을 한데 묶어 배포하면 되었습니다. 하지만 이러한 방식은 사용자 입장에서 CI/CD 단계에서 service 관리 뿐만 아니라 envoy proxy 설정을 관리해야하는 불편함이 존재했습니다.
더욱이 istio와 같은 control plane이 추가되면, 이제는 control plane과 연결을 위한 속성까지 추가해야하므로 service 관리자의 불편은 더욱 가중됩니다. 만약 배포되는 istio의 설정 정보가 바뀌기라도 한다면, service 관리자는 전체 service에 속한 config 정보를 바꿔야할 지도 모릅니다.
istio에서는 이러한 문제를 해결하고자 사용자는 CI/CD를 위해서 개별 Service 설정에만 집중하도록 하고 pilot-agent 추가와 설정 정보 변경은 istio가 대신 해주기 위한 2가지 방법을 제공합니다.
첫 번째 방법은 수동 방식으로 사용자가 작성한 yaml 파일을 읽어들여 istio 설정이 적용된 형태로 yaml 형태를 변경해주는 스크립트를 제공해주는 방식입니다. 두 번째 방식은 자동으로 Pod 생성 시점에 istio 설정이 적용된 형태로 배포하는 것입니다. 지금부터 이 두 가지 방법에 대해서 살펴보겠습니다.
2-1 수동 배포(istioctl CLI)
첫번째 방법은 istioctl을 통해서 기존에 생성된 pod.yaml을 조작하는 방법입니다.
위 출력 결과물을 보면, 이전에 작성한 Pod.yaml 과 비교해서 꽤나 복잡해진 것을 확인할 수 있습니다. 이는 istiod와 통신을 위한 iptables 변경과 pilot-agent 주입을 위한 기본 설정이 포함되었기 때문입니다. 만약 사용자가 직접 이를 작성해야한다면 꽤나 불편했을텐데, istioctl을 통해서 위와같이 변경된 yaml 파일을 얻을 수 있습니다.
변경된 yaml 파일을 토대로 kubernetes에 배포하면 istio control plane과 통신할 수 있는 envoy proxy가 내장된 Pod를 사용할 수 있습니다.
2-2 자동 배포
이번에는 자동 배포에 대해서 알아보겠습니다. 자동 배포에는 두 가지 방법이 있습니다. 첫번째 방법은 Pod.yaml에 istio에서 요구하는 label을 추가하는 것입니다.
이전에 살펴본 nginx pod yaml 파일에 sidecar.istio.io/inject: "true" label을 추가하면, Pod 생성 시점에 envoy proxy가 내장된 형태로 Pod yaml이 변경되고 배포가 이루어집니다. 참고로 해당 annotation의 값은 true 이외에도 y, yes, on 중 하나가 입력되면 사이드카 배포가 이루어집니다.
두번째 방법은 배포하려는 namespace에 label을 추가하는 것입니다.
[cla9@DESKTOP-FBK64T0]$ kubectl label ns default istio-injection=enabled
[cla9@DESKTOP-FBK64T0]$ kubectl get ns default -L istio-injection
NAME STATUS AGE ISTIO-INJECTION
default Active 334d enabled
위와 같이 Pod를 배포하려는 namespace에 istio-injection=enabled label을 추가하면, 이후 해당 namespace에 배포되는 Pod에는 모두 envoy proxy가 내장된 형태로 배포가 이루어집니다.
지금까지 envoy proxy를 배포하기 위한 수동 배포와 자동 배포에 대해서 살펴봤습니다. 여기서 자동 배포의 경우 Pod 혹은 namespace에 label을 추가하는 것만으로 Pod 생성시에 자동으로 pilot-agent가 배포되는 것을 확인했는데요. 해당 작업은 어떻게 이루어지는 것일까요?
3. Mutation Webhook
사이드카 컨테이너의 자동 주입과정에 대해서 이해하려면, 먼저 kubernetes의 API Server를 활용한 배포 과정에서 어떠한 일이 일어나는지에 대해서 알아야합니다. 따라서 istio의 자동 배포 프로세스 과정을 살펴보기 전에 kubernetes의 Resource 생성 과정에 대해서 살펴보겠습니다.
사용자가 Kubectl을 통해서 Pod를 생성하였을 때, 우리는 일반적으로 위와같이 Kubectl 명령어를 통해서 Kube ApiServer에 전달하면, Pod가 생성된다고 생각합니다.
이때 내부 과정을 조금 더 자세히 살펴보면, 실제로는 위 그림과 같은 단계를 거쳐 Pod가 생성됩니다.
1. kubectl로 명령어를 Kube ApiServer로 전달할 때 명령어를 실행하는 컴퓨터에 존재하는 config파일 정보를 같이 전달합니다. 해당 config 파일에는 인증서 정보와 사용자 정보가 같이 포함되어있습니다.
2. Kube ApiServer는 제일 첫번째로 인증(Authentication) 과정을 수행합니다. 해당 과정은 사용자가 전달한 인증서 정보를 토대로 해당 서버에 접속 가능한 요청인지를 분석하고 승인하는 과정입니다. 마치 ID/Password를 입력했을 때, 유효한 계정인지를 검증하는 것과 같습니다.
3. 두번째로는 인가(Authorization) 과정을 수행합니다. 해당 과정은 접속이 완료된 이후에 해당 사용자가 요청한 작업에 대해서 수행 권한이 있는지를 분석하고 승인하는 과정입니다. 만약 Pod 생성을 요청했는데, 해당 권한이 존재하지 않는다면 이 단계에서 실패합니다.
4. 마지막 단계에서는 Admission Controllers 체인을 거치면서 사용자가 전달한 yaml 파일내에서 validation을 진행하거나 부가적인 내용을 추가하거나 변경하는 등의 기능을 제공합니다.
5. 모든 단계가 완료되면 Pod가 생성됩니다.
여기서 주목할 부분은 Admission Controllers입니다. Admission Controller는 이전에 설명했듯이 인증과 인가외에 검증(Validation)이나 내용을 변경(Mutation)하기 위해 사용됩니다. 또한 Admission Controller는 하나만 존재하는 것이 아니라 여러 Admission Controller가 Chain 형태로 엮여있고 해당 Chain을 순회하면서 validation 혹은 mutation을 진행합니다.
이쯤되면 istio가 어떻게 자동으로 Pod에 사이드카 컨테이너를 주입시킬 수 있는지 눈치를 챌 수 있는데요. Admission controller 중 하나인 MutatingAdmission Webhook을 활용하여 사용자의 요청을 변경시킵니다.
[cla9@DESKTOP-FBK64T0]$ kubectl get mutatingwebhookconfigurations
NAME WEBHOOKS AGE
istio-revision-tag-default 4 2d
istio-sidecar-injector 4 26d
이를 위해서 kubernetes 상에 위와 같이 찾아보면, istio에서 사이드카를 주입하기 위한 MutatingWebhookConfiguration이 존재하는 것을 확인할 수 있습니다.
해당 설정을 조금 더 자세히 살펴보겠습니다. 위 코드는 istio-sidecar-injector ConfigMap 일부를 발췌한 부분으로 먼저 살펴볼 지점은 rules 부분에 Pod를 생성할 때 해당 Webhook이 동작하는 것을 확인할 수 있습니다.
또한 namespaceSelector와 objectSelector를 통해서 특정 label의 값이 존재할 경우에만 동작하도록 되어있는 것을 확인할 수 있습니다.
마지막으로 살펴볼 지점은 조건들이 부합되었을 때, istiod의 /inject URL로 요청을 전달하여 후속 작업 처리를 요청한다는 점입니다.
이를 토대로 살펴보면, Kubernetes는 사용자가 요청한 내용을 분석하여 istio를 통한 Mutation 변경이 필요하다고 판단될 때 istiod에게 요청하여 mutation 작업을 수행하도록 요청합니다. 그 이후에는 istiod 내부에서 istio 설정에 맞추어 yaml을 변경한 다음 이를 다시 Kubernetes로 전달하여 후속 작업 진행 후 배포하는 과정을 거쳐 Pod가 생성됩니다.
4. Sidecar Injector
이전 내용을 통해 Pod가 생성되는 시점에 istio pilot-discovery에 요청하여 istio 연결에 필요한 설정이 적용되는 것을 확인할 수 있었습니다. 그렇다면 istio 내부에 어떤 컴포넌트가 어떤 과정을 거쳐 이러한 과정이 수행될까요? pilot-discovery에 위치한 Sidecar Injector 컴포넌트를 살펴보면서, 사이드카 컨테이너를 어떻게 주입하는지 과정을 살펴보겠습니다. 먼저 살펴볼 것은 Sidecar Injector의 기동 과정입니다.
처음에 pilot-discovery가 기동될 당시에 INJECT_ENABLED 환경 변수 값을 확인합니다. 해당 값이 true이면, Sidecar Injector의 초기화 과정을 수행합니다. 참고로 해당 환경 변수의 기본 값은 true입니다.
초기화 과정을 살펴보면 다음과 같습니다.
먼저 pilot-discovery가 실행되는 Pod 내의 /var/lib/istio/inject 하위 디렉토리에 config 파일이 존재하는지를 살펴봅니다. 만약 해당 디렉토리에 config 파일이 위치한다면, config와 values를 FileWatcher에 매핑시킵니다.
만약 해당 디렉토리에 파일이 위치하지 않으면, 그 다음으로 확인하는 것은 k8s의 Configmap을 확인하고 관련 ConfigMapWatcher 컴포넌트를 생성하여 바인딩하는 역할을 수행합니다.
먼저 살펴볼 것은 ConfigMap Watcher 내부입니다. 내부에는 Controller가 존재하고 ConfigMap 이벤트를 전달받기 위해서 informer를 등록하고 변경 사항을 전달 받습니다. 이때 ConfigMap Watcher가 감시하는 대상은 istio-system 네임스페이스에 존재하는 istio-sidecar-injector입니다. 참고로 해당 ConfigMap은 istio 설치 시에 자동 등록되는 리소스로 내부에는 config와 values 두 Key에 해당하는 데이터가 존재합니다.
informer에 등록된 이후 해당 Configmap에 변경이 발생하면, 다음과 같은 과정을 거칩니다.
1. informer에서 queue에 이벤트 정보를 전달합니다. 2. queue에는 등록된 callback을 호출합니다. 3. Controller에 등록된 callback의 역할은 Configmap 정보를 읽고 거기에서 등록된 config 데이터와 values 두 개의 데이터를 읽어들이는 작업을 수행하고, ConfigMap Watcher에 등록된 handler에 전달하여 후속 작업을 위임합니다. 4. handler의 역할은 Webhook 구조체에 위치한 updateConfig 메소드를 호출하는 것입니다. 5. updateConfig 메소드의 역할은 ConfigMap을 읽어들여 Webhook 구조체 멤버에 저장합니다. 이때 config의 경우는 configMap의 config key의 값을 그대로 저장하지만, values의 경우는 해당 데이터를 map으로 변환 후 ValuesConfig 구조체로 감싸서 Webhook의 valuesconfig에 저장하는 차이점이 있습니다.
위 내용을 보면, istio-sidecar-injector ConfigMap의 values 데이터를 Map 형태로 Parsing하여 저장하는 것을 확인할 수 있습니다.
5번 단계까지 진행되면, pilot-discovery 내부에 사이드카 주입을 위한 템플릿 정보를 취득할 수 있으며, ConfigMap 변경에 따라서 이를 감지하고 반영할 수 있게 되었습니다. 이후 진행되는 6번 단계에서는 외부에서 /inject 혹은 /inject/URI를 통해서 접근할 경우 요청을 처리하기 위한 handler 메소드를 등록하는데, 이때 담당을 webhook에 위치한 serverInject 메소드가 담당합니다. 즉 사이드카 컨테이너 주입 역할은 serverInject 메소드에 있습니다. 지금부터 해당 메소드에서 수행되는 주요 과정에 대해서 살펴보겠습니다.
4-1 사이드카 주입 가능 여부 확인
serveInject 메소드내에서 가장 먼저 수행하는 작업은 위 그림과 같습니다. 먼저 Client의 요청 타입은 JSON으로 전달되기 때문에 해당 타입이 JSON인지 확인하고 Body에서 데이터를 추출합니다. 그 후 수행하는 작업은 해당 Pod의 사이드카 주입 요청이 적절한지를 확인하는 것입니다. 해당 과정은 위 그림과 같은 과정을 거칩니다.
1. Pod의 spec.hostNetwork 값이 true인가? 2. 사이드카 생성 요청 대상 Namespace가 Ignored Namespace에 해당하는가? 3. sidecar.istio.io/inject 어노테이션이 존재하거나 대상 Namespace에 자동 주입 활성화 Label이 존재하는가?
먼저 첫번째 조건에 대해서 살펴보면, hostNetwork 값이 true라면, 이는 Pod의 네트워크 설정이 해당 Pod가 동작하는 host 노드의 네트워크 설정을 따라가는 것을 의미합니다. istio에서는 Pod 컨테이너의 iptables를 조작하여 네트워크 트래픽을 변경하는데, 해당 설정이 true일 경우 노드 전체에 장애가 발생할 여지가 있습니다. 따라서 Pod의 hostNetwork가 true인 경우에는 사이드카 주입을 수행하지 않습니다.
두 번째 조건은 사이드카 요청 대상 Namespace가 시스템에서 내부적으로 사용하는 Namespace인 경우에는 요청을 거절합니다. 대상 Namespace는 위 그림과 같이 총 4개입니다.
세 번째 조건은 Pod 요청에 사이드카 주입 어노테이션이 존재하는지 확인하는 것입니다. 해당 어노테이션이 존재하면서 값 주입을 요청하거나 생성 대상 Namespace에 사이드카 자동 주입 Label이 활성화되어있는 경우에는 사이드카 주입을 허가하지만 그렇지 않을 경우에는 주입을 수행하지 않습니다.
4-2 Pod Annotation 검증
이번에는 Pod 내에 존재하는 Annotation 중 istio 동작에 관여하는 Annotation이 지정되어있을 경우 해당 Annotation이 istio에서 요구하는 형태로 작성되어있는지 검증하는 단계입니다. 이전 단계에서는 특정 조건에 부합되지 않은 경우에는 사이드카 주입은 수행하지 않더라도 Pod 생성은 진행되었지만, 해당 단계에서는 Validation을 통과하지 않은 경우에 Pod 생성이 되지 않는 것이 차이점입니다.
위와같이 pilot-discovery 내에는 사용자가 Pod에 입력한 Annotation의 이름이 위 Map에서 제공하는 Annotation과 일치할 경우 입력값이 올바른지 검증하는 함수가 매핑되어있는 것을 확인할 수 있습니다.
따라서 Pod 내부 Annotation을 순회하면서 위 조건에 부합하는 Key Annotation이 존재할 경우 해당 값에 매핑된 검증 함수를 수행하여 올바른 값이 입력되지 않았을 경우 Pod 생성 요청을 거절합니다.
4-3 Concurrency 계산
envoy 구조 포스팅에서 다루어봤듯이 envoy 내부에는 Master 쓰레드와 Worker 쓰레드가 분리되어 있고 해당 쓰레드 개수의 설정은 --concurency 인자에 의해 결정되는 것을 확인했습니다.
istio에서 사이드카를 주입할 때 내부적으로 envoy proxy를 기동시켜야되기 때문에 해당 과정에서는 몇 개의 Worker 쓰레드를 기동시키는 지 계산하여 사이드카 컨테이너 템플릿을 생성할 때 이 과정에서 계산된 값이 주입됩니다.
이때 Concurrency가 계산되는 과정을 살펴보면, 위 그림과 같이 Proxy Config 설정 여부에 따라 달라집니다. Proxy Config는 istio 전체에 전역적으로 설정하거나 특정 Workload에만 적용하거나 아니면 특정 Pod에 대해서 Annotation에서 설정 변경이 가능합니다. 이때 우선순위는 Pod > Workload > Global 순이기 때문에 우선순위에 따라 값이 Override 됩니다.
만약 Concurrency 값이 양수 값이 입력되어있다면, 계산 과정에서는 해당 값이 적용됩니다. 하지만 Concurrency 값이 0일 경우에는 계산 과정이 달라집니다.
가장 먼저 확인하는 것은 sidecar.istio.io/proxyCPULimit Annotation 값이 존재하는지 확인하는 것입니다. 해당 Annotation은 Envoy를 위해 사용되는 CPU의 Limit을 지정하는 값으로 해당 Annotation이 존재한다면, 그 값을 기준으로 Concurrency 값을 계산합니다.
만약 해당 Annotation이 존재하지 않는다면, sidecar.istio.io/proxyCPU 값이 존재하는지 확인합니다. 해당 Annotation은 Envoy를 위해 사용되는 CPU를 의미합니다.
만약 위 두 Annotation이 존재하지 않는다면, 해당 Pod에 지정되어있는 CPU Resource의 Request와 Limit을 확인합니다. 만약 지정된 값이 있으면, 그 값을 기준으로 Concurrency를 계산합니다.
마지막으로 어떠한 조건에도 부합하지 않는다면, 기본 값인 2를 지정합니다.
참고로 Concurrency 계산에 필요한 값이 존재할 경우에는 입력된 값을 1000으로 나눈 값을 올림하여 요구 Concurrency를 계산합니다. 가령 500m이 입력되었다면, CEIL(500/1000) = 1이므로 1개가 지정됩니다.
4-4 Template Yaml 생성
istio-system namespace에 존재하는 istio-sidecar-injector Configmap을 살펴보면, 사이드카 주입을 위한 Template이 Yaml 형식으로 정의된 것을 확인할 수 있습니다.
위 예시는 Template 내부에 있는 Yaml 중 init-container의 Argument를 설정하는 부분을 발췌했습니다. 자세히 살펴보면 Template에는 사용자가 지정한 값을 기반으로 사이드카 주입을 위한 Yaml의 최종 내용이 결정되도록 디자인 되었음을 확인할 수 있습니다.
해당 Template은 이미 서버 기동시점에 주입이 되었으므로 사용자 요청을 분석하여 Template을 만드는 작업을 수행합니다.
4-5 Template 적용
사이드카 컨테이너 주입을 위한 Template을 만들었으면, 이제 사용자가 요청한 Pod에 Template을 병합하는 과정을 수행합니다.
이를 위해서 첫번째로 하는 작업은 병합을 원활하게 수행하기 위해 Original Pod의 요청 Spec과 Template Yaml을 Map으로 변환합니다.
위 그림은 Pod의 요청을 Map으로 변환한 originalMap과 Template Yaml을 Map으로 변환한 patchMap의 결과입니다. 내용을 살펴보면, patchMap에는 init-container 한개와 Container한개가 존재하는 것을 확인할 수 있습니다. 위 두 컨테이너의 이미지는 pilot-agent이며, 사이드카 컨테이너 주입을 통해 기본적으로 1개의 init-container와 Container가 추가로 삽입되는 것을 확인할 수 있습니다.
개별 Container가 어떤 역할을 수행하는지는 다음 포스팅에서 살펴보도록하며, 지금은 위 그림을 통해 서로 다른 요청 Spec을 병합하기 위해 Map으로 만들었음을 이해하면 좋을 것 같습니다.
두 Map을 만들고나면, 그 다음은 두 Map의 Key, Value를 비교하면서 병합하는 작업을 거칩니다.
위와같이 병합을 완료하고나면 최종적으로 하나의 Map이 완성됩니다. 내용을 살펴보면 Template에 존재하던 init-container 추가와 Template Container 더해져서 컨테이너 수가 2가 되었음을 확인할 수 있습니다.
병합이 완료된 이후에는 부가적인 작업을 수행하기 위해 postProcess 과정을 거칩니다. 이때 만약 Prometheus 설정이 존재한다면, 병합된 결과에 추가적으로 Prometheus 통합을 위한 어노테이션 등이 추가되는 작업을 거칩니다.
그리고 k8s에 반영을 요청하기 위해 JSON으로 데이터를 다시 변환 후 AdmissionResponse를 통해 응답을 반환하여 사이드카가 주입된 Pod 생성을 요청하는 것으로 마무리됩니다.
5. 마치며
이번 포스팅까지 해서 pilot-discovery의 가장 핵심인 Service Discovery와 사이드카 컨테이너 주입에 대해서 살펴봤습니다. Pilot-discovery는 위 두가지 기능 외에도 Multi Cluster 관리, 인증서 관리 등 중요한 기능과 핵심 컴포넌트가 여럿 존재합니다.
하지만 istio-internals 시리즈의 목표는 pilot-discovery와 pilot-agent 그리고 envoy가 어떻게 상호 작용하는지를 살펴보는 것이기 때문에 흐름을 유지하기 위해 우선 pilot-discovery에 대한 탐구는 여기서 마치고 나머지는 추후 포스팅을 통해 다루어 보겠습니다.
다음 포스팅에서는 Sidecar Injector에 의해서 주입된 사이드카 컨테이너의 init-container에 대해서 살펴보겠습니다.
이전 포스팅을 통해 envoy를 관리하기 위해서 등장한 istio의 pilot 아키텍처와 Service Discovery 전파를 위한 informer 구조에 대해서 살펴봤습니다. 이번 포스팅에서는 pilot 아키텍처 중 pilot-discovery의 내부 컴포넌트 중 XDS Server에 대해서 살펴보면서 외부로 부터 전달된 Service 및 설정 변경이 어떻게 내부 과정을 거쳐 개별 pilot-agent의 envoy에 전파되는지 알아보겠습니다.
2. XDS Server 개요
이전 포스팅에서 pilot-discovery는 Informer로부터 Kubernetes의 Resource 정보를 전달받으면, Controller의 Handler를 통해서 관련 내용이 XDS Server에게 전달된다고 설명했습니다. 그리고 이를 전달받은 XDS Server의 역할은 개별 Service 들에게 xDS API 형태로 제공하여 Service 정보를 갱신하는 역할을 수행합니다. 이를 위해 XDS Server 내부에는 Service들의 Connection Pool을 관리하고 envoy가 이해할 수 있도록 xDS API로 변환하여 전달하는 컴포넌트 등이 존재합니다.
그렇다면 XDS Server는 어떻게 구성되어있을까요?
핵심 컴포넌트에 대해서만 살펴보자면, 위와 같은 속성을 지니고 있습니다. 그중 특히 informer와 연계되어 Controller의 역할을 처리하는 것은 Enviroment에 속한 ServiceDiscovery입니다. 해당 컴포넌트에 대해서 살펴보겠습니다.
3. ServiceDiscovery
ServiceDiscovery의 주요 역할은 istio의 주요 관심 대상 Resource에 대해서 변경이 감지되었을 때, 이를 수신 받아 XDS Server로 전달하는 Controller 역할을 수행합니다. 위 그림을 살펴보면 총 21개가 관심 대상이며, 파란색으로 표시한 Resource는 istio에서 제공하는 CRD가 아니라 Kubernetes Gateway API에 해당하는 항목입니다.
참고로 Kubernetes Gateway API는 Kubernetes의 Network 서비스에 대한 표준 스펙을 정의하기 위한 Spec으로 자세한 내용은 아래 공식 문서를 참고하시기 바랍니다.
istio에서는 Kubernetes Gateway API Resource가 생성/수정/삭제되면, 해당 내용을 분석하여 xDS API 변환하여 전달하도록 구현되어있습니다. 다만 Kubernetes Gateway API는 Kubernetes 설치 시 기본적으로 등록된 API가 아니기 때문에 별도로 설치해야합니다.
istio에서는 Kubernetes Gateway API 지원을 위해서 위 그림과 같이 환경 변수를 통해 pilot-discovery 프로그램을 수행합니다. 이때 PILOT_ENABLE_GATEWAY_API환경 변수가 true로 지정되어있는지 여부를 확인합니다. 그 결과 값이 true로 지정된 경우에는 위와 같이 istio에서 제공하는 CRD와 더불어 Kuberntes Gateway API의 Resource 정보를 관심 Collection에 추가합니다. 반대로 해당 값이 false인 경우에는 istio에서 제공하는 CRD만 관심 Collection에 추가하도록 구성되어있습니다.
위와 같은 과정을 거치게되면, istio에서 주요 관심 대상에 대한 Resource 추출은 끝나게됩니다. 그 다음 수행해야하는 일은 Controller를 구성하고 Informer에 등록하는 작업입니다.
우선 Controller를 구성했다고 가정하고, 이전 과정에서 추출한 관심 대상을 기반으로 Informer와 연결하는 부분부터 살펴보겠습니다. 이전에 추출한 관심 대상 Resource를 등록하는 작업을 수행합니다. 해당 과정은 총 4가지 절차를 통해 수행됩니다.
1. 먼저 kubernetes에 등록된 CRD 정보를 조회합니다.
2. 조회한 CRD 정보 중에서 istio의 관심대상 Resource에 부합되는 정보만 Filtering 합니다. 이때 Kubernetes Gateway API는 Kubernetes를 설치했다고 해서 기본적으로 등록되는 Resource 정보가 아닙니다. 따라서 별도로 Gateway API CRD 설치(https://istio.io/latest/docs/tasks/traffic-management/ingress/gateway-api/#setup)를 수행하지 않았을 경우에는 등록된 CRD 정보가 없기 때문에 Filtering 됩니다. 반면 해당 CRD가 설치되어있으면 Filtering 결과에 포함될 것입니다. 참고로 위 예시는 설치가 안되어있다는 가정하에 작성하였습니다.
3. 관심 대상 Resource 간에 서로 Group이 다르기 때문에 Group 별로 매칭되는 Informer를 찾습니다.
4. Resource Group에 해당하는 Informer에 Resource 생성/수정/삭제가 발생했을 경우 Callback을 위한 Handler를 등록합니다.
위 코드내용을 살펴보면, 개별 Callback 메소드에서 수행하는 작업은 Controller Queue에 Event 타입과 전달받은 Object 내용을 전달하도록 구성되어있음을 확인할 수 있습니다.
이번에는 ServiceDiscovery 즉 Controller를 구성하고 있는 주요 컴포넌트에 대한 소개와 더불어 해당 컴포넌트와 이전에 설명한 informer 연계 과정이 어떻게 연결되는지를 살펴보겠습니다.
ServiceDiscovery에는 3가지 주요 속성이 존재합니다. 하나씩 살펴보면 다음과 같습니다. 먼저 queue는 informer로부터 Event가 발생하였을 때 이를 전달하는 중간 버퍼의 역할을 수행합니다.
두번째로 살펴볼 것은 handlers입니다. istio의 관심 대상 Resource Group이 여러개이다 보니 GVK(Group Version Kind)별로 대상 informer가 다르고 이를 처리해야하는 Handler 로직 또한 달리 작성해야할 수 있습니다. 따라서 Service Discovery에서는 GVK 별로 Handler를 매핑하여 Map으로 관리하고 있습니다.
이때 GVK 별로 생성되는 Handler의 모습은 위와 같으며, 가장 중요한 부분은 informer로 부터 전달받은 설정을 기반으로 PushRequest 요청을 만들고 이를 XDSServer.ConfigUpdate 메소드를 호출함으로써, XDSServer와 연결 되어있는 모든 Client에게 설정 정보를 동기화하도록 요청하는 것입니다.
세번째로 살펴볼 것은 kinds로 GVK 별로 매핑되는 informer 정보가 다르기 때문에, 해당 정보를 관리하기 위한 용도로 사용됩니다. handlers와 마찬가지로 Map으로 구성되어있으며 GVK 별로 어떤 Resource가 있으며, 어떤 informer와 매칭되어있는지 정보를 구조화하여 담고 있습니다.
지금까지 설명한 내용을 바탕으로 Service Discovery와 istio Kube Client의 연계 과정을 살펴보겠습니다.
1. 관심 대상 Resource를 순회하면서, GVK 별로 대상 informer를 찾습니다. 2. informer를 찾았으면 event 발생할 경우 정보를 전달받기 위한 Handler를 등록합니다. 이때 해당 Handler는 GVK를 Key로 가지고 Callback Function을 Value로 가지는 handlers Map에서 GVK에 매칭되는 Function을 등록합니다. 이때 등록되는 Function에는 XDSServer로 Update하는 코드가 포함되어있습니다. 3. queue에서는 지속적으로 하나씩 processing 하면서 informer로 부터 전달받은 오브젝트 정보를 기반으로 매칭되는 Handler 실행을 통해 XDS Server로 Update를 수행합니다.
여기까지가 XDS Server 내에 위치한 ServiceDiscovery의 역할입니다.
4. Event 전파
이번에는 이전 내용에서 XDSServer.ConfigUpdate 메소드를 호출했을 때, XDS Server 내부에서 어떻게 처리하는지를 살펴보겠습니다.
ConfigUpdate 메소드가 호출되면, 가장 먼저 하는 일은 XDSServer에 있는 PushChannel에 요청을 전달합니다. 위 그림에서 확인할 수 있듯이 해당 속성은 PushRequest를 전달받아 다른 루틴으로 전달하는 Channel임을 알 수 있습니다.
이후 Channel로 전달된 데이터는 내부적으로는 debounce 메소드가 별도의 go routine으로 동작하여 해당 데이터를 처리합니다. 그렇다면 debounce 작업은 왜 하는 것일까요?
4-1 debounce
debounce는 이벤트가 연이어 발생했을 때, 이를 개별적으로 하나씩 처리하지 않고 그룹핑하여 전달하는 것에 목적이 있습니다. 이에 대한 이해를 돕기위해 최대 5명 운송 가능한 엘리베이터를 이용하는 상황을 가정해봅시다.
만약 엘리베이터에 대기하는 사람이 6명이 존재하는 상황에서 최대 5명까지 이용 가능하지만 이용객 한명씩 순차적으로 엘리베이터를 사용할 수 있다면 어떻게 될까요? 당연히 운송 시간은 증가할 것이며, 엘리베이터 가동 비용 측면에서 봤을 때에도 굉장히 비효율입니다. 이때 가장 효율적인 방법은 엘리베이터가 허용 가능한 인원만큼 탑승해서 한번에 운반하는 것입니다. 따라서 위 경우에서는 처음 5명을 태운 승강기를 한번 운행하고 그 다음 한번 더 운행 즉 총 2번의 운행하는 것이 가장 좋습니다.
이번에는 엘리베이터에 한명이 현재 탑승한 상황이라고 가정해봅시다. 이때 시설 관리자 입장에서는 5명까지 이용한 엘리베이터에서 한명만 운행하는 것은 금전적으로 비효율입니다. 따라서 시설 관리자 입장에서만 생각해본다면, 다섯명이 탑승할 때까지 엘리베이터를 기다리게 하는 것이 가장 좋은 선택입니다. 하지만 이 경우에는 다섯명이 탑승할 때까지 엘리베이터가 작동하지 않으므로 탑승객 입장에서는 엄청난 불편함을 초래할 것입니다.
따라서 두 이해 당사자간의 입장을 고려하여 설계된 일반적인 엘리베이터의 모습은 엘리베이터에 탑승객이 탑승하고나서 일정시간이 지날 때까지 탑승하는 인원이 없으면 문이 닫히고 동작하도록 되어있습니다. 또한 중간에 엘리베이터에 탑승하려는 승객이 추가로 발생할 경우에는 센서가 감지하여 엘리베이터 문이 닫히지 않도록 하며, 그때부터 다시 일정 시간 동안 대기하도록 되어있습니다.
위와 같은 엘리베이터는 탑승객 입장에서는 정원이 다 차지 않아도 일정 시간동안 승객의 유입이 없으면 동작하기 때문에 다소 불편함을 감소할 수 있습니다. 또한 시설 관리자 입장에서도 엘리베이터가 아직 닫히지 않는 이상 추가 이용 승객이 발생하면 탑승 가능하기 때문에 합리적인 방법입니다.
이번에는 100명이 이용할 수 있는 엘리베이터가 있고 최초 승객이 탑승 이후 약 10초 이후에 엘리베이터가 자동으로 닫힌다고 가정해봅시다. 이러한 상황에서 최초 승객이 탑승한 이후에 10초마다 새로운 탑승객이 발생한다면 어떻게 될까요? 처음 탑승한 승객은 이전과 같이 엘리베이터가 수용 가능한 모든 인원이 탑승할때까지 대기하게되는 문제가 발생합니다. 따라서 이러한 경우에 승객의 불편함을 최소화할 수 있는 방법은 처음 엘리베이터에 탑승객이 탑승한 이후로부터 최대 대기시간을 설정하여 최대 대기시간을 넘기게되면 추가 승객이 탑승하더라도 타이머가 더 이상 동작하지 않고 닫히도록 구현하는 것입니다. 이렇게 되면 처음 탑승한 승객 입장에서는 최대 대기시간 만큼만 대기하면 되므로 불편함은 최소화 될 수 있을겁니다.
지금까지 엘리베이터 시나리오를 설명했는데, istio에서는 위 시나리오 내용과 같은 이유로 동일하게 debounce를 적용했습니다. istio pilot-discovery에는 모든 pilot-agent와 Connection을 맺고 있는데, 이벤트 하나하나씩 발생할 때마다 broadcast하는 것은 굉장히 통신 비용을 증가하는 요인입니다.
따라서 이때 debounce를 지정하면, 메시지를 효과적으로 처리할 수 있습니다. 가령 최초 PushReqest 요청을 전달받았을 때로 부터 일정 시간 내에 메시지가 유입되는 경우 해당 메시지와 이전 메시지를 병합합니다. 참고로 이때 병합되는 메시지는 PushRequest내에 있는 Reason Slice에 기존 메시지가 추가됩니다.
이후 또 다시 일정 시간 이내에 메시지가 추가 유입되면 병합하고 만약 그 시간동안 메시지가 유입되지 않는다면 지금까지 병합한 메시지를 한번에 전송합니다. 다만 이러한 경우 메시지가 계속 추가로 유입되는 경우 한없이 기다릴 수 있기 때문에 최대 대기 시간을 설정하여 최대 대기 시간이 지나면 메시지 처리를 수행하도록 수행할 수 있습니다.
istio에서는 PILOT_ENABLE_EDS_DEBOUNCE환경 변수가 true로 되어있을 때 debounce가 작동하도록 설계되었습니다. 해당 값은 기본적으로 true이기 때문에 별도의 설정이 없다면 항상 debounce는 작동하고 있습니다.
그리고 PILOT_DEBOUNCE_AFTER와 PILOT_DEBOUNCE_MAX값은 이전에 설명한 일정 대기 시간과 최대 대기 시간을 의미합니다. 기본적으로 일정 대기 시간은 100ms 로써 100ms 동안 Event가 발생하면 해당 Event는 하나의 메시지로 병합됩니다. 또한 최대 대기 시간은 10초로 설정되어있으며, 메시지가 아무리 지속 유입이 된다고 할지라도 10초를 넘어가면 메시지 병합을 멈추고 메시지를 발행하도록 되어있습니다.
따라서 만약 istio를 운영하다가 너무 잦은 sync로 인한 overhead가 발생한다면 해당 두 값을 적절하게 튜닝하여 메시지 동기화 속도를 조절할 수 있습니다.
debouce 작업이 끝나면, 하나의 메시지로 통합되며, 이를 통해 하나의 요청만 전달할 수 있습니다. 그 다음 작업은 해당 메시지를 개별 client로 전달하는 것입니다. 이때 전체 Client의 정보는 AdsClients에 저장되어 있습니다. 따라서 Client 별로 해당 메시지를 매핑하여 Queue에 입력합니다. 여기까지가 debounce 작업과 후속 메시지 처리 작업입니다.
4-2 PushQueue
이전 내용에서 debounce 이후 PushQueue에 Client 정보와 요청 정보 메시지를 삽입하는 것을 확인했습니다. 이때 Enqueue 메소드 안에서는 여러가지 작업이 이루어지는데, 이와 관련하여 PushQueue의 구조와 수행 동작에 대해서 살펴보겠습니다.
Push Queue의 주요 속성은 위 3가지입니다. queue에는 Client의 연결을 slice 형태로 저장하고 있고 Client 별로 pending과 processing의 Map이 존재하는 것을 확인할 수 있습니다. 위 세가지 자료 구조는 Push Queue가 동작하는데 있어 주요하게 사용됩니다.
먼저 Queue에 삽입할 때 과정을 살펴보겠습니다.
debounce를 통해 입력되는 쓰레드와 Push Queue 처리 쓰레드는 별개의 go routine으로 동작합니다. 따라서 Push Queue에 빠르게 적재되었을 지라도 Push Queue의 처리 능력에 따라서 아직 처리되지 않고 Queue에 남아있을 수 있습니다.
따라서 가장 먼저 수행하는 일은 queue 속성에 Client를 저장하는 작업 이외에 Client를 Key, 요청 Requset를 Value로 사용하는 pending Map에 메시지를 집어 넣습니다. 또한 Client는 queue Slice에 추가합니다.
이때 모습은 위 그림과 같은 형태일 것입니다. 만약 이러한 상황에서 debounce 작업이 한차례 더 진행되고 이때 Service A에 대해서 새로운 PushRequest가 발생했는데, 아직 Push Queue에서 기존 데이터가 처리되지 않은 상황이라면 어떻게될까요?
아직 메시지가 전송된 상태가 아니기 때문에 debounce에서 메시지들을 Merge 했던것 처럼 해당 Client에 전송되는 Message에 대해서 Merge 작업을 수행하도록 진행합니다. 따라서 Pending의 목적은 처리되지 않은 메시지를 저장함과 동시에 추가로 발행되는 메시지 중 Connection 정보가 일치하는 요청에 대해서는 Merge 작업을 수행하여 메시지 처리량을 높이는 효과가 있습니다.
Push Queue에서 주기적으로 Queue에 데이터가 존재하는지 탐색하고 처리하는 작업은 doSendPushes 메소드에서 수행하는데, 해당 메소드는 별도의 go Routine으로 동작합니다. doSendPushes에서 처리하는 과정은 총 3단계로 이루어져있으며, 이는 다음과 같습니다.
1. Dequeue 명령을 통해 Push Queue에 존재하는 queue에서 Client 정보를 추출하고 pending Map에 존재하는 Client 정보를 삭제합니다. 추가로 작업 진행을 관리하기 위해 processing Map에 Client 정보를 추가합니다. 이때 중요한 점은 Map에 데이터를 넣을 때 Key 값 즉 Client 정보만 Map에 입력하고 Value는 nil로 입력한다는 것입니다. 그 이유에 대해서는 추후 살펴보겠습니다.
2. Client 구조체에 위치한 PushChannel에 데이터를 삽입합니다. 이후 처리 과정은 Client와 연계되어있는 ADS에서 처리합니다. 이에 대해서는 추후 살펴보겠습니다.
3. Client에 데이터 전송이 완료되면, 완료 응답을 수신받습니다. 응답 수신이 완료되면, processing Map에서 해당 정보를 삭제합니다.
위와 같은 세 가지 단계를 거치면 데이터가 정상적으로 처리되는 것을 확인할 수 있습니다.
그렇다면 현재 데이터를 Service A에 전송하고 있는 과정에서 동일한 Client에게 Push Request가 요청되면 어떻게 될까요? 이때는 processing Map을 통해 메시지를 처리합니다.
메시지를 Client에게 전송하면 processing Map에 Connection 정보를 입력하는 것을 이전 내용을 통해 확인했습니다. 따라서 debounce 이후 Queue에 입력하는 과정에서 해당 Map을 확인하면 현재 메시지 전송 상태를 확인할 수 있습니다.
이 경우에는 processing Map에 존재하는 Client 데이터에 Push Request를 추가합니다. 만약 그 이후에도 Client의 전송 완료 메시지를 받기 이전에 Push Request를 전달받으면 지속적으로 Push Request를 Merge 하여 하나의 메시지로 만듭니다.
이후 Client가 작업을 종료하고 전송 완료 응답을 전달하였을 때, processing Map을 살펴봅니다. 이때 Client Key에 해당하는 Value가 nil일 경우에는 메시지 전송 이후로 해당 Client에 요청되는 추가적인 메시지가 없으므로 작업을 종료합니다. 하지만 Value가 nil이 아닐 경우에는 중간에 입력된 메시지가 존재함을 의미합니다. 따라서 그 다음 doSendPushes 싸이클에서 메시지가 다시 처리될 수 있도록 pending 메시지에 해당 Client 정보와 Push Request를 다시 입력하고 queue에도 Client 정보를 삽입합니다.
지금까지 Push Queue에 대해 살펴봤습니다. 이를 통해 메시지를 전달하는 과정에서도 Network overhead를 줄이기 위해 여러가지 최적화 장치가 마련된 모습을 볼 수 있습니다.
5. Connection 관리
지금까지 Event가 발생했을 때, debounce 과정과 이후 Push Queue에서 데이터를 Client에게 전달하고 후속 작업을 처리하는 과정에 대해서 살펴봤습니다. 이번에는 XDS Server Client Connection 관리 주체와 어떻게 통신을 수행하는지 살펴보겠습니다.
envoy에서는 위와 같이 ADS Server에 대한 gRPC interface를 제공합니다. 따라서 istio에서는 해당 ADS Spec에 부합하도록 gRPC 코드가 구현되어있습니다.
istio에서는 사용자 요청을 처리하기 위해서 내부적으로 gRPC Server가 있습니다. pilot-agent는 gRPC에 정의된 interface 호출을 통해 pilot-discovery에 접속을 접속과 더불어 필요한 Resource 정보를 요청합니다.
사용자의 요청을 전달받으면 해당 Server는 별개의 go routine을 통해 receive와 processRequest 두 개의 작업을 수행합니다. 여기서 주목할 점은 receive와 processRequest는 별개로 동작하지만 두 routine간의 Request Channel로 연결되어있다는 점입니다. 지금부터는 두 과정에 대해서 살펴보면서 동작 원리를 관찰해보겠습니다.
먼저 살펴볼 것은 receive입니다.
receive의 역할은 연결된 Connection에서 데이터 처리가 필요할 때 해당 Request를 Requet Channel로 연결하기 위함입니다. 위 코드 내용을 통해 이를 살펴보겠습니다.
최초에 Client가 gRPC 서버에 접속을 요청하면, firstRequest는 true일 것입니다. 이후 for-loop 구문을 수행하는데, 위 코드를 살펴보면 for-loop 자체에는 별도의 조건이 없기 때문에 계속 반복 수행되는 것을 확인할 수 있습니다.
이후 for-loop 구문으로 진입했을 때 눈여겨볼 점은 최초 접속시 initConnection 메소드를 호출한다는 점입니다. 해당 메소드가 수행하는 기능은 사용자의 요청이 적합한지 권한 검사 이후에 XDS Server에 위치하는 AdsClients Slice에 해당 Connection 정보를 추가하는 작업을 진행합니다. 마찬가지로 defer 메소드로 Client가 접속을 종료할 때는 closeConnection 메소드를 호출하여 AdsClients로부터 해당 정보를 삭제하는 과정 또한 진행합니다.
따라서 이전에 Service Discovery 로직에서 Event 정보를 모든 Clients에게 전파할 때 해당 자료 구조를 참조했음을 확인할 수 있는데 receive 로직에서 AdsClients를 관리함을 알 수 있습니다.
사용자 최초 접속 하여 Connection 할당 정보가 완료된 이후, 수행하는 일은 사용자로부터 전달 받은 요청이 있으면 이를 Request Channel로 전달하는 중간 매개체 역할을 수행합니다.
정리하자면 별도의 go routine으로 동작하는 receive 메소드의 역할은 Connection 관리와 더불어 Connection에서 발생한 Request 정보를 Request Channel에 전파하는 것입니다.
이번에는 사용자의 요청을 처리하거나 Server의 전달 요청을 처리하는 로직 부분을 살펴보겠습니다.
해당 로직 또한 마찬가지로 for-loop을 지속적으로 수행하면서 요청 구분에 따라 처리 방법을 달리 수행합니다. 그 중 위 코드가 핵심 로직을 설명합니다.
먼저 살펴볼 것은 사용자의 요청에 대한 처리입니다. Receive go routine에서 사용자 요청을 최초 접수받고 이를 Request Channel에 전파한다고 이전에 설명했습니다. 이때 Channel로 전파된 데이터를 수신받고 processRequset 메소드에 전달하는 역할을 수행합니다.
이때 processRequest 로직에서는 내부적으로 위와 같은 작업이 수행됩니다.
1. Client가 요청한 DiscoveryRequest는 ADS interface에 의해 정의된 Envoy 구조체로써 Discovery를 희망하는 요청 타입(ex Cluster)을 알 수 있습니다. 따라서 먼저 해당 정보를 파싱합니다. 그리고 요청하는 Resource를 지속적으로 추적하기 위해서 Connection 내부에 위치한 WatchedResource map에 해당 Type을 저장합니다.
2. istio 내부에서는 Envoy Spec과 달리 Abstract한 Model 구조를 사용합니다. 따라서 istio에서 통용되는 데이터 구조인 PushRequest로변환합니다.
3. istio는 내부에서 관리하는 데이터 타입 구조를 envoy가 이해할 수 있는 xDS로 변환하기 위해 내부적으로 Generator를 내장하고 있습니다. 위 그림과 같이 Generator는 Map 형태로 되어있으며, 각기 다른 Generator 중 Watched Resource Slice에 저장된 사용자가 원하는 타입에 부합하는 Generator를 찾아 xDS API 스펙에 맞게 변환하는 작업을 수행합니다.
4. 변환 작업이 완료되면, 다시 ADS interface에 의해서 정의된 DiscoveryResponse로 다시 변환합니다. 그리고 해당 정보를 Client에게 반환합니다.
정리하자면, 사용자가 원하는 요청 정보를 분석하여 istio가 관리하고 있는 서비스 정보를 Generator를 통해 envoy가 이해할 수 있는 형태로 변환한 다음 ADS interface가 요구하는 형태로 Wrapping 하여 전달하는 것이 사용자 요청 처리의 핵심 흐름이라고 볼 수 있습니다.
이번에는 Service 생성/수정/삭제로 인해 Server에서 Client에게 데이터를 전달해야되는 상황에 대해서 살펴보겠습니다.
이전 Push Queue 설명을 통해서 최종 통보는 Client Connection에 위치한 Push Channel로 데이터가 전달된다고 설명했습니다. 따라서 위 코드를 살펴보면 PushChannel에 데이터가 들어왔을 때는 빨간색 음영 부분이 감지가 될 것이며, 이를 토대로 pushConnection 메소드를 호출하여 후속 작업을 처리합니다.
이때 pushConnection 내부 처리 과정은 다음과 같습니다.
PushRequest는 이미 PushChannel로 부터 받았으므로 envoy의 요청과는 다르게 메시지를 변환할 필요가 없습니다. 이후 Connection에 등록된 Watched Resource 정보를 모두 가져와서 Generator에 매칭되는 정보로 변환하고 ADS interface에 의해서 정의된 DiscoveryResponse로 다시 변환합니다. 그리고 해당 정보를 Client에게 반환합니다.
해당 과정을 통해 Server의 변경 내용을 Client에게 전파할 수 있습니다.
5. 정리
지금까지 XDS Server에 존재하는 주요 컴포넌트에 대해서 살펴봤습니다. 이번에는 지금까지 배운 내용을 요약해서 전체적인 관점에서 흐름을 간략하게 살펴보겠습니다.
1. 사용자로부터 접속 요청을 전달받으면 Connection을 생성하고 XDS Server 내에 위치한 AdsClients에 저장하여 사용자를 관리합니다.
2. informer에 등록한 Handler를 통해서 관심 대상 Resource를 등록하며, Callback으로 ServiceDiscovery내에 있는 queue로 전달됩니다. 해당 queue에서는 kinds에 등록된 GVK(Group Version Kind)에 매칭된 handler를 실행합니다. 대부분의 handler는 XDSServer.ConfigUpdate 메소드를 수행하고 해당 정보는 PushChannel로 전달됩니다.
3. 내부에는 독립적으로 수행하는 debounce go routine이 존재하여 일정 시간동안 유입되는 메시지를 병합하는 작업을 수행하고 이후 메시지 전달을 위해 Push Queue에 데이터를 저장합니다.
4. Push Queue를 처리하는 로직은 또 다른 별도 go routine을 동작하는 doSubPushes에서 수행되며, 여기서는 Client Connection 정보를 참조하여 해당 Connection의 Push Channel로 데이터를 다시 전달합니다.
5. Push Channel로 전달된 데이터는 내부 Generator를 통해 envoy가 이해할 수 있는 형태로 데이터를 가공한다음 Client로 전달합니다.
6. 마치며
이번 포스팅에서는 istio pilot-discovery에서 가장 중요한 XDS Server에 대해서 살펴봤습니다. 다음 포스팅에서는 인증서 관리와 SDS Server 구조에 대해서 살펴보겠습니다.
envoy-internals 시리즈를 통해 envoy 아키텍처에 대해서 자세하게 살펴봤습니다. 이번부터 진행되는 시리즈는 envoy 위에서 동작하는 istio가 어떻게 상호작용하는지 그리고 이를 위해 어떤 내부 구조를 지니고 동작 원리가 어떻게 되는지에 대해서 자세하게 살펴보고자 합니다.
이번 포스팅에서는 envoy proxy를 활용하기 위한 istio의 역할과 아키텍처에 구조에 대해서 일부 다루어보겠습니다.
2. envoy proxy와 istio
이전 envoy 시리즈를 통해 envoy proxy는 service 앞단에 위치하고 요청에 대한 라우팅 정책, Circuit Breaker, mTLS 통신과 같은 인프라단에서 제공해주는 네트워크 기능을 대신 처리함을 확인했습니다. 그에 따라 Client가 Service를 호출할 때 다이렉트로 Service를 호출하는 것이 아니라 envoy proxy를 경유해서 upstream으로 트래픽이 전달됩니다.
위와 같은 구조를 가졌을 때, 애플리케이션 배포할 때는 대개 Service만 배포하지 않고 envoy proxy와 Service간 연결 설정 정보가 포함된 하나의 형태로 패키징하여 배포합니다. 이러한 배포 형태를 사이드카 패턴이라고 부르며, Kubernetes를 통해 배포할 경우 1개의 Pod내에 Service와 Proxy Container를 엮어서 같이 배포하는 것이 일반적입니다.
이때 Service가 단일로 구성되어있다면, envoy 설정 정보가 포함된 1개의 Pod를 잘 만들어서 배포할 경우 서비스 제공에 문제가 없을 것입니다.
그렇다면 MSA 환경에서는 어떨까요?
MSA 환경에서는 위와 같이 시스템을 구축하기 위해 협력해야하는 Service가 다수가 될 것입니다. 그리고 이때 문제가 되는 것은 협력 해야할 Service가 지속적으로 추가되거나 변경되는 일이 자주 이루어진다는 것입니다. 즉 envoy proxy간 통신을 위해서는 서로간의 정보를 추적하고 이를 반영해야 서비스를 안전하게 제공할 수 있음을 의미합니다.
envoy 구조 포스팅에서 살펴봤듯이 envoy 내부 컴포넌트 변경을 위해서는 static과 dynamic 방법이 있음을 설명했습니다. 만약 Service가 지속적으로 추가되거나 변경될 때 static 방식으로 처리해야한다면, 위 예시와 같이 Service A 입장에서 Service B, Service C가 추가될 때마다 static resources 파일을 변경해서 재기동을 수행해야하는 문제점을 야기합니다. 따라서 이 경우에는 dynamic 방식을 활용하여 xDS를 통해 config 파일 변경 없이 기동 시점에 동적으로 Cluster 정보, Endpoint 정보 등을 변경하는 것이 바람직합니다.
그렇다면 envoy proxy에 xDS를 통해 올바르게 정보를 제공하기 위해서는 누군가는 Service의 등록/수정/삭제 등을 지속적으로 추적하고 관리하는 Control plane이 필요할 것입니다. 아마 눈치채셨겠지만, istio가 바로 해당 Control Plane의 역할을 수행합니다.
그렇다면 istio는 어떠한 원리로 Service의 등록/수정/삭제를 추적하고 관리하고, envoy proxy들과 Connection을 맺고 있으며 어떻게 데이터를 전달할까요?
이러한 목표를 달성하기 위해서 istio에서는 pilot 아키텍처를 제공하였습니다.
Pilot은 말 그대로 조종사를 의미합니다. 마치 istio를 통해서 서비스와 endpoint 정보를 제공하여 올바르게 애플리케이션이 동작할 수 있도록 조종하는 역할을 수행합니다. pilot 아키텍처에는 이를 구현하기 위해 두 개의 컨테이너 이미지가 제공됩니다. 하나는 pilot-discovery 나머지는 pilot-agent입니다. 각각 이미지가 무슨 역할을 수행하는지 알아보겠습니다.
pilot-agent는 envoy proxy를 wrapping하고 있는 프로그램으로써 pilot-discovery로부터 Service 변경 사항을 통지받으면, 내부의 envoy proxy에게 이를 전파하여 envoy proxy 내부 설정을 변경하는 중계자 역할을 수행합니다.
반면 pilot-discovery는 Control Plane의 역할인 등록/수정/삭제되는 Service를 감지하고 이를 연결된 모든 pilot-agent에게 전달하는 역할을 수행합니다. 다만 여기서 알아야 할 중요 포인트는 pilot-discovery 자체가 Service 라이프 사이클을 직접 추적하지 않는다는 점입니다. 즉 외부에 존재하는 Service Registry로부터 관련 이벤트를 전달받아 이를 전파하는 중계의 역할만을 담당합니다. 따라서 istio에서는 3가지 방법을 통해 이벤트를 전달받을 수 있는 창구를 제공합니다.
2-1 MCP
첫번째 방식은 MCP(Mesh Configuration Protocol)를 활용하는 방법입니다. Service Discovery 기능을 제공하는 Platform은 많습니다. 가령 위와같이 Consul, Eureka 등이 예입니다. 이때 개별 Platform 마다 Service Discovery 제공 format은 상이할 것이므로 만약 istio에서 각각의 Platform format을 지원해야한다면, 시스템 의존성이 강해질 것입니다. 이는 유지보수 측면에서 굉장히 어려움을 겪을 수 있음을 의미합니다.
따라서 외부 의존성을 줄이기 위해서 istio 내부에서 사용할 수 있는 MCP를 만들고 Service Discovery에서는 MCP 포맷으로 데이터를 전달받아 처리하도록 설계되었습니다. 또한 개별 Service Registry에서 바로 MCP 포맷으로 보내줄 수 없는 경우에는 위와 같이 중간에 Adapter 역할을 수행하는 MCP Server를 통해 pilot-discovery에 전달하도록 구성할 수 있습니다.
2-2 File
두번째 방식은 File 방식입니다. 이는 local에 저장된 Configuration file을 주기적으로 읽어들여서 메모리에 캐싱하고 해당 정보를 토대로 Configuration을 수행하는 방식입니다. 해당 방식은 주로 istio를 테스트할 때 사용합니다.
2-3 Kubernetes
세번째 방식은 Kubernetes 기반의 Service Discovery 방식입니다. Kubernetes 플랫폼을 사용한다면, 자체적으로 Service의 등록/삭제/수정과 관련된 이벤트를 제공받을 수 있습니다. 저를 포함하여 istio를 사용하는 대부분의 사용자는 kubernetes위에서 istio를 사용하는 경우가 많을 것이기 때문에 대개 Service Discovery Provider로써 Kubernetes를 사용할 것입니다.
본 포스팅 또한 위 세가지 방법 중 Kubernetes를 통한 Service 전달 방식에 대해서 자세하게 알아보겠습니다.
3. Informer
이전 내용에서 확인했듯이 pilot-discovery는 외부에 존재하는 Service Registry로부터 Service의 변경 사항을 전달받아 이를 pilot-agent에게 전달하는 역할을 수행합니다. 그리고 이를 위한 인터페이스 방법으로 3가지가 있음을 확인했습니다. 그 중 kubernetes가 가장 보편적으로 사용되는데, istio는 어떻게 kubernetes로부터 이벤트를 전달받을 수 있을까요?
kuberetes 내부에는 여러가지 Controller 즉 Scheduler, Service controller 등이 있습니다. 그리고 해당 컴포넌트의 역할은 kube-apiserver로부터 관심 대상 Resource에 대해서 변경사항이 발생했을 때, Watch 메커니즘을 통해 Resource를 전달받고 후속 작업을 처리합니다. istio 또한 kubernetes로부터 Resource 변경에 대해 통지를 받아 후속처리하는 역할로써 kubernetes의 Custom Controller라고 볼 수 있습니다.
kubernetes의 Controller는 내부적으로 client-go sdk를 통해서 Kube API Server와 통신하고 Watch 메커니즘을 제공합니다. 그리고 이를 위해서 내부에는 informer구조를 사용하고 있습니다.
그렇다면 informer는 왜 사용할까요? 만약 Controller 내부에 있는 컴포넌트들이 Resource 정보를 얻기 위해서 개별적으로 kube-apiserver와 통신을 수행한다면, kube-apiserver 입장에서는 부하가 걸릴 수 있습니다.
따라서 kubernetes 개발자들은 이러한 kube-apiserver의 API 호출 부담을 줄여주고자 Controller 내부에 캐싱 기능과 kube-apiserver와의 효율적인 통신을 위한 informer 메커니즘을 설계하였습니다.
해당 구조를 개략적으로 살펴보면 위와 같습니다. 가령 하나의 Program안에 여러개의 Controller가 존재하고 그 안에서 Resource 변화를 감지를 통지받아야한다면, informer를 통해 통지를 전달받고 싶은 대상을 등록합니다. 그리고 informer가 kube-apiserver에 요청을 전달하여 list 목록을 얻어와서 자신의 Local Thread Safe Cache에 저장합니다. (※ 참고로 위 그림의 informer는 SharedInformer 구조를 의미하며, istio에서는 이를 위해 SharedInformerFactory를 사용합니다. 자세한 내용은 https://rudecamel.tistory.com/35 내용을 참고하시기 바랍니다.)
이후 Watch 메커니즘을 통해서 kube-apiserver로 부터 이벤트를 전달받으면, 해당 이벤트 내역을 개별 Controller들에게 Callback 하는 방식으로 이벤트를 처리합니다.
조금 더 자세히 살펴보면, Controller Application을 살펴보면 위와 같이 Client-go에서 제공하는 informer 영역과 Controller의 비즈니스 로직 두 부분으로 나눌 수 있습니다.
이때 Controller가 변경 통지를 희망하는 GroupVersionResource를 informer에게 등록하고 향후 해당 이벤트가 전달되었을 때 처리를 희망하는 내부의 여러 Process들이 Controller에게 Add/Update/Delete Callback Function을 등록합니다.
이후 informer가 기동되면, 다음과 같은 과정을 거칩니다.
1. Reflector가 kube-apiserver에게 llist API를 통해서 가장 최신의 resourceVersion이 포함된 Resource 정보를 가져옵니다.
2. 가져온 Resource 정보는 DeltaFIFO에 적재합니다. DeltaFIFO는 Queue 형태로 구성되어있으며, add, update, delete, list, pop 과 같은 연산을 제공합니다.
3. Informer는 DeltaFIFO에 적재된 데이터를 가져옵니다.
4. Informer는 Indexer에게 전달된 데이터 내용을 저장하도록 명령합니다.
5. Informer에 등록된 Controller의 ResourceEventHandler Callback function을 호출합니다.
6. ResourceEventHandler Callback function에서 filtering 룰을 등록한게 있으면, 해당 이벤트는 제외하고 난 다음 관심 대상의 Resource 정보를 WorkQueue에 삽입합니다.
7. Controller는 WorkQueue로 부터 Resource 정보를 획득하여 자신이 보유한 Processor들에게 전파합니다.
8. Processor들은 전달받은 정보를 토대로 비즈니스 로직을 수행합니다. 만약 이 과정에서 Resource에 대한 정보가 필요하다면, Lister interface를 통해서 Indexer에게 Resource 정보를 요청할 수 있습니다. 요청받은 Indexer는 Local Cache로 부터 정보를 제공하여 Processor에서 불필요하게 kube-apiserver에 질의하는 것을 차단할 수 있습니다.
9. 초기화 과정이 끝나면 Reflector는 Watch API를 요청하여 kube-apiserver로부터 Resource 정보의 변경이 있을 때마다 내용을 전달받습니다. 그리고 이전의 흐름 그대로 호출이 이어지면서, 후속 작업을 수행합니다. 이때 Watch API는 HTTP의 Chunked Transfer-Encoding 기법을 사용하여 Connection을 유지하도록 합니다. 이에 대한 설명은 아래 블로그를 참고하시기 바랍니다.
지금까지 client-go에서 제공하는 informer 로직에 대해서 살펴봤습니다. istio를 학습하는데 있어 굳이 kubernetes의 watch 메커니즘을 알아야하나 싶을 수도 있습니다. 하지만 istio의 Service Discovery 동작에 있어서 client-go에서 제공해주는 컴포넌트와 강하게 결합되어 있습니다. 따라서 해당 구조를 이해하면 istio 컴포넌트의 동작 흐름을 쉽게 이해할 수 있습니다.
4. Istio Service Discovery 과정
지금까지 kubernetes가 제공하는 client-go의 informer 구조에 대해서 살펴봤습니다. 지금부터는 istio가 어떻게 informer와 연결되어 Service Discovery를 수행하는지에 대해서 살펴보겠습니다.
informer 구조에서 살펴본 것과 같이 informer에는 여러개의 Controller가 존재하고 해당 Controller가 요구하는 Resource 정보를 가지고 있다가 변화가 발생하면, Controller에게 전달함을 확인했습니다. 그에따라 istio에서는 위와 같이 여러개의 Controller를 정의하고 해당 event 정보를 수신하여 비즈니스 로직을 처리하도록 구성되어있습니다.
Control Plane에서는 pilot-agent와 통신을 위한 gRPC Server가 존재합니다. 해당 Server의 역할은 pilot-agent의 Connection을 저장하고 데이터 통신 시 해당 Server를 통해 pilot-agent에게 정보를 전달하는 역할을 수행합니다. 그리고 해당 gRPC Server를 담당하는 것은 pilot-discovery에 속한 XDS Server입니다.
따라서 Controller의 Event가 감지하면 1차적으로 XDS Server에게 해당 내용이 전달되고 XDS Server에서는 전달받은 Resource를 envoy가 이해할 수 있는 XDS 형태로 변환하여 이를 ADS에 전달하여 연결된 모든 pilot-agent에게 정보를 전달하는 역할을 수행합니다.
5. 마치며
이번 포스팅에서는 envoy proxy에게 xDS 정보 제공을 위한 istio control-plane의 기능과 Service Discovery 과정에 대해서 개략적으로 살펴봤습니다. pilot-discovery의 주요 컴포넌트에 대해서는 추후 다루어보도록 하고 이번 포스팅에서는 kubernetes의 informer 구조에 대한 이해와 XDS Server를 통해 envoy proxy들에게 Service Discovery 기능을 제공한다는 점을 기억하면 좋을 것 같습니다.
이번 포스팅은 지난 포스팅에 이어서 사용자가 Connection 연결을 등록한 이후에 실제 HTTP 요청을 전달했을 때 Upstream까지 어떻게 트래픽이 전달되는지에 대해서 살펴보고자 합니다.
그 과정에서 너무 지엽적인 부분은 다이어그램으로 표현하고 필요한 부분은 envoy 코드를 같이 보면서, 내부 동작 과정에 대한 이해를 집중적으로 해보겠습니다.
이번 포스팅은 특히 HttpConnectionManager와 깊은 연관이 있으므로 먼저 해당 내용을 소개한 이전 블로그 내용을 학습 후 보시는 것을 추천드립니다.
(아직 포스팅 미완성상태인데 공개합니다ㅜ 하단에 코드만 첨부된 내용은 추후 보강하겠습니다)
2. Client HTTP 요청 과정
Client Connection 연결은 설정되었기 때문에 libevent로 Client가 HTTP 접속을 요청하면, 내부적으로 어디로 이벤트를 전달해야하는지 이미 알 수 있습니다. 이전 포스팅 내용에서 ServerConnectionImpl이 생성되면 libevent에 onFileEvent를 Callback 메소드로 등록한다고 설명했습니다. 따라서 이번 포스팅에서는 해당 지점부터 살펴보겠습니다.
connection_impl.cc
void ConnectionImpl::onFileEvent(uint32_t events) {
ScopeTrackerScopeState scope(this, this->dispatcher_);
ENVOY_CONN_LOG(trace, "socket event: {}", *this, events);
if (immediate_error_event_ == ConnectionEvent::LocalClose ||
immediate_error_event_ == ConnectionEvent::RemoteClose) {
if (bind_error_) {
ENVOY_CONN_LOG(debug, "raising bind error", *this);
// Update stats here, rather than on bind failure, to give the caller a chance to
// setConnectionStats.
if (connection_stats_ && connection_stats_->bind_errors_) {
connection_stats_->bind_errors_->inc();
}
} else {
ENVOY_CONN_LOG(debug, "raising immediate error", *this);
}
closeSocket(immediate_error_event_);
return;
}
if (events & Event::FileReadyType::Closed) {
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
ASSERT(!(events & Event::FileReadyType::Read));
ENVOY_CONN_LOG(debug, "remote early close", *this);
closeSocket(ConnectionEvent::RemoteClose);
return;
}
if (events & Event::FileReadyType::Write) {
onWriteReady();
}
// It's possible for a write event callback to close the socket (which will cause fd_ to be -1).
// In this case ignore read event processing.
if (ioHandle().isOpen() && (events & Event::FileReadyType::Read)) {
onReadReady();
}
}
소켓에 Write/Read 이벤트가 발생하면, libevent에 의해서 onFileEvent를 호출됩니다. 이때 코드 내용을 살펴보면, Socket이 종료되었을 경우에는 연결을 해제하도록 작업을 수행하며, 그렇지 않은 경우에는 Socket 이벤트 타입이 Write인지 혹은 Read인지에 따라서 각기 다른 메소드를 호출하는 것을 확인할 수 있습니다.
위 경우에는 HTTP 요청으로 인해 Socket에 데이터가 작성된 것이기 때문에 데이터를 읽기 위해서 onReadReady() 메소드가 호출됨을 가정하여 진행하겠습니다.
connection_impl.cc
void ConnectionImpl::onReadReady() {
ENVOY_CONN_LOG(trace, "read ready. dispatch_buffered_data={}", *this,
static_cast<int>(dispatch_buffered_data_));
const bool latched_dispatch_buffered_data = dispatch_buffered_data_;
dispatch_buffered_data_ = false;
ASSERT(!connecting_);
// We get here while read disabled in two ways.
// 1) There was a call to setTransportSocketIsReadable(), for example if a raw buffer socket ceded
// due to shouldDrainReadBuffer(). In this case we defer the event until the socket is read
// enabled.
// 2) The consumer of connection data called readDisable(true), and instead of reading from the
// socket we simply need to dispatch already read data.
if (read_disable_count_ != 0) {
// Do not clear transport_wants_read_ when returning early; the early return skips the transport
// socket doRead call.
if (latched_dispatch_buffered_data && filterChainWantsData()) {
onRead(read_buffer_->length());
}
return;
}
// Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
// the transport socket read resumption happens as requested; onReadReady() returns early without
// reading from the transport if the read buffer is above high watermark at the start of the
// method.
transport_wants_read_ = false;
IoResult result = transport_socket_->doRead(*read_buffer_);
uint64_t new_buffer_size = read_buffer_->length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
// If this connection doesn't have half-close semantics, translate end_stream into
// a connection close.
if ((!enable_half_close_ && result.end_stream_read_)) {
result.end_stream_read_ = false;
result.action_ = PostIoAction::Close;
}
read_end_stream_ |= result.end_stream_read_;
if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
(latched_dispatch_buffered_data && read_buffer_->length() > 0)) {
// Skip onRead if no bytes were processed unless we explicitly want to force onRead for
// buffered data. For instance, skip onRead if the connection was closed without producing
// more data.
onRead(new_buffer_size);
}
// The read callback may have already closed the connection.
if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
ENVOY_CONN_LOG(debug, "remote close", *this);
closeSocket(ConnectionEvent::RemoteClose);
}
}
onReadReady() 메소드 내용은 위와 같습니다. 내용을 살펴보면, buffer에 읽을 데이터가 존재하면 해당 길이만큼 read_buffer에서 읽도록 onRead 메소드를 호출하는 것을 볼 수 있습니다.
connection_impl.cc
void ConnectionImpl::onRead(uint64_t read_buffer_size) {
ASSERT(dispatcher_.isThreadSafe());
if (inDelayedClose() || !filterChainWantsData()) {
return;
}
ASSERT(ioHandle().isOpen());
if (read_buffer_size == 0 && !read_end_stream_) {
return;
}
if (read_end_stream_) {
// read() on a raw socket will repeatedly return 0 (EOF) once EOF has
// occurred, so filter out the repeats so that filters don't have
// to handle repeats.
//
// I don't know of any cases where this actually happens (we should stop
// reading the socket after EOF), but this check guards against any bugs
// in ConnectionImpl or strangeness in the OS events (epoll, kqueue, etc)
// and maintains the guarantee for filters.
if (read_end_stream_raised_) {
// No further data can be delivered after end_stream
ASSERT(read_buffer_size == 0);
return;
}
read_end_stream_raised_ = true;
}
filter_manager_.onRead();
}
그리고 onRead에서는 filter_manager에 등록된 filterChains에서 데이터를 읽도록 위임합니다.
filter_manager에서는 내부에 존재하는 upstream_filters_를 순차적으로 실행 시켜서 Filter Chains를 수행시킵니다. 이를 위해 onContinueReading 메소드를 호출합니다.
filter_manager_impl.cc
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter,
ReadBufferSource& buffer_source) {
// Filter could return status == FilterStatus::StopIteration immediately, close the connection and
// use callback to call this function.
if (connection_.state() != Connection::State::Open) {
return;
}
std::list<ActiveReadFilterPtr>::iterator entry;
if (!filter) {
connection_.streamInfo().addBytesReceived(buffer_source.getReadBuffer().buffer.length());
entry = upstream_filters_.begin();
} else {
entry = std::next(filter->entry());
}
for (; entry != upstream_filters_.end(); entry++) {
if (!(*entry)->filter_) {
continue;
}
if (!(*entry)->initialized_) {
(*entry)->initialized_ = true;
FilterStatus status = (*entry)->filter_->onNewConnection();
if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
return;
}
}
StreamBuffer read_buffer = buffer_source.getReadBuffer();
if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
return;
}
}
}
}
onContinueReading 메소드는 먼저 read buffer에 입력된 값 만큼을 fetch하는 과정을 수행합니다. 이후 자신이 보유하고 있는 upstream_filters를 순회하면서 사용자 요청 처리를 각각의 필터에 위임합니다.
참고로 해당 메소드는 이전 포스팅에서 Read Filter 등록이 모두 완료된 이후 initializeReadFilters()실행 과정에서도 호출된 적이 있습니다. 그 당시에는 초기화를 목적으로 onNewConnection()이 수행되었는데, 지금은 초기화가 모두완료된 이후이기 때문에 Filter를 순회하면서 onData를 호출하는 것이 이전과는 다른 차이점입니다.
본 포스팅에서는 HTTP 요청과 관련하여 처리됨을 가정하였으므로, filterChains에 등록된 filter 중HttpConnectionManager 필터의 onData가 호출될 것입니다.
3. HttpConnectionManager
HttpConnectionManager의 onData 메소드가 호출되면, 가장 먼저 수행하는 일은 codec을 생성하는 작업입니다. 해당 내용은 HttpConnectionManager 관련 포스팅에서도 다루었던 내용이며, 본 포스팅에서는 Http 1.1 요청임을 가정하므로 Http1::ServerConnectionImpl 인스턴스가 반환될 것입니다.
해당 과정을 코드와 함께 살펴보면 다음과 같습니다.
1. ServerConnectionImpl에서 filter_manager에게 upstream_filters를 호출하라고 지정하면, 내부에 등록된 HttpConnectionManager의 onData가 호출됩니다.
connection_manager_impl.cc
Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
if (!codec_) {
// Http3 codec should have been instantiated by now.
createCodec(data);
}
...(후략)...
}
2. HttpConnectionManager에서는 먼저 codec_ 생성여부를 판별한 다음 codec_이 만들어진 적이 없으면 이를 생성합니다.
참고로 config 설정에 use_balsa_parser가 지정되어있다면, BalsaParser가 지정됩니다.
connection_manager_impl.cc
Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
if (!codec_) {
// Http3 codec should have been instantiated by now.
createCodec(data);
}
bool redispatch;
do {
redispatch = false;
const Status status = codec_->dispatch(data);
if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
handleCodecError(status.message());
return Network::FilterStatus::StopIteration;
} else if (isCodecProtocolError(status)) {
stats_.named_.downstream_cx_protocol_error_.inc();
handleCodecError(status.message());
return Network::FilterStatus::StopIteration;
}
ASSERT(status.ok());
// Processing incoming data may release outbound data so check for closure here as well.
checkForDeferredClose(false);
// The HTTP/1 codec will pause dispatch after a single message is complete. We want to
// either redispatch if there are no streams and we have more data. If we have a single
// complete non-WebSocket stream but have not responded yet we will pause socket reads
// to apply back pressure.
if (codec_->protocol() < Protocol::Http2) {
if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
data.length() > 0 && streams_.empty()) {
redispatch = true;
}
}
} while (redispatch);
if (!read_callbacks_->connection().streamInfo().protocol()) {
read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
}
return Network::FilterStatus::StopIteration;
}
지금까지 3가지 단계에 걸쳐서 ServerConnectionImpl 생성과 더불어 실제 요청을 처리하는 codec 또한 생성됨을 확인했습니다. 여기까지 완료되면, 그 다음 작업은 위 코드와 같이 codec_ 에게 사용자가 요청한 데이터를 넘겨줌으로써 dispatch 하도록 처리를 위임하고 Codec 내부에서는 다시 parser_를 통해 사용자의 데이터를 분석하고 정제하는 과정을 거칩니다.
이 과정에 대해서 보다 자세하게 이해하기 위해 다이어그램과 코드를 통해 살펴보겠습니다.
1. HttpConnectionManager의 onData 메소드 내에서 codec_인 Http1::ServerConnectionImpl에게 dispatch를 요청합니다.
connection_manager_impl.cc
Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
...(중략)...
const Status status = codec_->dispatch(data);
...(중략)...
}
2. ServerConnectionImpl은 내부에 존재하는 parser_ 프로퍼티를 통해서 데이터 분석을 요청합니다.
parser에게 execute 요청을 보내고나면, parser는 내부적으로 사용자 데이터를 전달받아 이를 분석하는 과정을 거칠 것입니다. 그리고 이전 포스팅을 통해 설명했듯이 사전에 정의된 ParserCallback 함수를 통해서 분석 중간중간의 결과물을 Callback을 통해 전달합니다.
3. Parser가 사용자의 데이터를 분석하면서 가장 먼저 onMessageBegin 콜백 함수가 실행됩니다.
4. 해당 콜백 함수를 전달받은 ServerConnectionImpl은 ActiveRequest 인스턴스를 생성합니다.
codec_impl.cc
Status ServerConnectionImpl::onMessageBeginBase() {
if (!resetStreamCalled()) {
ASSERT(active_request_ == nullptr);
active_request_ = std::make_unique<ActiveRequest>(*this, std::move(bytes_meter_before_stream_));
...(중략)...
}
return okStatus();
}
5. ActiveRequest를 만든 이후에 ActiveStream을 새로 생성하기 위해 HttpConnectionManager에게 생성을 요청합니다.
codec_impl.cc
Status ServerConnectionImpl::onMessageBeginBase() {
if (!resetStreamCalled()) {
...(중략)...
active_request_->request_decoder_ = &callbacks_.newStream(active_request_->response_encoder_);
...(중략)...
}
return okStatus();
}
6. HttpConnectionManager에서는 새로운 ActiveStream을 생성합니다. 이때 HttpConnectionManager가 보유한 filter_factories 정보 또한 같이 참조하여 ActiveStream 인스턴스 내에 filter_manager_를 구성합니다.
conn_manager_impl.cc
RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encoder,
bool is_internally_created) {
TRACE_EVENT("core", "ConnectionManagerImpl::newStream");
if (connection_idle_timer_) {
connection_idle_timer_->disableTimer();
}
ENVOY_CONN_LOG(debug, "new stream", read_callbacks_->connection());
Buffer::BufferMemoryAccountSharedPtr downstream_stream_account =
response_encoder.getStream().account();
if (downstream_stream_account == nullptr) {
// Create account, wiring the stream to use it for tracking bytes.
// If tracking is disabled, the wiring becomes a NOP.
auto& buffer_factory = read_callbacks_->connection().dispatcher().getWatermarkFactory();
downstream_stream_account = buffer_factory.createAccount(response_encoder.getStream());
response_encoder.getStream().setAccount(downstream_stream_account);
}
ActiveStreamPtr new_stream(new ActiveStream(*this, response_encoder.getStream().bufferLimit(),
std::move(downstream_stream_account)));
accumulated_requests_++;
if (config_.maxRequestsPerConnection() > 0 &&
accumulated_requests_ >= config_.maxRequestsPerConnection()) {
if (codec_->protocol() < Protocol::Http2) {
new_stream->state_.saw_connection_close_ = true;
// Prevent erroneous debug log of closing due to incoming connection close header.
drain_state_ = DrainState::Closing;
} else if (drain_state_ == DrainState::NotDraining) {
startDrainSequence();
}
ENVOY_CONN_LOG(debug, "max requests per connection reached", read_callbacks_->connection());
stats_.named_.downstream_cx_max_requests_reached_.inc();
}
...(중략)...
return **streams_.begin();
}
이때 먼저, config에 지정된 max_requests_per_connection을 넘겼는지 확인합니다. 해당 작업은 하나의 Connection 내부에서 여러 Stream이 생성될 때 이를 제한하기 위한 용도로 사용됩니다.
7. Stream 생성이 완료되면, HttpConnectionManager 내부에 존재하는 streams_ List에 추가하고 이를 반환합니다.
또한 이 과정에서 현재까지 분석된 Header의 길이가 max_headers_kb에 지정된 값(기본 60)을 넘는지를 검증하는 작업 또한 진행합니다.
11. Url 분석 작업이 완료된 이후에는 본격적으로 입력 값 Header를 분석하여 채우기 시작합니다. 이때 Header의 명을 Parsing 했을 경우에는 onHeaderField 콜백을 호출하고, Header의 값을 분석했을 때는 onHeaderValue 콜백을 호출합니다.
12. Header 명과 값을 모두 추출한 경우에는 해당 값을 headers_or_trailers_ 라는 Map에 입력합니다. 해당 자료구조는 Header의 필드명을 Key로 값을 Value로 하는 Map 구조로써 향후 Http 정보를 요청할 때 활용됩니다.
codec_impl.cc
Status ConnectionImpl::completeCurrentHeader() {
ASSERT(dispatching_);
ENVOY_CONN_LOG(trace, "completed header: key={} value={}", connection_,
current_header_field_.getStringView(), current_header_value_.getStringView());
auto& headers_or_trailers = headersOrTrailers();
// Account for ":" and "\r\n" bytes between the header key value pair.
getBytesMeter().addHeaderBytesReceived(CRLF_SIZE + 1);
// TODO(10646): Switch to use HeaderUtility::checkHeaderNameForUnderscores().
RETURN_IF_ERROR(checkHeaderNameForUnderscores());
if (!current_header_field_.empty()) {
// Strip trailing whitespace of the current header value if any. Leading whitespace was trimmed
// in ConnectionImpl::onHeaderValue. http_parser does not strip leading or trailing whitespace
// as the spec requires: https://tools.ietf.org/html/rfc7230#section-3.2.4
current_header_value_.rtrim();
// If there is a stateful formatter installed, remember the original header key before
// converting to lower case.
auto formatter = headers_or_trailers.formatter();
if (formatter.has_value()) {
formatter->processKey(current_header_field_.getStringView());
}
current_header_field_.inlineTransform([](char c) { return absl::ascii_tolower(c); });
headers_or_trailers.addViaMove(std::move(current_header_field_),
std::move(current_header_value_));
}
// Check if the number of headers exceeds the limit.
if (headers_or_trailers.size() > max_headers_count_) {
error_code_ = Http::Code::RequestHeaderFieldsTooLarge;
RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().TooManyHeaders));
const absl::string_view header_type =
processing_trailers_ ? Http1HeaderTypes::get().Trailers : Http1HeaderTypes::get().Headers;
return codecProtocolError(
absl::StrCat("http/1.1 protocol error: ", header_type, " count exceeds limit"));
}
header_parsing_state_ = HeaderParsingState::Field;
ASSERT(current_header_field_.empty());
ASSERT(current_header_value_.empty());
return okStatus();
}
이번에는 Header 분석이 모두 완료되었을 때 흐름에 대해서 살펴보겠습니다. 해당 과정은 이전에 살펴봤던 Cluster Manager에서의 역할과 Http filter 중 하나인 Router Filter 등이 사용되므로 해당 과정을 이해하기 위해서는 이전 시리즈의 내용에 대한 개념 숙지가 반드시 필요합니다.
13. Parser로 부터 Header 분석이 완료되면, onHeaderComplete 콜백 함수를 호출합니다.
16. RequestMap에 기존 ActiveRequest에서 보관중이던 request_url을 추출하여 Upstream 대상 Url을 설정합니다. 이 과정에서 추가로 Method 타입 또한 지정합니다.
codec_impl.cc
StatusOr<CallbackResult> ConnectionImpl::onHeadersCompleteImpl() {
...(중략)...
auto statusor = onHeadersCompleteBase();
...(중략)...
}
codec_impl.cc
Envoy::StatusOr<CallbackResult> ServerConnectionImpl::onHeadersCompleteBase() {
// Handle the case where response happens prior to request complete. It's up to upper layer code
// to disconnect the connection but we shouldn't fire any more events since it doesn't make
// sense.
if (active_request_) {
auto& headers = absl::get<RequestHeaderMapPtr>(headers_or_trailers_);
...(중략)...
// Inform the response encoder about any HEAD method, so it can set content
// length and transfer encoding headers correctly.
const Http::HeaderValues& header_values = Http::Headers::get();
active_request_->response_encoder_.setIsResponseToHeadRequest(parser_->methodName() ==
header_values.MethodValues.Head);
active_request_->response_encoder_.setIsResponseToConnectRequest(
parser_->methodName() == header_values.MethodValues.Connect);
RETURN_IF_ERROR(handlePath(*headers, parser_->methodName()));
ASSERT(active_request_->request_url_.empty());
headers->setMethod(parser_->methodName());
...(중략)...
}
return CallbackResult::Success;
}
codec_impl.cc
Status ServerConnectionImpl::handlePath(RequestHeaderMap& headers, absl::string_view method) {
const Http::HeaderValues& header_values = Http::Headers::get();
HeaderString path(header_values.Path);
bool is_connect = (method == header_values.MethodValues.Connect);
// The url is relative or a wildcard when the method is OPTIONS. Nothing to do here.
if (!is_connect && !active_request_->request_url_.getStringView().empty() &&
(active_request_->request_url_.getStringView()[0] == '/' ||
(method == header_values.MethodValues.Options &&
active_request_->request_url_.getStringView()[0] == '*'))) {
headers.addViaMove(std::move(path), std::move(active_request_->request_url_));
return okStatus();
}
// If absolute_urls and/or connect are not going be handled, copy the url and return.
// This forces the behavior to be backwards compatible with the old codec behavior.
// CONNECT "urls" are actually host:port so look like absolute URLs to the above checks.
// Absolute URLS in CONNECT requests will be rejected below by the URL class validation.
if (!codec_settings_.allow_absolute_url_ && !is_connect) {
headers.addViaMove(std::move(path), std::move(active_request_->request_url_));
return okStatus();
}
Utility::Url absolute_url;
if (!absolute_url.initialize(active_request_->request_url_.getStringView(), is_connect)) {
RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().InvalidUrl));
return codecProtocolError("http/1.1 protocol error: invalid url in request line");
}
// RFC7230#5.7
// When a proxy receives a request with an absolute-form of
// request-target, the proxy MUST ignore the received Host header field
// (if any) and instead replace it with the host information of the
// request-target. A proxy that forwards such a request MUST generate a
// new Host field-value based on the received request-target rather than
// forward the received Host field-value.
headers.setHost(absolute_url.hostAndPort());
// Add the scheme and validate to ensure no https://
// requests are accepted over unencrypted connections by front-line Envoys.
if (!is_connect) {
headers.setScheme(absolute_url.scheme());
if (!HeaderUtility::schemeIsValid(absolute_url.scheme())) {
RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().InvalidScheme));
return codecProtocolError("http/1.1 protocol error: invalid scheme");
}
if (codec_settings_.validate_scheme_ &&
absolute_url.scheme() == header_values.SchemeValues.Https && !connection().ssl()) {
error_code_ = Http::Code::Forbidden;
RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().HttpsInPlaintext));
return codecProtocolError("http/1.1 protocol error: https in the clear");
}
}
if (!absolute_url.pathAndQueryParams().empty()) {
headers.setPath(absolute_url.pathAndQueryParams());
}
active_request_->request_url_.clear();
return okStatus();
}
17. ActiveRequest에 지정된 request_decoder_ 즉 ActiveStream에게 Header 처리를 위임합니다. 이 과정에서 RequestMap을 인자로 전달합니다.
codec_impl.cc
Envoy::StatusOr<CallbackResult> ServerConnectionImpl::onHeadersCompleteBase() {
...(중략)...
if (parser_->isChunked() ||
(parser_->contentLength().has_value() && parser_->contentLength().value() > 0) ||
handling_upgrade_) {
active_request_->request_decoder_->decodeHeaders(std::move(headers), false);
// If the connection has been closed (or is closing) after decoding headers, pause the parser
// so we return control to the caller.
if (connection_.state() != Network::Connection::State::Open) {
return parser_->pause();
}
} else {
deferred_end_stream_headers_ = true;
}
}
return CallbackResult::Success;
}
18. ActiveStream에서는 내부에 존재하는 filter_manager_ 에게 먼저 Downstream의 주소와 전달받은 RequestMap을 토대로 만든 header 정보를 매핑합니다. 매핑이 완료되면, 그 이후에는 filter_manager_가 보유한 filter chain을 생성합니다.
참고로 filter_manager_는 ActiveStream이 생성되는 당시에 HttpConnectionManager로 부터 filter_factories_ 의 정보를 전달받았으며, filter chain 생성을 요청받으면, filter_factories_가 보유한 factory 콜백 메소드를 실행시켜 filter를 내부에 생성하는 작업을 수행합니다.
filter 생성이 모두 완료되면, 해당 filter chain을 순회하면서 header 정보 처리를 위임합니다.
19. Http filter 종류 중 하나인 Router filter가 filter chain 가장 마지막에 호출됩니다.
20. Router filter에서는 Upstream 대상 Host 정보와 Connection Pool을 획득하기 위해 우선 ClusterEntry 정보를 ClusterManager에게 요청합니다.
21. Cluster Manager는 자신이 보유하고 있는 thread_local_clusters_ 에서 Router가 요청한 Cluster 정보를 찾습니다.
22. Cluster Manager로 부터 ThreadLocalCluster를 찾아서 Router로 반환합니다.
23. Router에서는 Cluster내 존재하는 host를 통해 Connection 연결을 수행해야합니다. 따라서 전달받은 ThreadLocalCluster를 통해서 Connection Pool 할당을 요청합니다.
24. 이를 전달받은 ThreadLocalCluster(Cluster Entry)는 Cluster Manager에게 요청하여 host_http_conn_pool_map에 할당 받은 Connection Pool이 존재하는지를 확인합니다. 만약 존재한다면, 해당 Connection Pool Map에 있는 Container에 접근합니다. 반면, 존재하지 않는다면 새로운 Container를 생성하여 해당 Map에 추가합니다.
25. Container 내부에는 Cluster 별로 Connection을 관리하는 Pool이 존재합니다. 여기에서 Pool 존재여부를 최종적으로 확인합니다.
26. 최초 접근시에는 Pool이 생성되지 않았을 것이기 때문에 먼저 Cluster가 Connection Pool을 생성이 가능한지 설정된 Resource Limit 설정 값을 살펴봅니다.
27. Resource Limit 설정 값에 이상이 없어 신규 Pool 생성이 가능하면, ClusterManager에게 Pool 할당을 요청합니다. 이때 ClusterManager는 Client가 요구하는 프로토콜이 무엇인지 확인한 다음 해당 요청을 처리할 수 있는 Connection을 생성하여 반환합니다. 그리고 생성된 Pool을 Container가 보유하고 있는 active_pools_에 삽입합니다.
28. 생성된 Pool을 ThreadLocalCluster(Cluster Entry)에 반환합니다.
29. Pool을 Router에게 반환합니다. Router는 전달받은 Pool 정보를 기반으로 Upstream Connection을 생성하기 위해 UpstreamRequest를 생성합니다. 그리고 생성된 UpstreamRequest를 자신이 보유하고 있는 upstream_requests_ 리스트에 추가합니다.
여기서 UpstreamRequest는 전달받은 Connection Pool을 기반으로 Upstream에 연결을 요청하기 위해 설정된 자료구조로써 해당 자료구조를 통해 상위 Host와 통신을 수행할 수 있습니다. 해당 자료구조에 대해서 조금 더 살펴보면 다음과 같습니다.
UpstreamRequest 구조에서 핵심 속성은 위와 같습니다. 위 속성에 대해서 하나씩 살펴보겠습니다.
먼저 conn_pool_은 UpstreamRequest를 관리하는 Connection Pool을 가르키는 포인터입니다. 해당 속성은 이전 ThreadLocalCluster를 통해서 전달받았던 Connection Pool을 가르키고 있습니다. 따라서 해당 Pool을 통해 사용자 요청 Stream을 매핑시킴으로써 데이터 송/수신 역할 처리를 담당할 수 있도록 가교 역할을 수행합니다.
conn_pool_ 내부를 조금 더 살펴보겠습니다. Connection Pool 내부에는 Client를 관리하는 list가 존재합니다. 다만 Connection Pool 내부에 존재하는 Client가 상태가 모두 다를 수 있기 때문에 이를 관리하기 위한 여러 자료구조가 존재합니다.
가령 Connection Pool에서 생성된 Client는 위 그림과 같이 상태를 내부 속성으로 지니고 있는데, 상태가 Ready, Busy, Draining 등에 따라 재사용 가능 여부가 결정됩니다. 그 이유는 현재 Client 사용중인데, 해당 Client를 재사용하게된다면 이는 올바른 서비스를 제공할 수 없기 때문입니다.
따라서 현재 재사용이 가능한 Client 목록을 지니고 있는 ready_clients_, 현재 Stream 처리가 진행중이거나 Drain을 수행중인 Client 목록을 보유하는 busy_clients 그리고 연결을 시도중인 Client 목록을 보유하고 있는 connecting_clients_ 등 여러 자료구조가 존재합니다.
해당 자료구조는 Client의 상태 변이에 따라 저장되는 리스트가 다르며, 만약 재사용이 가능한 ready_clients_ 에서 Client를 사용하여 해당 Client가 Connecting을 수행한다면 connectiong_clients_로 Client 목록이 이동하게 됩니다.
이번에는 Upstream의 filter_manager에 대해서 살펴보겠습니다.
upstream_request.cc
// Set up the upstream filter manager.
filter_manager_callbacks_ = std::make_unique<UpstreamRequestFilterManagerCallbacks>(*this);
filter_manager_ = std::make_unique<UpstreamFilterManager>(
*filter_manager_callbacks_, parent_.callbacks()->dispatcher(),
parent_.callbacks()->connection(), parent_.callbacks()->streamId(),
parent_.callbacks()->account(), true, parent_.callbacks()->decoderBufferLimit(),
*parent_.cluster(), *this);
parent_.cluster()->createFilterChain(*filter_manager_);
// The cluster will always create a codec filter, which sets the upstream
UpstreamRequest 내부 filter_manager는 Upstream 요청을 처리하기 위한 filter를 생성하고 관리하는 역할을 수행합니다.
이때 내부적으로는 위 코드와 같이 UpstreamCodecFilterFactory에 의해서 UpstreamCodecFilter가 UpstreamRequest의 filter chain으로 등록되며, 향후 Upstream 요청을처리하는 역할을 수행합니다.
마지막으로 살펴볼 것은 stream_info_와 upstream_ 속성입니다. stream_info의 경우 현재 Upstream의 메타정보를 관리하는 속성이며, upstream_의 경우 이 다음에 바로 설명하겠지만, UpstreamRequest가 생성된 이후 사용자 요청을 처리하기 위한 헤더 정보 매핑, 해당 요청을 처리해야할 encoder 등의 속성을 지니고 있는 자료구조로써 Upstream 처리를 수행합니다.
지금까지 UpstreamRequest 내부에 포함된 주요 속성에 대해서 살펴봤는데요. 이번에는 UpstreamRequest를 생성한 다음 후속 과정에 대해서 살펴보겠습니다.
30. Router 에서는 upstream_requests_ 에 추가된 UpstreamRequest에게 Header 정보를 전달합니다.
31. UpstreamRequest 내부에서는 내부에 매핑된 pool을 통해 Client를 해당 Pool에 매핑하도록 요청합니다.
32. Pool 내부에는 ready_clients_ 를 먼저 살펴보고 연결 가능한 목록이 존재하면 해당 목록에 존재하는 Client 목록을 얻어와서 현재 Stream을 연결하도록 합니다.
conn_pool_base.cc
if (!ready_clients_.empty()) {
ActiveClient& client = *ready_clients_.front();
ENVOY_CONN_LOG(debug, "using existing fully connected connection", client);
attachStreamToClient(client, context);
// Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
tryCreateNewConnections();
return nullptr;
}
이때 위 코드와 같이 먼저 ready_clients_ 로부터 재사용이 가능한 Client 목록이 존재하는지 살펴보고 만약 존재한다면 해당 Client를 재사용하여 사용자의 Context를 해당 Client에게 연결시킵니다.
하지만 ready_clients_ 목록에 연결 가능한 Client가 존재하지 않는다면, 해당 Host에 연결된 설정을 토대로 pending request 생성 여부를 판별한 다음 해당 설정을 넘어선 Stream 요청이 들어왔을 경우는 연결을 해제하도록 합니다.
반면 Stream 생성이 가능하다면, 해당 요청은 HttpPendingStream으로 생성하여 Pool에 보유한 pending_streams_ 리스트에 이를 추가합니다.
그리고 이렇게 등록된 pending_streams_는 주기적으로 Dispatcher에 존재하는 baseScheduler에 의해서 관리됩니다.
conn_pool_base.cc
void ConnPoolImplBase::onUpstreamReady() {
while (!pending_streams_.empty() && !ready_clients_.empty()) {
ActiveClientPtr& client = ready_clients_.front();
ENVOY_CONN_LOG(debug, "attaching to next stream", *client);
// Pending streams are pushed onto the front, so pull from the back.
attachStreamToClient(*client, pending_streams_.back()->context());
state_.decrPendingStreams(1);
pending_streams_.pop_back();
}
if (!pending_streams_.empty()) {
tryCreateNewConnections();
}
}
이때 주기적으로 호출되는 메소드는 onUpstreamReady이며, 해당 메소드의 역할을 살펴보면, pending_streams에 존재하는 stream을 살펴보고 호출 당시 ready_clients_에 재사용 가능한 client가 존재한다면, 해당 client에 pending_streams에 존재하는 stream을 연결시켜주는 역할을 수행하는 것을 볼 수 있습니다.
33. HttpUpstream이 생성되면 UpstreamRequest에 할당하여 이후 해당 Stream을 통해 Upstream과 연결을 수행할 수 있습니다.
지금까지 살펴볼 것은 Header가 분석이 완료된 이후 후속 작업에 대해서 살펴봤습니다. 해당 과정을 요약하자면, Header 정보를 파싱하여 이를 별도 Map에 저장하는 것은 물론 Cluster Manager와 협업을 통해서 Connection Pool을 할당받고 해당 Pool에 존재하는 Client에 연결을 수행하여 Upstream을 만드는 작업을 진행합니다. 이번에는 Http Request의 본문을 파싱하고 이를 해석하는 과정에 대해서 살펴보겠습니다.
34. Parser에서 본문을 분석한 다음 bufferBody callback 함수를 호출합니다.
35. bufferBody를 호출받으면 이를 다시 bufferBody 함수를 호추하여 후속 작업을 이어갑니다.
StatusOr<CallbackResult> ConnectionImpl::onHeadersCompleteImpl() {
ASSERT(!processing_trailers_);
ASSERT(dispatching_);
ENVOY_CONN_LOG(trace, "onHeadersCompleteBase", connection_);
RETURN_IF_ERROR(completeCurrentHeader());
if (!parser_->isHttp11()) {
// This is not necessarily true, but it's good enough since higher layers only care if this is
// HTTP/1.1 or not.
protocol_ = Protocol::Http10;
}
RequestOrResponseHeaderMap& request_or_response_headers = requestOrResponseHeaders();
const Http::HeaderValues& header_values = Http::Headers::get();
if (Utility::isUpgrade(request_or_response_headers) && upgradeAllowed()) {
// Ignore h2c upgrade requests until we support them.
// See https://github.com/envoyproxy/envoy/issues/7161 for details.
if (absl::EqualsIgnoreCase(request_or_response_headers.getUpgradeValue(),
header_values.UpgradeValues.H2c)) {
ENVOY_CONN_LOG(trace, "removing unsupported h2c upgrade headers.", connection_);
request_or_response_headers.removeUpgrade();
if (request_or_response_headers.Connection()) {
const auto& tokens_to_remove = caseUnorderdSetContainingUpgradeAndHttp2Settings();
std::string new_value = StringUtil::removeTokens(
request_or_response_headers.getConnectionValue(), ",", tokens_to_remove, ",");
if (new_value.empty()) {
request_or_response_headers.removeConnection();
} else {
request_or_response_headers.setConnection(new_value);
}
}
request_or_response_headers.remove(header_values.Http2Settings);
} else {
ENVOY_CONN_LOG(trace, "codec entering upgrade mode.", connection_);
handling_upgrade_ = true;
}
}
if (parser_->methodName() == header_values.MethodValues.Connect) {
if (request_or_response_headers.ContentLength()) {
if (request_or_response_headers.getContentLengthValue() == "0") {
request_or_response_headers.removeContentLength();
} else {
// Per https://tools.ietf.org/html/rfc7231#section-4.3.6 a payload with a
// CONNECT request has no defined semantics, and may be rejected.
error_code_ = Http::Code::BadRequest;
RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().BodyDisallowed));
return codecProtocolError("http/1.1 protocol error: unsupported content length");
}
}
ENVOY_CONN_LOG(trace, "codec entering upgrade mode for CONNECT request.", connection_);
handling_upgrade_ = true;
}
// https://tools.ietf.org/html/rfc7230#section-3.3.3
// If a message is received with both a Transfer-Encoding and a
// Content-Length header field, the Transfer-Encoding overrides the
// Content-Length. Such a message might indicate an attempt to
// perform request smuggling (Section 9.5) or response splitting
// (Section 9.4) and ought to be handled as an error. A sender MUST
// remove the received Content-Length field prior to forwarding such
// a message.
#ifndef ENVOY_ENABLE_UHV
// This check is moved into default header validator.
// TODO(yanavlasov): use runtime override here when UHV is moved into the main build
// Reject message with Http::Code::BadRequest if both Transfer-Encoding and Content-Length
// headers are present or if allowed by http1 codec settings and 'Transfer-Encoding'
// is chunked - remove Content-Length and serve request.
if (parser_->hasTransferEncoding() != 0 && request_or_response_headers.ContentLength()) {
if (parser_->isChunked() && codec_settings_.allow_chunked_length_) {
request_or_response_headers.removeContentLength();
} else {
error_code_ = Http::Code::BadRequest;
RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().ChunkedContentLength));
return codecProtocolError(
"http/1.1 protocol error: both 'Content-Length' and 'Transfer-Encoding' are set.");
}
}
#endif
// Per https://tools.ietf.org/html/rfc7230#section-3.3.1 Envoy should reject
// transfer-codings it does not understand.
// Per https://tools.ietf.org/html/rfc7231#section-4.3.6 a payload with a
// CONNECT request has no defined semantics, and may be rejected.
if (request_or_response_headers.TransferEncoding()) {
const absl::string_view encoding = request_or_response_headers.getTransferEncodingValue();
if (!absl::EqualsIgnoreCase(encoding, header_values.TransferEncodingValues.Chunked) ||
parser_->methodName() == header_values.MethodValues.Connect) {
error_code_ = Http::Code::NotImplemented;
RETURN_IF_ERROR(sendProtocolError(Http1ResponseCodeDetails::get().InvalidTransferEncoding));
return codecProtocolError("http/1.1 protocol error: unsupported transfer encoding");
}
}
auto statusor = onHeadersCompleteBase();
if (!statusor.ok()) {
RETURN_IF_ERROR(statusor.status());
}
header_parsing_state_ = HeaderParsingState::Done;
// Returning CallbackResult::NoBodyData informs http_parser to not expect a body or further data
// on this connection.
return handling_upgrade_ ? CallbackResult::NoBodyData : statusor.value();
}
codec_impl.cc
Envoy::StatusOr<CallbackResult> ServerConnectionImpl::onHeadersCompleteBase() {
// Handle the case where response happens prior to request complete. It's up to upper layer code
// to disconnect the connection but we shouldn't fire any more events since it doesn't make
// sense.
if (active_request_) {
auto& headers = absl::get<RequestHeaderMapPtr>(headers_or_trailers_);
ENVOY_CONN_LOG(trace, "Server: onHeadersComplete size={}", connection_, headers->size());
if (!handling_upgrade_ && headers->Connection()) {
// If we fail to sanitize the request, return a 400 to the client
if (!Utility::sanitizeConnectionHeader(*headers)) {
absl::string_view header_value = headers->getConnectionValue();
ENVOY_CONN_LOG(debug, "Invalid nominated headers in Connection: {}", connection_,
header_value);
error_code_ = Http::Code::BadRequest;
RETURN_IF_ERROR(
sendProtocolError(Http1ResponseCodeDetails::get().ConnectionHeaderSanitization));
return codecProtocolError("Invalid nominated headers in Connection.");
}
}
// Inform the response encoder about any HEAD method, so it can set content
// length and transfer encoding headers correctly.
const Http::HeaderValues& header_values = Http::Headers::get();
active_request_->response_encoder_.setIsResponseToHeadRequest(parser_->methodName() ==
header_values.MethodValues.Head);
active_request_->response_encoder_.setIsResponseToConnectRequest(
parser_->methodName() == header_values.MethodValues.Connect);
RETURN_IF_ERROR(handlePath(*headers, parser_->methodName()));
ASSERT(active_request_->request_url_.empty());
headers->setMethod(parser_->methodName());
// Make sure the host is valid.
auto details = HeaderUtility::requestHeadersValid(*headers);
if (details.has_value()) {
RETURN_IF_ERROR(sendProtocolError(details.value().get()));
return codecProtocolError(
"http/1.1 protocol error: request headers failed spec compliance checks");
}
// Determine here whether we have a body or not. This uses the new RFC semantics where the
// presence of content-length or chunked transfer-encoding indicates a body vs. a particular
// method. If there is no body, we defer raising decodeHeaders() until the parser is flushed
// with message complete. This allows upper layers to behave like HTTP/2 and prevents a proxy
// scenario where the higher layers stream through and implicitly switch to chunked transfer
// encoding because end stream with zero body length has not yet been indicated.
if (parser_->isChunked() ||
(parser_->contentLength().has_value() && parser_->contentLength().value() > 0) ||
handling_upgrade_) {
active_request_->request_decoder_->decodeHeaders(std::move(headers), false);
// If the connection has been closed (or is closing) after decoding headers, pause the parser
// so we return control to the caller.
if (connection_.state() != Network::Connection::State::Open) {
return parser_->pause();
}
} else {
deferred_end_stream_headers_ = true;
}
}
return CallbackResult::Success;
}
StatusOr<CallbackResult> ConnectionImpl::onMessageCompleteImpl() {
ENVOY_CONN_LOG(trace, "message complete", connection_);
dispatchBufferedBody();
if (handling_upgrade_) {
// If this is an upgrade request, swallow the onMessageComplete. The
// upgrade payload will be treated as stream body.
ASSERT(!deferred_end_stream_headers_);
ENVOY_CONN_LOG(trace, "Pausing parser due to upgrade.", connection_);
return parser_->pause();
}
// If true, this indicates we were processing trailers and must
// move the last header into current_header_map_
if (header_parsing_state_ == HeaderParsingState::Value) {
RETURN_IF_ERROR(completeCurrentHeader());
}
return onMessageCompleteBase();
}
CallbackResult ServerConnectionImpl::onMessageCompleteBase() {
ASSERT(!handling_upgrade_);
if (active_request_) {
// The request_decoder should be non-null after we've called the newStream on callbacks.
ASSERT(active_request_->request_decoder_);
active_request_->remote_complete_ = true;
if (deferred_end_stream_headers_) {
active_request_->request_decoder_->decodeHeaders(
std::move(absl::get<RequestHeaderMapPtr>(headers_or_trailers_)), true);
deferred_end_stream_headers_ = false;
} else if (processing_trailers_) {
active_request_->request_decoder_->decodeTrailers(
std::move(absl::get<RequestTrailerMapPtr>(headers_or_trailers_)));
} else {
Buffer::OwnedImpl buffer;
active_request_->request_decoder_->decodeData(buffer, true);
}
// Reset to ensure no information from one requests persists to the next.
headers_or_trailers_.emplace<RequestHeaderMapPtr>(nullptr);
}
// Always pause the parser so that the calling code can process 1 request at a time and apply
// back pressure. However this means that the calling code needs to detect if there is more data
// in the buffer and dispatch it again.
return parser_->pause();
}
conn_manager_impl.cc
// Ordering in this function is complicated, but important.
//
// We want to do minimal work before selecting route and creating a filter
// chain to maximize the number of requests which get custom filter behavior,
// e.g. registering access logging.
//
// This must be balanced by doing sanity checking for invalid requests (one
// can't route select properly without full headers), checking state required to
// serve error responses (connection close, head requests, etc), and
// modifications which may themselves affect route selection.
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
bool end_stream) {
ENVOY_STREAM_LOG(debug, "request headers complete (end_stream={}):\n{}", *this, end_stream,
*headers);
ScopeTrackerScopeState scope(this,
connection_manager_.read_callbacks_->connection().dispatcher());
request_headers_ = std::move(headers);
filter_manager_.requestHeadersInitialized();
if (request_header_timer_ != nullptr) {
request_header_timer_->disableTimer();
request_header_timer_.reset();
}
// Both saw_connection_close_ and is_head_request_ affect local replies: set
// them as early as possible.
const Protocol protocol = connection_manager_.codec_->protocol();
state_.saw_connection_close_ = HeaderUtility::shouldCloseConnection(protocol, *request_headers_);
// We end the decode here to mark that the downstream stream is complete.
maybeEndDecode(end_stream);
if (!validateHeaders()) {
ENVOY_STREAM_LOG(debug, "request headers validation failed\n{}", *this, *request_headers_);
return;
}
// We need to snap snapped_route_config_ here as it's used in mutateRequestHeaders later.
if (connection_manager_.config_.isRoutable()) {
if (connection_manager_.config_.routeConfigProvider() != nullptr) {
snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
} else if (connection_manager_.config_.scopedRouteConfigProvider() != nullptr) {
snapped_scoped_routes_config_ =
connection_manager_.config_.scopedRouteConfigProvider()->config<Router::ScopedConfig>();
snapScopedRouteConfig();
}
} else {
snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
}
// Drop new requests when overloaded as soon as we have decoded the headers.
if (connection_manager_.random_generator_.bernoulli(
connection_manager_.overload_stop_accepting_requests_ref_.value())) {
// In this one special case, do not create the filter chain. If there is a risk of memory
// overload it is more important to avoid unnecessary allocation than to create the filters.
filter_manager_.skipFilterChainCreation();
connection_manager_.stats_.named_.downstream_rq_overload_close_.inc();
sendLocalReply(Http::Code::ServiceUnavailable, "envoy overloaded", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().Overload);
return;
}
if (!connection_manager_.config_.proxy100Continue() && request_headers_->Expect() &&
// The Expect field-value is case-insensitive.
// https://tools.ietf.org/html/rfc7231#section-5.1.1
absl::EqualsIgnoreCase((request_headers_->Expect()->value().getStringView()),
Headers::get().ExpectValues._100Continue)) {
// Note in the case Envoy is handling 100-Continue complexity, it skips the filter chain
// and sends the 100-Continue directly to the encoder.
chargeStats(continueHeader());
response_encoder_->encode1xxHeaders(continueHeader());
// Remove the Expect header so it won't be handled again upstream.
request_headers_->removeExpect();
}
connection_manager_.user_agent_.initializeFromHeaders(*request_headers_,
connection_manager_.stats_.prefixStatName(),
connection_manager_.stats_.scope_);
// Make sure we are getting a codec version we support.
if (protocol == Protocol::Http10) {
// Assume this is HTTP/1.0. This is fine for HTTP/0.9 but this code will also affect any
// requests with non-standard version numbers (0.9, 1.3), basically anything which is not
// HTTP/1.1.
//
// The protocol may have shifted in the HTTP/1.0 case so reset it.
filter_manager_.streamInfo().protocol(protocol);
if (!connection_manager_.config_.http1Settings().accept_http_10_) {
// Send "Upgrade Required" if HTTP/1.0 support is not explicitly configured on.
sendLocalReply(Code::UpgradeRequired, "", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().LowVersion);
return;
}
if (!request_headers_->Host() &&
!connection_manager_.config_.http1Settings().default_host_for_http_10_.empty()) {
// Add a default host if configured to do so.
request_headers_->setHost(
connection_manager_.config_.http1Settings().default_host_for_http_10_);
}
}
if (!request_headers_->Host()) {
// Require host header. For HTTP/1.1 Host has already been translated to :authority.
sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().MissingHost);
return;
}
// Verify header sanity checks which should have been performed by the codec.
ASSERT(HeaderUtility::requestHeadersValid(*request_headers_).has_value() == false);
// Check for the existence of the :path header for non-CONNECT requests, or present-but-empty
// :path header for CONNECT requests. We expect the codec to have broken the path into pieces if
// applicable. NOTE: Currently the HTTP/1.1 codec only does this when the allow_absolute_url flag
// is enabled on the HCM.
if ((!HeaderUtility::isConnect(*request_headers_) || request_headers_->Path()) &&
request_headers_->getPathValue().empty()) {
sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().MissingPath);
return;
}
// Currently we only support relative paths at the application layer.
if (!request_headers_->getPathValue().empty() && request_headers_->getPathValue()[0] != '/') {
connection_manager_.stats_.named_.downstream_rq_non_relative_path_.inc();
sendLocalReply(Code::NotFound, "", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().AbsolutePath);
return;
}
// Path sanitization should happen before any path access other than the above sanity check.
const auto action =
ConnectionManagerUtility::maybeNormalizePath(*request_headers_, connection_manager_.config_);
// gRPC requests are rejected if Envoy is configured to redirect post-normalization. This is
// because gRPC clients do not support redirect.
if (action == ConnectionManagerUtility::NormalizePathAction::Reject ||
(action == ConnectionManagerUtility::NormalizePathAction::Redirect &&
Grpc::Common::hasGrpcContentType(*request_headers_))) {
connection_manager_.stats_.named_.downstream_rq_failed_path_normalization_.inc();
sendLocalReply(Code::BadRequest, "", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
return;
} else if (action == ConnectionManagerUtility::NormalizePathAction::Redirect) {
connection_manager_.stats_.named_.downstream_rq_redirected_with_normalized_path_.inc();
sendLocalReply(
Code::TemporaryRedirect, "",
[new_path = request_headers_->Path()->value().getStringView()](
Http::ResponseHeaderMap& response_headers) -> void {
response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
},
absl::nullopt, StreamInfo::ResponseCodeDetails::get().PathNormalizationFailed);
return;
}
ASSERT(action == ConnectionManagerUtility::NormalizePathAction::Continue);
auto optional_port = ConnectionManagerUtility::maybeNormalizeHost(
*request_headers_, connection_manager_.config_, localPort());
if (optional_port.has_value() &&
requestWasConnect(request_headers_, connection_manager_.codec_->protocol())) {
filter_manager_.streamInfo().filterState()->setData(
Router::OriginalConnectPort::key(),
std::make_unique<Router::OriginalConnectPort>(optional_port.value()),
StreamInfo::FilterState::StateType::ReadOnly, StreamInfo::FilterState::LifeSpan::Request);
}
if (!state_.is_internally_created_) { // Only sanitize headers on first pass.
// Modify the downstream remote address depending on configuration and headers.
const auto mutate_result = ConnectionManagerUtility::mutateRequestHeaders(
*request_headers_, connection_manager_.read_callbacks_->connection(),
connection_manager_.config_, *snapped_route_config_, connection_manager_.local_info_);
// IP detection failed, reject the request.
if (mutate_result.reject_request.has_value()) {
const auto& reject_request_params = mutate_result.reject_request.value();
connection_manager_.stats_.named_.downstream_rq_rejected_via_ip_detection_.inc();
sendLocalReply(reject_request_params.response_code, reject_request_params.body, nullptr,
absl::nullopt,
StreamInfo::ResponseCodeDetails::get().OriginalIPDetectionFailed);
return;
}
filter_manager_.setDownstreamRemoteAddress(mutate_result.final_remote_address);
}
ASSERT(filter_manager_.streamInfo().downstreamAddressProvider().remoteAddress() != nullptr);
ASSERT(!cached_route_);
refreshCachedRoute();
if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
filter_manager_.streamInfo().setTraceReason(
ConnectionManagerUtility::mutateTracingRequestHeader(
*request_headers_, connection_manager_.runtime_, connection_manager_.config_,
cached_route_.value().get()));
}
filter_manager_.streamInfo().setRequestHeaders(*request_headers_);
const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
// TODO if there are no filters when starting a filter iteration, the connection manager
// should return 404. The current returns no response if there is no router filter.
if (hasCachedRoute()) {
// Do not allow upgrades if the route does not support it.
if (upgrade_rejected) {
// While downstream servers should not send upgrade payload without the upgrade being
// accepted, err on the side of caution and refuse to process any further requests on this
// connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
// contains a smuggled HTTP request.
state_.saw_connection_close_ = true;
connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
sendLocalReply(Code::Forbidden, "", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().UpgradeFailed);
return;
}
// Allow non websocket requests to go through websocket enabled routes.
}
// Check if tracing is enabled.
if (connection_manager_tracing_config_.has_value()) {
traceRequest();
}
filter_manager_.decodeHeaders(*request_headers_, end_stream);
// Reset it here for both global and overridden cases.
resetIdleTimer();
}
위 과정에서 이제 connectionManager와 결합하여 Connection을 만든다. 과정을 조금 자세히 살펴보면 다음과 같다.
header_utility.cc
bool HeaderUtility::shouldCloseConnection(Http::Protocol protocol,
const RequestOrResponseHeaderMap& headers) {
// HTTP/1.0 defaults to single-use connections. Make sure the connection will be closed unless
// Keep-Alive is present.
if (protocol == Protocol::Http10 &&
(!headers.Connection() ||
!Envoy::StringUtil::caseFindToken(headers.Connection()->value().getStringView(), ",",
Http::Headers::get().ConnectionValues.KeepAlive))) {
return true;
}
if (protocol == Protocol::Http11 && headers.Connection() &&
Envoy::StringUtil::caseFindToken(headers.Connection()->value().getStringView(), ",",
Http::Headers::get().ConnectionValues.Close)) {
return true;
}
// Note: Proxy-Connection is not a standard header, but is supported here
// since it is supported by http-parser the underlying parser for http
// requests.
if (protocol < Protocol::Http2 && headers.ProxyConnection() &&
Envoy::StringUtil::caseFindToken(headers.ProxyConnection()->value().getStringView(), ",",
Http::Headers::get().ConnectionValues.Close)) {
return true;
}
return false;
}
먼저 shouldCloseCOnnection을 통해서 해당 프로토콜이 http2이하인지를 살펴보고 종료 여부를 결정한다.
conn_manager_impl.cc
void ConnectionManagerImpl::ActiveStream::maybeEndDecode(bool end_stream) {
// If recreateStream is called, the HCM rewinds state and may send more encodeData calls.
if (end_stream && !filter_manager_.remoteDecodeComplete()) {
filter_manager_.streamInfo().downstreamTiming().onLastDownstreamRxByteReceived(
connection_manager_.read_callbacks_->connection().dispatcher().timeSource());
ENVOY_STREAM_LOG(debug, "request end stream", *this);
}
}
그 다음에 end_stream이면서 decode 완료 여부를 확인한다.
conn_manager_impl.cc
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
bool end_stream) {
....(중략)...
// We need to snap snapped_route_config_ here as it's used in mutateRequestHeaders later.
if (connection_manager_.config_.isRoutable()) {
if (connection_manager_.config_.routeConfigProvider() != nullptr) {
snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
} else if (connection_manager_.config_.scopedRouteConfigProvider() != nullptr) {
snapped_scoped_routes_config_ =
connection_manager_.config_.scopedRouteConfigProvider()->config<Router::ScopedConfig>();
snapScopedRouteConfig();
}
} else {
snapped_route_config_ = connection_manager_.config_.routeConfigProvider()->configCast();
}
...(중략)...
}
Connection Manager로부터 Route Config가 존재하는지를 확인하고 존재하면 해당 Config 정보를 가져온다. 만약 ScopedRouteConfig가 설정되어있으면, 해당 정보를 가져오고 그렇지 않을 경우에는 Route 정보를 가져올 것이다.
conn_manager_impl.cc
void ConnectionManagerImpl::ActiveStream::snapScopedRouteConfig() {
// NOTE: if a RDS subscription hasn't got a RouteConfiguration back, a Router::NullConfigImpl is
// returned, in that case we let it pass.
snapped_route_config_ = snapped_scoped_routes_config_->getRouteConfig(*request_headers_);
if (snapped_route_config_ == nullptr) {
ENVOY_STREAM_LOG(trace, "can't find SRDS scope.", *this);
// TODO(stevenzzzz): Consider to pass an error message to router filter, so that it can
// send back 404 with some more details.
snapped_route_config_ = std::make_shared<Router::NullConfigImpl>();
}
}
scoped_config_impl.cc
Router::ConfigConstSharedPtr
ScopedConfigImpl::getRouteConfig(const Http::HeaderMap& headers) const {
ScopeKeyPtr scope_key = scope_key_builder_.computeScopeKey(headers);
if (scope_key == nullptr) {
return nullptr;
}
auto iter = scoped_route_info_by_key_.find(scope_key->hash());
if (iter != scoped_route_info_by_key_.end()) {
return iter->second->routeConfig();
}
return nullptr;
}
그외 route를 위한 사전 작업을 수행한다.
conn_manager_impl.cc
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
bool end_stream) {
...(중략)...
if (!state_.is_internally_created_) { // Only sanitize headers on first pass.
// Modify the downstream remote address depending on configuration and headers.
const auto mutate_result = ConnectionManagerUtility::mutateRequestHeaders(
*request_headers_, connection_manager_.read_callbacks_->connection(),
connection_manager_.config_, *snapped_route_config_, connection_manager_.local_info_);
// IP detection failed, reject the request.
if (mutate_result.reject_request.has_value()) {
const auto& reject_request_params = mutate_result.reject_request.value();
connection_manager_.stats_.named_.downstream_rq_rejected_via_ip_detection_.inc();
sendLocalReply(reject_request_params.response_code, reject_request_params.body, nullptr,
absl::nullopt,
StreamInfo::ResponseCodeDetails::get().OriginalIPDetectionFailed);
return;
}
filter_manager_.setDownstreamRemoteAddress(mutate_result.final_remote_address);
}
...(중략)...
}
그 다음에 하는 일은 DownStreamRemoteAddress를 확인해서 설정하는 작업을 수행한다.
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
bool end_stream) {
...(중략)...
refreshCachedRoute();
if (!state_.is_internally_created_) { // Only mutate tracing headers on first pass.
filter_manager_.streamInfo().setTraceReason(
ConnectionManagerUtility::mutateTracingRequestHeader(
*request_headers_, connection_manager_.runtime_, connection_manager_.config_,
cached_route_.value().get()));
}
filter_manager_.streamInfo().setRequestHeaders(*request_headers_);
const bool upgrade_rejected = filter_manager_.createFilterChain() == false;
// TODO if there are no filters when starting a filter iteration, the connection manager
// should return 404. The current returns no response if there is no router filter.
if (hasCachedRoute()) {
// Do not allow upgrades if the route does not support it.
if (upgrade_rejected) {
// While downstream servers should not send upgrade payload without the upgrade being
// accepted, err on the side of caution and refuse to process any further requests on this
// connection, to avoid a class of HTTP/1.1 smuggling bugs where Upgrade or CONNECT payload
// contains a smuggled HTTP request.
state_.saw_connection_close_ = true;
connection_manager_.stats_.named_.downstream_rq_ws_on_non_ws_route_.inc();
sendLocalReply(Code::Forbidden, "", nullptr, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().UpgradeFailed);
return;
}
// Allow non websocket requests to go through websocket enabled routes.
}
// Check if tracing is enabled.
if (connection_manager_tracing_config_.has_value()) {
traceRequest();
}
filter_manager_.decodeHeaders(*request_headers_, end_stream);
// Reset it here for both global and overridden cases.
resetIdleTimer();
}
void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::RouteCallback& cb) {
Router::RouteConstSharedPtr route;
if (request_headers_ != nullptr) {
if (connection_manager_.config_.isRoutable() &&
connection_manager_.config_.scopedRouteConfigProvider() != nullptr) {
// NOTE: re-select scope as well in case the scope key header has been changed by a filter.
snapScopedRouteConfig();
}
if (snapped_route_config_ != nullptr) {
route = snapped_route_config_->route(cb, *request_headers_, filter_manager_.streamInfo(),
stream_id_);
}
}
setRoute(route);
}
conn_manager_impl.cc
void ConnectionManagerImpl::ActiveStream::snapScopedRouteConfig() {
// NOTE: if a RDS subscription hasn't got a RouteConfiguration back, a Router::NullConfigImpl is
// returned, in that case we let it pass.
snapped_route_config_ = snapped_scoped_routes_config_->getRouteConfig(*request_headers_);
if (snapped_route_config_ == nullptr) {
ENVOY_STREAM_LOG(trace, "can't find SRDS scope.", *this);
// TODO(stevenzzzz): Consider to pass an error message to router filter, so that it can
// send back 404 with some more details.
snapped_route_config_ = std::make_shared<Router::NullConfigImpl>();
}
}
void ConnectionManagerImpl::ActiveStream::decodeHeaders(RequestHeaderMapPtr&& headers,
bool end_stream) {
...(중략)...
filter_manager_.streamInfo().setRequestHeaders(*request_headers_);
...(중략)...
// Check if tracing is enabled.
if (connection_manager_tracing_config_.has_value()) {
traceRequest();
}
filter_manager_.decodeHeaders(*request_headers_, end_stream);
// Reset it here for both global and overridden cases.
resetIdleTimer();
}
그 다음에는 filter_manager에게 request header 정보를 저장하고 decodeheaders를 통해서 Route 과정을 수행할 수 있또록 진행한다.
void FilterManager::decodeHeaders(ActiveStreamDecoderFilter* filter, RequestHeaderMap& headers,
bool end_stream) {
// Headers filter iteration should always start with the next filter if available.
std::list<ActiveStreamDecoderFilterPtr>::iterator entry =
commonDecodePrefix(filter, FilterIterationStartState::AlwaysStartFromNext);
std::list<ActiveStreamDecoderFilterPtr>::iterator continue_data_entry = decoder_filters_.end();
for (; entry != decoder_filters_.end(); entry++) {
ASSERT(!(state_.filter_call_state_ & FilterCallState::DecodeHeaders));
state_.filter_call_state_ |= FilterCallState::DecodeHeaders;
(*entry)->end_stream_ = (end_stream && continue_data_entry == decoder_filters_.end());
FilterHeadersStatus status = (*entry)->decodeHeaders(headers, (*entry)->end_stream_);
if (state_.decoder_filter_chain_aborted_) {
ENVOY_STREAM_LOG(trace,
"decodeHeaders filter iteration aborted due to local reply: filter={}",
*this, (*entry)->filter_context_.config_name);
status = FilterHeadersStatus::StopIteration;
}
ASSERT(!(status == FilterHeadersStatus::ContinueAndDontEndStream && !(*entry)->end_stream_),
"Filters should not return FilterHeadersStatus::ContinueAndDontEndStream from "
"decodeHeaders when end_stream is already false");
state_.filter_call_state_ &= ~FilterCallState::DecodeHeaders;
ENVOY_STREAM_LOG(trace, "decode headers called: filter={} status={}", *this,
(*entry)->filter_context_.config_name, static_cast<uint64_t>(status));
(*entry)->decode_headers_called_ = true;
const auto continue_iteration = (*entry)->commonHandleAfterHeadersCallback(status, end_stream);
ENVOY_BUG(!continue_iteration || !state_.local_complete_,
"Filter did not return StopAll or StopIteration after sending a local reply.");
// If this filter ended the stream, decodeComplete() should be called for it.
if ((*entry)->end_stream_) {
(*entry)->handle_->decodeComplete();
}
// Skip processing metadata after sending local reply
if (state_.local_complete_ && std::next(entry) != decoder_filters_.end()) {
maybeContinueDecoding(continue_data_entry);
return;
}
const bool new_metadata_added = processNewlyAddedMetadata();
// If end_stream is set in headers, and a filter adds new metadata, we need to delay end_stream
// in headers by inserting an empty data frame with end_stream set. The empty data frame is sent
// after the new metadata.
if ((*entry)->end_stream_ && new_metadata_added && !buffered_request_data_) {
Buffer::OwnedImpl empty_data("");
ENVOY_STREAM_LOG(
trace, "inserting an empty data frame for end_stream due metadata being added.", *this);
// Metadata frame doesn't carry end of stream bit. We need an empty data frame to end the
// stream.
addDecodedData(*((*entry).get()), empty_data, true);
}
if (!continue_iteration && std::next(entry) != decoder_filters_.end()) {
// Stop iteration IFF this is not the last filter. If it is the last filter, continue with
// processing since we need to handle the case where a terminal filter wants to buffer, but
// a previous filter has added body.
maybeContinueDecoding(continue_data_entry);
return;
}
// Here we handle the case where we have a header only request, but a filter adds a body
// to it. We need to not raise end_stream = true to further filters during inline iteration.
if (end_stream && buffered_request_data_ && continue_data_entry == decoder_filters_.end()) {
continue_data_entry = entry;
}
}
maybeContinueDecoding(continue_data_entry);
if (end_stream) {
disarmRequestTimeout();
}
}
그러면 HTTP Connection Manager에는 cors, fault, router 등과 같은 필터가 있을 수 있는데, 해당 필터를 수행한다. 여기서는 router가 수행됨을 가정해보자.
void addStreamDecoderFilter(ActiveStreamDecoderFilterPtr filter) {
// Note: configured decoder filters are appended to decoder_filters_.
// This means that if filters are configured in the following order (assume all three filters
// are both decoder/encoder filters):
// http_filters:
// - A
// - B
// - C
// The decoder filter chain will iterate through filters A, B, C.
LinkedList::moveIntoListBack(std::move(filter), decoder_filters_);
}
참고로 Router 필터 등록 시에는 위와 같은 필터가 decoder_filters로 등록되어있다.
router.cc
Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers, bool end_stream) {
downstream_headers_ = &headers;
// Extract debug configuration from filter state. This is used further along to determine whether
// we should append cluster and host headers to the response, and whether to forward the request
// upstream.
const StreamInfo::FilterStateSharedPtr& filter_state = callbacks_->streamInfo().filterState();
const DebugConfig* debug_config = filter_state->getDataReadOnly<DebugConfig>(DebugConfig::key());
// TODO: Maybe add a filter API for this.
grpc_request_ = Grpc::Common::isGrpcRequestHeaders(headers);
exclude_http_code_stats_ = grpc_request_ && config_.suppress_grpc_request_failure_code_stats_;
// Only increment rq total stat if we actually decode headers here. This does not count requests
// that get handled by earlier filters.
stats_.rq_total_.inc();
// Initialize the `modify_headers` function as a no-op (so we don't have to remember to check it
// against nullptr before calling it), and feed it behavior later if/when we have cluster info
// headers to append.
std::function<void(Http::ResponseHeaderMap&)> modify_headers = [](Http::ResponseHeaderMap&) {};
// Determine if there is a route entry or a direct response for the request.
route_ = callbacks_->route();
if (!route_) {
stats_.no_route_.inc();
ENVOY_STREAM_LOG(debug, "no route match for URL '{}'", *callbacks_, headers.getPathValue());
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound);
callbacks_->sendLocalReply(Http::Code::NotFound, "", modify_headers, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().RouteNotFound);
return Http::FilterHeadersStatus::StopIteration;
}
// Determine if there is a direct response for the request.
const auto* direct_response = route_->directResponseEntry();
if (direct_response != nullptr) {
stats_.rq_direct_response_.inc();
direct_response->rewritePathHeader(headers, !config_.suppress_envoy_headers_);
callbacks_->streamInfo().setRouteName(direct_response->routeName());
callbacks_->sendLocalReply(
direct_response->responseCode(), direct_response->responseBody(),
[this, direct_response,
&request_headers = headers](Http::ResponseHeaderMap& response_headers) -> void {
std::string new_path;
if (request_headers.Path()) {
new_path = direct_response->newPath(request_headers);
}
// See https://tools.ietf.org/html/rfc7231#section-7.1.2.
const auto add_location =
direct_response->responseCode() == Http::Code::Created ||
Http::CodeUtility::is3xx(enumToInt(direct_response->responseCode()));
if (!new_path.empty() && add_location) {
response_headers.addReferenceKey(Http::Headers::get().Location, new_path);
}
direct_response->finalizeResponseHeaders(response_headers, callbacks_->streamInfo());
},
absl::nullopt, StreamInfo::ResponseCodeDetails::get().DirectResponse);
return Http::FilterHeadersStatus::StopIteration;
}
// A route entry matches for the request.
route_entry_ = route_->routeEntry();
// If there's a route specific limit and it's smaller than general downstream
// limits, apply the new cap.
retry_shadow_buffer_limit_ =
std::min(retry_shadow_buffer_limit_, route_entry_->retryShadowBufferLimit());
callbacks_->streamInfo().setRouteName(route_entry_->routeName());
if (debug_config && debug_config->append_cluster_) {
// The cluster name will be appended to any local or upstream responses from this point.
modify_headers = [this, debug_config](Http::ResponseHeaderMap& headers) {
headers.addCopy(debug_config->cluster_header_.value_or(Http::Headers::get().EnvoyCluster),
route_entry_->clusterName());
};
}
Upstream::ThreadLocalCluster* cluster =
config_.cm_.getThreadLocalCluster(route_entry_->clusterName());
if (!cluster) {
stats_.no_cluster_.inc();
ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName());
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoClusterFound);
callbacks_->sendLocalReply(route_entry_->clusterNotFoundResponseCode(), "", modify_headers,
absl::nullopt,
StreamInfo::ResponseCodeDetails::get().ClusterNotFound);
return Http::FilterHeadersStatus::StopIteration;
}
cluster_ = cluster->info();
// Set up stat prefixes, etc.
request_vcluster_ = route_entry_->virtualCluster(headers);
if (request_vcluster_ != nullptr) {
callbacks_->streamInfo().setVirtualClusterName(request_vcluster_->name());
}
route_stats_context_ = route_entry_->routeStatsContext();
ENVOY_STREAM_LOG(debug, "cluster '{}' match for URL '{}'", *callbacks_,
route_entry_->clusterName(), headers.getPathValue());
if (config_.strict_check_headers_ != nullptr) {
for (const auto& header : *config_.strict_check_headers_) {
const auto res = FilterUtility::StrictHeaderChecker::checkHeader(headers, header);
if (!res.valid_) {
callbacks_->streamInfo().setResponseFlag(
StreamInfo::ResponseFlag::InvalidEnvoyRequestHeaders);
const std::string body = fmt::format("invalid header '{}' with value '{}'",
std::string(res.entry_->key().getStringView()),
std::string(res.entry_->value().getStringView()));
const std::string details =
absl::StrCat(StreamInfo::ResponseCodeDetails::get().InvalidEnvoyRequestHeaders, "{",
StringUtil::replaceAllEmptySpace(res.entry_->key().getStringView()), "}");
callbacks_->sendLocalReply(Http::Code::BadRequest, body, nullptr, absl::nullopt, details);
return Http::FilterHeadersStatus::StopIteration;
}
}
}
const Http::HeaderEntry* request_alt_name = headers.EnvoyUpstreamAltStatName();
if (request_alt_name) {
alt_stat_prefix_ = std::make_unique<Stats::StatNameDynamicStorage>(
request_alt_name->value().getStringView(), config_.scope_.symbolTable());
headers.removeEnvoyUpstreamAltStatName();
}
// See if we are supposed to immediately kill some percentage of this cluster's traffic.
if (cluster_->maintenanceMode()) {
callbacks_->streamInfo().setResponseFlag(StreamInfo::ResponseFlag::UpstreamOverflow);
chargeUpstreamCode(Http::Code::ServiceUnavailable, nullptr, true);
callbacks_->sendLocalReply(
Http::Code::ServiceUnavailable, "maintenance mode",
[modify_headers, this](Http::ResponseHeaderMap& headers) {
if (!config_.suppress_envoy_headers_) {
headers.addReference(Http::Headers::get().EnvoyOverloaded,
Http::Headers::get().EnvoyOverloadedValues.True);
}
// Note: append_cluster_info does not respect suppress_envoy_headers.
modify_headers(headers);
},
absl::nullopt, StreamInfo::ResponseCodeDetails::get().MaintenanceMode);
cluster_->stats().upstream_rq_maintenance_mode_.inc();
return Http::FilterHeadersStatus::StopIteration;
}
// Fetch a connection pool for the upstream cluster.
const auto& upstream_http_protocol_options = cluster_->upstreamHttpProtocolOptions();
if (upstream_http_protocol_options.has_value() &&
(upstream_http_protocol_options.value().auto_sni() ||
upstream_http_protocol_options.value().auto_san_validation())) {
// Default the header to Host/Authority header.
absl::string_view header_value = headers.getHostValue();
// Check whether `override_auto_sni_header` is specified.
const auto override_auto_sni_header =
upstream_http_protocol_options.value().override_auto_sni_header();
if (!override_auto_sni_header.empty()) {
// Use the header value from `override_auto_sni_header` to set the SNI value.
const auto overridden_header_value = Http::HeaderUtility::getAllOfHeaderAsString(
headers, Http::LowerCaseString(override_auto_sni_header));
if (overridden_header_value.result().has_value() &&
!overridden_header_value.result().value().empty()) {
header_value = overridden_header_value.result().value();
}
}
const auto parsed_authority = Http::Utility::parseAuthority(header_value);
bool should_set_sni = !parsed_authority.is_ip_address_;
// `host_` returns a string_view so doing this should be safe.
absl::string_view sni_value = parsed_authority.host_;
if (should_set_sni && upstream_http_protocol_options.value().auto_sni()) {
callbacks_->streamInfo().filterState()->setData(
Network::UpstreamServerName::key(),
std::make_unique<Network::UpstreamServerName>(sni_value),
StreamInfo::FilterState::StateType::Mutable);
}
if (upstream_http_protocol_options.value().auto_san_validation()) {
callbacks_->streamInfo().filterState()->setData(
Network::UpstreamSubjectAltNames::key(),
std::make_unique<Network::UpstreamSubjectAltNames>(
std::vector<std::string>{std::string(sni_value)}),
StreamInfo::FilterState::StateType::Mutable);
}
}
transport_socket_options_ = Network::TransportSocketOptionsUtility::fromFilterState(
*callbacks_->streamInfo().filterState());
if (auto downstream_connection = downstreamConnection(); downstream_connection != nullptr) {
if (auto typed_state = downstream_connection->streamInfo()
.filterState()
.getDataReadOnly<Network::UpstreamSocketOptionsFilterState>(
Network::UpstreamSocketOptionsFilterState::key());
typed_state != nullptr) {
auto downstream_options = typed_state->value();
if (!upstream_options_) {
upstream_options_ = std::make_shared<Network::Socket::Options>();
}
Network::Socket::appendOptions(upstream_options_, downstream_options);
}
}
if (upstream_options_ && callbacks_->getUpstreamSocketOptions()) {
Network::Socket::appendOptions(upstream_options_, callbacks_->getUpstreamSocketOptions());
}
std::unique_ptr<GenericConnPool> generic_conn_pool = createConnPool(*cluster);
if (!generic_conn_pool) {
sendNoHealthyUpstreamResponse();
return Http::FilterHeadersStatus::StopIteration;
}
Upstream::HostDescriptionConstSharedPtr host = generic_conn_pool->host();
if (debug_config && debug_config->append_upstream_host_) {
// The hostname and address will be appended to any local or upstream responses from this point,
// possibly in addition to the cluster name.
modify_headers = [modify_headers, debug_config, host](Http::ResponseHeaderMap& headers) {
modify_headers(headers);
headers.addCopy(
debug_config->hostname_header_.value_or(Http::Headers::get().EnvoyUpstreamHostname),
host->hostname());
headers.addCopy(debug_config->host_address_header_.value_or(
Http::Headers::get().EnvoyUpstreamHostAddress),
host->address()->asString());
};
}
// If we've been instructed not to forward the request upstream, send an empty local response.
if (debug_config && debug_config->do_not_forward_) {
modify_headers = [modify_headers, debug_config](Http::ResponseHeaderMap& headers) {
modify_headers(headers);
headers.addCopy(
debug_config->not_forwarded_header_.value_or(Http::Headers::get().EnvoyNotForwarded),
"true");
};
callbacks_->sendLocalReply(Http::Code::NoContent, "", modify_headers, absl::nullopt, "");
return Http::FilterHeadersStatus::StopIteration;
}
hedging_params_ = FilterUtility::finalHedgingParams(*route_entry_, headers);
timeout_ = FilterUtility::finalTimeout(*route_entry_, headers, !config_.suppress_envoy_headers_,
grpc_request_, hedging_params_.hedge_on_per_try_timeout_,
config_.respect_expected_rq_timeout_);
const Http::HeaderEntry* header_max_stream_duration_entry =
headers.EnvoyUpstreamStreamDurationMs();
if (header_max_stream_duration_entry) {
dynamic_max_stream_duration_ =
FilterUtility::tryParseHeaderTimeout(*header_max_stream_duration_entry);
headers.removeEnvoyUpstreamStreamDurationMs();
}
// If this header is set with any value, use an alternate response code on timeout
if (headers.EnvoyUpstreamRequestTimeoutAltResponse()) {
timeout_response_code_ = Http::Code::NoContent;
headers.removeEnvoyUpstreamRequestTimeoutAltResponse();
}
include_attempt_count_in_request_ = route_entry_->includeAttemptCountInRequest();
if (include_attempt_count_in_request_) {
headers.setEnvoyAttemptCount(attempt_count_);
}
// The router has reached a point where it is going to try to send a request upstream,
// so now modify_headers should attach x-envoy-attempt-count to the downstream response if the
// config flag is true.
if (route_entry_->includeAttemptCountInResponse()) {
modify_headers = [modify_headers, this](Http::ResponseHeaderMap& headers) {
modify_headers(headers);
// This header is added without checking for config_.suppress_envoy_headers_ to mirror what is
// done for upstream requests.
headers.setEnvoyAttemptCount(attempt_count_);
};
}
callbacks_->streamInfo().setAttemptCount(attempt_count_);
route_entry_->finalizeRequestHeaders(headers, callbacks_->streamInfo(),
!config_.suppress_envoy_headers_);
FilterUtility::setUpstreamScheme(
headers, callbacks_->streamInfo().downstreamAddressProvider().sslConnection() != nullptr);
// Ensure an http transport scheme is selected before continuing with decoding.
ASSERT(headers.Scheme());
retry_state_ =
createRetryState(route_entry_->retryPolicy(), headers, *cluster_, request_vcluster_,
route_stats_context_, config_.runtime_, config_.random_,
callbacks_->dispatcher(), config_.timeSource(), route_entry_->priority());
// Determine which shadow policies to use. It's possible that we don't do any shadowing due to
// runtime keys. Also the method CONNECT doesn't support shadowing.
auto method = headers.getMethodValue();
if (method != Http::Headers::get().MethodValues.Connect) {
for (const auto& shadow_policy : route_entry_->shadowPolicies()) {
const auto& policy_ref = *shadow_policy;
if (FilterUtility::shouldShadow(policy_ref, config_.runtime_, callbacks_->streamId())) {
active_shadow_policies_.push_back(std::cref(policy_ref));
shadow_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>(*downstream_headers_);
}
}
}
ENVOY_STREAM_LOG(debug, "router decoding headers:\n{}", *callbacks_, headers);
// Hang onto the modify_headers function for later use in handling upstream responses.
modify_headers_ = modify_headers;
conn_pool_new_stream_with_early_data_and_http3_ =
Runtime::runtimeFeatureEnabled(Runtime::conn_pool_new_stream_with_early_data_and_http3);
const bool can_send_early_data =
conn_pool_new_stream_with_early_data_and_http3_ &&
route_entry_->earlyDataPolicy().allowsEarlyDataForRequest(*downstream_headers_);
// Set initial HTTP/3 use based on the presence of HTTP/1.1 proxy config.
// For retries etc, HTTP/3 usability may transition from true to false, but
// will never transition from false to true.
bool can_use_http3 =
!transport_socket_options_ || !transport_socket_options_->http11ProxyInfo().has_value();
UpstreamRequestPtr upstream_request = std::make_unique<UpstreamRequest>(
*this, std::move(generic_conn_pool), can_send_early_data, can_use_http3);
LinkedList::moveIntoList(std::move(upstream_request), upstream_requests_);
upstream_requests_.front()->acceptHeadersFromRouter(end_stream);
if (end_stream) {
onRequestComplete();
}
return Http::FilterHeadersStatus::StopIteration;
}
void
http_parser_pause(http_parser *parser, int paused) {
/* Users should only be pausing/unpausing a parser that is not in an error
* state. In non-debug builds, there's not much that we can do about this
* other than ignore it.
*/
if (HTTP_PARSER_ERRNO(parser) == HPE_OK ||
HTTP_PARSER_ERRNO(parser) == HPE_PAUSED) {
uint32_t nread = parser->nread; /* used by the SET_ERRNO macro */
SET_ERRNO((paused) ? HPE_PAUSED : HPE_OK);
} else {
assert(0 && "Attempting to pause parser in error state");
}
}
Parsing 과정이 끝나면 다시 dispatchSlice에서부터 시작함
codec_impl.cc
Envoy::StatusOr<size_t> ConnectionImpl::dispatchSlice(const char* slice, size_t len) {
ASSERT(codec_status_.ok() && dispatching_);
const size_t nread = parser_->execute(slice, len);
if (!codec_status_.ok()) {
return codec_status_;
}
const ParserStatus status = parser_->getStatus();
if (status != ParserStatus::Ok && status != ParserStatus::Paused) {
absl::string_view error = Http1ResponseCodeDetails::get().HttpCodecError;
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser")) {
if (parser_->errorMessage() == "headers size exceeds limit" ||
parser_->errorMessage() == "trailers size exceeds limit") {
error = Http1ResponseCodeDetails::get().HeadersTooLarge;
} else if (parser_->errorMessage() == "header value contains invalid chars") {
error = Http1ResponseCodeDetails::get().InvalidCharacters;
}
}
RETURN_IF_ERROR(sendProtocolError(error));
// Avoid overwriting the codec_status_ set in the callbacks.
ASSERT(codec_status_.ok());
codec_status_ =
codecProtocolError(absl::StrCat("http/1.1 protocol error: ", parser_->errorMessage()));
return codec_status_;
}
return nread;
}
Parsing 과정이 끝나면 Parser는 Paused 상태가 되니 Parser를 통해 읽은 size 개수를 반환한다.
codec_impl.cc
Http::Status ConnectionImpl::dispatch(Buffer::Instance& data) {
...(중략),,,
ssize_t total_parsed = 0;
if (data.length() > 0) {
current_dispatching_buffer_ = &data;
while (data.length() > 0) {
...(중략)...
if (!dispatching_slice_already_drained_) {
ASSERT(statusor_parsed.value() <= slice.len_);
data.drain(statusor_parsed.value());
}
total_parsed += statusor_parsed.value();
if (parser_->getStatus() != ParserStatus::Ok) {
// Parse errors trigger an exception in dispatchSlice so we are guaranteed to be paused at
// this point.
ASSERT(parser_->getStatus() == ParserStatus::Paused);
break;
}
}
current_dispatching_buffer_ = nullptr;
dispatchBufferedBody();
} else {
auto result = dispatchSlice(nullptr, 0);
if (!result.ok()) {
return result.status();
}
}
ASSERT(buffered_body_.length() == 0);
ENVOY_CONN_LOG(trace, "parsed {} bytes", connection_, total_parsed);
// If an upgrade has been handled and there is body data or early upgrade
// payload to send on, send it on.
maybeDirectDispatch(data);
return Http::okStatus();
}
buffer_impl.h
void drain(uint64_t size) {
ASSERT(data_ + size <= reservable_);
data_ += size;
if (data_ == reservable_) {
// All the data in the slice has been drained. Reset the offsets so all
// the data can be reused.
data_ = 0;
reservable_ = 0;
}
}
codec_impl.cc
Http::Status ConnectionImpl::dispatch(Buffer::Instance& data) {
...(중략),,,
ssize_t total_parsed = 0;
if (data.length() > 0) {
current_dispatching_buffer_ = &data;
while (data.length() > 0) {
...(중략)...
if (parser_->getStatus() != ParserStatus::Ok) {
// Parse errors trigger an exception in dispatchSlice so we are guaranteed to be paused at
// this point.
ASSERT(parser_->getStatus() == ParserStatus::Paused);
break;
}
}
current_dispatching_buffer_ = nullptr;
dispatchBufferedBody();
} else {
auto result = dispatchSlice(nullptr, 0);
if (!result.ok()) {
return result.status();
}
}
ASSERT(buffered_body_.length() == 0);
ENVOY_CONN_LOG(trace, "parsed {} bytes", connection_, total_parsed);
// If an upgrade has been handled and there is body data or early upgrade
// payload to send on, send it on.
maybeDirectDispatch(data);
return Http::okStatus();
}
bool ConnectionImpl::maybeDirectDispatch(Buffer::Instance& data) {
if (!handling_upgrade_) {
// Only direct dispatch for Upgrade requests.
return false;
}
ENVOY_CONN_LOG(trace, "direct-dispatched {} bytes", connection_, data.length());
onBody(data);
data.drain(data.length());
return true;
}
parser가 완료되면 return fals를 통해 더 이상 데이터 fetch를 수행하지 않는다.
codec_impl.cc
Http::Status ServerConnectionImpl::dispatch(Buffer::Instance& data) {
...(중략)...
if (active_request_ != nullptr && active_request_->remote_complete_) {
// Read disable the connection if the downstream is sending additional data while we are working
// on an existing request. Reading from the connection will be re-enabled after the active
// request is completed.
if (data.length() > 0) {
active_request_->response_encoder_.readDisable(true);
}
}
return status;
}
상태를 반환한다.
conn_manager_impl.cc
Network::FilterStatus ConnectionManagerImpl::onData(Buffer::Instance& data, bool) {
...(중략)...
bool redispatch;
do {
...(중략)...
if (isBufferFloodError(status) || isInboundFramesWithEmptyPayloadError(status)) {
handleCodecError(status.message());
return Network::FilterStatus::StopIteration;
} else if (isCodecProtocolError(status)) {
stats_.named_.downstream_cx_protocol_error_.inc();
handleCodecError(status.message());
return Network::FilterStatus::StopIteration;
}
ASSERT(status.ok());
// Processing incoming data may release outbound data so check for closure here as well.
checkForDeferredClose(false);
// The HTTP/1 codec will pause dispatch after a single message is complete. We want to
// either redispatch if there are no streams and we have more data. If we have a single
// complete non-WebSocket stream but have not responded yet we will pause socket reads
// to apply back pressure.
if (codec_->protocol() < Protocol::Http2) {
if (read_callbacks_->connection().state() == Network::Connection::State::Open &&
data.length() > 0 && streams_.empty()) {
redispatch = true;
}
}
} while (redispatch);
if (!read_callbacks_->connection().streamInfo().protocol()) {
read_callbacks_->connection().streamInfo().protocol(codec_->protocol());
}
return Network::FilterStatus::StopIteration;
}
conn_manager_impl.cc
void ConnectionManagerImpl::checkForDeferredClose(bool skip_delay_close) {
Network::ConnectionCloseType close = Network::ConnectionCloseType::FlushWriteAndDelay;
if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.skip_delay_close") &&
skip_delay_close) {
close = Network::ConnectionCloseType::FlushWrite;
}
if (drain_state_ == DrainState::Closing && streams_.empty() && !codec_->wantsToWrite()) {
doConnectionClose(close, absl::nullopt,
StreamInfo::ResponseCodeDetails::get().DownstreamLocalDisconnect);
}
}
void ConnectionImpl::onReadReady() {
...(중략)...
read_end_stream_ |= result.end_stream_read_;
if (result.bytes_processed_ != 0 || result.end_stream_read_ ||
(latched_dispatch_buffered_data && read_buffer_->length() > 0)) {
// Skip onRead if no bytes were processed unless we explicitly want to force onRead for
// buffered data. For instance, skip onRead if the connection was closed without producing
// more data.
onRead(new_buffer_size);
}
// The read callback may have already closed the connection.
if (result.action_ == PostIoAction::Close || bothSidesHalfClosed()) {
ENVOY_CONN_LOG(debug, "remote close", *this);
closeSocket(ConnectionEvent::RemoteClose);
}
}
onRead가 끝난 이후 IoAction이 Close된 경우 소켓을 종료한다. 여기서는 정상적인 연결이기 때문에 해당 로직을 수행하지 않는다.
connection_impl.cc
void ConnectionImpl::onFileEvent(uint32_t events) {
...(중략)...
// It's possible for a write event callback to close the socket (which will cause fd_ to be -1).
// In this case ignore read event processing.
if (ioHandle().isOpen() && (events & Event::FileReadyType::Read)) {
onReadReady();
}
}
Kafka는 분산 시스템으로써, 수많은 Broker와 Producer, Consumer끼리 통신을 수행하며, Broker 간에도 데이터 복제 및 상태 중재를 위한 네트워크 통신이 사용된다.
그렇다면, Kafka에서는 어떠한 방식으로 통신을 수행할까?
일반적으로 생각해보면, HTTP Protocol을 사용하는 방법으로 이미 구현된 HTTP 프레임워크를 사용하는 것이 개발 입장에서는 쉬울 것이다. 하지만 Kafka 커미터들은 다음과 같은 이슈로 인하여 자체 통신 체계를 구축했다고 한다.
Kafka에서는 빠른 메시지 전달을 위해 Kafka에 최적화된 통신 체계가 필요하다. 또한 커다란 프레임워크 코드 영역에서 Kafka가 필요한 부분은 일부에 불과하다.
라이브러리 의존성과 버전 관리의 어려움이다.
Kafka는 메시지 전달을 위해 빠른 네트워크 성능이 필요하기 때문에, 고성능 통신을 위해 간결하면서도 최적화된 방식이 필요하다. 그리고 라이브러리 및 의존성 문제에서 자유로워야한다. 만약 다른 프레임워크에 의존하게된다면, Broker 및 Client 모두 해당 라이브러리에 대한 강한 의존성이 생긴다. 이는 버전 관리의 어려움이 존재하게되며, Kafka 라이브러리를 포함하는 Client의 파일 크기 또한 커지게 된다. 따라서, 위 두 가지 이슈로 인해 자체 네트워크 모델을 구축하으며, 빠른 메시지 전달과 동시 신뢰성 있는 데이터 전달이 중요하기 때문에 UDP 기반이 아닌 TCP 기반위에서 동작하도록 자체 Protocol을 설계했다.
Kafka 네트워크 모델 기반에는 Java의 NIO API가 광범위하게 사용된다. 따라서, Kafka 네트워크 모델을 살펴보기 앞서 NIO에서 Network 연관 부분만 살펴보자.
NIO
NIO는 New IO의 약자로써 기존 IO 방식에서 발생하는 Blocking 이슈를 개선하기 위해 Java 1.4 부터 새롭게 도입된 기능(JSR-51)이며, 다음과 같은 특징을 지니고 있다.
Non-Blocking
IO Multiplexing
위 두 가지 특징을 기반으로 NIO가 기존 IO와 무엇이 다른지에 대해 살펴보자.
일반적으로 Socket을 활용한 네트워킹 과정을 살펴보면 위와 같다. 여기서 기존 I/O 방식은 Client의 연결을 받아들이는 Accept 부분과 Read, Write 연산 등은 모두 Blocking된다. 이는 즉 Accept의 경우 요청이 들어올 때까지 요청을 반환하지 않음을 의미하며, Read, Write의 작업이 수행되는 동안에도 결과를 리턴 하지 않고 끝날 때까지 대기함을 의미한다.
이러한 경우 일반적인 단일 소켓으로는 여러 요청을 효과적으로 처리할 수 없다. 그 이유는 여러 요청을 빠르게 처리하기 위해서는 Blocking되어 Idle한 시간을 효율적으로 분배하여 다른 작업을 처리 해야하는데, 기존 구조로는 이에 대해 효과적으로 대응할 수 없기 때문이다.
따라서 기존에는 동기식 I/O 방식의 문제점을 해결하고자 각 Client의 연결 요청에 대해 이를 처리할 수 있는 Thread를 생성 후 매핑을 통해 동시성을 해결하고자 했다.
하지만 위와 같이 Multi Thread 방식의 네트워크 통신 방법에는 한계가 존재한다. 그 이유는 아무리 Thread가 Process보다는 가볍다고 하나 개별 요청 별 Thread를 할당하는 방식은 메모리 사용 및 Context Switching에 따른 Overhead가 크기 때문이다. 가령 Thread별 스택을 1M씩만 할당한다고 가정하더라도 1024개 사용자 요청을 처리하기 위해서 생성되는 스택만 1GB가 사용될 것이다. 따라서, 사용자가 증가할 수록 처리량은 감소하게된다.
그렇다면, Non-Blocking으로 처리하면 어떻게 될까?
Non-Blocking 방식은 기존의 Blocking I/O를 유발하는 메소드에 대해서 추가 설정을 통해 Non-Blocking 형태로 구성할 수 있다. 즉 기존에는 메소드 호출 시 작업이 완료될 때까지 기다렸지만, 설정 이후에는 메소드 결과가 즉시 반환하기 때문에, 하나의 소켓 서버 Thread가 여러개의 I/O를 처리할 수 있게되었다.
Non-Blocking 설정 이후로는 위 그림과 같이 단일 Thread에서 여러 사용자의 Channel과 매핑되어 데이터를 처리할 수 있게 되었다. 위 그림만 보면 Thread 개수를 줄일 수 있으니 성능이 많이 향상될 것으로 보인다. 하지만 다음과 같은 문제가 존재한다.
Blocking I/O 방식으로 처리할 경우에는 순차적으로 처리하므로 Blocking 메소드를 벗어났다는 것은 해당 처리가 완료되었음이 어느정도 보장된다. 하지만 Non-Blocking 방식은 작업 요청과 별개로 바로 리턴이 되기 때문에 실제 요청 여부를 확인하기 위해서는 주기적으로 소켓 정보를 Polling하여 처리 가능 여부를 확인해야한다.
List<SocketChannel> channels = new ArrayList<>();
ServerSocketChannel serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(9002));
serverSocket.configureBlocking(false);
while (true) {
SocketChannel channel = serverSocket.accept();
if (null != channel) {
socketChannel.configureBlocking(false);
channels.add(channel);
}
Iterator<SocketChannel> iterator = channels.iterator();
while (iterator.hasNext()) {
SocketChannel channel = iterator.next();
..(Read 요청 확인 수행 및 처리)...
}
}
위 코드는 configureBlocking 설정을 통해서 Blocking 방식 API를 Non-Blocking 방식으로 변경한 예제이다. 코드를 살펴보면, accept 혹은 read 수행하면 바로 리턴되므로 요청 확인을 위해서 지속적으로 무한 Loop를 수행하며 확인 과정이 필요하다. 예를 들어 현재 100개의 연결이 이루어져 channels List에 등록되어있다면, 매번 100번의 연결에 대하여 요청 여부를 확인한다.
위와 같은 방식의 경우 무한 Loop로 인하여 CPU overhead가 지속 발생하므로 Non-Blocking 기법만 적용해서는 성능 향상의 효과를 크게 얻을 수 없다. 그렇다면 이러한 문제는 어떻게 해결할까? IO Multiplexing에 대해서 알아보자.
IO Multiplexing
IO Multiplexing 방식은 하나의 Channel을 통해 여러 개의 연결을 관리하는 방식으로 해당 방식에서는 소켓 관리를 OS에서 직접 관리한다. 따라서 사용자 코드에서는 OS에 관리 대상 소켓 정보를 등록하는 단계가 필요하다.
(※ 본 포스팅에서 Kafka는 Linux 환경에서 동작함을 가정하므로 Socket은 FileDescriptor로 취급됨을 참고하자.)
등록 이후에는 OS에서 File descriptor 목록을 가지고 있고, 내부적으로 데이터를 처리해야 될 대상이 발견되면, 해당 정보를 이후 Client에서 요청 시 반환하는 역할을 담당한다. 즉 이전에는 Client에서 직접 처리 요청 대상을 관리했다면, Monitor 역할을 OS가 담당하는 셈이다.
위와 같은 방법을 적용하면, 사용자 코드에서 Connection 개수 여부와 관계없이 처리 대상만 OS로부터 전달받으므로 CPU overhead를 줄일 수 있으므로 적은 Thread로 많은 처리 요청을 수행할 수 있다.
Java의 NIO는 이를 위해서 Selector를 활용한다. Selector는 OS와 사용자 코드 상의 가교 역할을 수행한다. 따라서 사용자 코드에서 Selector에게 처리 대상 Channel 등록을 요청하면, OS에 해당 정보를 전달한다. 그 이후에는 주기적으로 OS에 목록 전달 요청을 전달하면, OS에서 대상 목록을 전달 받아 후속 작업을 처리한다.
(※ JVM 6이상 환경에서 Linux Kernel 2.6 이상을 사용하면, 기본 Selector의 구현체로 Linux의 epoll이 사용된다.)
Server 소켓을 생성하고 open() 메소드를 통해 Selector를 생성한다. 해당 메소드는 Linux 내부에 epoll Object를 생성한다.
Client가 IP 및 Port 정보를 통해서 접속 요청을 할 것이다. 그러면 OS는 바인딩된 내부 오브젝트에 반영한다.
사용자 코드에서 select() 메소드를 호출하면, 접속 요청이 존재하므로 해당 정보를 반환한다.
사용자 코드에서 해당 접속 요청을 받아들인 이후에 Read 요청이 들어오면 이를 감지하기 위해 Kernel에 Read 이벤트에 대한 수신을 받을 수 있도록 요청한다. 이때 호출되는 register()를 통해 내부에 epoll_ctl및 epoll_wait 시스템 콜이 호출된다.
Linux Kernel은 해당 요청을 다룰 수 있는 connection이 존재하는지 확인 후 client와 연결한다.
연결이 성공적으로 이루어지면, Selector에게 알림을 통지하고 내부적으로 Channel을 생성한다.
사용자가 데이터 fetch 요청을 전달하면 내부 Buffer에 이를 저장한다. 이때 Buffer의 위치는 direct 방식과 아닐 경우에 따라서 달라질 수 있는데, 이는 나중에 다루도록 한다.
Channel은 연결 역할을 수행할 뿐 데이터 fetch는 Buffer를 통해 이루어진다.
Selector는 지속적으로 poll을 수행하여 Channel에 등록된 Buffer의 내용을 읽어간다.
( ※ 위 그림에서 Channel과 Buffer는 연결된 Client 마다 생성된다.)
위 코드 중 가장 중요한 것은 select()이다. 이는 해당 메소드 또한 Blocking 방식이기 때문이다. 따라서 Selector를 활용한 방식은 완벽한 비동기 방식은 아니므로 Synchronous Non-Blocking 방식이라고 볼 수 있다.
Java의 NIO에 대해서 정리하자면, 기존 동기 방식의 API로 인한 동시성 저하를 막고자 Non-Blocking API를 제공하며, IO Multiplexing을 통해 처리량을 높일 수 있다. 하지만 완전한 방식의 비동기 방식은 아니다.
Kafka Broker Network 구조
지금까지 학습한 Java의 NIO를 바탕으로 Kafka Broker내의 Network 통신을 위한 구조를 살펴보자. Broker 구조는 크게 Socket Server, Request Handler Pool, API 세 가지로 이루어져있다. 해당 컴포넌트에 무엇이 있는지 하나씩 살펴보자.
Socket Server
Socket Server는 사용자 접속 및 요청을 담당하는 역할을 담당하며, Acceptor, Processor, Request Channel로 이루어진 Request-Plane 세트이다.
이전 그림에는 1개의 Plane을 묘사했지만, 실제로는 data-plane과 control-plane 총 2개의 plane이 존재한다. 여기서 control-plane은 Broker와 Controller 간의 통신을 위해 연결된 전용 네트워크이며, data-plane은 Broker 끼리 혹은 client의 요청을 처리하기 위한 네트워크이다.
그렇다면, Request-Plane 구성 요소인 Acceptor, Processor, Request Channel은 각각 무엇일까?
Acceptor는 Client의 접속 요청을 감지하는 문지기의 역할을 수행한다. Acceptor를 통해 연결 요청을 전달받으면, 하위에 존재하는 Processor 중 하나에게 Read/Write 처리를 수행할 수 있도록 연결해준다.
Processor는 연결된 Socket에 대하여 Read/Write 요청이 전달되는 것을 감지하고, 이를 Request Channel의 Request Queue에 전달하는 역할과 실제 작업이 완료된 이후 결과를 전달받아 사용자에게 반환하는 것을 담당한다.
Request Channel은 모든 Processor, Handler, API가 공유하는 전역 저장소로써, 사용자의 요청이 전달되면 해당 정보를 보관하고 처리가 완료되면 요청한 Processor에게 결과를 반환하는 역할을 수행한다.
Socket Server의 구조를 보면, Acceptor가 여러개의 Processor를 가지고 있고 Processor는 Request Channel과 연관이 있음을 알 수 있다. Socket Server에는 data-plane과 control-plane 두 개가 존재한다고 이전에 설명했는데, data-plane의 경우 Acceptor는 여러개의 Processor를 가질 수 있으며, 해당 설정은 num.network.threads 설정을 통해서 개수를 조절할 수 있다. 반면 control-plane의 경우는 Processor가 1개만 존재한다.
Request Handler
Request Handler는 ReuqestChannel에서 Request 정보를 가져와 API에게 처리를 요청하고 요청 결과를 다시 RequestChannel에 전달하는 역할을 담당한다. RequestHandler는 1개가 아니라 여러개의 Thread로 구성될 수 있으며, 이는 num.io.threads 속성을 통해 개수를 조정할 수 있다.
이전 Kafka 버전(0.7)에서는 Request Handler가 따로 존재하지 않았고 Processor를 통해 직접 처리를 수행하였다. 하지만 Network Read/Write 요청을 감지하는 영역과 I/O를 처리하는 부분이 하나의 Thread안에 있으므로 탄력적으로 Thread 개수를 늘리기 어려운 문제가 있었다.
따라서 I/O와 Network 처리를 위한 Thread를 분리함으로써, 현재와 같은 모습을 갖추게 되었다.
API
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
case ApiKeys.FETCH => handleFetchRequest(request)
...(중략)...
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
API는 Client가 요청한 정보를 기반으로 Kafka 내부 모듈에 필요한 메소드를 호출하는 역할을 담당한다. Kafka Protocol에는 위와 같이 어떤 요청인지 header에 포함시키도록 규정되었다. 따라서 Kafka API가 요구하는 Spec에 맞게 작성하면, 이를 Parsing 하여 개별 모듈로 Routing을 시켜준다. 요청 처리가 완료되면, RequestHelper를 통해 RequestChannel로 전달한다.
동작 과정
지금까지 Kafka Network 구조에 대해서 큰 틀에서 살펴봤다. 이번에는 각 모듈끼리 어떠한 상호작용을 거쳐 동작하는지 살펴보자. 먼저 큰 흐름 속에서 어떻게 동작하는지 보고 이후 코드 레벨에서 보다 자세하게 살펴보도록 하자.
첫 번째로 살펴볼 것은 Client가 접속 요청 시도시 내부 동작 과정이다.
사용자가 접속 요청을 한다.
Acceptor가 해당 접속 요청을 수락하고, 자신이 보유한 Processor 중 하나에게 할당한다. Processor는 해당 요청을 자신이 보유한 Kafka Selector에 요청하여 이후 Client로부터 데이터 처리 요청이 왔을 경우 감지할 수 있도록 사전 준비한다.
Client 접속 요청이 완료되면, Processor는 사용자 요청을 처리할 수 있는 단계가 된다. 이후 사용자 요청이 발생했을 때 처리 과정을 살펴보자.
사용자가 데이터 fetch 요청을 하면, Kernel은 이를 감지한다.
Processor에서 Kafka Selector에게 데이터 fetch 요청 이후 해당 요청을 Request Channel의 Request Queue에 저장한다. 이때 향후 처리 결과를 자신에게 포워딩 하기 위해 Queue 삽입시 자신의 Processor Id를 함께 추가한다.
Request Handler에서 Request Queue에 존재하는 요청을 fetch한다.
해당 요청을 API에게 전달한다.
API는 요청을 처리한다음 자신이 보유한 Request Helper를 통해 RequestChannel로 전달한다.
Request Channel은 Processor Id를 보고 해당 Processor의 Response Queue에 결과를 삽입한다.
Processor는 Response Queue 내용을 확인하고 Client에게 결과를 전달한다.
지금까지 살펴본 내용은 큰 틀에서 컴포넌트간 상호 작용에 대해서 확인했다. 이번에는 코드 레벨에서 자세하게 각 모듈이 어떻게 구동되고 상호작용하는지 알아보자.
Socket Server 동작 과정
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics)
// control-plane
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time, apiVersionManager.newRequestMetrics))
가장 먼저 살펴볼 것은 Socket Server에 속한 2개의 plane이 어떻게 구성되어있는지 살펴보자. 위 내용을 살펴보면, 2개의 plane이 서로 다른점이 몇 가지 보인다.
data-plane의 경우 Acceptor, Processor가 여러개이지만, controlPlane의 경우 하나만 존재한다.
data-plane의 경우 RequestChannel 내에 존재하는 RequestQueue의 크기를 queued.max.requests 속성 크기만큼 지정 가능한 반면, control-plane의 경우는 20개로 크기가 고정되어있다.
두 가지 생성 메소드 중 data-plane 생성 코드를 살펴보자. 위와같이 listeners를 통해서 전달받은 endpoint 별로 acceptor가 생성되며, num.network.threads 개수만큼 processor 또한 생성 된다. processor 생성 이후 acceptor와 channel에 해당 processor를 등록한다.
해당 과정을 통해 Acceptor와 RequestChannel의 Processor 간의 매핑 관계를 이해할 수 있다.
설정 작업이 마무리되면, listeners에 매핑된 Endpoint 개수 만큼의 Kafka 쓰레드를 생성하여 Acceptor에게 할당한다. 위 코드는 Acceptor에게 쓰레드 할당 후 start() 호출 이후 수행 과정을 나타낸다.
NIO Selector를 통해 Accept 이벤트를 통지할 수 있도록 요청하면, 내부적으로 Kernel에 epoll 오브젝트가 생성되고, Accept 요청이 왔을 때 이를 수신받을 수 있음을 이전 NIO 개념을 학습하면서 살펴봤다.
소켓 정보 등록 후에는 무한 Loop를 통해 새로운 연결 요청이 있는지를 확인한다.
private def acceptNewConnections(): Unit = {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
...(중략)...
if (key.isAcceptable) {
accept(key).foreach { socketChannel =>
...(중략)...
var processor: Processor = null
do {
...(중략)...
processor = synchronized {
currentProcessorIndex = currentProcessorIndex % processors.length
processors(currentProcessorIndex)
}
currentProcessorIndex += 1
} while (!assignNewConnection(socketChannel, processor, retriesLeft == 0))
}
} else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
} catch {
...(중략)...
}
}
}
}
이때 select() 메소드를 통해 Accept 요청이 들어왔는지를 OS에게 확인하는데, 해당 메소드는 Blocking 메소드이므로 무한 대기를 막기 위해 500ms 기간의 Timeout을 지정한다. 이 과정에서 Accept 요청이 들어온다면, 자신이 보유하고 있는 Processor 중 하나에게 향후 Read/Write 요청에 대한 처리를 담당하도록 한다. 이때 살펴볼 것은 Processor에게 균등한 분배를 위해서 Round-Robin 방식으로 접속 요청을 분배한다는 점이다.
Acceptor의 역할은 여기까지이고, 지금 부터는 위 코드를 통해 새로운 요청이 Processor에게 할당된 이후 처리 과정에 대해서 살펴보자.
Processor 동작 과정
이전에 Acceptor에서 Processor에게 요청을 할당한다고 했는데, 해당 과정은 어떻게 이루어질까? 먼저 Processor가 지닌 프로퍼티에 대해 먼저 살펴보자.
위 그림을 살펴보면 일반 Selector가 아닌 Kafka Selector를 내부 프로퍼티로 가지고 있는 것을 확인할 수 있다. 여기서 Kafka Selector에는 내부에 NIO의 Selector를 포함하며, 그 외에 Kafka 데이터 송수신에 필요한 프로퍼티 및 내부 메소드를 지닌 클래스이다.
Kafka Selector에는 위 그림외에도 수많은 내부 프로퍼티가 존재하지만, 일부만 간추려서 알아보자. nioSelector는 Java NIO의 selector를 의미한다.
channel은 Processor를 통해 연결된 Client와의 Channel을 의미하며, Connection Id 와 KafkaChannel로 이루어진 Map이다. 따라서 특정 Client와 연결 시 Connection Id 기준으로 해당 Channel과 연결한다.
completedSends, completedReceives 및 disconnected는 데이터 송수신 및 close 처리 시, 해당 요청을 임시 저장하는 용도의 buffer로써 활용된다.
여기까지 Kafka Selector에 대해서 알아보고 이번에는 Processor의 또 다른 주요 프로퍼티 중 하나인 newConnections에 대해서 알아보자. 해당 자료구조는 Queue로써 Acceptor가 새로운 요청을 Processor에게 할당할 때, 해당 Queue에 입력이 된다.
이제 Processor의 내부 프로퍼티를 토대로 Processor 동작 과정에 대해 살펴보자.
Processor 또한 Acceptor와는 별개의 Thread로 수행된다. 위 코드는 Processor 기동 시작 후 수행 과정을 나타내며, 무한 Loop를 통해서 동일한 작업을 지속 반복 수행하는 것을 확인할 수 있다. 위 코드와 같이 7개의 동작을 수행하는데, 전부다 살펴보지는 않고 주요 동작에 대해서만 살펴보자.
configureNewConnections()
private def configureNewConnections(): Unit = {
var connectionsProcessed = 0
while (connectionsProcessed < connectionQueueSize && !newConnections.isEmpty) {
val channel = newConnections.poll()
try {
...(중략)...
selector.register(connectionId(channel.socket), channel)
connectionsProcessed += 1
} catch {
...(중략)...
}
}
}
configureNewConnections 메소드는 newConnections를 통해 새로운 Channel이 입력되면, Connection Id를 부여한 다음 해당 정보를 Kafka Selector에게 전달하여 궁극 적으로는 OS 내부에 해당 소켓 정보를 등록시킨다. 따라서, Processor가 지닌 Kafka Selector를 통해서 향후 Read/Write 요청이 들어왔을 때 이를 감지해 후속 작업을 처리할 수 있다.
processNewResponse()
private def processNewResponses(): Unit = {
var currentResponse: RequestChannel.Response = null
while ({currentResponse = dequeueResponse(); currentResponse != null}) {
val channelId = currentResponse.request.context.connectionId
try {
currentResponse match {
case response: NoOpResponse =>
...(중략)...
case response: SendResponse =>
sendResponse(response, response.responseSend)
case response: CloseConnectionResponse =>
...(중략)...
close(channelId)
...(중략)...
}
} catch {
...(중략)...
}
}
}
processNewResponse() 메소드는 RequestHandler를 통해 API 호출 후 Client에게 결과를 전달하는 과정을 처리한다. API 수행이 모두 완료되면, Channel을 통해 Processor의 개별 ResponseQueue에 결과가 적재된다.
그 이후 위 코드가 실행되면, dequeueResponse() 메소드를 통해 결과를 추출 이후 결과 유형에 따라서 처리를 달리 수행한다.
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send): Unit = {
val connectionId = response.request.context.connectionId
...(중략)...
if (openOrClosingChannel(connectionId).isDefined) {
selector.send(new NetworkSend(connectionId, responseSend))
...(중략)...
}
}
만약 해당 요청이 SendResponse라면, 위 코드와 같이 Kafka Selector의 저장 Buffer에 임시 보관하도록 요청한다.
만약 처리 유형이 CloseConnectionResponse 형태라면, Selector에게 close 요청을 전달하여 Channel을 정상적으로 종료하도록 한다. 그리고 Kafka Selector는 자신이 지닌 Client 연결 항목에서 해제하고 Channel을 종료시킨다.
poll()
private def poll(): Unit = {
val pollTimeout = if (newConnections.isEmpty) 300 else 0
try selector.poll(pollTimeout)
catch {
...(중략)...
}
}
poll() 메소드는 KafkaSelector를 통해 데이터가 Channel에 존재하면, fetch를 요청하는 작업이다.
이때 Kafka Selector의 내부 동작 방식은 복잡하지만, 핵심 부분만 도식화해보면 위 흐름과 같다.
Processor가 poll()을 통해 변경 대상 Channel 확인 및 데이터 fetch를 요청한다.
Kafka Selector 내부 nioSelector를 통해서 Kernel에 변경 대상 Channel이 존재하는지를 요청한다.
Kernel 내부에서 변화가 감지된 Channel 정보를 전달한다.
해당 Channel이 데이터 수신이 가능한 상태라면 데이터를 추출하여 completedReceives에 저장한다. 만약 processNewResponse() 메소드 수행 결과 전달할 데이터가 존재한다면, Selector 쓰기 임시 버퍼에 저장되어있을 것이다. 해당 내용을 completedSends에 저장한다.
※ completedSends, completedReceives에 저장된 데이터는 다음에 확인할 processCompletedReceives()와 processCompletedSends() 과정을 통해서 처리된다.
processCompletedReceives()
private def processCompletedReceives(): Unit = {
selector.completedReceives.forEach { receive =>
try {
openOrClosingChannel(receive.source) match {
case Some(channel) =>
...(중략)...
val connectionId = receive.source
val context = new RequestContext(header, connectionId, channel.socketAddress,channel.principal, listenerName, securityProtocol,
channel.channelMetadataRegistry.clientInformation, isPrivilegedListener, channel.principalSerde)
val req = new RequestChannel.Request(processor = id, context = context, startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
...(중략)...
requestChannel.sendRequest(req)
...(중략)...
case None => ...(중략)...
}
} catch {
...(중략)...
}
}
selector.clearCompletedReceives()
}
이전에 poll() 과정이 끝나고나면, 데이터 수신이 완료된 내용은 completedReceives에 저장됨을 확인했다. processCompletedReceives()는 해당 내용을 가져와서 처리를 수행하기 위해 Context를 만들고 이를 RequestChannel에 추가한다. 이때 데이터 처리 이후 자신의 Processor가 후속 작업을 처리하기 위해서 요청시 자신의 Processor Id를 파라미터로 넘기는 것을 참고하자.
Channel에 Request 요청을 넣은 이후에는 completedReceives 내용을 초기화하여 이후 중복 처리 되지 않도록한다.
발송할 데이터는 poll() 메소드 수행 과정을 통해 모두 completedSends에 저장되어있다. 이후 해당 메소드에서 실제 후속 작업 처리를 진행함으로써, Client에게 결과를 반환하고, completedSends를 모두 초기화 한다.
지금까지, Processor 동작 과정에 대해서 살펴봤다. 해당 내용을 정리하자면 다음과 같다.
Acceptor로 부터 Client를 할당 받는다.
API로 부터 처리 결과를 전달받으면, Kafka Selector에 존재하는 임시 버퍼(각 Channel마다 존재)에 저장한다.
Kafka Selector로부터 Client의 요청이 있는지 확인하며, 이 과정에서 사용자의 요청이 전달된다면, completedReceives에 저장하고, API 처리 결과를 completedSends에 한데 모은다.
completedReceives은 사용자의 요청이므로 Request Channel에 전달하여 데이터 처리 요청하고, completedSends 내용은 결과를 Client에 반환하는 후속 작업을 처리한다.
Request Handler 동작 과정
class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: ApiRequestHandler,
time: Time,
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
...(중략)...
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}
def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}
...(중략)...
}
Request Handler는 KafkaRequestHandlerPool 내부에 존재한다. 따라서 먼저 KafkaRequestHandlerPool가 생성된다. 생성 당시 SocketServer와 연결될 Channel과 API로 전달할 APIRequestHandler가 인자로 같이 전달되는 것을 참고하자. 또한 RequestHandler는 단일 쓰레드로 동작하는 것이 아니라 num.io.threads 인자에 따라 개수 조절이 가능하므로 생성 당시 해당 값 또한 전달된다.
위 과정을 통해 KafkaHandler는 데몬쓰레드로 동작한다.
def run(): Unit = {
while (!stopped) {
...(중략)...
val req = requestChannel.receiveRequest(300)
req match {
case RequestChannel.ShutdownRequest =>
...(중략)...
completeShutdown()
return
case request: RequestChannel.Request =>
try {
...(중략)...
apis.handle(request, requestLocal)
} catch {
...(중략)...
}
...(중략)...
}
}
completeShutdown()
}
RequestHandler의 역할은 단순하다. 만약 연결되어있는 Channel이 종료된다면, Handler의 역할이 더이상 필요 없으므로 종료한다. 반면 RequestChannel에서 Request가 존재한다면, API에게 처리를 위임한다.
API 동작 과정
class ControllerApis(val requestChannel: RequestChannel,
val authorizer: Option[Authorizer],
val quotas: QuotaManagers,
val time: Time,
val supportedFeatures: Map[String, VersionRange],
val controller: Controller,
val raftManager: RaftManager[ApiMessageAndVersion],
val config: KafkaConfig,
val metaProperties: MetaProperties,
val controllerNodes: Seq[Node],
val apiVersionManager: ApiVersionManager) extends ApiRequestHandler with Logging {
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
...(중략)...
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
} catch {
case e: FatalExitError => throw e
case e: ExecutionException => requestHelper.handleError(request, e.getCause)
case e: Throwable => requestHelper.handleError(request, e)
}
}
}
API는 사용자 요청을 라우터의 역할로써, Header에 명시된 API Key를 보고 요청을 전달한다.
def handleFetch(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new FetchResponse(response.asInstanceOf[FetchResponseData]))
}
요청이 전달되면, 요청을 처리하고 결과를 반환하기 위해서 위와 같이 반환 메소드를 호출한다.
private def handleRaftRequest(request: RequestChannel.Request,
buildResponse: ApiMessage => AbstractResponse): Unit = {
val requestBody = request.body[AbstractRequest]
...(중략)...
future.whenComplete { (responseData, exception) =>
val response = if (exception != null) {
requestBody.getErrorResponse(exception)
} else {
buildResponse(responseData)
}
requestHelper.sendResponseExemptThrottle(request, response)
}
}
반환 메소드 안에서는 ResponseBody를 만든 이후에 requestHelper를 통하여 반환을 위임한다.
requestHelper는 해당 결과를 requestChannel에 전달함으로써 API 역할은 마무리된다.
Request Channel 동작 과정
Request Channel은 Processor와 Handler 그리고 API가 상호 작용에 필수적인 컴포넌트로써 요청을 전달하고 결과를 수신받는 중간 버퍼의 역할을 담당한다.
class RequestChannel(val queueSize: Int,
val metricNamePrefix: String,
time: Time,
val metrics: RequestChannel.Metrics) extends KafkaMetricsGroup {
import RequestChannel._
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
...(중략)...
def addProcessor(processor: Processor): Unit = {
if (processors.putIfAbsent(processor.id, processor) != null)
warn(s"Unexpected processor with processorId ${processor.id}")
...(중략)...
}
...(중략)...
}
RequestChannel의 핵심 프로퍼티는 requestQueue와 processors이다. 여기서 requestQueue는 사용자 요청을 저장하는 임시 버퍼의 역할을 수행하며, queueSize를 통해 전달된다. 해당 값은 이전에 Socket Server를 살펴볼 때 확인했듯이 data-plane과 control-plane에 따라서 서로 다른 값을 지니고 있다.
processors는 API로부터 결과를 반환할 때 Processor Id를 기반으로 빠르게 Processor 객체를 찾기 위한 자료구조로 사용되며, Socket Server의 구동 당시 Acceptor와 Processor가 만들어지고 나면, addProcessor 메소드를 통해서 Processor Id와 참조 객체를 전달받아 processors 자료구조에 삽입하게 된다.
private[network] def sendResponse(response: RequestChannel.Response): Unit = {
...(중략)...
val processor = processors.get(response.processor)
if (processor != null) {
processor.enqueueResponse(response)
}
}
이후 API로부터 결과를 전달받게되면, response에 저장된 processor Id를 기반으로 processors에서 참조 객체를 찾아 Processor에 위치한 Response Queue에 결과를 삽입한다.
마무리
지금까지 Kafka Broker 입장에서 네트워크 모델이 어떻게 구성되어있고 요청/응답이 어떤 식으로 이루어지는지 살펴봤다. 내부 구조를 살펴보면서, Kafka에 대한 이해가 조금 더 올라간 것 같다.
이번 포스팅은 그동안 envoy-internals 시리즈의 이해를 바탕으로 사용자가 Http 전달을 요청했을 때, Envoy 내부구조를 토대로 네트워크 요청이 어떻게 흘러가는지에 대해서 전체적인 흐름을 상세히 조망해보는 시간을 가져보려 합니다. 사실 envoy에 대해서 분석했던 계기 중 하나가 도대체 어떻게 사용자의 요청이 전달되는지에 대한 궁금증에서 출발했기 때문에 이번 포스팅을 위해서 이전 시리즈의 내용이 존재했다고 생각합니다. 따라서 이번 내용은 이전 내용에 대한 이해가 선행되어야하므로 이전 시리즈 내용을 정독하고 보시는 것을 추천드립니다.
2. Worker 쓰레드 소켓 할당 과정
이전 시리즈 내용을 통해 Listener Manager에서 네트워크 요청 처리를 위해서 여러개의 Worker 쓰레드를 생성하는 것을 이해할 수 있었습니다. 이때 Worker 쓰레드 생성 갯수는 envoy 생성 당시 --concurrency 인자에 의해서 결정되는 것 또한 확인했습니다. 이번에는 Envoy 기동 과정 중 Worker 쓰레드 상호작용을 살펴보면서 Worker 쓰레드에서 어떻게 네트워크 요청을 처리하는지 살펴보겠습니다.
위 그림은 --concurrency 값이 2개이고 static_config, dynamic_config에 의해서 등록된 Listener 개수 또한 2개이면서 모두 TCP임을 가정했습니다. 이때 기동 과정을 살펴보면 다음과 같습니다.
1. Envoy 기동 과정에서 --concurrency 값을 살펴보고 Listener Manager에게 Worker 쓰레드 생성을 요청합니다. Listener Manager는 Worker 쓰레드를 요청만큼 생성합니다. 이 과정에서 Worker 쓰레드 내부에 Dispatcher와 Connection Handler가 생성됩니다.
2. Worker 쓰레드가 생성되는 과정에서 Envoy의 TLS를 관장하는 메인 쓰레드 InstanceImpl에 쓰레드를 등록합니다. 이 과정에서 InstanceImpl에서 Worker 쓰레드에 위치한 Dispatcher 정보를 registered_threads_ 에 저장할 수 있으며, 향후 Worker 쓰레드의 Dispatcher에 참조가 가능합니다.
3. Worker 쓰레드 생성이 마무리되면, Config 파일 파싱 도중 static configuration 정보를 등록하기 위해 Listener Manager에게 Config 정보 등록을 요청합니다. 해당 과정은 향후 LDS에 의해서도 생성될 수 있습니다. 이 과정에서 Listener Component Factory를 통해 --concurrency 만큼 Socket을 생성합니다.
4. Envoy의 설정이 모두 완료되면, Listener Manager에게 기동을 요청합니다.
5. Listener Manager에서는 기동 과정에서 등록된 Listener Config 정보를 Worker 쓰레드에 모두 Bind하기 위해 개별 Worker 쓰레드에게 Listener 생성을 요청합니다.
6. Worker 쓰레드내에 존재하는 Connection Handler에 Listener를 생성하기 위해 먼저 Listener로부터 자신의 Worker 쓰레드 번호에 해당하는 Socket 정보를 얻어옵니다. 그리고 Worker 쓰레드에서 외부 요청을 참조하기 위해 Dispatcher에게 Socket 정보를 전달하면서 Listener 생성을 요청합니다.
7. Dispatcher는 전달받은 Socket 정보를 토대로 Listener를 생성하여 반환합니다. 이때 주의해서 살펴볼 것은 사용자가 접속했을 때, 인자로 전달받은 TcpListenerCallbacks에게 Accept 요청을 수행하는데 호출되는 주체가 Connection Handler에서 전달한 ActiveTcpListener라는 것입니다. 따라서 향후 사용자가 접속하게되면, 그에 대한 Accept 처리는 ActiveTcpListener가 담당하게됩니다.
이때 dispatcher를 통해서 libevent에서 해당 소켓에 이벤트가 감지되면, onSocketEvent 메소드를 호출하도록 위 코드와 같이 등록됩니다. 즉 이 과정을 통해서 각각의 Worker 쓰레드에 존재하는 Listener는 dispatcher에 의해 libevent로 등록되었으므로 사용자가 OS로부터 Socket 생성을 요청했을 때, OS는 등록된 socket 중 하나를 임의로 선정하여 요청을 전달할 수 있게됩니다.
그렇다면 Listener 설정이 모두 완료된 이후 Client로부터 Connection 요청이 들어오면 어떠한 과정을 거치게될까요?
먼저 Socket Event가 감지되면, 위와 같이 두개의 Worker 쓰레드 중 누가 해당 요청을 처리해야하는지 선택해야합니다. 이때 Worker 쓰레드 선정에 대한 결정은 이전에 설명했듯이 전적으로 OS가 수행합니다. 따라서 가령 위와같이 Worker_0번이 선택되었으면, 해당 쓰레드의 onSocketEvent가 실행될 것입니다.
onSocketEvent 메소드가 호출되면, 가장 먼저 수행하는 것은 연결된 Connection 갯수가 Global 설정을 넘어섰는지 확인합니다. 이 과정에서 Global Limit이 지정되어있고 신규 연결 요청이 Limit을 넘어서게되면, 해당 소켓에 대한 연결은 Close하고 종결처리 합니다.
이때 Global Limit으로 지정될 수 있는 값은 overload.global_downstrea_max_connections에 의해서 지정될 수 있으며, 해당 값은 OverloadManager에 의해서 관리되는 값입니다. 따라서 현재 Socket에 Accepted된 개수가 해당 값을 넘었을 경우에는 Socket 연결을 해제합니다. 해당 값에 대한 자세한 설명은 envoy 공식문서를 참고 바랍니다.
반대로 요청이 Global Limit을 넘지 않았을 경우는 AcceptedSocket을 생성하고 Worker 쓰레드 내 Listener는 Socket 연결에 대한 Accept 처리를 위임합니다.
위 그림은 AcceptedSocket 클래스 구조를 나타냅니다. 위 내용을 통해서 우리는 AcceptedSocket을 만드는 이유에 대해서 유추해볼 수 있습니다. 코드를 살펴보면, global_accetped_socket_count_ 라는 값이 static으로 지정되어있음을 알 수 있습니다. 그리고 생성자, 소멸자 단계에서 해당 값이 증감하는 것 또한 알 수 있습니다.
이를 통해서 확인되는 사실은 Socket이 접속하면 현재 Accepted된 Socket 개수를 파악할 수 있습니다. 또한 새로운 Socket이 연결되었을 때 Global Limit을 넘는지 검증할 수 있는 기준을 제시합니다.
AcceptedSocket을 만들고나면 그 다음에는 해당 Socket을 Listener에서 Accept하는 과정이 진행됩니다.
이전에 살펴본 onSocketEvent 메소드의 마지막 줄을 살펴보면, 저장된 Callback에서 onAccept를 수행해달라고 요청하는 것을 볼 수 있습니다. 이는 TcpListenerImpl을 생성할 때, 해당 Callback 값이 Worker에 존재하는 ActiveTcpListener 이므로 해당 ActiveTcpListener가 실질적으로 Accept를 수행함을 의미합니다.
먼저 listenerConnectionLimitReached메소드를 수행하면서 이를 위반할 경우 Socket을 Close하는 것을 볼 수 있습니다. 이는 이전의 Connection은 Envoy 전체의 Connection을 살펴본 것이라면, 이번에는 개별 listener 별로 Connection 제한이 있는지 검사하고 만약 지정된 값이 있을 경우 그 값을 넘어서게되면 Accept하지 않습니다. 해당 설정은 Listener에서 수행할 수 있으며 envoy 공식 문서에서 이에 대해서 소개하고 있으니 참고 바랍니다.
ActiveTcpListener에서는 Limit 검사만 체크하고 다시 Socket에 대한 Accept는 onAcceptWorker 메소드 호출을 통해 후속 작업을 처리합니다.
envoy에서는 쓰레드간의 효율적인 Socket 분배를 위해서 만약 하드웨어에서 DLB 지원이 된다면, 이를 사용하여 Connection을 안정적으로 분배할 수 있는 기능을 제공합니다. 이때 위와 같이 connection_balance_config를 지정하면, 해당 설정을 토대로 connection_balancer가 Target을 지정합니다.
해당 기술은 내부적으로는 Intel DLB hardware를 통해 구현되며, 위와 같이 Config 설정이 지정되어있다면 다른 Target으로 Load를 분산시킬 수 있습니다. 만약 지정되지 않았다면, 현재 Worker 쓰레드에서 Accept 과정이 정상적으로 진행될 것입니다. 이와 관련된 자세한 내용은 envoy 공식문서에 설명되어있으니 참고 바랍니다.
참고로 본 포스팅에서는 DLB 설정이 지정되어있지 않았다고 가정하므로 현재 Worker 쓰레드에서 해당 처리를 진행한다고 가정하겠습니다.
connection_balancer에 의해서 선정된 target이 자기 자신이라면 해당 연결을 허용하기 위해 ActiveTcpSocket을 만드는 것을 볼 수 있습니다.
ActiveTcpSocket까지 생성되면, 향후 Client의 요청은 해당 Socket을 통해서 모두 처리가됩니다.
즉 이말은 이전에 Envoy를 처음 학습할 때 살펴봤듯이 Client가 Envoy에게 특정 API를 요청하면, 내부적으로 Listener Filters와 Filter Chains를 통과하면서 Upstream 대상을 찾고 전달한다고 했는데, 이 과정을 수행하는 주체가 해당 소켓이 됩니다. 따라서 Active Tcp Socket은 이를 지원하기 위해서 다양한 내부 프로퍼티가 있는데, 그 중 Filter와 관련된 것이 accept_filters 입니다. 해당 속성을 기반으로 향후 Listener Filters를 만들고 이를 수행하고 그 다음에는 Filter Chains를 생성하고 이를 수행하는 작업을 진행합니다. 그리고 해당 작업을 위해서 ActiveTcpSocket을 만든 이후 onSocketAccepted를 호출합니다.
active_stream_listener_base.h
void onSocketAccepted(std::unique_ptr<ActiveTcpSocket> active_socket) {
// Create and run the filters
if (config_->filterChainFactory().createListenerFilterChain(*active_socket)) {
active_socket->startFilterChain();
} else {
// If create listener filter chain failed, it means the listener is missing
// config due to the ECDS. Then close the connection directly.
active_socket->socket().close();
ASSERT(active_socket->isEndFilterIteration());
}
// Move active_socket to the sockets_ list if filter iteration needs to continue later.
// Otherwise we let active_socket be destructed when it goes out of scope.
if (!active_socket->isEndFilterIteration()) {
active_socket->startTimer();
LinkedList::moveIntoListBack(std::move(active_socket), sockets_);
} else {
if (!active_socket->connected()) {
// If active_socket is about to be destructed, emit logs if a connection is not created.
if (active_socket->streamInfo() != nullptr) {
emitLogs(*config_, *active_socket->streamInfo());
} else {
// If the active_socket is not connected, this socket is not promoted to active
// connection. Thus the stream_info_ is owned by this active socket.
ENVOY_BUG(active_socket->streamInfo() != nullptr,
"the unconnected active socket must have stream info.");
}
}
}
}
위 코드는 onSocketAccepted 메소드 내용입니다. 코드를 살펴보면, Socket이 Accepted 되면 가장 먼저 해당 소켓에 해당되는 ListenerFilterChain을 생성하고 FilterChain을 수행하는 것을 볼 수 있습니다. 즉 이 과정부터 해당 소켓은 각종 Filter들을 통과하면서 Upstream 연결이 이어지게됩니다.
해당 과정을 조금 더 자세히 살펴보겠습니다.
이전 내용을 토대로 ActiveTcpListener 에서 ActiveTcpSocket을 만든 것을 확인했습니다. 그리고 createListenerFactory 메소드를 호출하면서 생성한 ActiveTcpSocket을 전달했습니다.
그러면 내부적으로는 Listener Config에 이미 저장되어있는 listener_filter_factories 내부에 매핑된 Factory Callback을 하나씩 실행시키면서 ActiveTcpSocket의 accept_filters에 Filter를 하나씩 생성하는 과정을 거칩니다. 해당 과정을 코드로 살펴보면 다음과 같습니다.
listener_impl.cc
bool ListenerImpl::createListenerFilterChain(Network::ListenerFilterManager& manager) {
if (Configuration::FilterChainUtility::buildFilterChain(manager, listener_filter_factories_)) {
return true;
} else {
ENVOY_LOG(debug, "New connection accepted while missing configuration. "
"Close socket and stop the iteration onAccept.");
missing_listener_config_stats_.extension_config_missing_.inc();
return false;
}
}
위 코드는 ActiveTcpListener 에서 ActiveTcpSocket을 생성 후 ListenerFilterChain을 생성하기 위해 createListenerFilterChain 메소드를 호출하였을 때 과정을 나타냅니다. 코드를 살펴보면, buildFilterChain 함수 호출을 통해 자신이 보유하고 있는 listener_filter_factories_ 목록을 전달하는 것을 볼 수 있습니다.
configuration_impl.cc
bool FilterChainUtility::buildFilterChain(Network::ListenerFilterManager& filter_manager,
const Filter::ListenerFilterFactoriesList& factories) {
for (const auto& filter_config_provider : factories) {
auto config = filter_config_provider->config();
if (!config.has_value()) {
return false;
}
auto config_value = config.value();
config_value(filter_manager);
}
return true;
}
buildFilterChain 내부를 살펴보면, 전달받은 factories를 순회하면서 callback 함수를 순차적으로 수행시키는 것을 볼 수 있습니다. 그리고 해당 callback을 수행할 때 전달받은 ActiveTcpSocket 정보를 다시 넘기는 것을 확인할 수 있습니다.
위 코드는 http instpector Listener Filter의 Factory 코드이며, 개별 Listener Filter Factory의 리턴 값은 위와 같이 FilterManager 즉 ActiveTcpSocket 정보로 받아서 addAcceptFilter 메소드를 호출하고 있는 것을 볼 수 있습니다. 또한 인자를 통해서 Factory에서 보유하고 있는 Filter 정보를 새롭게 생성하여 전달하는 것을 볼 수 있습니다.
startFilterChain()은 다시 내부에 continueFilterChain 메소드에 처리를 위임합니다.
active_tcp_socket.cc
void ActiveTcpSocket::continueFilterChain(bool success) {
if (success) {
bool no_error = true;
if (iter_ == accept_filters_.end()) {
iter_ = accept_filters_.begin();
} else {
iter_ = std::next(iter_);
}
for (; iter_ != accept_filters_.end(); iter_++) {
Network::FilterStatus status = (*iter_)->onAccept(*this);
if (status == Network::FilterStatus::StopIteration) {
if (!socket().ioHandle().isOpen()) {
no_error = false;
break;
} else {
// If the listener maxReadBytes() is 0, then it shouldn't return
// `FilterStatus::StopIteration` from `onAccept` to wait for more data.
ASSERT((*iter_)->maxReadBytes() != 0);
if (listener_filter_buffer_ == nullptr) {
if ((*iter_)->maxReadBytes() > 0) {
createListenerFilterBuffer();
}
} else {
// If the current filter expect more data than previous filters, then
// increase the filter buffer's capacity.
if (listener_filter_buffer_->capacity() < (*iter_)->maxReadBytes()) {
listener_filter_buffer_->resetCapacity((*iter_)->maxReadBytes());
}
}
if (listener_filter_buffer_ != nullptr) {
listener_filter_buffer_->activateFileEvent(Event::FileReadyType::Read);
}
// Waiting for more data.
return;
}
}
}
// Successfully ran all the accept filters.
if (no_error) {
newConnection();
} else {
// Signal the caller that no extra filter chain iteration is needed.
iter_ = accept_filters_.end();
}
}
// Filter execution concluded, unlink and delete this ActiveTcpSocket if it was linked.
if (inserted()) {
unlink();
}
}
continueFilterChain코드 내용을 자세히 살펴보면, 생성된 Listener Filters(accept_filters)를 순회하면서 onAccept를 통해 Filter로직을 수행하는 것을 볼 수 있습니다.
만약 Filter onAccept 순회 도중에 Stop을 해야될 경우가 존재한다면, 이유를 살펴보고 요청을 중지하던지 아니면 설정 변경 후 재수행을 수행하도록 작성되어있습니다.
그리고 모든 Filter 순회가 종료되면, 사용자 요청에 대한 Metadata 및 요청 정보를 모두 분석하였으므로 그 다음에는 newConnection() 메소드를 호출하여 본격적으로 downstream간의 연결을 위한 작업을 진행합니다.
active_tcp_socket.cc
void ActiveTcpSocket::newConnection() {
connected_ = true;
// Check if the socket may need to be redirected to another listener.
Network::BalancedConnectionHandlerOptRef new_listener;
if (hand_off_restored_destination_connections_ &&
socket_->connectionInfoProvider().localAddressRestored()) {
// Find a listener associated with the original destination address.
new_listener =
listener_.getBalancedHandlerByAddress(*socket_->connectionInfoProvider().localAddress());
}
// Reset the file events which are registered by listener filter.
// reference https://github.com/envoyproxy/envoy/issues/8925.
if (listener_filter_buffer_ != nullptr) {
listener_filter_buffer_->reset();
}
if (new_listener.has_value()) {
...(중략)...
new_listener.value().get().onAcceptWorker(std::move(socket_), false, false);
} else {
// Set default transport protocol if none of the listener filters did it.
if (socket_->detectedTransportProtocol().empty()) {
socket_->setDetectedTransportProtocol("raw_buffer");
}
accept_filters_.clear();
// Create a new connection on this listener.
listener_.newConnection(std::move(socket_), std::move(stream_info_));
}
}
newConnection 메소드를 호출하면, 다른 Listener로 Redirect 해야할 필요가 있는지를 살펴보고 listener에 새로운 Connection 할당을 요청합니다.
newConnection 메소드를 살펴보면, 먼저 Listener의 FilterChainManager로부터 해당 소켓 정보에 해당하는 Filter Chain을 찾는 과정을 수행합니다. 그리고 이 과정에서 매칭되는 Filter Chain을 찾지 못한다면, 연결된 Socket을 종료하고 마칩니다.
filter_chain_manager_impl.cc
const Network::FilterChain*
FilterChainManagerImpl::findFilterChain(const Network::ConnectionSocket& socket) const {
if (matcher_) {
return findFilterChainUsingMatcher(socket);
}
const auto& address = socket.connectionInfoProvider().localAddress();
const Network::FilterChain* best_match_filter_chain = nullptr;
// Match on destination port (only for IP addresses).
if (address->type() == Network::Address::Type::Ip) {
const auto port_match = destination_ports_map_.find(address->ip()->port());
if (port_match != destination_ports_map_.end()) {
best_match_filter_chain = findFilterChainForDestinationIP(*port_match->second.second, socket);
if (best_match_filter_chain != nullptr) {
return best_match_filter_chain;
} else {
// There is entry for specific port but none of the filter chain matches. Instead of
// matching catch-all port 0, the fallback filter chain is returned.
return default_filter_chain_.get();
}
}
}
// Match on catch-all port 0 if there is no specific port sub tree.
const auto port_match = destination_ports_map_.find(0);
if (port_match != destination_ports_map_.end()) {
best_match_filter_chain = findFilterChainForDestinationIP(*port_match->second.second, socket);
}
return best_match_filter_chain != nullptr
? best_match_filter_chain
// Neither exact port nor catch-all port matches. Use fallback filter chain.
: default_filter_chain_.get();
}
위 코드는 Filter Chain Manager로 부터 Filter Chain을 찾는 과정을 보여줍니다.
Filter Chain Manager는 Listener 기동 시점에 설정 정보를 파싱하여 Filter Chain 정보를 모두 가지고 있습니다. 따라서 Socket의 Address 정보를 토대로 원하는 Filter Chain을 찾아서 반환해줍니다. 만약에 상응하는 Filter Chain 정보를 찾지 못한 경우에는 Default Filter Chain을 반환합니다.
생성되는 ServerConnectionImpl의 생성자는 위와 같으며, 부호 생성자에게 전달받은 인자들을 넘깁니다.
connection_impl.cc
ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
TransportSocketPtr&& transport_socket,
StreamInfo::StreamInfo& stream_info, bool connected)
: ConnectionImplBase(dispatcher, next_global_id_++),
transport_socket_(std::move(transport_socket)), socket_(std::move(socket)),
stream_info_(stream_info), filter_manager_(*this, *socket_),
write_buffer_(dispatcher.getWatermarkFactory().createBuffer(
[this]() -> void { this->onWriteBufferLowWatermark(); },
[this]() -> void { this->onWriteBufferHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
read_buffer_(dispatcher.getWatermarkFactory().createBuffer(
[this]() -> void { this->onReadBufferLowWatermark(); },
[this]() -> void { this->onReadBufferHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
write_buffer_above_high_watermark_(false), detect_early_close_(true),
enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
transport_wants_read_(false) {
if (!connected) {
connecting_ = true;
}
Event::FileTriggerType trigger = Event::PlatformDefaultTriggerType;
// We never ask for both early close and read at the same time. If we are reading, we want to
// consume all available data.
socket_->ioHandle().initializeFileEvent(
dispatcher_, [this](uint32_t events) -> void { onFileEvent(events); }, trigger,
Event::FileReadyType::Read | Event::FileReadyType::Write);
transport_socket_->setTransportSocketCallbacks(*this);
// TODO(soulxu): generate the connection id inside the addressProvider directly,
// then we don't need a setter or any of the optional stuff.
socket_->connectionInfoProvider().setConnectionID(id());
socket_->connectionInfoProvider().setSslConnection(transport_socket_->ssl());
}
ConnectionImpl 생성자 내부를 살펴보면, Socket 내부에 Write/Read 전용 Buffer를 설정하는 것을 볼 수 있습니다. 해당 Buffer는 사용자가 HTTP 요청을 전달했을 때 데이터를 읽는 용도 그리고 upstream으로부터 응답 데이터가 전달되었을 때 기록하는 용도로 사용됩니다.
또한 downstream을 위한 transport_socket 생성 및 Filter Chains 관리를 위한 filter_manager를 생성합니다.
중요하게 살펴볼 부분은 향후 Socket 내부에 Read/Write 이벤트가 감지되면 이를 통지받고 후속 작업을 처리하기 위해 Dispatcher에게 onFileEvent()를 등록하는 부분입니다. 이는 내부적으로 다시 libevent에게 등록이 되며, 향후 Client가 HTTP 요청을 전달하게 되면, 해당 메소드로 요청 항목이 전달되어 후속 작업을 수행할 수 있습니다.
ServerConnection을 생성하고 나면, 그 다음에는 Filter Chains를 생성하는 작업을 수행합니다. 이를 위해 Listener 에게 FilterChain 생성을 요청합니다. 이때 생성되는 Filter Chains는 ServerConnection 내부에 매핑되어야하기 때문에 해당 정보를 Filter Chains와 같이 전달합니다.
filter_manager에서 addReadFilter가 호출되면, 생성된 Filter에 ActiveReadFilter 정보를 전달하여 해당 Filter 내에서 향후 Callback할 수 있도록 initializeReadFilterCallbacks를 등록합니다. 그리고 자신의 upstream_filters_에 Read Filter를 등록하는 것을 볼 수 있습니다.
등록된 Filter가 Http Connection Manager Filter임을 가정하였으므로, initializeReadFilter를 호출하면, 위 코드 구문이 호출될 것입니다. 해당 코드를 개략적으로 살펴보면, Callback을 자신의 프로퍼티에 등록하는 작업 외에 metric 정보를 갱신하는 것을 볼 수 있습니다. 또한 Timeout이 발생하면 이를 처리하기 위해 dispatcher에게 다양한 Timer 생성 및 Timeout 발생 시 처리 하도록 등록하는 등 Read Filter 동작과 관련된 기본 초기화 작업을 수행합니다.
Http Connection Manager의 경우는 Read Filter의 역할만을 수행하기 때문에 filter_manager 내부에 upstream_filters에만 filter가 추가됩니다. 하지만 FilterChain Factory 내부에는 Read Filter 뿐만 아니라 Writer Filter의 역할을 수행하는 것도 있고 Read/Write를 모두 수행하는 Filter가 존재합니다.
이 경우에는 람다내에서 ServerConnectionImpl에 Filter 생성을 요청할 때 addWriteFilter 혹은 addFilter 메소드를 호출합니다. 가령 addWriterFilter의 경우는 Writer Filter만의 역할을 수행하기 때문에 궁극적으로는 filter_manager의 downstream_filters_ 에 추가되면서 Read Filter 등록과 마찬가지로 ActiveWriterFilter 인스턴스를 만들어서 향후 Callback할 수 있도록 initializeWriteFilterCallbacks를 등록하는 것을 볼 수 있습니다.
반면 addFilter의 경우는 Read/Write를 모두 수행할 때 호출되기 때문에 addReadFilter와 addWriteFilter를 차례대로 호출합니다.
Filter Factories로 부터 Filter 생성이 모두 완료되면, ServerConnectionImpl 내부 filter_manager에는 upstream 전용 filter와 downstream 전용 filter가 모두 생성되어있습니다. 그 과정이 완료되면 ServerConnectionImpl 내부의 initializeReadFilters()를 호출 합니다.
그리고 해당 요청을 전달받은 ServerConnectionImpl 내부에서는 다시 filter_manager에게 해당 요청 처리를 위임합니다.
filter_manager_impl.cc
bool FilterManagerImpl::initializeReadFilters() {
if (upstream_filters_.empty()) {
return false;
}
onContinueReading(nullptr, connection_);
return true;
}
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter,
ReadBufferSource& buffer_source) {
// Filter could return status == FilterStatus::StopIteration immediately, close the connection and
// use callback to call this function.
if (connection_.state() != Connection::State::Open) {
return;
}
std::list<ActiveReadFilterPtr>::iterator entry;
if (!filter) {
connection_.streamInfo().addBytesReceived(buffer_source.getReadBuffer().buffer.length());
entry = upstream_filters_.begin();
} else {
entry = std::next(filter->entry());
}
for (; entry != upstream_filters_.end(); entry++) {
if (!(*entry)->filter_) {
continue;
}
if (!(*entry)->initialized_) {
(*entry)->initialized_ = true;
FilterStatus status = (*entry)->filter_->onNewConnection();
if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
return;
}
}
StreamBuffer read_buffer = buffer_source.getReadBuffer();
if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {
FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);
if (status == FilterStatus::StopIteration || connection_.state() != Connection::State::Open) {
return;
}
}
}
}
해당 처리 과정을 살펴보면, 초기에 upstream_filters_에 등록할 때 ActiveReadFilter 인스턴스를 새로 생성해서 등록했는데 초기 initialized_ 값은 기본적으로 false 입니다. 따라서 위 코드에서는 등록된 upstream_filters_ 를 순회하면서 가장 먼저 onNewConnection() 작업을 수행할 것입니다.
여기에는 등록된 Filter가 Http Connection Manager 하나만 존재한다고 가정하기 때문에, 해당 Filter의 onNewConnection()이 호출됩니다.
connection_manager_impl.cc
Network::FilterStatus ConnectionManagerImpl::onNewConnection() {
if (!read_callbacks_->connection().streamInfo().protocol()) {
// For Non-QUIC traffic, continue passing data to filters.
return Network::FilterStatus::Continue;
}
// Only QUIC connection's stream_info_ specifies protocol.
Buffer::OwnedImpl dummy;
createCodec(dummy);
ASSERT(codec_->protocol() == Protocol::Http3);
// Stop iterating through each filters for QUIC. Currently a QUIC connection
// only supports one filter, HCM, and bypasses the onData() interface. Because
// QUICHE already handles de-multiplexing.
return Network::FilterStatus::StopIteration;
}
해당 코드를 잠시 살펴보면, QUIC 트래픽인지 확인하고 일반 HTTP 프로토콜이라면 해당 Filter 사용이 가능하도록 구현되어있음을 확인할 수 있습니다.
여기까지 수행하면 Filter Chains를 생성하고 Filter의 초기화까지 수행하는 전 과정을 살펴볼 수 있습니다.
앞서 살펴본 긴 과정의 Filter Chains를 생성하고나면, Filter Chains가 존재하는지 여부를 bool 값으로 반환합니다. 만약에 Filter Chains 생성 과정에서 생성된 FilterChains가 전혀 존재하지 않는다면, Stream을 연결하여 작업을 이어나가는 것이 무의미하기 때문에 해당 Server Connection을 종료합니다. 그렇지 않을 경우 생성된 Filter Chains를 기반으로 ActiveConnection을 생성하는 과정을 진행합니다.
active_tcp_listener.cc
void ActiveTcpListener::newActiveConnection(const Network::FilterChain& filter_chain,
Network::ServerConnectionPtr server_conn_ptr,
std::unique_ptr<StreamInfo::StreamInfo> stream_info) {
auto& active_connections = getOrCreateActiveConnections(filter_chain);
auto active_connection =
std::make_unique<ActiveTcpConnection>(active_connections, std::move(server_conn_ptr),
dispatcher().timeSource(), std::move(stream_info));
// If the connection is already closed, we can just let this connection immediately die.
if (active_connection->connection_->state() != Network::Connection::State::Closed) {
ENVOY_CONN_LOG(
debug, "new connection from {}", *active_connection->connection_,
active_connection->connection_->connectionInfoProvider().remoteAddress()->asString());
active_connection->connection_->addConnectionCallbacks(*active_connection);
LinkedList::moveIntoList(std::move(active_connection), active_connections.connections_);
}
}
여기서 ActiveConnection이란 생성된 Filter Chains를 사용하는 Connection이 얼마나 있는지를 관리하기 위한 자료구조 입니다. 따라서 getOrCreateActiveConnection 메소드를 호출함으로써, 먼저 해당 Filter에 존재하는 ActiveConnection이 있는지를 살펴봅니다.
class ActiveConnections : public Event::DeferredDeletable {
public:
ActiveConnections(OwnedActiveStreamListenerBase& listener,
const Network::FilterChain& filter_chain);
~ActiveConnections() override;
// listener filter chain pair is the owner of the connections
OwnedActiveStreamListenerBase& listener_;
const Network::FilterChain& filter_chain_;
// Owned connections.
std::list<std::unique_ptr<ActiveTcpConnection>> connections_;
};
이때 ActiveConnections를 가져오는 메소드는 위 코드와 같으며, 내부적으로는 FilterChain 별로 ActiveConnection 목록을 관리하는 hash map을 통해서 참조하는 것을 확인할 수 있습니다.
ActiveConnections를 가져오면 신규로 생성하는 ActiveConnection을 만들고 ActiveConnections에 추가하는 것으로 Client의 접속 요청 이후 Connection 할당 과정은 마무리됩니다.
4. Client Connection 연결 과정 정리
이전에 Client Connection 연결 과정을 코드를 통해서 어떻게 구성되는지 집중적으로 살펴봤습니다. 다만 지엽적인 내용까지 살펴보느라 전체적인 흐름을 이해하기 쉽지 않았을 수도 있습니다. 따라서 이번에는 큰 그림에서 컴포넌트간 메시지 교환을 중점으로 Client Connection이 어떻게 연결되는지 개략적으로 다시 살펴보도록 하겠습니다.
1. Libevent 로 부터 Listener 가 Listen 하고 있는 소켓으로 사용자의 Connection 연결 요청이 접수되었을 때, 내부적으로 존재하는 Worker 쓰레드 중 하나를 선정하여 해당 Listener의 onSocketEvent메소드를 호출합니다. 참고로 위 예시에서는 Listener 0에 요청이 전달되었을 때 Worker 0번 쓰레드가 담당한다고 가정하여 ActiveTcpListener 0이 수신받았습니다.
2. ActiveTcpListener에서는 전체 연결된 Connection 개수가 Global Limit을 넘었는지 확인하고 내부적으로 AcceptedSocket을 거쳐 ActiveTcpSocket을 생성하여 Connection 연결을 Accept합니다.
3. 생성된 Socket에서 Listener Filter Chain을 생성하기 위해 Listener Config에게 Listener Filter Chain 정보를 요청합니다.
4. Listener Config는 Envoy 기동 과정에서 Parsing되었거나 LDS로 부터 갱신된 Listener Filter Chains 정보를 기반으로 Chains 내부에 존재하는 Factory Callback 메소드를 순회하면서 ActiveTcpSocket 내부에 있는 accept_filters에 ListenerFilter를 생성하여 바인딩합니다.
6. Filter Chains(Network Filters)를 생성하기 위해 ActiveTcpListener는 Listener Config 로부터 보유하고 있는 Filter Chains 정보를 요청합니다.
7. Listener Config 내부에 존재하는 filter_chain_manager에는 Trie, HashMap 등을 비롯하여 Filter Chain 정보를 빠르게 찾기 위한 다양한 자료구조가 존재하는데, 이를 활용하여 사용자 요청한 Filter Chain 정보를 제공합니다.
8. ActiveTcpListener는 ServerConnetionImpl을 생성하기 위해 Dispatcher에 요청합니다.
9. Dispatcher는 ServerConnectionImpl을 생성합니다.
10. 생성된 ServerConnectionImpl은 사용자가 접속 연결 이후 실질적으로 HTTP 요청 시, 이벤트를 수신 받아야되기 때문에 Dispatcher에게 Socket 이벤트를 전달하도록 등록 요청합니다.
11. Didpatcher는 Libevent에게 ServerConnectionImpl이 요청한 정보를 등록합니다. 이후 해당 Socket에 이벤트가 감지되면 ServerConnectionImpl이 등록한 onFileEvent 메소드를 호출합니다.
12. ActiveTcpListener는 Filter Chains를 생성하기 위해 Listner Config에게 처리를 요청합니다.
13. Listener Config는 Util 함수를 활용하여 Filter Chains를 생성합니다. 이때 Filter의 특성에 따라 Read Filter인 경우에는 ServerConnectionImpl 내에 있는 filter_manager의 upstream_filters_에 매핑되고 Write Filter라면 downstream_filters_에 매핑됩니다. 만약 Read/Write 둘 다 처리하는 경우에는 두 군데 모두 입력합니다.
14. 생성된 Filter Chain에 연결된 Server Connection 정보를 관리하기 위해 ActiveTcpSocket 내부에 있는 connections_by_contexts_에 신규로 생성된 ActiveConnection 정보를 등록합니다.
5. 마무리
이번 포스팅에서는 사용자가 HTTP 요청을 통해 envoy의 기능을 이용하고자 할 때 내부적으로 Listener 구성이 어떻게 되어있는지 그리고 Client Connection이 어떻게 할당되는지에 대해서 집중적으로 알아봤습니다.
envoy를 처음 학습하면, 이러한 과정이 마법처럼 느껴지는데 코드를 통해서 살펴보니 동작 과정에 대해서 어느정도 이해할 수 있었던 것 같습니다. 다음 포스팅에서는 연결이 완료된 이후 실제 HTTP 요청이 전달될 때 처리 과정에 대해서 살펴보겠습니다.