欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 新闻 > 社会 > Ribbon客户端负载均衡策略测试及其改进

Ribbon客户端负载均衡策略测试及其改进

2025/1/3 8:04:05 来源:https://blog.csdn.net/qq_16127313/article/details/142904732  浏览:    关键词:Ribbon客户端负载均衡策略测试及其改进

文章目录

  • 一、目的概述
  • 二、验证步骤
    • 1、源码下载
    • 2、导入IDE
    • 3、运行前修改配置
    • 4、策略说明
    • 5、修改策略
  • 三、最终结论
  • 四、改进措施
    • 1. 思路分析
    • 2. 核心代码
    • 3. 测试页面

一、目的概述

为了验证Ribbon客户端负载均衡策略在负载节点失效的情况下,是否具有故障转移的功能,进行了以下代码验证!

二、验证步骤

1、源码下载

git clone https://gitee.com/00fly/microservice-all-in-one.git

https://gitee.com/00fly/microservice-all-in-one/tree/master/ribbon-demo-simple

2、导入IDE

在这里插入图片描述

3、运行前修改配置

根据调用关系,我们需要启动2个user服务,为了方便调试我们这边分别启动8081、8082端口的user服务,并在movie模块中,设置负载节点地址为:127.0.0.1:8081,127.0.0.1:8082

微服务movie
微服务user
微服务user

eclipse为例简要说明
查看环境配置
在这里插入图片描述
打开Dashboard,选择Duplicate config
在这里插入图片描述
选择open Config
在这里插入图片描述
选择Profile设置为dev

在这里插入图片描述
全部启动
在这里插入图片描述
docker部署相对简单,编排文件为
https://gitee.com/00fly/microservice-all-in-one/blob/master/ribbon-demo-simple/docker/docker-compose.yml

version: '3.8'
services:#负载均衡节点ribbon-user-simple-0:image: registry.cn-shanghai.aliyuncs.com/00fly/ribbon-user-simple:0.0.1container_name: ribbon-user-simple-0deploy:resources:limits:cpus: '1'memory: 200Mreservations:memory: 180Mrestart: on-failurelogging:driver: json-fileoptions:max-size: 5mmax-file: '1'#负载均衡节点ribbon-user-simple-1:image: registry.cn-shanghai.aliyuncs.com/00fly/ribbon-user-simple:0.0.1container_name: ribbon-user-simple-1deploy:resources:limits:cpus: '1'memory: 200Mreservations:memory: 180Mrestart: on-failurelogging:driver: json-fileoptions:max-size: 5mmax-file: '1'#调用方ribbon-movie-simple:image: registry.cn-shanghai.aliyuncs.com/00fly/ribbon-movie-simple:0.0.1container_name: ribbon-movie-simpledeploy:resources:limits:cpus: '1'memory: 200Mreservations:memory: 180Mports:- 8090:8082environment:USER_SERVERS: ribbon-user-simple-0:8081,ribbon-user-simple-1:8081restart: on-failurelogging:driver: json-fileoptions:max-size: 5mmax-file: '1'

4、策略说明

  • RandomRule 实现从服务实例清单中随机选择一个服务实例的功能。
  • RoundRobinRule 实现了按照线性轮询的方式依次选择每个服务实例的功能。
  • RetryRule 实现了一个具备重试机制的实例选择功能。
  • WeightedResponseTimeRule是对 RoundRobinRule 的拓展,增加了根据实例的运行情况来计算权重,并根据权重来挑选实例。
  • ClientConfigEnableRoundRobinRule 通过继承该策略,在子类中做一些高级策略时有可能会存在一些无法实施的情况,那么就可以用父类的实现作为备选(线性轮询机制)。
  • BestAvailableRule 通过遍历负载均衡器中维护的所有服务实例,会过滤掉故障的实例,并找出并发请求数最小的一个,所以该策略的特性是可选出最空闲的实例。
  • PredicateBasedRule 先通过子类实现中的 Predicate 逻辑来过滤一部分服务实例,然后再以线性轮询的方式从过滤后的实例清单中选出一个。
  • AvailabilityFilteringRule 通过线性抽样的方式直接尝试寻找可用且较空闲的实例来使用。
  • ZoneAvoidanceRule 根据负载情况选择可用区

5、修改策略

修改这边的负载均衡策略在这里插入图片描述
打开页面
在这里插入图片描述
停止8081或8082端口服务,重新调试,返回结果如下:
在这里插入图片描述

三、最终结论

RandomRule、RoundRobinRule 策略不具备故障转移能力
RetryRule、WeightedResponseTimeRule等虽然具有故障转移,但是故障转移的时间太长,并且故障恢复后,重新选中该恢复的节点所需时间也较长。

