欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > 【SpringCloud】 K8s的滚动更新中明明已经下掉旧Pod,还是会把流量分到了不存活的节点

【SpringCloud】 K8s的滚动更新中明明已经下掉旧Pod,还是会把流量分到了不存活的节点

2024/11/30 8:48:15 来源:https://blog.csdn.net/SSHH_ZHU/article/details/143182751  浏览:    关键词:【SpringCloud】 K8s的滚动更新中明明已经下掉旧Pod,还是会把流量分到了不存活的节点

系列文章目录


文章目录

  • 系列文章目录
  • 前言
  • 一、初步定位问题
  • 二、源码解释
    • 1.引入库
      • 核心问题代码
      • 进一步往下看【这块儿算是只是拓展了,问题其实处在上面的代码】
        • Nacos是如何实现的?
    • 如何解决
  • 总结


前言

背景:
使用了SpringCloudGateWay 和 SpringCloud 全套的组件,内网服务指之间通信和请求使用FeignClient (基于HTTP的)的一个客户端。

现况:运维已经增加了5秒的服务下线等待,并且不会让流量在打到下线的服务,但是还是有相关请求路由到了不可用的服务节点上


一、初步定位问题

org.springframework.cloud.loadbalancer.cache.LoadBalancerCacheProperties 是配置SCG从Nacos获取路由配置`表的一个缓存配置,默认是打开状态并且缓存了35秒。

由于SCG使用了LoadBanalcer请求转发 ,此配置会导致,猜测服务下线时候获取到了已经不可用的或者下线的节点会导致503、500、504或者请求超时等等…

源码断点
请添加图片描述

在这里插入图片描述

二、源码解释

1.引入库

核心问题代码

/** Copyright 2012-2020 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**      https://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.springframework.cloud.loadbalancer.core;import java.util.List;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.cache.CacheFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cloud.client.ServiceInstance;/*** A {@link ServiceInstanceListSupplier} implementation that tries retrieving* {@link ServiceInstance} objects from cache; if none found, retrieves instances using* {@link DiscoveryClientServiceInstanceListSupplier}.** @author Spencer Gibb 大佬1* @author Olga Maciaszek-Sharma 大佬2* @since 2.2.0 2.20的版本*/
public class CachingServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {private static final Log log = LogFactory.getLog(CachingServiceInstanceListSupplier.class);/*** 缓存的名字*/public static final String SERVICE_INSTANCE_CACHE_NAME = CachingServiceInstanceListSupplier.class.getSimpleName()+ "Cache";/*** 缓存Flux集合*/private final Flux<List<ServiceInstance>> serviceInstances;@SuppressWarnings("unchecked")public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {super(delegate);//父类方法//在CacheFlux中寻找对应serviceName的服务,这里使用了一个CacheFlux//如何使用看这里:具体的API文档在这里【这个可能会在未来的版本会去掉】:https://projectreactor.io/docs/extra/release/api/reactor/cache/CacheFlux.htmlthis.serviceInstances = CacheFlux.lookup(key -> {// TODO: configurable cache name 【这里似乎有一个TODO代码需要处理,可以指定缓存的的名字了~可能未来版本会支持】Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);//缓存不存在说明缓存管理器有问题,这里一般情况不会进来,是一个为了逻辑完整的代码if (cache == null) {if (log.isErrorEnabled()) {log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME);}return Mono.empty();}//找到了缓存,但是缓存为空,说明缓存不存在List<ServiceInstance> list = cache.get(key, List.class);if (list == null || list.isEmpty()) {return Mono.empty();}//如果存在,则Flux.just返回这个缓存列表return Flux.just(list).materialize().collectList();}, delegate.getServiceId())//如果换成没有命中从delegate获取一个数据来源从.onCacheMissResume(delegate.get().take(1))//写入缓存.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> {Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);if (cache == null) {if (log.isErrorEnabled()) {log.error("Unable to find cache for writing: " + SERVICE_INSTANCE_CACHE_NAME);}}else {cache.put(key, instances);}}).then());}@Overridepublic Flux<List<ServiceInstance>> get() {return serviceInstances;}}

进一步往下看【这块儿算是只是拓展了,问题其实处在上面的代码】

ServiceInstanceListSupplier:在SCG的实现是 ==>DiscoveryClientServiceInstanceListSupplier:
如果需要看懂,为了更好了解 需要先了解一下WebFlux和链式调用你才能看懂下面的代码,或者你只看 delegate.getInstances()这个方法也可以
源码解释

/** Copyright 2012-2020 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**      https://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package org.springframework.cloud.loadbalancer.core;import java.time.Duration;
import java.util.ArrayList;
import java.util.List;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;import org.springframework.boot.convert.DurationStyle;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.core.env.Environment;import static org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory.PROPERTY_NAME;/*** A discovery-client-based {@link ServiceInstanceListSupplier} implementation.** @author Spencer Gibb* @author Olga Maciaszek-Sharma* @author Tim Ysewyn* @since 2.2.0*/
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {/*** Property that establishes the timeout for calls to service discovery.*/public static final String SERVICE_DISCOVERY_TIMEOUT = "spring.cloud.loadbalancer.service-discovery.timeout";private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class);private Duration timeout = Duration.ofSeconds(30);private final String serviceId;private final Flux<List<ServiceInstance>> serviceInstances;/**** 重点需要看下这个类的构造方法的serviceInstances * 这是一个Flux的集合*/public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {this.serviceId = environment.getProperty(PROPERTY_NAME);resolveTimeout(environment);//delegate.getInstances 这个真正往集合中添加实例的方法,不同SpringCloud规范了这个接口,不同的组件实现它就好了,其他的WebFlux的一些方法可以忽略。//我们处于SpringCloudGateWay中,所以SpringCloudGateWay 按照这个方式this.serviceInstances = Flux.defer(() -> Flux.just(delegate.getInstances(serviceId))).subscribeOn(Schedulers.boundedElastic()).timeout(timeout, Flux.defer(() -> {logTimeout();return Flux.just(new ArrayList<>());})).onErrorResume(error -> {logException(error);return Flux.just(new ArrayList<>());});}public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) {this.serviceId = environment.getProperty(PROPERTY_NAME);resolveTimeout(environment);this.serviceInstances = Flux.defer(() -> delegate.getInstances(serviceId).collectList().flux().timeout(timeout, Flux.defer(() -> {logTimeout();return Flux.just(new ArrayList<>());})).onErrorResume(error -> {logException(error);return Flux.just(new ArrayList<>());}));}@Overridepublic String getServiceId() {return serviceId;}@Overridepublic Flux<List<ServiceInstance>> get() {return serviceInstances;}private void resolveTimeout(Environment environment) {String providedTimeout = environment.getProperty(SERVICE_DISCOVERY_TIMEOUT);if (providedTimeout != null) {timeout = DurationStyle.detectAndParse(providedTimeout);}}private void logTimeout() {if (LOG.isDebugEnabled()) {LOG.debug(String.format("Timeout occurred while retrieving instances for service %s."+ "The instances could not be retrieved during %s", serviceId, timeout));}}private void logException(Throwable error) {LOG.error(String.format("Exception occurred while retrieving instances for service %s", serviceId), error);}}
Nacos是如何实现的?

其中:实际走的是Nacos为其实现的获取Nacos实例的Reactive的实现
NacosReactiveDiscoveryClient: 核心方法 loadInstancesFromNacos()【哈哈哈 一看就是中国人写的 这方法名字】

/** Copyright 2013-2018 the original author or authors.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at**      https://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/package com.alibaba.cloud.nacos.discovery.reactive;import java.util.function.Function;import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery;
import com.alibaba.nacos.api.exception.NacosException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;/*** @author <a href="mailto:echooy.mxq@gmail.com">echooymxq</a>**/
public class NacosReactiveDiscoveryClient implements ReactiveDiscoveryClient {private static final Logger log = LoggerFactory.getLogger(NacosReactiveDiscoveryClient.class);private NacosServiceDiscovery serviceDiscovery;public NacosReactiveDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {this.serviceDiscovery = nacosServiceDiscovery;}@Overridepublic String description() {return "Spring Cloud Nacos Reactive Discovery Client";}@Overridepublic Flux<ServiceInstance> getInstances(String serviceId) {return Mono.justOrEmpty(serviceId).flatMapMany(loadInstancesFromNacos()).subscribeOn(Schedulers.boundedElastic());}private Function<String, Publisher<ServiceInstance>> loadInstancesFromNacos() {return serviceId -> {try {return Flux.fromIterable(serviceDiscovery.getInstances(serviceId));}catch (NacosException e) {log.error("get service instance[{}] from nacos error!", serviceId, e);return Flux.empty();}};}@Overridepublic Flux<String> getServices() {return Flux.defer(() -> {try {return Flux.fromIterable(serviceDiscovery.getServices());}catch (Exception e) {log.error("get services from nacos server fail,", e);return Flux.empty();}}).subscribeOn(Schedulers.boundedElastic());}}

到此SPG如何从Nacos中获取服务列表就梳理完毕,同样的,服务和服务之间内网之间的调用应该也是换汤不换药


如何解决

  • 增加配置,关闭缓存,增加SCG的bootstrap.yml配置,当使用这个配置以后,就不会走上述逻辑。
spring:cloud:loadbalancer:cache:enabled: false # 是否启用缓存
  • 带来的问题

    • 不使用缓存,似乎可以解决了上述的问题,但是没有缓存似乎会对Nacos带来一定的压力。问题为甚是35s ,35s不会频繁失效不会带来”内存风暴“吗?(这块需要了解)
    • 35秒的缓存设计通常与“Refresh-ahead”策略有关。这是一种缓存预热策略,用于在数据过期之前提前刷新缓存数据,适用于那些预计在不久的将来会被频繁请求的热数据。例如,如果缓存数据的过期时间设置为60秒,刷新提前系数设置为0.5,那么在数据实际过期前的30秒(即在第35秒时),缓存就会异步刷新数据。这样做的好处是在高流量系统中,可以在下一次可能的缓存访问之前更新缓存,避免缓存失效时的突然流量峰值,从而提高系统的性能和用户体验。在实际应用中,这种策略可以确保缓存中的数据始终保持最新状态,减少因缓存失效导致的数据库压力。例如,在一些高流量的Web应用中,通过提前刷新缓存,可以避免在缓存数据过期时大量用户同时请求数据库,从而减少数据库的负载并提高响应速度。此外,缓存设计还需要考虑其他因素,如缓存大小、缓存命中率、缓存未命中率等,以确保缓存系统的整体性能和效率。不同的应用场景可能需要不同的缓存策略组合,以适应特定的读/写访问模式。例如,写密集型应用可能需要结合使用Write-Through、Write-back或Write-around策略,而读密集型应用则可能更侧重于Read-through或Refresh-ahead策略。选择合适的缓存策略对于提高系统性能和用户体验至关重要。
  • 最终解决方案
    不建议关闭这缓存,会导致Nacos压力过大,所以这边解决方案是最后修改了滚动更新时候的旧Pod的存活时间

总结

与运维人员配合修改Pod 实例的”存活窗口时间“大于缓存的35秒即可

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com