欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > 基于Spring的消息推送实战(Websocket和前端轮询实现)

基于Spring的消息推送实战(Websocket和前端轮询实现)

2024/12/1 0:37:51 来源:https://blog.csdn.net/xingyuemengjin/article/details/141868036  浏览:    关键词:基于Spring的消息推送实战(Websocket和前端轮询实现)

基于Spring的消息推送实战(Websocket和前端轮询实现)

        本文介绍了基于Spring的消息推送实现方法,主要介绍了websocket实时消息推送方法(ServerEndpoint方式实现),以及前端客户端轮询方式的消息推送。

一、消息推送

       常见的消息推送方式有轮询、websocket、jpush等。

       传统http协议需要客户端发起请求,不能服务端进行推送,且建立tcp连接需要多次握手(tcp三次握手建立连接,四次握手关闭连接),是单工通道,请求头也需要大量信息,造成资源浪费。

        Websocket仅需一次握手就可以在客户端和服务器之间建立长时间、全双工、双向交互的连接通道,且请求头报文可以压缩传输,减轻资源浪费,支持服务器主动向客户端发送消息。

二、应用场景

通常用在实时同步性要求高的场景,如:

1、实时数据监控和推送:WebSocket可以用于实时监控和推送数据,例如股票行情,实时交通信息、天气、预警等。服务器可以将最新的数据实时推送给客户端,使用户能够实时获取并展示最新的数据。

2、即时聊天:WebSocket可以实现实时的消息传递,使得即时聊天应用能够实时地将消息推送给在线用户,实现实时的聊天体验。

3、多人协作应用:WebSocket可以用于多人协作应用程序,例如实时协作编辑器、白板应用等。多个用户可以在同一文档或画布上实时协作、彼此之间的更改和操作可以实时同步。

三、WebSocket实现

1、两种实现方法

Websocket可以通过ServerEndpointExporter和registerWebSocketHandlers两种不同的方式实现。

使用@ServerEndpoint注解来定义WebSocket端点;

registerWebSocketHandlers可以指定一个WebSocketHandler实例添加自定义的拦截器或使用Spring的注解驱动(如@MessageMapping)

2、依赖引入

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

3、WebSocket处理器方法及端点定义

@ServerEndpoint指定端点地址;

onOpen、onMssage、onError、onClose为接受信息不同状态的处理方法,可向前端客户端发送消息
 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.Semaphore;/*** websocket 消息处理**/
@Component
@ServerEndpoint("/websocket/message")
public class WebSocketServer
{/*** WebSocketServer 日志控制器*/private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketServer.class);/*** 默认最多允许同时在线人数100*/public static int socketMaxOnlineCount = 100;private static Semaphore socketSemaphore = new Semaphore(socketMaxOnlineCount);/*** 连接建立成功调用的方法*/@OnOpenpublic void onOpen(Session session) throws Exception{boolean semaphoreFlag = false;// 尝试获取信号量semaphoreFlag = SemaphoreUtils.tryAcquire(socketSemaphore);if (!semaphoreFlag){// 未获取到信号量LOGGER.error("\n 当前在线人数超过限制数- {}", socketMaxOnlineCount);WebSocketUsers.sendMessageToUserByText(session, "当前在线人数超过限制数:" + socketMaxOnlineCount);session.close();}else{// 添加用户WebSocketUsers.put(session.getId(), session);LOGGER.info("\n 建立连接 - {}", session.getId());LOGGER.info("\n 当前人数 - {}", WebSocketUsers.getUsers().size());WebSocketUsers.sendMessageToUserByText(session, "连接成功");}}/*** 连接关闭时处理*/@OnClosepublic void onClose(Session session){LOGGER.info("\n 关闭连接 - {}", session);// 移除用户WebSocketUsers.remove(session.getId());// 获取到信号量则需释放SemaphoreUtils.release(socketSemaphore);}/*** 抛出异常时处理*/@OnErrorpublic void onError(Session session, Throwable exception) throws Exception{if (session.isOpen()){// 关闭连接session.close();}String sessionId = session.getId();LOGGER.info("\n 连接异常 - {}", sessionId);LOGGER.info("\n 异常信息 - {}", exception);// 移出用户WebSocketUsers.remove(sessionId);// 获取到信号量则需释放SemaphoreUtils.release(socketSemaphore);}/*** 服务器接收到客户端消息时调用的方法*/@OnMessagepublic void onMessage(String message, Session session){String msg = message.replace("你", "我").replace("吗", "");WebSocketUsers.sendMessageToUserByText(session, msg);}
}

4、处理子方法


 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.websocket.Session;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** websocket 客户端用户集** @author Inspur*/
