欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 建筑 > 后端消息推送方案方案(轮询,长轮询,websocket,SSE)

后端消息推送方案方案(轮询,长轮询,websocket,SSE)

2024/10/23 23:30:08 来源:https://blog.csdn.net/2301_79969279/article/details/143165519  浏览:    关键词:后端消息推送方案方案(轮询,长轮询,websocket,SSE)

1.轮询:

轮询是一种客户端定期向服务器发送HTTP请求,服务器实时返回数据给浏览器,用以检查是否有新的数据或更新的方式。客户端会设置一个固定的时间间隔,不停地向服务器发起HTTP请求,无论是否有新数据返回,都会获取响应。

适用场景

轮询适用于以下几种场景:

  1. 无需实时更新:系统不需要实时获取数据,只需要间歇性地同步,例如股票价格更新或新闻客户端的刷新。
  2. 轻量级更新:服务器资源有限,无法承受高并发的长连接。

客户端代码:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>Polling Example</title><script>function fetchData() {fetch('http://localhost:8080/polling').then(response => response.json()).then(data => {document.getElementById("data").innerText = data.message;}).catch(error => console.error('Error:', error));}// 每5秒发起一次轮询请求setInterval(fetchData, 5000);</script>
</head>
<body><h1>Polling Example</h1><div id="data">Waiting for data...</div>
</body>
</html>

服务端:

1.添加依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>

2.后端代码实现:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;@RestController
public class PollingController {private String data = "Initial data";// 模拟每10秒钟更新一次数据public PollingController() {new Thread(() -> {while (true) {try {Thread.sleep(10000); // 每10秒更新数据data = "Updated data at " + LocalTime.now();} catch (InterruptedException e) {e.printStackTrace();}}}).start();}@GetMapping("/polling")public Map<String, String> getData() {Map<String, String> response = new HashMap<>();response.put("message", data);return response;}
}
缺点
  • 高资源消耗:如果轮询间隔太短,服务器可能会承受大量无效请求。
  • 时效性差:数据的更新不是实时的,而是基于设定的轮询间隔。

2.长轮询:

定义

长轮询是一种改进的轮询方式。客户端发出请求后,服务器保持连接(会阻塞请求)直到有新的数据产生才返回响应。一旦有新数据,服务器响应客户端,并在客户端处理完数据后,客户端立即再次发起请求,维持类似“推”的效果。

适用场景
  1. 需要近实时数据:应用场景需要快速更新数据,例如聊天系统、通知提醒等。
  2. 减少不必要的请求:相比于传统轮询,长轮询能够减少无效请求。

服务端:

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;@RestController
public class LongPollingController {private String data = "Initial data";private final ConcurrentLinkedQueue<CompletableFuture<Map<String, String>>> waitingClients = new ConcurrentLinkedQueue<>();// 模拟每15秒钟更新一次数据public LongPollingController() {new Thread(() -> {while (true) {try {Thread.sleep(15000); // 每15秒更新数据data = "Updated data at " + LocalTime.now();notifyClients();} catch (InterruptedException e) {e.printStackTrace();}}}).start();}@GetMapping("/long-polling")public CompletableFuture<Map<String, String>> getData() {CompletableFuture<Map<String, String>> future = new CompletableFuture<>();waitingClients.add(future);return future;}// 通知等待的客户端private void notifyClients() {List<CompletableFuture<Map<String, String>>> clientsToNotify = new ArrayList<>(waitingClients);waitingClients.clear();for (CompletableFuture<Map<String, String>> future : clientsToNotify) {future.complete(Map.of("message", data));}}
}

