欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > 服务端给客户端push消息的demo的实现流程

服务端给客户端push消息的demo的实现流程

2024/10/26 0:21:21 来源:https://blog.csdn.net/su749520/article/details/142834279  浏览:    关键词:服务端给客户端push消息的demo的实现流程

摘要:

本示例演示了一个基本的服务端5分钟定时向客户端app推送消息的WebSocket机制。服务端使用WebSocket协议接受客户端的订阅和取消订阅请求,并根据客户端的订阅状态发送实时消息。服务端记录并打印带有时间戳的日志,以监控订阅活动。客户端可以接收来自服务端的消息,并根据需要处理这些消息。

实现流程

1.初始化WebSocket服务端

创建并启动WebSocket服务端,准备接收客户端的连接请求。

/** * 心跳通常是指客户端或服务器定期发送一个小型的、空的消息以保持连接的活动状态。它用于检测连接是否仍然有效,并防止连接由于长时间没有活动而被关闭。 * * 推送是指服务器主动向客户端发送实际的数据或消息。服务器可以根据特定的业务逻辑或事件触发,将数据推送给客户端,而不需要客户端发起请求。 */package com.fadi.power.netpowerpushserviceimport io.ktor.application.*import io.ktor.http.cio.websocket.*import io.ktor.routing.*import io.ktor.server.engine.embeddedServerimport io.ktor.server.netty.Nettyimport io.ktor.websocket.WebSocketsimport io.ktor.websocket.webSocketimport kotlinx.coroutines.launchimport java.util.concurrent.atomic.AtomicIntegerimport kotlinx.coroutines.delayimport kotlinx.coroutines.isActiveimport java.util.concurrent.ConcurrentHashMapimport java.text.SimpleDateFormatimport java.util.Calendarfun main() {    embeddedServer(Netty, port = 8080, module = Application::module).start(wait = true)}fun Application.module() {    install(WebSockets)    routing {        val connections = AtomicInteger(0)        // 使用 ConcurrentHashMap 以支持线程安全的读写操作        val clients = ConcurrentHashMap<WebSocketSession, Boolean>()        webSocket("/ws") {            connections.incrementAndGet()            println("Client connected. Total connections: ${connections.get()}")            // 默认情况下,新客户端愿意接收 push 消息            clients[this] = true            try {                // 启动一个协程用于定时向客户端发送消息                val pushJob = launch {                    while (isActive) {                        delay(300000) // 等待5分钟                        clients.forEach { (client, wantsPush) ->                            if (client.isActive && wantsPush) {                                val currentDateTime = Calendar.getInstance().time                                val formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)                                client.send("Server push message at ${formattedDateTime}")                            }                        }                    }                }                for (frame in incoming) {

2.客户端连接和设置是否推送处理

接受客户端连接,并为每个客户端创建一个会话。

companion object {        const val HEARTBEAT_MESSAGE = "Heartbeat"        const val SUBSCRIBE_MESSAGE = "subscribe"        const val UNSUBSCRIBE_MESSAGE = "unsubscribe"    }        override fun onOpen(handshakedata: ServerHandshake?) {        // 连接成功,发送数据或执行其他操作        noteWebSocketClientOpen()        send("Hello, Server!")        Log.d(Config.TAG, "WebSocketClient onOpen: Hello, Server!")        // 【不使用自带的心跳,使用自建Alarm定时】启动定时任务发送心跳消息        // startHeartbeat()    }    // 设置是否推送    fun setEnablePush(enable: Boolean) {        if (enable) {            send(SUBSCRIBE_MESSAGE)        } else {            send(UNSUBSCRIBE_MESSAGE)        }    }

3.消息接收与处理

接收来自客户端的消息,并根据消息内容做出响应。

@Overridepublic void onMessage(WebSocket conn, String message) {    System.out.println("Received message from client: " + message);    // 处理客户端发送的消息}

4.订阅与取消订阅逻辑

根据客户端发送的指令更新订阅状态,并记录带时间戳的日志。

for (frame in incoming) {    when (frame) {        is Frame.Text -> {            val receivedText = frame.readText()            println("Received message: $receivedText")            when (receivedText) {                "unsubscribe" -> {                    val currentDateTime = Calendar.getInstance().time                    val formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)                    println("[$formattedDateTime] unsubscribe: 关闭推送")                }                "subscribe" -> {                    clients[this] = true                    val currentDateTime = Calendar.getInstance().time                    val formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)                    println("[$formattedDateTime] subscribe: 开启推送")                }                else -> send("Server received: $receivedText")            }        }        else -> {}    }}

5.推送消息

向已订阅的客户端发送推送消息。

// 启动一个协程用于定时向客户端发送消息val pushJob = launch {    while (isActive) {        delay(300000) // 等待5分钟        clients.forEach { (client, wantsPush) ->            if (client.isActive && wantsPush) {                val currentDateTime = Calendar.getInstance().time                val formattedDateTime = SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(currentDateTime)                client.send("Server push message at ${formattedDateTime}")            }        }    }}

6.客户端接受消息

override fun onMessage(message: String?) {        // 接收到服务器发送的消息,执行相应的处理逻辑        message?.let {            Log.d(Config.TAG, "WebSocketClient onMessage: Received message: $it")        }    }

7. 日志验证

7.1 服务端

服务端支持动态开启和关闭推送

0

7.2 app端

App收到定时5分钟的服务器push消息

0

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com