各种策略的表现。大家可以自行研究测试。

四、改进措施

1. 思路分析

  • 采用多线程,多个节点同时检测,返回最快响应的节点
  • 采用多线程,定义超时时间,返回超时时间之内有响应的节点, 后续根据规则选择1个节点

2. 核心代码

NodeController.java


import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.reactive.function.client.WebClient;import com.itmuch.cloud.study.user.entity.User;import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;@Slf4j
@Api(tags = "负载均衡节点")
@RestController
@RequestMapping("/node")
public class NodeController
{@Autowiredprivate WebClient webClient;@Value("${microservice-ribbon-user.ribbon.listOfServers}")private List<String> listOfServers;private ExecutorService executorService = Executors.newFixedThreadPool(10);@ApiOperation("查询用户")@GetMapping("/user/{id}")public List<User> findById(@PathVariable Long id)throws InterruptedException{// WebClient支持异步List<User> users = new CopyOnWriteArrayList<User>();listOfServers.stream().forEach(hostWithPort -> webClient.get().uri(String.format("http://%s/%s", hostWithPort, id))// URI.acceptCharset(StandardCharsets.UTF_8).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class).subscribe(resp -> users.add(resp)));int index = 0;while (users.isEmpty() && (index++) < 100){TimeUnit.MILLISECONDS.sleep(10);log.info("index:{}, waitting......", index);}if (users.isEmpty()){throw new RuntimeException("查询超时,无返回值");}return users;}@ApiOperation("查询用户 by execute")@GetMapping("/v0/user/{id}")public List<User> findByExecute(@PathVariable Long id)throws InterruptedException{// List<User> users = new ArrayList<User>();// TODO ArrayList users一定概率有null值// 原因:通过new ArrayList<>()初始化的大小是0,首次插入触发扩容,并发可能导致出现null值List<User> users = new CopyOnWriteArrayList<User>();listOfServers.stream().forEach(hostWithPort -> executorService.execute(() -> webClient.get().uri(String.format("http://%s/%s", hostWithPort, id))// URI.acceptCharset(StandardCharsets.UTF_8).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class).subscribe(resp -> users.add(resp))));int index = 0;while (users.isEmpty() && (index++) < 100){TimeUnit.MILLISECONDS.sleep(10);log.info("index:{}, waitting......", index);}if (users.isEmpty()){throw new RuntimeException("查询超时,无返回值");}return users;}@ApiOperation("查询用户 by submit")@GetMapping("/v1/user/{id}")public List<User> findBySubmit(@PathVariable Long id)throws InterruptedException{List<User> users = new CopyOnWriteArrayList<User>();listOfServers.stream().forEach(hostWithPort -> executorService.submit(() -> webClient.get().uri(String.format("http://%s/%s", hostWithPort, id))// URI.acceptCharset(StandardCharsets.UTF_8).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class).subscribe(resp -> users.add(resp)), users));int index = 0;while (users.isEmpty() && (index++) < 100){TimeUnit.MILLISECONDS.sleep(10);log.info("index:{}, waitting......", index);}if (users.isEmpty()){throw new RuntimeException("查询超时,无返回值");}return users;}@ApiOperation("查询用户 by invokeAny")@GetMapping("/v2/user/{id}")public User findByInvokeAny(@PathVariable Long id)throws InterruptedException, ExecutionException, TimeoutException{return executorService.invokeAny(listOfServers.stream().map(hostWithPort -> new Callable<User>(){@Overridepublic User call(){Mono<User> mono = webClient.get().uri(String.format("http://%s/%s", hostWithPort, id))// URI.acceptCharset(StandardCharsets.UTF_8).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class);return mono.block();}}).collect(Collectors.toList()), 1000, TimeUnit.MILLISECONDS);}@ApiOperation("查询用户 by invokeAll")@GetMapping("/v3/user/{id}")public List<User> findByInvokeAll(@PathVariable Long id)throws InterruptedException{List<Future<User>> futures = executorService.invokeAll(listOfServers.stream().map(hostWithPort -> new Callable<User>(){@Overridepublic User call(){Mono<User> mono = webClient.get().uri(String.format("http://%s/%s", hostWithPort, id))// URI.acceptCharset(StandardCharsets.UTF_8).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class);return mono.block();}}).collect(Collectors.toList()), 1000, TimeUnit.MILLISECONDS);List<User> users = new ArrayList<User>();for (Future<User> future : futures){try{users.add(future.get());}catch (Exception e){log.error(e.getMessage(), e);}}return users;}
}

3. 测试页面

在这里插入图片描述


有任何问题和建议,都可以向我提问讨论,大家一起进步,谢谢!

-over-

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com