整体架构
Envoy 的架构如图所示:
Envoy 中也可能有多个 Listener,每个 Listener 中可能会有多个 filter 组成了 chain。
Envoy 接收到请求后,会先走 FilterChain,通过各种 L3/L4/L7 Filter 对请求进行微处理,然后再路由到指定的集群,并通过负载均衡获取一个目标地址,最后再转发出去。
在Envoy中,具有最为核心的四种资源:Listener,Router,Cluster,以及Filter。
Listener:Envoy工作的基础
简单理解,Listener是Envoy打开的一个监听端口,用于接收来自Downstream(客户端)连接。Envoy可以支持复数个Listener。多个Listener之间几乎所有的配置都是隔离的。Listener配置中核心包括监听地址、Filter链等。
Filter:强大源于可扩展
Filter,通俗的讲,就是插件。通过Filter机制,Envoy提供了极为强大的可扩展能力。
利用Filter机制,Envoy理论上可以实现任意协议的支持以及协议之间的转换,可以对请求流量进行全方位的修改和定制。强大的Filter机制带来的不仅仅是强大的可扩展性,同时还有优秀的可维护性。Filter机制让Envoy的使用者可以在不侵入社区源码的基础上对Envoy做各个方面的增强。
-
网络过滤器(Network Filters): 工作在 L3/L4,是 Envoy 网络连接处理的核心,处理的是原始字节,分为 Read、Write 和 Read/Write 三类。
-
HTTP 过滤器(HTTP Filters): 工作在 L7,由特殊的网络过滤器 HTTP connection manager 管理,专门处理 HTTP1/HTTP2/gRPC 请求。它将原始字节转换成 HTTP 格式,从而可以对 HTTP 协议进行精确控制。
-
网络过滤器(Network Filters): 工作在 L3/L4,是 Envoy 网络连接处理的核心,处理的是原始字节,分为 Read、Write 和 Read/Write 三类。
-
HTTP 过滤器(HTTP Filters): 工作在 L7,由特殊的网络过滤器 HTTP connection manager 管理,专门处理 HTTP1/HTTP2/gRPC 请求。它将原始字节转换成 HTTP 格式,从而可以对 HTTP 协议进行精确控制。
Cluster:对上游服务的抽象
在Envoy中,每个Upstream上游服务都被抽象成一个Cluster。Cluster包含该服务的连接池、超时时间、endpoints地址、端口、类型(类型决定了Envoy获取该Cluster具体可以访问的endpoint方法)等等。
Cluster对应的配置/资源发现服务称之为CDS。一般情况下,CDS服务会将其发现的所有可访问服务全量推送给Envoy。与CDS紧密相关的另一种服务称之为EDS。CDS服务负责Cluster资源的推送。而当该Cluster类型为EDS时,说明该Cluster的所有endpoints需要由xDS服务下发,而不使用DNS等去解析。下发endpoints的服务就称之为EDS。
Listener对应的配置/资源发现服务称之为LDS。LDS是Envoy正常工作的基础,没有LDS,Envoy就不能实现端口监听(如果启动配置也没有提供静态Listener的话),其他所有xDS服务也失去了作用。
Router:上下游之间的桥梁
Listener可以接收来自下游的连接,Cluster可以将流量发送给具体的上游服务,而Router则决定Listener在接收到下游连接和数据之后,应该将数据交给哪一个Cluster处理。它定义了数据分发的规则。虽然说到Router大部分时候都可以默认理解为HTTP路由,但是Envoy支持多种协议,如Dubbo、Redis等,所以此处Router泛指所有用于桥接Listener和后端服务(不限定HTTP)的规则与资源集合。
Route对应的配置/资源发现服务称之为RDS。Router中最核心配置包含匹配规则和目标Cluster,此外,也可能包含重试、分流、限流等等。
服务管理
静态配置
下面的配置将所有流量代理到 baidu.com,配置完成后我们应该能够通过请求 Envoy 的端点就可以直接看到百度的主页了,而无需更改 URL 地址。
static_resources:# 1. 监听器listeners:- name: listener_0address:socket_address: { address: 0.0.0.0, port_value: 10000 }# 2. 过滤器filter_chains:- filters:- name: envoy.http_connection_managerconfig:stat_prefix: ingress_httproute_config:name: local_routevirtual_hosts:- name: local_servicedomains: ["*"]routes:- match: { prefix: "/" }route: { host_rewrite: www.baidu.com, cluster: service_baidu }http_filters:- name: envoy.router# 3. 集群clusters:- name: service_baiduconnect_timeout: 0.25stype: LOGICAL_DNSdns_lookup_family: V4_ONLYlb_policy: ROUND_ROBINhosts: [{ socket_address: { address: www.baidu.com, port_value: 443 }}]tls_context: { sni: baidu.com }# 4. 管理
admin:access_log_path: /tmp/admin_access.logaddress:socket_address: { address: 0.0.0.0, port_value: 9901 }
动态配置xDS协议(动态服务发现)
Envoy 通过查询文件或管理服务器来动态发现资源。这些发现服务及其相应的 API 被统称为 xDS。
Envoy 通过订阅(subscription)方式来获取资源,如监控指定路径下的文件、启动 gRPC 流(streaming)或轮询 REST-JSON URL。后两种方式会发送 DiscoveryRequest 请求消息,发现的对应资源则包含在响应消息 DiscoveryResponse 中。
xDS 协议是 “X Discovery Service” 的简写,这里的 “X” 表示它不是指具体的某个协议,是一组基于不同数据源的服务发现协议的总称:
CDS:Cluster Discovery Service
EDS:Endpoint Discovery Service
SDS:Secret Discovery Service
RDS:Route Discovery Service
LDS:Listener Discovery Service
xDS 协议是由 Envoy 提出的,目前已成为服务网格的协议标准之一。
Envoy是 Istio 中默认的 sidecar 代理,但只要实现了 xDS 协议,理论上也可以作为 Istio 中的 sidecar 代理 —— 比如蚂蚁集团开源的 MOSN。
比如每个 Envoy 流以发送一个 DiscoveryRequest 开始,包括了列表订阅的资源、订阅资源对应的类型 URL、节点标识符和空的 version_info。EDS 请求示例如下:
version_info:
node: { id: envoy }
resource_names:
- foo
- bar
type_url: type.googleapis.com/envoy.api.v2.ClusterLoadAssignment
response_nonce
管理服务器可立刻或等待资源就绪时发送 DiscoveryResponse 作为响应,示例如下
version_info: X
resources:
- foo ClusterLoadAssignment proto encoding
- bar ClusterLoadAssignment proto encoding
type_url: type.googleapis.com/envoy.api.v2.ClusterLoadAssignment
nonce: A
源码分析
源码版本
release/v1.31
启动
Bootstrap文件详解
Envoy 内部对请求的处理流程其实跟我们上面脑补的流程大致相同,即对请求的处理流程基本是不变的,而对于变化的部分,即对请求数据的微处理,全部抽象为 Filter,例如对请求的读写是 ReadFilter、WriteFilter,对 HTTP 请求数据的编解码是 StreamEncoderFilter、StreamDecoderFilter,对 TCP 的处理是 TcpProxyFilter,其继承自 ReadFilter,对 HTTP 的处理是 ConnectionManager,其也是继承自 ReadFilter 等等,各个 Filter 最终会组织成一个 FilterChain,在收到请求后首先走 FilterChain,其次路由到指定集群并做负载均衡获取一个目标地址,然后转发出去。
Bootstrap启动过程
-
载入Bootstrap启动yaml文件
-
设置header prefix
-
初始化stats、设置TagProducer、StatsMatcher、HistogramSettings等
-
创建Server stats
-
注册assert action、bug action
-
设置healthy check为false
-
cluster manager包含了多阶段初始化,第一阶段要初始化的是static/DNS clsuter, 然后是预先定义的静态的EDS cluster, 如果包含了CDS需要等待CDS收到一个response,或者是失败的响应,最后初始化CDS,接着开始初始化CDS提供的Cluster。
-
如果集群提供了健康检查,Envoy还会进行一轮健康检查
-
等到cluster manager初始化完毕后,RDS和LDS开始初始化,直到收到响应或者失败,在此之前Envoy是不会接受连接来处理流量的
-
如果LDS的响应中嵌入了RDS的配置,那么还需要等待RDS收到响应,这个过程被称为listener warming
-
上述所有流程完毕后,listener开始接受流量。
数据流处理
总的来说,Envoy中的filter处理过程可以简单概括为:请求到达Listener -> 经过一系列filter处理 -> 转发给上游服务 -> 接收上游服务的响应 -> 经过相同filter链处理 -> 发送响应给客户端。
其中,将请求转发给上游服务的过程是由Proxy模块完成的。Proxy模块负责管理与上游服务之间的通信,包括建立连接、发送请求、接收响应等操作。一旦请求在经过所有的filter处理后,Proxy模块将负责将请求转发给相应的上游服务,并将上游服务返回的响应转发回客户端。Proxy模块在Envoy中扮演着重要的角色,确保请求能够顺利地到达目标服务并返回响应。
Listener 分析
listener流程
-
监听器配置:在Envoy的配置文件中,可以配置一个或多个listener来指定Envoy应该监听的地址和端口,以及应用于该listener的filter链。
-
传入连接接受:当Envoy启动时,每个listener会开始在其指定的地址和端口上监听传入的连接。一旦有新的连接建立,listener将会接受这个连接。
-
Filter链处理:一旦listener接受了传入连接,连接将会按照listener配置的顺序通过一系列的filter。每个filter可以对连接进行操作,比如解密、身份验证、流量控制等。
在监听处理时,为了截获关注的流量到listener端口,经常通过iptables等方式配置,将关注的流量转发至对应的listener统一处理。
Filter分析
Filter 作用在listen socket上,当有连接到来的时候,通过libevent会触发可读事件,调用listen socket的accept获取到连接socket封装为ConnectionSocket, 最后调用ActiveListener::onAccept,将获取到的连接socket作为其参数。
-
创建filter chain
-
continueFilterChain 调用filter chain
-
如果有filter返回了StopIteration,那么就开启timer,在这个时间内还没有继续continue就直接关闭当前socket
-
filter返回StopIteration后,要继续运行剩下的filter可以回调continueFilterChain
比如proxy_protocol这个listener filter当接收到一个filter后会注册读事件,从socket读取proxy协议头,所以会返回StopIteration 等到有数据可读的时候,并且读到了协议头才会回调continueFilterChain继续执行下面的filter
void ConnectionHandlerImpl::ActiveListener::onAccept(Network::ConnectionSocketPtr&& socket, bool hand_off_restored_destination_connections) {auto active_socket = std::make_unique<ActiveSocket>(*this, std::move(socket),hand_off_restored_destination_connections);// Create and run the filtersconfig_.filterChainFactory().createListenerFilterChain(*active_socket);active_socket->continueFilterChain(true);// Move active_socket to the sockets_ list if filter iteration needs to continue later.// Otherwise we let active_socket be destructed when it goes out of scope.if (active_socket->iter_ != active_socket->accept_filters_.end()) {// 启动了一个timer,避免filter长时间不调用active_socket->startTimer();active_socket->moveIntoListBack(std::move(active_socket), sockets_);}
}// 如果超时就从socket list中移除当前socket
void ConnectionHandlerImpl::ActiveSocket::onTimeout() {listener_.stats_.downstream_pre_cx_timeout_.inc();ASSERT(inserted());unlink();
}void ConnectionHandlerImpl::ActiveSocket::startTimer() {if (listener_.listener_filters_timeout_.count() > 0) {timer_ = listener_.parent_.dispatcher_.createTimer([this]() -> void { onTimeout(); });timer_->enableTimer(listener_.listener_filters_timeout_);}
}
四层filter执行链
-
Downstream连接建立后,开始创建filter,然后初始化filter
-
回调onNewConnection
-
回调onData
bool FilterManagerImpl::initializeReadFilters() {if (upstream_filters_.empty()) {return false;}// 初始化完成后,开始从头开始执行filteronContinueReading(nullptr);return true;
}// 传入的是nullptr的时候,从头开始执行filter的
// 设置initialized_标志为true
// 回调onNewConnection,如果是返回stop就停止运行了
// 等待filter返回通过ReadFilterCallbacks回调onContinueReading来继续执行
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) {std::list<ActiveReadFilterPtr>::iterator entry;if (!filter) {entry = upstream_filters_.begin();} else {entry = std::next(filter->entry());}for (; entry != upstream_filters_.end(); entry++) {if (!(*entry)->initialized_) {(*entry)->initialized_ = true;FilterStatus status = (*entry)->filter_->onNewConnection();if (status == FilterStatus::StopIteration) {return;}}BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);if (status == FilterStatus::StopIteration) {return;}}}
}
Example: 有三个filter、其中任何一个filter其中的一个callback返回StopIteration那么整个流程就停止了,需要等待调用onContinueReading才能继续 执行下一个callback方法。
FilterA::onNewConnection FilterA::onData
FilterB::onNewConnection FilterB::onData
FilterC::onNewConnection FilterC::onData
执行顺序为: FilterA::onNewConnection->FilterA::onData->FilterB::onNewConnection->FilterB::onData->FilterC::onNewConnection->FilterC::onData 任何一个callback返回StopIteration整个流程就不会继续往下走了,需要等待对应的filter回调onContinueReading,这样就会带来一个问题,一旦停止filter chain 继续往下走,那么网络层依然会收数据存在内部buffer里面,这会导致内存上涨,因此TCP PROXY中会在调用onNewConnection的时候先关闭读,然后和upstream建立连接 连接建立后才会开启读,防止内存被打爆。
目前实现的listener filter主要有original_dst
、original_src
、proxy_protocol
、tls_inspector
等,同时支持用户扩展自定义的filter。
original_dst filter
一般应用于通过iptables或者tproxy的方式将流量发送给envoy,导致原来要访问的地址信息丢失,但是可以通过从socket中获取到这些信息,交给envoy做listen转发。
-
主要就是从socket中获取到原来的目的地址信息 (
getsockopt(fd, SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len)
) -
然后设置socket的restore_local_address为原来的目的地址
Network::FilterStatus OriginalDstFilter::onAccept(Network::ListenerFilterCallbacks& cb) {ENVOY_LOG(debug, "original_dst: New connection accepted");Network::ConnectionSocket& socket = cb.socket();const Network::Address::Instance& local_address = *socket.localAddress();if (local_address.type() == Network::Address::Type::Ip) {Network::Address::InstanceConstSharedPtr original_local_address =getOriginalDst(socket.ioHandle().fd());// A listener that has the use_original_dst flag set to true can still receive// connections that are NOT redirected using iptables. If a connection was not redirected,// the address returned by getOriginalDst() matches the local address of the new socket.// In this case the listener handles the connection directly and does not hand it off.if (original_local_address) {// Restore the local address to the original one.socket.restoreLocalAddress(original_local_address);}}return Network::FilterStatus::Continue;
四层filter执行链
-
Downstream连接建立后,开始创建filter,然后初始化filter
-
回调onNewConnection
-
回调onData
bool FilterManagerImpl::initializeReadFilters() {if (upstream_filters_.empty()) {return false;}// 初始化完成后,开始从头开始执行filteronContinueReading(nullptr);return true;
}// 传入的是nullptr的时候,从头开始执行filter的
// 设置initialized_标志为true
// 回调onNewConnection,如果是返回stop就停止运行了
// 等待filter返回通过ReadFilterCallbacks回调onContinueReading来继续执行
void FilterManagerImpl::onContinueReading(ActiveReadFilter* filter) {std::list<ActiveReadFilterPtr>::iterator entry;if (!filter) {entry = upstream_filters_.begin();} else {entry = std::next(filter->entry());}for (; entry != upstream_filters_.end(); entry++) {if (!(*entry)->initialized_) {(*entry)->initialized_ = true;FilterStatus status = (*entry)->filter_->onNewConnection();if (status == FilterStatus::StopIteration) {return;}}BufferSource::StreamBuffer read_buffer = buffer_source_.getReadBuffer();if (read_buffer.buffer.length() > 0 || read_buffer.end_stream) {FilterStatus status = (*entry)->filter_->onData(read_buffer.buffer, read_buffer.end_stream);if (status == FilterStatus::StopIteration) {return;}}}
}
Example: 有三个filter、其中任何一个filter其中的一个callback返回StopIteration那么整个流程就停止了,需要等待调用onContinueReading才能继续 执行下一个callback方法。
FilterA::onNewConnection FilterA::onData
FilterB::onNewConnection FilterB::onData
FilterC::onNewConnection FilterC::onData
执行顺序为: FilterA::onNewConnection->FilterA::onData->FilterB::onNewConnection->FilterB::onData->FilterC::onNewConnection->FilterC::onData 任何一个callback返回StopIteration整个流程就不会继续往下走了,需要等待对应的filter回调onContinueReading,这样就会带来一个问题,一旦停止filter chain 继续往下走,那么网络层依然会收数据存在内部buffer里面,这会导致内存上涨,因此TCP PROXY中会在调用onNewConnection的时候先关闭读,然后和upstream建立连接 连接建立后才会开启读,防止内存被打爆。
目前实现的listener filter主要有original_dst
、original_src
、proxy_protocol
、tls_inspector
等,同时支持用户扩展自定义的filter。
original_dst filter
一般应用于通过iptables或者tproxy的方式将流量发送给envoy,导致原来要访问的地址信息丢失,但是可以通过从socket中获取到这些信息,交给envoy做listen转发。
-
主要就是从socket中获取到原来的目的地址信息 (
getsockopt(fd, SOL_IP, SO_ORIGINAL_DST, &orig_addr, &addr_len)
) -
然后设置socket的restore_local_address为原来的目的地址
Network::FilterStatus OriginalDstFilter::onAccept(Network::ListenerFilterCallbacks& cb) {ENVOY_LOG(debug, "original_dst: New connection accepted");Network::ConnectionSocket& socket = cb.socket();const Network::Address::Instance& local_address = *socket.localAddress();if (local_address.type() == Network::Address::Type::Ip) {Network::Address::InstanceConstSharedPtr original_local_address =getOriginalDst(socket.ioHandle().fd());// A listener that has the use_original_dst flag set to true can still receive// connections that are NOT redirected using iptables. If a connection was not redirected,// the address returned by getOriginalDst() matches the local address of the new socket.// In this case the listener handles the connection directly and does not hand it off.if (original_local_address) {// Restore the local address to the original one.socket.restoreLocalAddress(original_local_address);}}return Network::FilterStatus::Continue;
original_src filter
L3/L4 transparency的含义: L3要求源IP可见、L4要求端口可见,这个filter的目的是将原地址信息透传到upstream,让upstream可以 获取到真实的源IP和端口信息。
proxy_protocol filter
建立连接后发送一段数据来传递源地址和端口信息。
// 连接建立后,开始注册读事件,读取传递过来的数据。
Network::FilterStatus Filter::onAccept(Network::ListenerFilterCallbacks& cb) {ENVOY_LOG(debug, "proxy_protocol: New connection accepted");Network::ConnectionSocket& socket = cb.socket();ASSERT(file_event_.get() == nullptr);file_event_ =cb.dispatcher().createFileEvent(socket.ioHandle().fd(),[this](uint32_t events) {ASSERT(events == Event::FileReadyType::Read);onRead();},Event::FileTriggerType::Edge, Event::FileReadyType::Read);cb_ = &cb;return Network::FilterStatus::StopIteration;
}void Filter::onRead() {try {onReadWorker();} catch (const EnvoyException& ee) {config_->stats_.downstream_cx_proxy_proto_error_.inc();cb_->continueFilterChain(false);}
}// 读取proxy头,这里的读取是通过ioctl(fd, FIONREAD, &bytes_avail) 来获取缓存中的数据大小 ,
// 然后通过MSG_PEEK的方式查看数据。并不是直接read,因为一旦不是proxy ptorocol协议头会导致数据不完整(被读走了)。void Filter::onReadWorker() {Network::ConnectionSocket& socket = cb_->socket();if ((!proxy_protocol_header_.has_value() && !readProxyHeader(socket.ioHandle().fd())) ||(proxy_protocol_header_.has_value() && !parseExtensions(socket.ioHandle().fd()))) {// We return if a) we do not yet have the header, or b) we have the header but not yet all// the extension data. In both cases we'll be called again when the socket is ready to read// and pick up where we left off.return;}....// 读取完成后,拿到获取的源地址信息进行操作。// Only set the local address if it really changed, and mark it as address being restored.if (*proxy_protocol_header_.value().local_address_ != *socket.localAddress()) {socket.restoreLocalAddress(proxy_protocol_header_.value().local_address_);}socket.setRemoteAddress(proxy_protocol_header_.value().remote_address_);....
}
TLS Inspector filter
TLS Inspector listener filter allows detecting whether the transport appears to be TLS or plaintext, and if it is TLS, it detects the Server Name Indication and/or Application-Layer Protocol Negotiation from the client. This can be used to select a FilterChain via the server_names and/or application_protocols of a FilterChainMatch.
-
注册读数据,等待数据到来
-
解析Client hello报文
-
找到TLS信息,设置TransportSocket为Tls
Cluster分析
clusuter的定义
在Envoy中Cluster
表示的是一个集群,一个集群主要包含两个部分的信息,一个部分是这个集群相关的配置,比如集群的名字、集群下机器建链的超时时间、负载均衡策略、建立链接用什么协议等等。 另外一个部分则是这个集群下所包含的机器列表。
在isto中 一组服务通过标签筛选中的上游pod及对应这里的集群下的集群列表, 服务则对应envoy 中的 cluster。
例如下面这个例子。
clusters:- name: statsdtype: STATICconnect_timeout: 0.25slb_policy: ROUND_ROBINload_assignment:cluster_name: statsdendpoints:- lb_endpoints:- endpoint:address:socket_address:address: 127.0.0.1port_value: 8125protocol: TCP
上面这段yaml定义了一个名为statsd的集群,负载均衡策略是ROUND_ROBIN
、连接超时时间是0.25s
、这个集群下面有一个机器,这个集群的类型是STATIC
。根据这段yaml
Envoy就会创建一个Cluster
对象。 这个Cluster对象并非是一个通用的对象,而且根据yaml中的type字段,找到对象类型的Cluster的构造工厂函数来进行构造。
而STRICT_DNS类型的Cluster则是通过DNS查询指定域名来获取机器列表的。EDS类型的Cluster则是通过发送EDS请求给控制面来获取机器列表的。无论是何种方式获取, 最终机器列表都是存在envoy::config::endpoint::v3::ClusterLoadAssignment
这样的protobuf message中的。
message ClusterLoadAssignment {// Load balancing policy settings.// [#next-free-field: 6]message Policy {// [#not-implemented-hide:]message DropOverload {// Identifier for the policy specifying the drop.string category = 1 [(validate.rules).string = {min_bytes: 1}];// Percentage of traffic that should be dropped for the category.type.FractionalPercent drop_percentage = 2;}reserved 1;// Action to trim the overall incoming traffic to protect the upstream// hosts. This action allows protection in case the hosts are unable to// recover from an outage, or unable to autoscale or unable to handle// incoming traffic volume for any reason.//// At the client each category is applied one after the other to generate// the 'actual' drop percentage on all outgoing traffic. For example://// .. code-block:: json//// { "drop_overloads": [// { "category": "throttle", "drop_percentage": 60 }// { "category": "lb", "drop_percentage": 50 }// ]}//// The actual drop percentages applied to the traffic at the clients will be// "throttle"_drop = 60%// "lb"_drop = 20% // 50% of the remaining 'actual' load, which is 40%.// actual_outgoing_load = 20% // remaining after applying all categories.// [#not-implemented-hide:]repeated DropOverload drop_overloads = 2;// Priority levels and localities are considered overprovisioned with this// factor (in percentage). This means that we don't consider a priority// level or locality unhealthy until the percentage of healthy hosts// multiplied by the overprovisioning factor drops below 100.// With the default value 140(1.4), Envoy doesn't consider a priority level// or a locality unhealthy until their percentage of healthy hosts drops// below 72%. For example://// .. code-block:: json//// { "overprovisioning_factor": 100 }//// Read more at :ref:`priority levels <arch_overview_load_balancing_priority_levels>` and// :ref:`localities <arch_overview_load_balancing_locality_weighted_lb>`.google.protobuf.UInt32Value overprovisioning_factor = 3 [(validate.rules).uint32 = {gt: 0}];// The max time until which the endpoints from this assignment can be used.// If no new assignments are received before this time expires the endpoints// are considered stale and should be marked unhealthy.// Defaults to 0 which means endpoints never go stale.google.protobuf.Duration endpoint_stale_after = 4 [(validate.rules).duration = {gt {}}];// The flag to disable overprovisioning. If it is set to true,// :ref:`overprovisioning factor// <arch_overview_load_balancing_overprovisioning_factor>` will be ignored// and Envoy will not perform graceful failover between priority levels or// localities as endpoints become unhealthy. Otherwise Envoy will perform// graceful failover as :ref:`overprovisioning factor// <arch_overview_load_balancing_overprovisioning_factor>` suggests.// [#not-implemented-hide:]bool disable_overprovisioning = 5 [deprecated = true];}// Name of the cluster. This will be the :ref:`service_name// <envoy_api_field_Cluster.EdsClusterConfig.service_name>` value if specified// in the cluster :ref:`EdsClusterConfig// <envoy_api_msg_Cluster.EdsClusterConfig>`.string cluster_name = 1 [(validate.rules).string = {min_bytes: 1}];// List of endpoints to load balance to.repeated endpoint.LocalityLbEndpoints endpoints = 2;// Map of named endpoints that can be referenced in LocalityLbEndpoints.// [#not-implemented-hide:]map<string, endpoint.Endpoint> named_endpoints = 5;// Load balancing policy settings.Policy policy = 4;
}
一个集群下面是多个LocalityLbEndpoints
,一个LocalityLbEndpoints
包含一个Locality
、一个优先级、一个区域的权重、以及一批LbEndpoint
一个LbEndpoint
包含了一个机器和对应的元数据和权重。
message LocalityLbEndpoints {// Identifies location of where the upstream hosts run.core.Locality locality = 1;// The group of endpoints belonging to the locality specified.repeated LbEndpoint lb_endpoints = 2;// Optional: Per priority/region/zone/sub_zone weight; at least 1. The load// balancing weight for a locality is divided by the sum of the weights of all// localities at the same priority level to produce the effective percentage// of traffic for the locality. The sum of the weights of all localities at// the same priority level must not exceed uint32_t maximal value (4294967295).//// Locality weights are only considered when :ref:`locality weighted load// balancing <arch_overview_load_balancing_locality_weighted_lb>` is// configured. These weights are ignored otherwise. If no weights are// specified when locality weighted load balancing is enabled, the locality is// assigned no load.google.protobuf.UInt32Value load_balancing_weight = 3 [(validate.rules).uint32 = {gte: 1}];// Optional: the priority for this LocalityLbEndpoints. If unspecified this will// default to the highest priority (0).//// Under usual circumstances, Envoy will only select endpoints for the highest// priority (0). In the event all endpoints for a particular priority are// unavailable/unhealthy, Envoy will fail over to selecting endpoints for the// next highest priority group.//// Priorities should range from 0 (highest) to N (lowest) without skipping.uint32 priority = 5 [(validate.rules).uint32 = {lte: 128}];// Optional: Per locality proximity value which indicates how close this// locality is from the source locality. This value only provides ordering// information (lower the value, closer it is to the source locality).// This will be consumed by load balancing schemes that need proximity order// to determine where to route the requests.// [#not-implemented-hide:]google.protobuf.UInt32Value proximity = 6;
}
Primary Cluster 和 Secondary Cluster
在 Envoy 中,将构建两种集群 Primary 集群(Primary Cluster) 和 Secondary 集群(Secondary Cluster) 主要用于 动态负载均衡 和 服务发现(Service Discovery) 机制,二者的区别主要体现在 管理方式 和 负载均衡的动态性 上。
Primary 集群(Primary Cluster)
概念:
-
Primary 集群 是 Envoy 启动时 解析并加载的集群,通常是静态定义的(但也可以是动态发现的)。
-
这些集群通常 不会自动更新,除非 Envoy 重新加载配置。
特点:
-
可以是静态的(static)或者 动态的(EDS, DNS)。
-
优先级较高,通常是 Envoy 最先解析并初始化 的集群。
-
变更需要重新加载,或者依赖 动态服务发现(EDS) 来更新。
-
适用于 长期稳定的后端服务,如数据库、监控系统等。
示例:
clusters:- name: primary_clustertype: STATICload_assignment:cluster_name: primary_clusterendpoints:- lb_endpoints:- endpoint:address:socket_address:address: 10.0.0.1port_value: 80
Secondary 集群
概念:
-
Secondary 集群 是在 Envoy 运行时 通过 xDS(如 CDS、EDS)动态添加和管理的。
-
这些集群可以 在运行时动态添加、删除或修改,而无需重新加载 Envoy。
特点:
-
完全依赖 xDS 发现(如 CDS、EDS),不会在启动时静态加载。
-
动态性更强,适合需要 频繁变更 的服务(如微服务)。
-
适用于大规模可变的服务发现场景,如 Kubernetes 中的 Pod 变更。
示例(CDS 动态发现):
dynamic_resources:cds_config:ads: {}
Cluster构建
无论是Primary cluster、还是Secondary Cluster,最终都是通过loadCluster把Cluster Protobuf变成Cluster对象。这两者的区别就是added_via_api
,前者为false、后者为true。这个参数表明是否是通过API获取的。很明显Primary都不是通过API来获取的。
absl::StatusOr<ClusterManagerImpl::ClusterDataPtr>
ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,const uint64_t cluster_hash, const std::string& version_info,bool added_via_api, const bool required_for_ads,ClusterMap& cluster_map)
这个方法主要做了以下几件事:
1. 通过ClusterManagerFactory以及Cluster的Protobuf来创建Cluster
和ThreadAwareLoadBalancer
source/common/upstream/cluster_manager_impl.cc
absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>new_cluster_pair_or_error =factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api);
Cluster
是对集群的抽象,而ThreadAwareLoadBalancer
则是对这个集群Load Balancer
的抽象,这个load balancer是感知线程的。 在实现自定义集群的时候需要自己来实现,目前Envoy中只有Dynamic forward proxy
、Aggregate
、redis
等三种类似的集群是实现了ThreadAwareLoadBalancer
接口, 他们有自己专用的LoadBalancer,其他的集群用的都是Envoy内置的的几个标准Load Balancer实现。比如Aggregate
集群的构造函数如下,他创建了AggregateThreadAwareLoadBalancer
, 属于这个集群特有的LoadBalancer
source/extensions/clusters/aggregate/cluster.cc
absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
ClusterFactory::createClusterWithConfig(const envoy::config::cluster::v3::Cluster& cluster,const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,Upstream::ClusterFactoryContext& context) {absl::Status creation_status = absl::OkStatus();auto new_cluster =std::shared_ptr<Cluster>(new Cluster(cluster, proto_config, context, creation_status));RETURN_IF_NOT_OK(creation_status);auto lb = std::make_unique<AggregateThreadAwareLoadBalancer>(*new_cluster);return std::make_pair(new_cluster, std::move(lb));
}
2. 设置healthChecker、outlierDetector等callback
ASSERT(state_ == State::WaitingToStartSecondaryInitialization ||state_ == State::CdsInitialized ||state_ == State::WaitingForPrimaryInitializationToComplete);ENVOY_LOG(debug, "maybe finish initialize primary init clusters empty: {}",primary_init_clusters_.empty());// If we are still waiting for primary clusters to initialize, do nothing.if (!primary_init_clusters_.empty()) {return;} else if (state_ == State::WaitingForPrimaryInitializationToComplete) {state_ = State::WaitingToStartSecondaryInitialization;if (primary_clusters_initialized_callback_) {primary_clusters_initialized_callback_();}return;}
首先判断是否完成了Primary Cluster的初始化,Primary Cluster初始化完成的标志就是primary_init_clusters_为空,因为载入的时候会把所有的Primary CLuster存进去, 然后遍历这个列表进行初始化,初始化完成的话则从这个列表中移除,因此这个列表为空就表明初始化完成了。
void ClusterManagerInitHelper::addCluster(ClusterManagerCluster& cm_cluster) {// See comments in ClusterManagerImpl::addOrUpdateCluster() for why this is only called during// server initialization.ASSERT(state_ != State::AllClustersInitialized);const auto initialize_cb = [&cm_cluster, this] {onClusterInit(cm_cluster);cm_cluster.cluster().info()->configUpdateStats().warming_state_.set(0);};Cluster& cluster = cm_cluster.cluster();cluster.info()->configUpdateStats().warming_state_.set(1);if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {// Remove the previous cluster before the cluster object is destroyed.primary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);cluster.initialize(initialize_cb);} else {ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);// Remove the previous cluster before the cluster object is destroyed.secondary_init_clusters_.insert_or_assign(cm_cluster.cluster().info()->name(), &cm_cluster);if (started_secondary_initialize_) {// This can happen if we get a second CDS update that adds new clusters after we have// already started secondary init. In this case, just immediately initialize.cluster.initialize(initialize_cb);}}ENVOY_LOG(debug, "cm init: adding: cluster={} primary={} secondary={}", cluster.info()->name(),primary_init_clusters_.size(), secondary_init_clusters_.size());
}void ClusterManagerInitHelper::onClusterInit(ClusterManagerCluster& cluster) {ASSERT(state_ != State::AllClustersInitialized);per_cluster_init_callback_(cluster);removeCluster(cluster);
}void ClusterManagerInitHelper::removeCluster(ClusterManagerCluster& cluster) {if (state_ == State::AllClustersInitialized) {return;}// There is a remote edge case where we can remove a cluster via CDS that has not yet been// initialized. When called via the remove cluster API this code catches that case.absl::flat_hash_map<std::string, ClusterManagerCluster*>* cluster_map;if (cluster.cluster().initializePhase() == Cluster::InitializePhase::Primary) {cluster_map = &primary_init_clusters_;} else {ASSERT(cluster.cluster().initializePhase() == Cluster::InitializePhase::Secondary);cluster_map = &secondary_init_clusters_;}// It is possible that the cluster we are removing has already been initialized, and is not// present in the initializer map. If so, this is fine as a CDS update may happen for a// cluster with the same name. See the case "UpdateAlreadyInitialized" of the// target //test/common/upstream:cluster_manager_impl_test.auto iter = cluster_map->find(cluster.cluster().info()->name());if (iter != cluster_map->end() && iter->second == &cluster) {cluster_map->erase(iter);}ENVOY_LOG(debug, "cm init: init complete: cluster={} primary={} secondary={}",cluster.cluster().info()->name(), primary_init_clusters_.size(),secondary_init_clusters_.size());maybeFinishInitialize();
}
3. 如果Primary集群都初始化完成了,那接下来就看是否是在做Secondary cluster的初始化
secondary_init_clusters_
不为空表明Secondary cluster还没有开始初始化或者没初始化完成,这个时候如果started_secondary_initialize_
为false,表明 没有开始初始化。此时通过调用initializeSecondaryClusters
开始正在的进行Secondary的初始化。
// If we are still waiting for secondary clusters to initialize, see if we need to first call// initialize on them. This is only done once.ENVOY_LOG(debug, "maybe finish initialize secondary init clusters empty: {}",secondary_init_clusters_.empty());if (!secondary_init_clusters_.empty()) {if (!started_secondary_initialize_) {ENVOY_LOG(info, "cm init: initializing secondary clusters");// If the first CDS response doesn't have any primary cluster, ClusterLoadAssignment// should be already paused by CdsApiImpl::onConfigUpdate(). Need to check that to// avoid double pause ClusterLoadAssignment.Config::ScopedResume maybe_resume_eds_leds_sds;if (cm_.adsMux()) {const std::vector<std::string> paused_xds_types{Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(),Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>(),Config::getTypeUrl<envoy::extensions::transport_sockets::tls::v3::Secret>()};maybe_resume_eds_leds_sds = cm_.adsMux()->pause(paused_xds_types);}initializeSecondaryClusters();}return;
4. 初始化CDS,否则没有拿到所有的cluster没办法进行Seondary cluster的初始化
// At this point, if we are doing static init, and we have CDS, start CDS init. Otherwise, move// directly to initialized.started_secondary_initialize_ = false;ENVOY_LOG(debug, "maybe finish initialize cds api ready: {}", cds_ != nullptr);if (state_ == State::WaitingToStartSecondaryInitialization && cds_) {ENVOY_LOG(info, "cm init: initializing cds");state_ = State::WaitingToStartCdsInitialization;cds_->initialize();} else {ENVOY_LOG(info, "cm init: all clusters initialized");state_ = State::AllClustersInitialized;if (initialized_callback_) {initialized_callback_();}}
cds初始化完成后会发送xds请求给控制面获取所有的cluster,当收到所有的cluster的时候,就触发cds设置的callback,在callback里面会再次触发maybeFinishInitialize
这个时候就走到了步骤3中的逻辑了。
void ClusterManagerInitHelper::setCds(CdsApi* cds) {ASSERT(state_ == State::Loading);cds_ = cds;if (cds_) {cds_->setInitializedCb([this]() -> void {ASSERT(state_ == State::WaitingToStartCdsInitialization);state_ = State::CdsInitialized;maybeFinishInitialize();});}
}
cluster在线程中的同步
每一个Cluster初始化完成后都会在其callback中调用这个方法进行Cluster额外的初始化。在这个初始化中会添加一些callback 最后触发thread local cluster的更新,以确保每一个thread都包含了最新的cluster内容了。
void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster)
在运行中一个Cluster动态初始化完成或者更新后,需要更新所有的Thread Local中,让所有的Thread可以拿到最新的Cluster
void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,ThreadLocalClusterUpdateParams&& params);
设置集群的AddedOrUpdated
位,表明已经更新了
bool add_or_update_cluster = false;if (!cm_cluster.addedOrUpdated()) {add_or_update_cluster = true;cm_cluster.setAddedOrUpdated();}
开始生成update hosts params、locality weight、overprovision_factor等需要参数
各个thread中的Cluster Priority Set会根据这些参数来进行更新。
for (auto& per_priority : params.per_priority_update_params_) {const auto& host_set =cm_cluster.cluster().prioritySet().hostSetsPerPriority()[per_priority.priority_];per_priority.update_hosts_params_ = HostSetImpl::updateHostsParams(*host_set);per_priority.locality_weights_ = host_set->localityWeights();per_priority.weighted_priority_health_ = host_set->weightedPriorityHealth();per_priority.overprovisioning_factor_ = host_set->overprovisioningFactor();}
在各个线程中获取到ThreadLocalClusterManagerImpl同步更新
tls_.runOnAllThreads([info = cm_cluster.cluster().info(), params = std::move(params),add_or_update_cluster, load_balancer_factory, map = std::move(host_map),cluster_initialization_object = std::move(cluster_initialization_object),drop_overload](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {ASSERT(cluster_manager.has_value(),"Expected the ThreadLocalClusterManager to be set during ClusterManagerImpl creation.");// Cluster Manager here provided by the particular thread, it will provide// this allowing to make the relevant change.if (const bool defer_unused_clusters =cluster_initialization_object != nullptr &&!cluster_manager->thread_local_clusters_.contains(info->name()) &&!Envoy::Thread::MainThread::isMainThread();defer_unused_clusters) {// Save the cluster initialization object.ENVOY_LOG(debug, "Deferring add or update for TLS cluster {}", info->name());cluster_manager->thread_local_deferred_clusters_[info->name()] =cluster_initialization_object;// Invoke similar logic of onClusterAddOrUpdate.ThreadLocalClusterCommand command = [&cluster_manager,cluster_name = info->name()]() -> ThreadLocalCluster& {// If we have multiple callbacks only the first one needs to use the// command to initialize the cluster.auto existing_cluster_entry = cluster_manager->thread_local_clusters_.find(cluster_name);if (existing_cluster_entry != cluster_manager->thread_local_clusters_.end()) {return *existing_cluster_entry->second;}auto* cluster_entry = cluster_manager->initializeClusterInlineIfExists(cluster_name);ASSERT(cluster_entry != nullptr, "Deferred clusters initiailization should not fail.");return *cluster_entry;};for (auto cb_it = cluster_manager->update_callbacks_.begin();cb_it != cluster_manager->update_callbacks_.end();) {// The current callback may remove itself from the list, so a handle for// the next item is fetched before calling the callback.auto curr_cb_it = cb_it;++cb_it;(*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);}} else {// BroadcastThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;if (add_or_update_cluster) {if (cluster_manager->thread_local_clusters_.contains(info->name())) {ENVOY_LOG(debug, "updating TLS cluster {}", info->name());} else {ENVOY_LOG(debug, "adding TLS cluster {}", info->name());}new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,load_balancer_factory);cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);cluster_manager->local_stats_.clusters_inflated_.set(cluster_manager->thread_local_clusters_.size());}if (cluster_manager->thread_local_clusters_[info->name()]) {cluster_manager->thread_local_clusters_[info->name()]->setDropOverload(drop_overload);}for (const auto& per_priority : params.per_priority_update_params_) {cluster_manager->updateClusterMembership(info->name(), per_priority.priority_, per_priority.update_hosts_params_,per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_,per_priority.weighted_priority_health_, per_priority.overprovisioning_factor_, map);}if (new_cluster != nullptr) {ThreadLocalClusterCommand command = [&new_cluster]() -> ThreadLocalCluster& {return *new_cluster;};for (auto cb_it = cluster_manager->update_callbacks_.begin();cb_it != cluster_manager->update_callbacks_.end();) {// The current callback may remove itself from the list, so a handle for// the next item is fetched before calling the callback.auto curr_cb_it = cb_it;++cb_it;(*curr_cb_it)->onClusterAddOrUpdate(info->name(), command);}}}});
router分析
Envoy 通过 route_config 规则匹配请求路径:
- 根据 Host、Path、Method 进行匹配。
- 将请求路由到 指定集群(Cluster)。
routes:- match:prefix: "/api"route:cluster: backend_service
此外流量定向到对应的cluster后,需要获取custer下的主机列表做负载均衡
- Envoy 支持 多种负载均衡策略:
- Round Robin(轮询)
- Random(随机)
- Least Request(最少请求)
- Ring Hash(哈希一致性)
- 负载均衡策略在 Cluster 配置 中定义:
clusters:- name: backend_servicetype: EDSlb_policy: ROUND_ROBINload_assignment:cluster_name: backend_serviceendpoints:- lb_endpoints:- endpoint:address:socket_address:address: 10.0.0.1port_value: 8080