欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > Laravel+swoole 实现websocket长链接

Laravel+swoole 实现websocket长链接

2024/10/25 0:33:49 来源:https://blog.csdn.net/qq_32835907/article/details/140485680  浏览:    关键词:Laravel+swoole 实现websocket长链接

需要使用 swoole 扩展

我使用的是 swoole 5.x

start 方法启动服务 和 定时器

调整 listenQueue 定时器可以降低消息通讯延迟

定时器会自动推送队列里面的消息

testMessage 方法测试给指定用户推送消息

使用 laravel console 启动

<?phpnamespace App\Console\Commands;use App\Services\SocketService;
use Illuminate\Console\Command;class WsServer extends Command
{/*** The name and signature of the console command.** @var string*/protected $signature = 'app:wsServer';/*** The console command description.** @var string*/protected $description = 'Command description';/*** Execute the console command.*/public function handle(){$SocketService = new SocketService();$SocketService->start();}
}

socket 服务实现代码

<?phpnamespace App\Services;use Swoole\WebSocket\Server;
use Swoole\Timer;
use Illuminate\Support\Facades\Redis;
use RedisException;
use Swoole\Http\Request;class SocketService
{public $port = 9501;public $server;public $links;public $cmds = [];public function __construct (){$this->links = collect([]);$this->server = new Server("0.0.0.0", env('APP_SOCKET_PORT', $this->port ));$this->server->on( 'open', function (Server $server, Request $request){$this->open( $server, $request );} );$this->server->on( 'message', function (Server $server, $frame){$this->message( $server, $frame );} );$this->server->on( 'close', function (Server $server, $fd){$this->close( $server, $fd );} );}public function start(){$this->linkManage();$this->listenQueue();$this->server->start();}public function print( $message, $level = 'info' ){if( is_array($message) || is_object($message) ){$message = json_encode($message, 320);}print_r( "[". date("Y-m-d H:i:s") ."] " . $level . ' ' . $message . "\n" );}public function linkManage(){Timer::tick( 100, function (){//var_dump( "listenQueue while: " . json_encode($this->cmds, 320) );$cmd = array_shift( $this->cmds );if( $cmd ){switch ( $cmd['operate'] ){case 'open':// 活跃$this->links->push( [ "fd" => $cmd['fd'], "user_id" => intval($cmd['user_id']??0), 'updated_at' => date("Y-m-d H:i:s") ] );$this->print( "添加客户端:fd = " . json_encode($cmd, 320)  );break;case 'close':$newLinks = [];foreach ( $this->links as $link ){if( $link['fd'] == $cmd['fd'] ){continue;}$newLinks[] = $link;}$this->links = collect( $newLinks );$this->print( "删除客户端:fd = " . json_encode($cmd, 320) );break;case 'heartbeat':$newLinks = [];foreach ( $this->links as $link ){if( $link['fd'] == $cmd['fd'] ){$link['updated_at'] = date("Y-m-d H:i:s");}$newLinks[] = $link;}$this->links = collect( $newLinks );break;}// $this->print( "连接数量是:" . $this->links->count() );// $this->print( "连接数量是:" . $this->links->toJson() );}$newLinks = [];foreach ( $this->links as $link ){if( strtotime( $link['updated_at'] ) < (time() - 60) ){$this->print( "长时间未心跳,删除客户端:fd = " . json_encode($link, 320) );if( $this->server->isEstablished( $link['fd'] ) ){$this->disconnect( $link['fd'], '未进行心跳' );}continue;}$newLinks[] = $link;}$this->links = collect( $newLinks );} );}public function listenQueue(){Timer::tick( 1000, function (){// Redis::rpush( "SocketService:listenQueue", serialize(["hahah"]) )try{$element = Redis::lpop('SocketService:listenQueue');if( $element ){$this->print( "listenQueue 有新的信息哦:" . $element );$data = unserialize($element);if( ! empty( $data['user_id']) ){$links = $this->links->where( "user_id", $data['user_id'] )->values()->all();if( empty($links) ){$this->print( "没有在线用户:user_id = " . json_encode($data, 320) );//var_export( $this->links );//var_export( $links );}foreach ( $links as $link ){if( ! $this->server->isEstablished( $link['fd'] ) ){array_push( $this->cmds, [ 'operate' => 'close', 'fd' => $link['fd'] ] );continue;}try{// 生成消息数据$message = $this->makeMessage( $data['data'], $data['type'], $data['message'] );// 开始推送$this->runPush( $link['fd'], $message );}catch (\Throwable $e){$this->print( "数据推送异常:" . json_encode([ $e->getMessage(),$e->getLine(), $e->getFile() ], 320) );}}}}}catch (RedisException $e){Redis::connect();}});}public function open( Server $server, Request $request ){$params = $request->get;if( empty( $params['user_id'] ) ){$this->disconnect( $request->fd, '缺少用户信息' );return true;}array_push( $this->cmds, [ 'operate' => 'open', 'fd' => $request->fd, 'user_id' => $params['user_id'] ] );// 生成消息数据$message = $this->makeMessage( [ 'fd' => $request->fd ], "connectionSuccessful", "连接成功" );// 开始推送$this->runPush( $request->fd, $message );$this->print( "server: handshake success with fd{$request->fd} " );}public function message( Server $server, $frame ){//$data = json_decode( $frame->data, true );if( is_array( $data ) ){if( $data['type'] == "ping" ){array_push( $this->cmds, [ 'operate' => 'heartbeat', 'fd' => $frame->fd ] );$this->server->push( $frame->fd, json_encode( [ "type" => "pong" ] , 320 ) );}else{$this->print( "receive from {$frame->fd}:{$frame->data},opcode:{$frame->opcode},fin:{$frame->finish} " );}}}public function close(Server $server, $fd){array_push( $this->cmds, [ 'operate' => 'close', 'fd' => $fd ] );$this->print( "client {$fd} closed " );}public function push( $fd, string $data ){$this->server->push($fd, $data);}public function disconnect(int $fd, string $reason = '', int $code = SWOOLE_WEBSOCKET_CLOSE_NORMAL){$this->server->disconnect($fd, $code, $reason);}public function makeMessage( array $data, $type = "", $message = "" ){return [ 'type' => $type, "message" => $message, "data" => $data ];}public function runPush( $fd, $message ){$this->print( "推送消息: {$fd} - " .  json_encode(  $message, 320 ) );$this->server->push( $fd, json_encode( $message , 320 ) );}/*** App\Services\SocketService::testMessage( 92 )* @param $user_id* @return void*/public static function testMessage( $user_id ){Redis::rpush( "SocketService:listenQueue", serialize(["user_id" => $user_id,"type" =>  "testMessage", "message" => "测试消息", "data" => ["hello world!"],]) );}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com