在现代网页应用中,一个强大的 WebSocket 服务器就像是一个高效的调度中心,能够处理成千上万的并发连接。记得在一个直播平台项目中,我们通过优化 WebSocket 服务器架构,成功支持了 10 万用户同时在线。今天,我想和大家分享如何使用 Node.js 构建高性能的 WebSocket 服务器。
服务器架构设计
一个优秀的 WebSocket 服务器需要考虑以下几个关键点:
- 连接管理
- 消息处理
- 错误处理
- 性能优化
- 集群扩展
让我们从基础架构开始:
// server.js
const WebSocket = require('ws')
const Redis = require('ioredis')
const cluster = require('cluster')
const numCPUs = require('os').cpus().length// Redis 客户端
const redis = new Redis({host: 'localhost',port: 6379,retryStrategy: (times) => {const delay = Math.min(times * 50, 2000)return delay}
})class WebSocketServer {constructor(options = {}) {this.port = options.port || 8080this.clients = new Map()this.channels = new Map()this.initialize()}initialize() {// 创建 WebSocket 服务器this.wss = new WebSocket.Server({port: this.port,// 自定义握手verifyClient: this.verifyClient.bind(this),// 客户端追踪clientTracking: true})// 设置服务器事件监听this.setupServerEvents()// 初始化 Redis 订阅this.setupRedis()console.log(`WebSocket Server is running on port ${this.port}`)}// 验证客户端连接verifyClient(info, callback) {const token = this.parseToken(info.req)if (!token) {callback(false, 401, 'Unauthorized')return}// 验证 tokenthis.verifyToken(token).then((user) => {info.req.user = usercallback(true)}).catch((error) => {console.error('Auth error:', error)callback(false, 401, 'Invalid token')})}// 解析 tokenparseToken(req) {const auth = req.headers['authorization']if (!auth) return nullconst [type, token] = auth.split(' ')return type === 'Bearer' ? token : null}// 验证 tokenasync verifyToken(token) {// 实现 token 验证逻辑// 返回用户信息return { id: 'user-1', name: 'Test User' }}// 设置服务器事件setupServerEvents() {// 处理新连接this.wss.on('connection', (ws, req) => {this.handleConnection(ws, req)})// 处理服务器错误this.wss.on('error', (error) => {console.error('Server error:', error)})// 优雅关闭process.on('SIGTERM', () => {this.shutdown()})}// 处理新连接handleConnection(ws, req) {const user = req.userconst clientId = user.id// 存储客户端连接this.clients.set(clientId, {ws,user,channels: new Set(),lastPing: Date.now()})console.log(`Client connected: ${clientId}`)// 设置客户端事件监听this.setupClientEvents(ws, clientId)// 发送欢迎消息this.sendToClient(ws, {type: 'welcome',data: {clientId,timestamp: Date.now()}})}// 设置客户端事件setupClientEvents(ws, clientId) {// 处理消息ws.on('message', (message) => {this.handleMessage(clientId, message)})// 处理关闭ws.on('close', () => {this.handleClose(clientId)})// 处理错误ws.on('error', (error) => {this.handleError(clientId, error)})// 处理 pingws.on('ping', () => {this.handlePing(clientId)})}// 处理消息handleMessage(clientId, message) {try {const data = JSON.parse(message)const client = this.clients.get(clientId)if (!client) {console.error(`Client not found: ${clientId}`)return}// 更新最后活动时间client.lastActivity = Date.now()// 处理不同类型的消息switch (data.type) {case 'subscribe':this.handleSubscribe(clientId, data.channel)breakcase 'unsubscribe':this.handleUnsubscribe(clientId, data.channel)breakcase 'publish':this.handlePublish(clientId, data.channel, data.data)breakdefault:console.warn(`Unknown message type: ${data.type}`)}} catch (error) {console.error('Message handling error:', error)this.sendError(clientId, 'Invalid message format')}}// 处理订阅handleSubscribe(clientId, channel) {const client = this.clients.get(clientId)if (!client) return// 添加到频道client.channels.add(channel)// 更新频道订阅者if (!this.channels.has(channel)) {this.channels.set(channel, new Set())}this.channels.get(channel).add(clientId)// 订阅 Redis 频道redis.subscribe(channel)// 发送确认消息this.sendToClient(client.ws, {type: 'subscribed',channel})}// 处理取消订阅handleUnsubscribe(clientId, channel) {const client = this.clients.get(clientId)if (!client) return// 从频道移除client.channels.delete(channel)// 更新频道订阅者const subscribers = this.channels.get(channel)if (subscribers) {subscribers.delete(clientId)if (subscribers.size === 0) {this.channels.delete(channel)// 取消订阅 Redis 频道redis.unsubscribe(channel)}}// 发送确认消息this.sendToClient(client.ws, {type: 'unsubscribed',channel})}// 处理发布handlePublish(clientId, channel, data) {const client = this.clients.get(clientId)if (!client) return// 验证权限if (!this.canPublish(client, channel)) {this.sendError(clientId, 'No permission to publish')return}// 发布到 Redisredis.publish(channel, JSON.stringify({from: clientId,data,timestamp: Date.now()}))}// 处理 pinghandlePing(clientId) {const client = this.clients.get(clientId)if (client) {client.lastPing = Date.now()client.ws.pong()}}// 处理关闭handleClose(clientId) {const client = this.clients.get(clientId)if (!client) return// 清理订阅client.channels.forEach((channel) => {this.handleUnsubscribe(clientId, channel)})// 移除客户端this.clients.delete(clientId)console.log(`Client disconnected: ${clientId}`)}// 处理错误handleError(clientId, error) {console.error(`Client error (${clientId}):`, error)this.sendError(clientId, 'Internal error occurred')}// 发送消息给客户端sendToClient(ws, data) {if (ws.readyState === WebSocket.OPEN) {ws.send(JSON.stringify(data))}}// 发送错误消息sendError(clientId, message) {const client = this.clients.get(clientId)if (client) {this.sendToClient(client.ws, {type: 'error',message})}}// 设置 RedissetupRedis() {// 订阅消息处理redis.on('message', (channel, message) => {try {const data = JSON.parse(message)const subscribers = this.channels.get(channel)if (subscribers) {subscribers.forEach((clientId) => {const client = this.clients.get(clientId)if (client && client.ws.readyState === WebSocket.OPEN) {this.sendToClient(client.ws, {type: 'message',channel,data: data.data,from: data.from,timestamp: data.timestamp})}})}} catch (error) {console.error('Redis message handling error:', error)}})// Redis 错误处理redis.on('error', (error) => {console.error('Redis error:', error)})}// 清理无效连接cleanup() {const now = Date.now()const timeout = 60000 // 60 秒超时this.clients.forEach((client, clientId) => {if (now - client.lastPing > timeout) {console.log(`Cleaning up inactive client: ${clientId}`)client.ws.terminate()this.handleClose(clientId)}})}// 优雅关闭shutdown() {console.log('Shutting down WebSocket server...')// 关闭所有客户端连接this.clients.forEach((client) => {client.ws.close(1001, 'Server shutting down')})// 关闭 WebSocket 服务器this.wss.close(() => {console.log('WebSocket server closed')// 关闭 Redis 连接redis.quit(() => {console.log('Redis connection closed')process.exit(0)})})}
}// 集群模式
if (cluster.isMaster) {console.log(`Master ${process.pid} is running`)// 启动工作进程for (let i = 0; i < numCPUs; i++) {cluster.fork()}cluster.on('exit', (worker, code, signal) => {console.log(`Worker ${worker.process.pid} died`)// 重启工作进程cluster.fork()})
} else {// 工作进程启动服务器const server = new WebSocketServer({port: process.env.PORT || 8080})// 定期清理setInterval(() => {server.cleanup()}, 30000)console.log(`Worker ${process.pid} started`)
}
连接管理
实现可靠的连接管理机制:
// connection-manager.js
class ConnectionManager {constructor() {this.connections = new Map()this.groups = new Map()}// 添加连接addConnection(id, connection) {this.connections.set(id, {connection,groups: new Set(),metadata: {},stats: {messagesReceived: 0,messagesSent: 0,bytesReceived: 0,bytesSent: 0,connectedAt: Date.now()}})}// 移除连接removeConnection(id) {const conn = this.connections.get(id)if (conn) {// 从所有组中移除conn.groups.forEach(group => {this.removeFromGroup(id, group)})this.connections.delete(id)}}// 添加到组addToGroup(connectionId, group) {const conn = this.connections.get(connectionId)if (!conn) return falseif (!this.groups.has(group)) {this.groups.set(group, new Set())}this.groups.get(group).add(connectionId)conn.groups.add(group)return true}// 从组中移除removeFromGroup(connectionId, group) {const groupSet = this.groups.get(group)if (groupSet) {groupSet.delete(connectionId)if (groupSet.size === 0) {this.groups.delete(group)}}const conn = this.connections.get(connectionId)if (conn) {conn.groups.delete(group)}}// 发送消息到组broadcastToGroup(group, message, excludeId = null) {const groupSet = this.groups.get(group)if (!groupSet) returngroupSet.forEach(id => {if (id !== excludeId) {this.sendTo(id, message)}})}// 发送消息到指定连接sendTo(id, message) {const conn = this.connections.get(id)if (conn && conn.connection.readyState === WebSocket.OPEN) {const data = JSON.stringify(message)conn.connection.send(data)// 更新统计conn.stats.messagesSent++conn.stats.bytesSent += data.length}}// 获取连接统计getStats(id) {const conn = this.connections.get(id)return conn ? conn.stats : null}// 获取组成员getGroupMembers(group) {return Array.from(this.groups.get(group) || [])}// 获取连接的组getConnectionGroups(id) {const conn = this.connections.get(id)return conn ? Array.from(conn.groups) : []}// 设置连接元数据setMetadata(id, key, value) {const conn = this.connections.get(id)if (conn) {conn.metadata[key] = value}}// 获取连接元数据getMetadata(id, key) {const conn = this.connections.get(id)return conn ? conn.metadata[key] : null}
}
消息队列集成
使用 Redis 实现消息队列:
// message-queue.js
const Redis = require('ioredis')class MessageQueue {constructor(options = {}) {this.publisher = new Redis(options)this.subscriber = new Redis(options)this.handlers = new Map()this.subscriber.on('message', (channel, message) => {this.handleMessage(channel, message)})}// 发布消息async publish(channel, message) {try {const data = JSON.stringify({message,timestamp: Date.now()})await this.publisher.publish(channel, data)return true} catch (error) {console.error('Publish error:', error)return false}}// 订阅频道async subscribe(channel, handler) {try {await this.subscriber.subscribe(channel)this.handlers.set(channel, handler)return true} catch (error) {console.error('Subscribe error:', error)return false}}// 取消订阅async unsubscribe(channel) {try {await this.subscriber.unsubscribe(channel)this.handlers.delete(channel)return true} catch (error) {console.error('Unsubscribe error:', error)return false}}// 处理消息handleMessage(channel, message) {try {const handler = this.handlers.get(channel)if (handler) {const data = JSON.parse(message)handler(data.message, data.timestamp)}} catch (error) {console.error('Message handling error:', error)}}// 关闭连接async close() {await this.publisher.quit()await this.subscriber.quit()}
}
心跳检测
实现可靠的心跳检测机制:
// heartbeat.js
class HeartbeatManager {constructor(options = {}) {this.interval = options.interval || 30000this.timeout = options.timeout || 60000this.connections = new Map()this.start()}// 启动心跳检测start() {this.timer = setInterval(() => {this.check()}, this.interval)}// 停止心跳检测stop() {if (this.timer) {clearInterval(this.timer)this.timer = null}}// 添加连接add(id, connection) {this.connections.set(id, {connection,lastPing: Date.now(),pingCount: 0})// 设置 ping 处理connection.on('pong', () => {this.handlePong(id)})}// 移除连接remove(id) {this.connections.delete(id)}// 检查连接check() {const now = Date.now()this.connections.forEach((data, id) => {const { connection, lastPing } = data// 检查超时if (now - lastPing > this.timeout) {console.log(`Connection timeout: ${id}`)connection.terminate()this.remove(id)return}// 发送 pingif (connection.readyState === WebSocket.OPEN) {connection.ping()data.pingCount++}})}// 处理 pong 响应handlePong(id) {const data = this.connections.get(id)if (data) {data.lastPing = Date.now()data.pingCount = 0}}// 获取连接状态getStatus(id) {const data = this.connections.get(id)if (!data) return nullreturn {lastPing: data.lastPing,pingCount: data.pingCount,isAlive: Date.now() - data.lastPing <= this.timeout}}
}
错误处理
实现全面的错误处理机制:
// error-handler.js
class ErrorHandler {constructor() {this.handlers = new Map()}// 注册错误处理器register(type, handler) {this.handlers.set(type, handler)}// 处理错误handle(error, context = {}) {const handler = this.handlers.get(error.type) || this.defaultHandlerreturn handler(error, context)}// 默认错误处理器defaultHandler(error, context) {console.error('Unhandled error:', error)return {type: 'error',message: 'Internal server error',code: 500}}
}// 错误类型
class WebSocketError extends Error {constructor(type, message, code = 500) {super(message)this.type = typethis.code = code}
}// 使用示例
const errorHandler = new ErrorHandler()// 注册错误处理器
errorHandler.register('auth', (error, context) => {return {type: 'error',message: 'Authentication failed',code: 401}
})errorHandler.register('validation', (error, context) => {return {type: 'error',message: 'Invalid message format',code: 400}
})// 处理错误
try {throw new WebSocketError('auth', 'Invalid token')
} catch (error) {const response = errorHandler.handle(error, {clientId: 'user-1'})console.log(response)
}
性能监控
实现性能监控系统:
// monitor.js
class PerformanceMonitor {constructor() {this.metrics = new Map()this.startTime = Date.now()}// 记录指标record(name, value) {if (!this.metrics.has(name)) {this.metrics.set(name, {count: 0,total: 0,min: Infinity,max: -Infinity,values: []})}const metric = this.metrics.get(name)metric.count++metric.total += valuemetric.min = Math.min(metric.min, value)metric.max = Math.max(metric.max, value)metric.values.push(value)}// 获取指标统计getStats(name) {const metric = this.metrics.get(name)if (!metric) return nullconst avg = metric.total / metric.countconst sorted = [...metric.values].sort((a, b) => a - b)const p95 = sorted[Math.floor(sorted.length * 0.95)]const p99 = sorted[Math.floor(sorted.length * 0.99)]return {count: metric.count,min: metric.min,max: metric.max,avg,p95,p99}}// 获取所有指标getAllStats() {const stats = {}this.metrics.forEach((value, key) => {stats[key] = this.getStats(key)})return stats}// 重置指标reset() {this.metrics.clear()this.startTime = Date.now()}// 获取运行时间getUptime() {return Date.now() - this.startTime}
}// 使用示例
const monitor = new PerformanceMonitor()// 记录消息处理时间
function processMessage(message) {const start = process.hrtime()// 处理消息...const [seconds, nanoseconds] = process.hrtime(start)const duration = seconds * 1000 + nanoseconds / 1000000monitor.record('messageProcessing', duration)
}// 定期输出统计信息
setInterval(() => {console.log('Performance Stats:', monitor.getAllStats())
}, 60000)
写在最后
通过这篇文章,我们详细探讨了如何使用 Node.js 构建高性能的 WebSocket 服务器。从基础架构到性能优化,我们不仅关注了功能实现,更注重了实际应用中的各种挑战。
记住,一个优秀的 WebSocket 服务器需要在功能、性能和可靠性之间找到平衡。在实际开发中,我们要根据具体需求选择合适的实现方案,确保服务器能够稳定高效地运行。
如果觉得这篇文章对你有帮助,别忘了点个赞 👍