客户端:

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>Long Polling Example</title><script>function fetchData() {fetch('http://localhost:8080/long-polling').then(response => response.json()).then(data => {document.getElementById("data").innerText = data.message;// 收到数据后立即再次发起请求fetchData();}).catch(error => console.error('Error:', error));}// 初次发起请求fetchData();</script>
</head>
<body><h1>Long Polling Example</h1><div id="data">Waiting for data...</div>
</body>
</html>
优点
  • 减少无效请求:只有在有新数据时才会返回响应,避免了传统轮询中的频繁无效请求。
  • 近实时更新:由于服务器只在有新数据时才返回,可以实现准实时的数据同步。
缺点
  • 长时间保持连接:虽然不如WebSocket那样一直保持连接,但在某些场景下可能会导致服务器的连接资源被大量占用。
  • 不适合高频更新的场景:如果数据更新频繁,长轮询的频繁重新连接可能反而成为负担。

使用场景对比

  1. 普通轮询:
    • 场景:例如一个新闻站点,每隔10分钟获取一次最新的文章。
    • 优点:实现简单,服务器压力相对小。
    • 缺点:即使没有新数据也会发起请求,浪费资源。
  1. 长轮询:
    • 场景:例如聊天应用,需要近实时更新消息。
    • 优点:数据更新近实时,无需过多无效请求。
    • 缺点:可能会占用较多服务器资源。

注意事项

  1. 网络超时:在长轮询中,客户端的请求会保持较长时间,因此需要确保客户端与服务器的超时设置合适。
  2. 负载问题:长轮询虽然比普通轮询更节省资源,但对于高并发场景下,服务器的连接数可能很快耗尽,需要合理设计资源管理机制,或者考虑使用WebSocket等更合适的技术。
  3. 重连机制:无论是普通轮询还是长轮询,都需要设计合理的重连机制,确保客户端在请求失败后能继续请求而不会中断。

轮询 (Polling)

  • 请求频率要适当:避免频繁轮询导致服务器压力过大。可以根据业务需要调整轮询间隔时间。
  • 数据无效请求:普通轮询容易出现大量无效请求,尤其在数据更新不频繁的情况下。

长轮询 (Long Polling)

  • 服务器端超时设置:为了防止资源长时间占用,设置合理的超时时间是关键。如果服务器长时间不响应,需要确保客户端有良好的错误处理和重连机制。
  • 并发处理:如果多个客户端同时发起长轮询请求,后端需要合理管理这些连接,避免资源耗尽。

SSE(Server-Sent Events)服务器发送事件

SSE在服务器和浏览器之间打开了一个单向通道

服务器响应的不再是一次性的数据包,而是text/event-stream类型的数据流信息

服务器在数据变更将数据流式传输到客户端

SSE 原理

  • SSE允许服务器通过一个持久的HTTP连接,不断向客户端推送更新。客户端只需要建立一次连接,服务器就可以不断推送数据,而客户端会持续接收数据。
  • 场景:适用于实时通知、股票价格、社交媒体推送、消息系统等需要频繁数据更新的场景。

Java 后端实现(Spring Boot)

依赖配置(pom.xml)

确保在pom.xml中配置Spring Web依赖:


<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>
</dependencies>
后端代码实现
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;import java.time.Duration;
import java.time.LocalTime;
import java.util.Map;
import java.util.stream.Stream;@RestController
public class SSEController {// 每隔5秒推送一次数据@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<Map<String, String>> streamData() {return Flux.fromStream(Stream.generate(() -> {String data = "SSE data at " + LocalTime.now();return Map.of("message", data);})).delayElements(Duration.ofSeconds(5)); // 每5秒推送一次}
}
  • 说明
    • Flux 是 Spring WebFlux 提供的响应式流类型,能够持续推送数据。
    • MediaType.TEXT_EVENT_STREAM_VALUE 表示返回的内容是SSE的流数据格式。

前端实现(HTML + JavaScript)

html复制代码
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>SSE Example</title><script>document.addEventListener('DOMContentLoaded', function () {const eventSource = new EventSource('http://localhost:8080/sse');eventSource.onmessage = function(event) {const data = JSON.parse(event.data);document.getElementById("data").innerText = data.message;};eventSource.onerror = function(error) {console.error('Error:', error);};});</script>
</head>
<body><h1>SSE Example</h1><div id="data">Waiting for server updates...</div>
</body>
</html>
  • 说明
    • EventSource 对象用于接收来自服务器的事件流。它自动保持与服务器的连接,断开时会自动重新连接。
    • onmessage 处理服务器发来的消息,并更新页面数据。

SSE 与轮询/长轮询的区别

  • 单向通信:SSE是单向通信,服务器主动推送数据给客户端,而客户端不需要频繁请求服务器。相比轮询更高效。
  • 自动重连:SSE有内置的自动重连机制,客户端与服务器的连接断开时,会自动重新建立连接。
  • 持久连接:客户端和服务器保持持久的HTTP连接,服务器可以在任意时间推送数据,而不需要等待客户端请求。

SSE 运行方式

  1. 启动后端:运行Spring Boot项目,服务监听8080端口。
  2. 打开前端页面:加载HTML文件,等待服务器推送数据,每隔5秒页面将自动更新。

注意事项

  1. 浏览器兼容性:SSE广泛支持,但不适用于IE。现代浏览器(如Chrome、Firefox、Safari)支持良好。
  2. 连接限制:默认情况下,浏览器会限制与同一个服务器的最大连接数,确保SSE不会超过该限制。
  3. 网络超时:如果网络连接不稳定,SSE可能会触发自动重连,注意客户端的错误处理。
  4. 数据流限制:SSE适用于轻量级、频率不太高的推送。如果需要高频、双向通信,可以考虑使用WebSocket。

总结:

  • 轮询:适用于简单、不频繁更新的数据获取。
  • 长轮询:适用于需要实时获取数据,但服务器更新频率相对较高的场景。
  • SSE:适用于单向、频繁的实时数据更新,比如实时通知、消息推送等。

选择技术时,需要根据具体业务需求、数据更新频率和系统负载来选择合适的解决方案。

以及websocket:

WebSocket是一种基于TCP连接上进行全双工通信的协议:

全双工:允许数据在两个方向上同时传输

半双工:允许数据在两个方向上传输,但是同一个时间段内只允许一个方向上传输

websocket协议建立在tcp协议的基础上,所以服务器端也容易实现,不同的语言都有支持

tcp协议室全双工协议,http协议基于它,但是设计成了单向的

websocket没有同源限制

1.引入坐标

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>

2.编写配置类,扫描有@ServerEndpoint注解的Bean

package com.cetide.chat.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {/*** 开启WebSocket支持, 自动注册所有使用@ServerEndpoint注解声明的Websocket endpoint* @return*/@Beanpublic ServerEndpointExporter webSocketServer() {return new ServerEndpointExporter();}
}

3.创建消息接收类

package com.cetide.chat.model;public class Message {/*** 消息内容*/private String message;/*** 消息类型*/public boolean flag;/*** 接收者*/private String toName;public String getMessage() {return message;}public void setMessage(String message) {this.message = message;}public boolean isFlag() {return flag;}public void setFlag(boolean flag) {this.flag = flag;}public String getToName() {return toName;}public void setToName(String toName) {this.toName = toName;}
}

4.消息处理类

package com.cetide.chat.model;public class ResultMessage {private boolean isSystem;private Object message;private String fromName;public boolean isSystem() {return isSystem;}public void setSystem(boolean system) {isSystem = system;}public Object getMessage() {return message;}public void setMessage(Object message) {this.message = message;}public String getFromName() {return fromName;}public void setFromName(String fromName) {this.fromName = fromName;}
}

5.消息处理工具

package com.cetide.chat.util;import com.alibaba.fastjson.JSON;
import com.cetide.chat.model.ResultMessage;public class MessageUtils {public static String getMessage(boolean isSystemMessage, String fromName, Object message){ResultMessage resultMessage = new ResultMessage();resultMessage.setSystem(isSystemMessage);resultMessage.setMessage(message);if (fromName != null){resultMessage.setFromName(fromName);}return JSON.toJSONString(resultMessage);}
}

6.创建GetHttpSessionConfig进行保存各个用户信息

package com.cetide.chat.config;import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;public class GetHttpSessionConfig extends ServerEndpointConfig.Configurator {@Overridepublic void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {//获取HttpSession对象HttpSession httpSession = (HttpSession)request.getHttpSession();// 将HttpSession对象放入到ServerEndpointConfig的userProperties中if (httpSession == null){throw new RuntimeException("HttpSession is null");}sec.getUserProperties().put(HttpSession.class.getName(),httpSession);}
}

7.创建消息处理服务

package com.cetide.chat.ws;import com.alibaba.fastjson.JSON;
import com.cetide.chat.config.GetHttpSessionConfig;
import com.cetide.chat.model.Message;
import com.cetide.chat.util.MessageUtils;
import org.springframework.stereotype.Component;import javax.servlet.http.HttpSession;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;@ServerEndpoint(value = "/chat", configurator = GetHttpSessionConfig.class)
@Component
public class ChatEndpoint {private static final String SYSTEM_NAME = "系统";private static final String SYSTEM_MESSAGE = "欢迎来到聊天室";private static final String SYSTEM_MESSAGE_LEAVE = "用户离开";private static final String SYSTEM_MESSAGE_ENTER = "用户进入";private static final String SYSTEM_MESSAGE_ERROR = "系统错误";private static final String SYSTEM_MESSAGE_RECEIVE = "收到消息";private static final String SYSTEM_MESSAGE_SEND = "发送消息";private static final String SYSTEM_MESSAGE_SEND_ERROR = "发送消息失败";private static final String SYSTEM_MESSAGE_SEND_SUCCESS = "发送消息成功";private static final String SYSTEM_MESSAGE_SEND_TO_USER_ERROR = "发送消息给用户失败";private static final String SYSTEM_MESSAGE_SEND_TO_USER_SUCCESS = "发送消息给用户成功";private static final String SYSTEM_MESSAGE_SEND_TO_ALL_ERROR = "发送消息给所有人失败";private static final String SYSTEM_MESSAGE_SEND_TO_ALL_SUCCESS = "发送消息给所有人成功";private static final String SYSTEM_MESSAGE_SEND_TO_ = "发送消息给";private static final String SYSTEM_MESSAGE_SEND_TO_ALL = "发送消息给所有人";private static final String SYSTEM_MESSAGE_SEND_TO_USER = "发送消息给用户";private HttpSession httpSession;/*** ConcurrentHashMap线程安全的hashmap用来存储用户信息*/private static final Map<String, Session> onlineUsers = new ConcurrentHashMap<>();/*** 连接建立成功调用的方法** @param session*/@OnOpenpublic void onOpen(Session session, EndpointConfig endpointConfig) {//1. 将session进行保存this.httpSession = (HttpSession) endpointConfig.getUserProperties().get(HttpSession.class.getName());onlineUsers.put((String) this.httpSession.getAttribute("userName"), session);//2. 广播消息,需要将登录的所有用户推送给用户String message = MessageUtils.getMessage(true, null, getFriends());broadcastAllUsers(message);}private Set getFriends() {return onlineUsers.keySet();}//session.setAttribute("userName",user.getUserName())private void broadcastAllUsers(String message) {onlineUsers.forEach((userName, session) -> {session.getAsyncRemote().sendText(message);});try {Set<Map.Entry<String, Session>> entries = onlineUsers.entrySet();for (Map.Entry<String, Session> entry : entries) {//获取到所有用户对应的session兑现Session session = entry.getValue();//发送消息session.getBasicRemote().sendText(message);}} catch (Exception e) {System.out.println(SYSTEM_MESSAGE_SEND_TO_ALL_ERROR);}}/*** 收到客户端消息后调用的方法** @param message*/@OnMessagepublic void onMessage(String message) {try {//1. 将消息推送给指定的用户Message parse = JSON.parseObject(message, Message.class);String toName = parse.getToName();String msg = parse.getMessage();//2. 获取消息接收方的用户名Session objSession = onlineUsers.get(toName);String toMsg = MessageUtils.getMessage(false, (String) this.httpSession.getAttribute("userName"), msg);objSession.getBasicRemote().sendText(toMsg);} catch (Exception e) {System.out.println(SYSTEM_MESSAGE_SEND_TO_USER_ERROR);}}/*** 连接关闭调用的方法** @param session*/@OnClosepublic void onClose(Session session) {//1. 移除用户onlineUsers.remove(this.httpSession.getAttribute("userName"));//2. 广播消息String message = MessageUtils.getMessage(true, null, getFriends());broadcastAllUsers(message);}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com