欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 科技 > 名人名企 > springboot、springcloudnacos、netty-socketio实现im集群弹性伸缩和节点上下线监听

springboot、springcloudnacos、netty-socketio实现im集群弹性伸缩和节点上下线监听

2025/4/24 5:47:06 来源:https://blog.csdn.net/zsj777/article/details/144664471  浏览:    关键词:springboot、springcloudnacos、netty-socketio实现im集群弹性伸缩和节点上下线监听

1、im-server 所有节点都注册到nacos服务中,使用nacos服务端2.4.3,客户端1.4.6,

spring-cloud-starter-alibaba-nacos-discovery版本  2021.1

2、im-listener 监听 im-server的上线和下线事件

3、springcloudalibaba  nacos监听服务上线和下线

配置文件

spring:redis:redisson:file:classpath:redisson.yamlapplication:# 应用名称name: im-listenercloud:nacos:discovery:# 服务注册地址server-addr: xxx.xxx.xxx.xx:xxnamespace: cb329a1e-c20b-495a-885f-72076fc90d5f#心跳间隔。时间单位:毫秒。heart-beat-interval: 1000#心跳暂停。时间单位:毫秒。 即nacos服务端40秒收不到微服务客户端心跳,会将该微服务客户端注册的实例设为不健康heart-beat-timeout: 4000#Ip删除超时。时间单位:秒。即服务端90秒收不到客户端心跳,会将该微服务客户端注册的实例删除ip-delete-timeout: 9000#nacos 账号username: XXXXXXXXX# nacos 密码password: YYYYYYYYYYregister-enabled: false # 注意:该服务无需注册到注册中心上,只用于获取注册中心上的服务信息就行了config:# 相同配置,本地优先override-none: trueserver:port: 8080im:port: 9092

代码

package com.yh.im.config;import com.alibaba.cloud.nacos.NacosServiceManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;/*** nacos 客户端 服务监听变化。当服务下线和上线的时候能够收到通知* @Date 2023/11/27 12:00*/
@Component
@Slf4j
public class NacosDiscoveryListener {private final Set<String> subscribedServices = ConcurrentHashMap.newKeySet();@Resourceprivate NacosServiceManager nacosServiceManager;private static Map<String, Map<String, Boolean>> instanceHealthStatus = new ConcurrentHashMap<>();/*** 构造一个事件监听器,主要作用是监听服务实例变化** @return EventListener*/private EventListener buildEventListener() {return event -> {if (event instanceof NamingEvent) {NamingEvent namingEvent = (NamingEvent) event;log.error("服务实例变化:{}", JSON.toJSONString(namingEvent));String serviceName = namingEvent.getServiceName();if (!instanceHealthStatus.containsKey(serviceName)) {ConcurrentHashMap<String, Boolean> instanceMap = new ConcurrentHashMap<>();instanceHealthStatus.put(serviceName, instanceMap);List<Instance> newInstance = namingEvent.getInstances();newInstance.forEach(instance -> {String instanceKey = instance.getIp() + ":" + instance.getPort();instanceMap.put(instanceKey, instance.isHealthy());log.error("服务首次上线: {} -> {}", serviceName, instanceKey);});return;}List<ServiceInfo> allServiceInstances = getAllServiceInstances();int instanceTotal = allServiceInstances.stream().mapToInt(serviceInfo -> Integer.parseInt(serviceInfo.getClusters())).sum();Map<String, Boolean> serviceMap = instanceHealthStatus.computeIfAbsent(serviceName, k -> new ConcurrentHashMap<>());Set<String> oldInstanceKeys = new HashSet<>(serviceMap.keySet());List<Instance> newInstance = namingEvent.getInstances();Set<String> newInstanceKeys = newInstance.stream().map(instance -> instance.getIp() + ":" + instance.getPort()).collect(Collectors.toSet());int oldSize = serviceMap.size();int newSize = namingEvent.getInstances().size();// 服务实例没有增减,只是状态变化if (oldSize == newSize) {newInstance.forEach((instance) -> {String instanceKey = instance.getIp() + ":" + instance.getPort();if (instance.isHealthy() != serviceMap.get(instanceKey)) {if (instance.isHealthy()) {log.error("服务上线: {} -> {}", serviceName, instanceKey);} else {log.error("服务下线: {} -> {}", serviceName, instanceKey);}serviceMap.put(instanceKey, instance.isHealthy());}});}// 下线实例if (oldSize > newSize) {newInstanceKeys.forEach(oldInstanceKeys::remove);oldInstanceKeys.forEach(instanceKey -> log.error("服务下线: {} -> {}", serviceName, instanceKey));} else {// 上线实例newInstanceKeys.removeAll(oldInstanceKeys);StringBuffer noticeTitle = new StringBuffer("服务上线通知");newInstanceKeys.forEach(instanceKey -> {String message = String.format("[%s-%s]", serviceName, instanceKey);log.info(message);noticeTitle.append(message).append(",");});}}};}/*** 定时获取服务列表,然后根据获取到的服务名,进行订阅,* nacos客户端目前不能订阅所有服务,只能手动的订阅* 也可以不用定时需要的时候通过getAllServiceInstances获取*/@Scheduled(fixedDelay = 5000)public void reportServices() {List<String> services = null;try {Properties properties = new Properties();NamingService namingService = nacosServiceManager.getNamingService(properties);services = namingService.getServicesOfServer(1, Integer.MAX_VALUE).getData();services.forEach(serviceName -> {if (!subscribedServices.contains(serviceName)) {try {namingService.subscribe(serviceName, buildEventListener());subscribedServices.add(serviceName);} catch (NacosException e) {log.error("订阅服务失败", e);}}});} catch (NacosException e) {log.error("获取服务列表失败", e);}}/*** 获取所有服务实例* @return  服务实例列表*/public List<ServiceInfo> getAllServiceInstances() {List<ServiceInfo> serviceInfos = new ArrayList<>();try {Properties properties = new Properties();NamingService namingService = nacosServiceManager.getNamingService(properties);List<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE).getData();for (String serviceName : services) {List<Instance> onlineInstances = namingService.selectInstances(serviceName, true);// 下线服务暂时不用关注List<Instance> offlineInstances = namingService.selectInstances(serviceName, false);serviceInfos.add(new ServiceInfo(serviceName, String.valueOf(onlineInstances.size())));}} catch (NacosException e) {e.printStackTrace();}return serviceInfos;}}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词