public class WebSocketUsers
{/*** WebSocketUsers 日志控制器*/private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketUsers.class);/*** 用户集*/private static Map<String, Session> USERS = new ConcurrentHashMap<String, Session>();/*** 存储用户** @param key 唯一键* @param session 用户信息*/public static void put(String key, Session session){USERS.put(key, session);}/*** 移除用户** @param session 用户信息** @return 移除结果*/public static boolean remove(Session session){String key = null;boolean flag = USERS.containsValue(session);if (flag){Set<Map.Entry<String, Session>> entries = USERS.entrySet();for (Map.Entry<String, Session> entry : entries){Session value = entry.getValue();if (value.equals(session)){key = entry.getKey();break;}}}else{return true;}return remove(key);}/*** 移出用户** @param key 键*/public static boolean remove(String key){LOGGER.info("\n 正在移出用户 - {}", key);Session remove = USERS.remove(key);if (remove != null){boolean containsValue = USERS.containsValue(remove);LOGGER.info("\n 移出结果 - {}", containsValue ? "失败" : "成功");return containsValue;}else{return true;}}/*** 获取在线用户列表** @return 返回用户集合*/public static Map<String, Session> getUsers(){return USERS;}/*** 群发消息文本消息** @param message 消息内容*/public static void sendMessageToUsersByText(String message){Collection<Session> values = USERS.values();for (Session value : values){sendMessageToUserByText(value, message);}}/*** 发送文本消息** @param userName 自己的用户名* @param message 消息内容*/public static void sendMessageToUserByText(Session session, String message){if (session != null){try{session.getBasicRemote().sendText(message);}catch (IOException e){LOGGER.error("\n[发送消息异常]", e);}}else{LOGGER.info("\n[你已离线]");}}
}

5、信号量限流工具类

Semaphore 就是一个信号量,它的作用是限制某段代码块的并发数。Semaphore有一个构造函数,可以传入一个 int 型整数 n,表示某段代码最多只有 n 个线程可以访问,如果超出了 n,那么请等待,等到某个线程执行完毕这段代码块,下一个线程再进入。由此可以看出如果 Semaphore 构造函数中


 

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.Semaphore;/*** 信号量相关处理** @author Inspur*/
public class SemaphoreUtils
{/*** SemaphoreUtils 日志控制器*/private static final Logger LOGGER = LoggerFactory.getLogger(SemaphoreUtils.class);/*** 获取信号量** @param semaphore* @return*/public static boolean tryAcquire(Semaphore semaphore){boolean flag = false;try{flag = semaphore.tryAcquire();}catch (Exception e){LOGGER.error("获取信号量异常", e);}return flag;}/*** 释放信号量** @param semaphore*/public static void release(Semaphore semaphore){try{semaphore.release();}catch (Exception e){LOGGER.error("释放信号量异常", e);}}
}

6、开启websocket(WebSocket.java)

package com.inspur.framework.websocket;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;/*** websocket 配置** @author Inspur*/
@Configuration
public class WebSocketConfig
{@Beanpublic ServerEndpointExporter serverEndpointExporter(){return new ServerEndpointExporter();}
}

7、SpringSecurity开启白名单

public class SecurityConfig extends WebSecurityConfigurerAdapter {protected void configure(HttpSecurity httpSecurity) throws Exception {.antMatchers("/websocket/**").permitAll()}}

8、后台推送消息

后台业务中,服务器向前端推送消息

WebSocketUsers.sendMessageToUsersByText("message消息示例!");

9、前台接收消息

(1)websocket地址配置

和@ServerEndpoint配置的地址一致

文件.env.development

# websocket 后台地址
VUE_APP_WEBSOCKET_URL = 'ws://127.0.0.1:8290/pro-service/websocket/message'

(2)前台依赖

文件package.json

"reconnecting-websocket": "^4.4.0",

(3)全局地址

Utils/webSocket.js’

export default {connectUrl: process.env.NODE_ENV === 'development' ? process.env.VUE_APP_WEBSOCKET_URL:`${location.protocol === 'https' ? 'wss' : 'ws'}://${location.host}`+process.env.VUE_APP_WEBSOCKET_URL
}

(4)接受消息

在Navbar.vue里,该组件在任何页面都存在

import {Notification} from "element-ui";
import moment from "moment";
import Vue from "vue";
import Router from "vue-router";import WebSocket from "@/utils/webSocket";mounted() {this.ws = new ReconnectingWebSocket(WebSocket.connectUrl);
console.log(WebSocket.connectUrl)
const self = this;
this.ws.onopen = function (event) {
};
this.ws.onmessage = function (event) {if (event.data != null && event.data != "连接成功") {Notification.success(event.data)}
};
this.ws.onclose = function (event) {// Notification.info("已经关闭连接")
};}

四、轮询方案

前端采用定时器+navbar推送展示消息

1、Interval定时器轮询

navbar的mounted方法,添加定时器

import {Notification} from "element-ui";mounted() {// 每隔1小时执行一次this.timer = setInterval(()=>{// 向后台服务器请求方法//eg:this.$store.dispatch('Method1')},1000 * 60 * 60)
},
beforeDestroy() {//清除定时器clearInterval(this.timer);
},

2、请求方法设置

可以直接在定时器里请求,也可异步请求

import { Notification } from 'element-ui'State:{maintainOrderToApproveCount: 0,
maintainOrderToApproveList: [],}mutations: {SET_MAINTAIN_ORDER_TO_APPROVE_LIST: (state, maintainOrderToApproveList) => {state.maintainOrderToApproveList = maintainOrderToApproveList
},SET_MAINTAIN_ORDER_TO_APPROVE_COUNT: (state, maintainOrderToApproveCount) => {state.maintainOrderToApproveCount = maintainOrderToApproveCount
},}actions: {// 请求方法
Method1({commit}) {return new Promise((resolve, reject) => {getMyApprovalOrderList().then(res => {if (res.code === 200) {commit('SET_MAINTAIN_ORDER_TO_APPROVE_COUNT', res.data.length)commit('SET_MAINTAIN_ORDER_TO_APPROVE_LIST', res.data.length)Notification.warning({title: '您有待审批的工单未完成',message: '尊敬的用户您好,您有'+res.data.length+'条工单待审批处理,请及时审批',duration: 5000})}resolve()}).catch(error => {reject(error)})})
},}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com