摘要:
本示例演示了一个基本的服务端5分钟定时向客户端app推送消息的WebSocket机制。服务端使用WebSocket协议接受客户端的订阅和取消订阅请求,并根据客户端的订阅状态发送实时消息。服务端记录并打印带有时间戳的日志,以监控订阅活动。客户端可以接收来自服务端的消息,并根据需要处理这些消息。
实现流程
1.初始化WebSocket服务端
创建并启动WebSocket服务端,准备接收客户端的连接请求。
/*** 心跳通常是指客户端或服务器定期发送一个小型的、空的消息以保持连接的活动状态。它用于检测连接是否仍然有效,并防止连接由于长时间没有活动而被关闭。** 推送是指服务器主动向客户端发送实际的数据或消息。服务器可以根据特定的业务逻辑或事件触发,将数据推送给客户端,而不需要客户端发起请求。*/package com.fadi.power.netpowerpushserviceimport io.ktor.application.*import io.ktor.http.cio.websocket.*import io.ktor.routing.*import io.ktor.server.engine.embeddedServerimport io.ktor.server.netty.Nettyimport io.ktor.websocket.WebSocketsimport io.ktor.websocket.webSocketimport kotlinx.coroutines.launchimport java.util.concurrent.atomic.AtomicIntegerimport kotlinx.coroutines.delayimport kotlinx.coroutines.isActiveimport java.util.concurrent.ConcurrentHashMapimport java.text.SimpleDateFormatimport java.util.Calendarfun main() {embeddedServer(Netty, port = 8080, module = Application::module).start(wait = true)}fun Application.module() {install(WebSockets)routing {val connections = AtomicInteger(0)// 使用 ConcurrentHashMap 以支持线程安全的读写操作val clients = ConcurrentHashMap<WebSocketSession, Boolean>()webSocket("/ws") {connections.incrementAndGet()println("Client connected. Total connections: ${connections.get()}")// 默认情况下,新客户端愿意接收 push 消息clients[this] = truetry {// 启动一个协程用于定时向客户端发送消息val pushJob = launch {while (isActive) {delay(300000) // 等待5分钟clients.forEach { (client, wantsPush) ->if (client.isActive && wantsPush) {val currentDateTime = Calendar.getInstance().timeval formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)client.send("Server push message at ${formattedDateTime}")}}}}for (frame in incoming) {
2.客户端连接和设置是否推送处理
接受客户端连接,并为每个客户端创建一个会话。
companion object {const val HEARTBEAT_MESSAGE = "Heartbeat"const val SUBSCRIBE_MESSAGE = "subscribe"const val UNSUBSCRIBE_MESSAGE = "unsubscribe"}override fun onOpen(handshakedata: ServerHandshake?) {// 连接成功,发送数据或执行其他操作noteWebSocketClientOpen()send("Hello, Server!")Log.d(Config.TAG, "WebSocketClient onOpen: Hello, Server!")// 【不使用自带的心跳,使用自建Alarm定时】启动定时任务发送心跳消息// startHeartbeat()}// 设置是否推送fun setEnablePush(enable: Boolean) {if (enable) {send(SUBSCRIBE_MESSAGE)} else {send(UNSUBSCRIBE_MESSAGE)}}
3.消息接收与处理
接收来自客户端的消息,并根据消息内容做出响应。
@Overridepublic void onMessage(WebSocket conn, String message) {System.out.println("Received message from client: " + message);// 处理客户端发送的消息}
4.订阅与取消订阅逻辑
根据客户端发送的指令更新订阅状态,并记录带时间戳的日志。
for (frame in incoming) {when (frame) {is Frame.Text -> {val receivedText = frame.readText()println("Received message: $receivedText")when (receivedText) {"unsubscribe" -> {val currentDateTime = Calendar.getInstance().timeval formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)println("[$formattedDateTime] unsubscribe: 关闭推送")}"subscribe" -> {clients[this] = trueval currentDateTime = Calendar.getInstance().timeval formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)println("[$formattedDateTime] subscribe: 开启推送")}else -> send("Server received: $receivedText")}}else -> {}}}
5.推送消息
向已订阅的客户端发送推送消息。
// 启动一个协程用于定时向客户端发送消息val pushJob = launch {while (isActive) {delay(300000) // 等待5分钟clients.forEach { (client, wantsPush) ->if (client.isActive && wantsPush) {val currentDateTime = Calendar.getInstance().timeval formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)client.send("Server push message at ${formattedDateTime}")}}}}
6.客户端接受消息
override fun onMessage(message: String?) {// 接收到服务器发送的消息,执行相应的处理逻辑message?.let {Log.d(Config.TAG, "WebSocketClient onMessage: Received message: $it")}}
7. 日志验证
7.1 服务端
服务端支持动态开启和关闭推送

7.2 app端
App收到定时5分钟的服务器push消息

