引言
实时通信技术在现代 Web 应用中扮演着核心角色,而 WebSocket 作为其中的关键技术,已成为即时通讯(IM)系统不可或缺的一部分。Cowboy,这个基于 Erlang/OTP 的轻量级 HTTP 服务器框架,以其强大且用户友好的 WebSocket 功能,为开发者提供了构建高效 IM 应用的利器。本文将深入分析如何利用 Cowboy WebSocket 来打造高性能的即时通讯解决方案。
WebSocket 基础
WebSocket 创造了一个全双工通信通道,允许服务器与客户端进行实时双向数据交换,非常适合需要即时反馈的应用场景,如在线聊天和实时游戏。
Websocket Handler 架构
IMBoy 的 websocket_handler.erl
模块通过实现 cowboy_websocket
行为来管理 WebSocket 连接。以下是关键组件的概览:
init/2
:初始化请求处理。websocket_init/1
:WebSocket 连接建立后的初始化操作。websocket_handle/2
:处理 WebSocket 接收到的消息。websocket_info/2
:处理从其他进程发送到 WebSocket 进程的消息。terminate/3
:关闭 WebSocket 连接时的资源清理。
websocket_handler.erl
代码解析
以下是对 websocket_handler.erl
代码片段的解析:
1. 模块定义与行为引入:
-module(websocket_handler).
-behavior(cowboy_websocket).
定义了名为 websocket_handler
的模块,并引入了名为 cowboy_websocket
的 behavior。
2. 导出函数:
-export([init/2]).
-export([websocket_init/1]).
-export([websocket_handle/2]).
-export([websocket_info/2]).
-export([terminate/3]).
这些函数分别对应 WebSocket 生命周期的不同阶段。
3. WebSocket 初始化握手 (init/2
):
在此阶段,我们从请求中提取关键信息(如设备 ID、版本号等),并根据这些信息配置 WebSocket。
3.1 客户端连接频率控制:
首先,检查客户端设备 ID 的连接频率。配置文件中设定了每秒 2 次、每分钟 20 次的限制。
case throttle:check(throttle_ws, DID) of{limit_exceeded, _, _} ->imboy_log:warning("DeviceID ~p exceeded api limit", [DID]),Req = cowboy_req:reply(429, Req0),{ok, Req, State0};_ ->% ... 代码省略
end.
3.2 WebSocket 子协议升级:
频率检查通过后,检查 sec-websocket-protocol
请求头,确保其为非空列表。IMBoy 采用列表中的第一个元素(例如 “text”),并设置响应头。
check_subprotocols([H | _Tail], Req0) ->Req = cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, H, Req0),{cowboy_websocket, Req}.
3.3 校验 Authorization:
子协议检查通过后,校验 authorization
请求头中的 JWT 令牌。验证成功后,将当前用户 UID 写入状态参数 State
,供后续使用。
auth_after(Uid, Req, State, Opt) ->Timeout = idle_timeout(Uid),{cowboy_websocket, Req, State#{current_uid => Uid}, Opt#{idle_timeout := Timeout}}.
3.4 动态设置 WebSocket 的 idle timeout 值:
设想根据客户端网络状况动态计算 idle timeout 值(该功能尚未实现,但值得期待)。
% 设置用户 WebSocket 超时时间,默认为 60 秒
% Cowboy 默认在 128 秒后关闭空闲连接,此处设置为 60000
idle_timeout(_Uid) ->60000.
4. 连接初始化 (websocket_init/1
):
一旦 WebSocket 连接建立,可以执行一些初始化操作,例如记录用户上线、获取离线消息等。
5. 消息处理 (websocket_handle/2
):
在此处理客户端发送的各种消息。例如,对于 ping 消息,回复 pong;对于文本消息,根据消息类型调用相应的处理函数。
5.1 客户端消息确认方法:
- 消息格式为
CLIENT_ACK,type,msgid,did
,例如前缀"CLIENT_ACK,"
后跟消息类型、消息唯一 ID 和设备 ID。 - 检查缓存系统中是否有相关消息的计时器引用,如果有,取消计时器并删除缓存。
- 根据消息类型清理离线消息。
相关代码如下:
% 客户端确认消息
% 格式:CLIENT_ACK,type,msgid,did
websocket_handle({text, <<"CLIENT_ACK,", Tail/binary>>}, State) ->CurrentUid = maps:get(current_uid, State),try binary:split(Tail, <<",">>, [global]) of[Type, MsgId, DID] ->Key = {CurrentUid, DID, MsgId},% 缓存设置在 message_ds:send_next/5 中case imboy_cache:get(Key) ofundefined ->ok;{ok, TimerRef} ->erlang:cancel_timer(TimerRef),imboy_cache:flush(Key)end,% ... 根据消息类型处理
end.
5.2 处理 WebSocket 消息:
根据接收到的文本消息类型,调用不同的逻辑处理函数。
websocket_handle({text, Msg}, State) ->% ... 解码消息、获取当前用户 UID% 根据消息类型分发处理逻辑case cowboy_bstr:to_lower(Type) of<<"c2c">> -> % 单聊消息websocket_logic:c2c(MsgId, CurrentUid, Data);% ... 其他消息类型处理end;
% ... 其他处理分支
6. 信息处理 (websocket_info/2
):
处理从 Erlang 系统发送到 WebSocket 进程的消息,例如超时消息或关闭连接请求。
- 处理超时消息:
websocket_info({timeout, _Ref, Msg}, State) ->{reply, {text, Msg}, State, hibernate};
当超时发生时,回复文本消息,并保持挂起状态以节省资源。
- 服务端主动关闭连接处理:
websocket_info({close, CloseCode, Reason}, State) ->{reply, {close, CloseCode, Reason}, State};
websocket_info(stop, State) ->{stop, State};
7. 连接终止 (terminate/3
):
在连接终止时,根据关闭原因执行清理操作,如记录用户下线。
terminate(Reason, _Req, State) ->% ... 执行清理操作
end;
WebSocket vs AMQP vs MQTT
在选择适合 IM 应用的协议时,需考虑以下因素:
- 实时性:WebSocket 提供最低延迟和最高实时性。
- 复杂性:AMQP 提供丰富消息模式,但配置和实现较复杂。
- 轻量级:MQTT 适合资源受限环境,但全双工通信受限。
结论
Cowboy WebSocket 提供了高效、简洁的方法来实现实时 Web 通信,特别适合需要快速交互的 IM 应用。通过深入理解其实现原理和生命周期管理,开发者可以构建高性能的实时通信系统。
希望通过本文的分析和代码示例,能帮助不同经验水平的开发者更好地理解和使用 Cowboy WebSocket,从而在项目中实现高效、稳定的实时通信功能。
欢迎关注 IMBoy 开源项目 https://gitee.com/imboy-pub。