1、sse(Server-Sent Events)
SSE
是一种允许服务器向浏览器推送实时更新的技术。它基于HTTP协议,是一种单向的通信方式
- 单向通信
- 基于HTTP
- 自动重连(内置了自动重连机制,当连接断开时,浏览器会自动尝试重新连接)
1.1 简易版:vue+python
vue:
<template><div class="container"><div id="app"><h1>SSE Stream Output</h1><div v-for="message in messages" :key="message">{{ message }}</div></div><el-button @click="sendMsg">send message</el-button></div>
</template><script setup lang="ts">import { ref } from 'vue';const messages = ref<any>([]);const sendMsg = () => {const eventSource = new EventSource('http://localhost:5000/stream');eventSource.onmessage = event => {messages.value.push(event.data);};};
</script><style scoped></style>
python:
from flask import Flask, Response
from flask_cors import CORS
import timeapp = Flask(__name__)
CORS(app)
def generate_stream():while True:time.sleep(1)yield f"data: 当前时间: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n"@app.route('/stream',methods=['GET', 'POST'])
def stream():data = request.jsonfile = data.get('file')print(file)return Response(generate_stream(), mimetype='text/event-stream')if __name__ == '__main__':app.run(debug=True)
1.2 post请求版本:vue(@microsoft/fetch-event-source)+python
安装:npm install @microsoft/fetch-event-source
<template><div class="container"><div id="app"><h1>SSE Stream Output</h1><div v-for="message in messages" :key="message">{{ message }}</div></div><el-button @click="sendMsg">send message</el-button></div><el-button @click="stop">stop</el-button>
</template><script setup lang="ts">import { ref } from 'vue';import { fetchEventSource } from '@microsoft/fetch-event-source';const messages = ref<any>([]);const sendMsg = () => {// const eventSource = new EventSource('http://localhost:5000/stream');// eventSource.onmessage = event => {// messages.value.push(event.data);// };connectToEventStream();};let controller: any = null;function connectToEventStream() {if (controller) {controller.abort();controller = null;}//没有的话就到这一步controller = new AbortController();let eventSource: any = fetchEventSource(`http://localhost:5000/stream`, {method: 'POST',headers: {'Content-Type': 'application/json',},body: JSON.stringify({file: 'testFile',userId: '1234',}),signal: controller.signal,openWhenHidden: true,onmessage(event: any) {console.log(event, 'event');messages.value.push(event.data);},onerror() {controller.abort();eventSource.close();},});}// 停止function stop() {if (controller) {controller.abort();controller = null;}}
</script><style scoped></style>
2.websocket
WebSocket
是一种网络通信协议,它允许在客户端(如浏览器)和服务器之间建立全双工(双向)的持久连接。
特点:
- 全双工通信:客户端和服务器可以同时发送和接收数据。
- 持久连接:连接建立后,客户端和服务器可以保持长时间的通信,而不需要频繁地重新建立连接。
- 低延迟:数据可以实时传输,适合实时聊天、在线游戏、股票行情等场景。
- 基于 TCP:WebSocket 是基于 TCP 协议的,确保数据的可靠传输。
- 轻量级:相比 HTTP,WebSocket 的协议头更小,减少了通信开销。
websocket.js
export default class WebSocketClient {constructor(url, options = {}) {this.url = url; // WebSocket 服务器地址this.options = {maxReconnectAttempts: 5, // 最大重连次数reconnectInterval: 5000, // 重连间隔(毫秒)heartbeatInterval: 30000, // 心跳间隔(毫秒)...options, // 允许自定义配置};this.socket = null; // WebSocket 实例this.reconnectAttempts = 0; // 当前重连次数this.heartbeatTimer = null; // 心跳定时器this.messageListeners = []; // 消息监听器this.openListeners = []; // 连接成功监听器this.closeListeners = []; // 连接关闭监听器this.errorListeners = []; // 错误监听器this.init();}// 初始化 WebSocketinit() {this.socket = new WebSocket(this.url);// 监听连接成功this.socket.onopen = () => {console.log("WebSocket 连接成功");this.reconnectAttempts = 0; // 重置重连次数this.startHeartbeat(); // 启动心跳检测this.emitOpen(); // 触发连接成功事件};// 监听服务器消息this.socket.onmessage = (event) => {const data = event.data;console.log("收到服务器消息:", data);this.emitMessage(data); // 触发消息事件// 如果是心跳响应,则重置心跳检测if (data === "pong") {this.resetHeartbeat();}};// 监听连接关闭this.socket.onclose = () => {console.log("WebSocket 连接已关闭");this.stopHeartbeat(); // 停止心跳检测this.emitClose(); // 触发连接关闭事件this.handleReconnect(); // 尝试重连};// 监听连接错误this.socket.onerror = (error) => {console.error("WebSocket 错误:", error);this.stopHeartbeat(); // 停止心跳检测this.emitError(error); // 触发错误事件this.handleReconnect(); // 尝试重连};}// 启动心跳检测startHeartbeat() {this.heartbeatTimer = setInterval(() => {if (this.socket && this.socket.readyState === WebSocket.OPEN) {this.socket.send("ping"); // 发送心跳消息console.log("发送心跳: ping");}}, this.options.heartbeatInterval);}// 重置心跳检测resetHeartbeat() {this.stopHeartbeat();this.startHeartbeat();}// 停止心跳检测stopHeartbeat() {if (this.heartbeatTimer) {clearInterval(this.heartbeatTimer);this.heartbeatTimer = null;}}// 处理重连handleReconnect() {if (this.reconnectAttempts < this.options.maxReconnectAttempts) {this.reconnectAttempts++;console.log(`尝试重连,第 ${this.reconnectAttempts} 次`);setTimeout(() => {this.init(); // 重新初始化 WebSocket}, this.options.reconnectInterval);} else {console.error("已达到最大重连次数,停止重连");}}// 发送消息send(message) {if (this.socket && this.socket.readyState === WebSocket.OPEN) {this.socket.send(JSON.stringify(message));} else {console.error("WebSocket 未连接,无法发送消息");}}// 关闭连接close() {if (this.socket) {this.socket.close();}this.stopHeartbeat();}// 添加消息监听器onMessage(listener) {this.messageListeners.push(listener);}// 添加连接成功监听器onOpen(listener) {this.openListeners.push(listener);}// 添加连接关闭监听器onClose(listener) {this.closeListeners.push(listener);}// 添加错误监听器onError(listener) {this.errorListeners.push(listener);}// 触发消息事件emitMessage(data) {this.messageListeners.forEach((listener) => listener(data));}// 触发连接成功事件emitOpen() {this.openListeners.forEach((listener) => listener());}// 触发连接关闭事件emitClose() {this.closeListeners.forEach((listener) => listener());}// 触发错误事件emitError(error) {this.errorListeners.forEach((listener) => listener(error));}
}
websocket.vue
<template><div class="container"><div><h1>Stream Output</h1><ul><li v-for="(message, index) in messages" :key="index">{{ message }}</li></ul></div><el-input v-model="inputVal"></el-input><el-button @click="sendMsg">send message</el-button></div>
</template><script setup>import { ref, onMounted } from 'vue';import WebSocketClient from './websocket';const messages = ref([]);const inputVal = ref('');const sendMsg = () => {const wsClient = new WebSocketClient('ws://localhost:8000/ws', {maxReconnectAttempts: 3, // 自定义最大重连次数reconnectInterval: 3000, // 自定义重连间隔heartbeatInterval: 20000, // 自定义心跳间隔});wsClient.onOpen(() => {console.log('WebSocket 连接成功');wsClient.send({input:inputVal.value,info:'消息'})});wsClient.onMessage(data => {console.log('收到消息:', data);messages.value.push(data)});};
</script>
python
from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
import asyncio
app = FastAPI()# WebSocket 路由
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):await websocket.accept() # 接受客户端连接try:while True:# 接收客户端消息data = await websocket.receive_text()print(f"收到客户端消息: {data}")# 模拟流式响应for i in range(5):response = f"服务器响应: {data} - {i + 1}"await websocket.send_text(response) # 发送消息给客户端await asyncio.sleep(1) # 模拟延迟except Exception as e:print(f"WebSocket 连接异常: {e}")finally:await websocket.close() # 关闭连接# 启动服务
if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)