欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 房产 > 家装 > Vert.x学习(二)—— TCP服务端、客户端和HTTP服务端、客户端

Vert.x学习(二)—— TCP服务端、客户端和HTTP服务端、客户端

2025/4/19 1:52:56 来源:https://blog.csdn.net/m0_73095030/article/details/147245086  浏览:    关键词:Vert.x学习(二)—— TCP服务端、客户端和HTTP服务端、客户端

TCP 服务端和客户端

  • 与Event Bus的区别?

    • 通信对象:客户端-服务器通信面向外部客户端,事件总线主要用于内部 Verticle 间的通信,但可通过桥接(SockJS)扩展到外部。
    • 通信模式:客户端-服务器通信通常是请求-响应,事件总线支持更灵活的模式,如发布-订阅、请求-响应和点对点。
    • 集成性:事件总线提供统一的内部消息系统,桥接后可与外部客户端无缝集成,而客户端-服务器通信需要单独处理协议。
  • TCP服务端

    • 若您在Verticle内创建了 TCP 服务端和客户端, 它们将会在Verticle撤销时自动关闭

    • 任意一个TCP服务端中的处理器总是在相同的Event-Loop线程上执行。

      这意味着如果您在多核的服务器上运行,并且只部署了一个实例, 那么您的服务器上最多只能使用一个核,此时需要部署更多的服务器实例,比如通过部署多个Verticle的方式,在对应的start方法中开启TCP服务器并进行监听(但实际上它内部仅维护一个服务器实例。当传入新的连接时, 它以轮询的方式将其分发给任意一个连接处理器处理)。

    • TCP服务端创建:

      //服务端配置,setRegisterWriteHandler用于将每个socket注册到event bus中,意味着Verticle可以向socket的地址(通过socket.writeHandlerId()方法获取)发送Buffer数据,但这个过程只能在本地服务器进行
      NetServerOptions options = new NetServerOptions().setRegisterWriteHandler(true);
      //创建TCP服务端
      NetServer server = vertx.createNetServer(options);
      //让服务端监听localhost:1234
      server.listen(1234, "localhost", res -> {if (res.succeeded()) {} else {}
      });
      //连接建立完时收到通知,配置connectHandler
      server.connectHandler(socket -> {//从socket(buffer)中读取数据socket.handler(buffer -> {//异步写入数据到socket中socket.write("hello")});//在socket关闭时收到通知,配置closeHandlersocket.closeHandler(v -> {...});//在socket异常时收到通知,配置exceptionHandlersocket.exceptionHandler(e -> {...});//获取socket地址socket.localAddress();//获取连接的另一方地址socket.remoteAddress();//发送文件和classpath中的资源socket.sendFile("myfile.dat");//升级到SSL/TLS连接,前提必须为服务器或客户端配置SSL/TLS才能正常工作socket.upgradeToSsl();
      });
      //关闭TCP服务端
      server.close(res -> {if (res.succeeded()) {} else {}
      });
      
  • TCP客户端

    • TCP客户端创建:

      //配置超时时间、初始连接的重连次数和重连间隔
      NetClientOptions options = new NetClientOptions().setConnectTimeout(10000).setReconnectAttempts(10).setReconnectInterval(500);
      //创建TCP客户端
      NetClient client = vertx.createNetClient(options);
      
    • 发起请求:

      client.connect(4321, "localhost", res -> {if (res.succeeded()) {NetSocket socket = res.result();}else {}
      });
      

Http服务端和客户端

  • Http服务端

    • Http服务端创建:

      //记录网络活动,用于调试
      HttpServerOptions options = new HttpServerOptions().setLogActivity(true);
      //创建Http服务端
      HttpServer server = vertx.createHttpServer(options);
      //处理不合法的请求,比如请求头过大
      server.invalidRequestHandler(HttpServerRequest.DEFAULT_INVALID_REQUEST_HANDLER);
      
    • 监听地址:

      server.listen(8080, "localhoSt", res -> {if (res.succeeded()) {} else {}
      });
      
    • 配置请求处理器:

      //记录网络活动,用于调试
      HttpServerOptions options = new HttpServerOptions().setLogActivity(true);
      //创建Http服务端
      HttpServer server = vertx.createHttpServer(options);
      //处理不合法的请求,比如请求头过大
      server.invalidRequestHandler(HttpServerRequest.DEFAULT_INVALID_REQUEST_HANDLER);
      //配置请求处理器,在读取完请求头后调用(因此没有请求体)
      server.requestHandler(request -> {//可以从request中读取uri,path,params和headers等其他信息。。。//接收请求体:设置处理器,每次请求体的一小块数据收到时,该处理器都会被调用,避免请求体数据过大耗尽服务器可用内存Buffer totalBuffer = Buffer.buffer();request.handler(buffer -> {totalBuffer.appendBuffer(buffer);});//在读取完请求体后被调用request.endHandler(v -> {System.out.println("请求体长度:" + totalBuffer.length());//建立response响应HttpServerResponse response = request.response();//响应数据response.setStatusCode(200);//设置响应头response.putHeader("content-type", "text/html");//异步写入响应体,原理是将写操作进入队列response.write("Hello world");//在队列为空时结束响应并写入响应体response.end("响应结束");//设置响应异常处理器response.exceptionHandler(e -> {});});//设置请求异常处理器request.exceptionHandler(e -> {});
      });
      
    • 如果你确认不会传输很大的数据,则可以通过bodyHandler直接获取整个请求体: request.bodyHandler(totalBuffer -> {});

    • 配置HTTP/2 服务端:

      HttpServerOptions options = new HttpServerOptions()//ALPN是一个TLS的扩展,它在客户端和服务器开始交换数据之前协商协议.setUseAlpn(true).setSsl(true).setKeyStoreOptions(new JksOptions().setPath("/path/to/my/keystore"));
      

      当服务器接受 HTTP/2 连接时,它会向客户端发送其 初始设置 。 定义客户端如何使用连接,服务器的默认初始设置为:

      • getMaxConcurrentStreams :限制单个连接上可以同时打开的流(stream)数量
      • 其他默认的 HTTP/2 的设置
    • 处理HTML表单:

      • 使用 application/x-www-form-urlencodedmultipart/form-data 这两种 content-type 来提交 HTML 表单。

      • 对于使用 URL 编码过的表单,表单属性会被编码在URL中,如同普通查询参数一样。

      • 对于 multipart 类型的表单,它会被编码在请求体中,在整个请求体被 完全读取之前不可用

      • 若您想要读取 multipart 表单的属性,需要在读取请求体 之前 调用 setExpectMultipart 方法, 然后在整个请求体都被读取后,您可以使用 formAttributes 方法来读取表单属性。

        server.requestHandler(request -> {request.setExpectMultipart(true);request.endHandler(v -> {MultiMap formAttributes = request.formAttributes();});
        });
        
    • 处理文件上传:

      • Vert.x 可以处理以 multipart 编码形式上传的的文件

      • 在读取请求体 之前 调用 setExpectMultipart 方法,并对请求设置 uploadHandler

      • 当服务器每次接收到上传请求时, uploadHandler将被调用一次

        server.requestHandler(request -> {request.setExpectMultipart(true);request.uploadHandler(upload -> {//将文件上传到服务器磁盘的某个地方upload.streamToFileSystem("directory/" + upload.filename());//上传的文件可能很大,不会在单个缓冲区中包含整个数据,所以设置处理器分批接收upload.handler(chunk -> {System.out.println("length:" + chunk.length());});});
        });
        
    • 处理cookies:

      • 管理cookie

        server.requestHandler(request -> {//获取cookieCookie key = request.getCookie("key");//移除cookierequest.response().removeCookie("key");//添加cookierequest.response().addCookie(Cookie.cookie("name", "value"));
        });
        
      • 给Cookie设置SameSite(限制站点发送):cookie.setSameSite(CookieSameSite.LAX);

        • None - 允许在跨域请求和非跨域请求中发送
        • Strict - 只能在同站点的请求中发送
        • Lax - 在跨域的子请求(例如调用加载图像或iframe)不发送该Cookie, 但当用户从外部站点导航到URL时将发送该Cookie, 例如通过链接打开。
    • 处理压缩体:Vert.x 可以处理在客户端通过 deflategzipbrotli 算法压缩过的请求体信息。在创建服务器时调用HttpServerOptions对象的 setDecompressionSupported(true) ,并确保在类路径classpath上存在有 Brotli4j类库。

    • 响应结束(调用end方法)后,vertx不会自动关闭keep-alive的连接,可以手动调用close方法关闭底层的TCP连接,或调用HttpServerOptions对象的setIdleTimeout方法配置空闲多少时间后自动关闭

    • 分块响应数据(HTTP/2流无效):response.setChunked(true);当处于分块模式时,每次调用任意一个 write 方法将导致新的 HTTP 块被写出(如果不分块则会在响应结束前一直缓存在内存中)

    • 响应文件:reponse.sendFile(Stirng name,long offset,long length)

    • 管道式(流式)响应:将输出流写入到响应中,这样可以直接往输出流写入数据达到流式传输的效果,不需要手动调用end方法结束响应

      response.setChunked(true);
      request.pipeTo(response);
      
    • 接收/写入自定义 HTTP/2 帧:HTTP/2 帧以二进制压缩格式存放内容。可以交错发送,然后根据每个帧头的数据流标识符重新组装。因此不受流量控制限制,会被立即发送或接收。

      //接收
      request.customFrameHandler(frame -> {System.out.println("Received a frame type=" + frame.type() +" payload" + frame.payload().toString());
      });
      //写入
      int frameType = 40;
      int frameStatus = 10;
      Buffer payload = Buffer.buffer("some data");// 向客户端发送一帧
      response.writeCustomFrame(frameType, frameStatus, payload);
      
    • 流重置

      HTTP/1.x 不允许请求或响应流执行清除重置,服务器需要接受整个响应。

      HTTP/2 在请求/响应期间随时支持流重置,默认会发送 NO_ERROR (0)错误代码

      request.response().reset();
      

      在流重置完成时收到通知:

      request.response().exceptionHandler(err -> {if (err instanceof StreamResetException) {StreamResetException reset = (StreamResetException) err;System.out.println("Stream reset " + reset.getCode());}
      });
      
    • 服务器推送(Server Push):HTTP/2 可以为单个客户端请求并行发送多个响应

      eg:当服务器准备推送响应时,推送响应处理器会被调用,并会发送响应(因此需要在响应结束之前调用 push 方法)。推送响应处理器可能会接收到失败,如:客户端可能取消推送,因为在缓存中已经包含了 main.js, 不再需要它

      // 准备响应时会推送main.js到客户端
      response.push(HttpMethod.GET, "/main.js", ar -> {if (ar.succeeded()) {// 服务器准备推送响应HttpServerResponse pushedResponse = ar.result();// 发送main.js响应pushedResponse.putHeader("content-type", "application/json").end("alert(\"Push response hello\")");} else {System.out.println("Could not push client resource " + ar.cause());}
      });// 响应请求的资源内容
      response.sendFile("<html><head><script src=\"/main.js\"></script></head><body></body></html>");
      
    • HTTP压缩:

      启动HTTP压缩后。服务器将自动检查客户端请求头中是否包含了 Accept-Encoding ,若找到,将使用所支持的压缩算法之一(gzip/deflate )自动压缩响应正文。

      设置压缩级别:压缩可以减少网络流量,但是CPU密集度会更高,所以通过调整压缩密度解决

      //支持压缩
      options.setCompressionSupported(true);
      //设置压缩界别
      options.setCompressionLevel(6);
      //设置压缩算法
      GzipOptions gzip = StandardCompressionOptions.gzip(6, 15, 8);
      options.addCompressor(gzip);
      

      关闭压缩:response.putHeader(HttpHeaders.CONTENT_ENCODING, HttpHeaders.IDENTITY)

  • HTTP客户端

    • HTTP客户端创建:

      HttpClient client = vertx.createHttpClient();
      
    • 配置连接池

      //启动长连接keep-alive
      options.setKeepAlive(false);
      //连接池的最大连接数
      options.setMaxPoolSize(10);
      //设置空闲连接超时时间
      options.setKeepAliveTimeout(100)
      
    • 配置管道:管道意味着在在不等待响应的情况下,在同一个连接发送另一个请求

      //启用管道
      options.setPipelining(true);
      //单个连接的管道的请求数限制
      options.setPipeliningLimit(2);
      
    • 配置HTTP/2客户端

      HttpClientOptions options = new HttpClientOptions()//启动HTTP/2协议.setProtocolVersion(HttpVersion.HTTP_2)//.setSsl(true)//使用ALPN启动TLS.setUseAlpn(true).setTrustAll(true)//记录网络活动日志.setLogActivity(true);
      
    • HTTP/2多路复用配置

      HTTP/1 的长连接和管道,实现了在一个TCP连接上不等待响应,而连续发送多次请求,但由于处理响应的顺序性,会出现队头阻塞的问题(如果一个请求处理耗时过长,会阻塞管道中的后续请求)。

      HTTP/2 基于流和帧的特性,让每个HTTP请求和响应分解为多个可以乱序发送的帧,这些帧通过流ID重新组装,使得能在一个TCP连接上同时发送多个请求和响应,即多路复用。

      options//限制每个连接的多路复用流数量.setHttp2MultiplexingLimit(10)//设置HTTP/2连接池.setHttp2MaxPoolSize(3);
      
    • 发起请求:

      client.request(HttpMethod.GET,8080, "localhost", "/some-uri", ar -> {//配置请求处理器if (ar.succeeded()) {//获取请求HttpClientRequest request = ar.result();//设置超时时间request.setTimeout(5000);//设置请求头MultiMap headers = HttpHeaders.set("content-type", "application/json");request.headers().adfdAll(headers);request.putHeader("other-header", "ailu");//send:将整个请求的数据发送到目标服务器,并代表请求已经完成,不能再写入额外数据。//发送请求1,携带String类型的请求体request.send("requestBody",ar1 -> {if (ar1.succeeded()) {HttpClientResponse response = ar1.result();}});//发送请求2,携带Buffer类型的请求体request.send(Buffer.buffer("requestBody"),ar1 -> {});//发送请求3,携带实现ReadStream类型的请求体request.send(stream,ar1 -> {});}
      });
      
    • 可提前配置要请求的主机/端口

      HttpClientOptions options = new HttpClientOptions().setDefaultHost("localhost").setDefaultPort(8080);
      
    • 使用write写入请求体,通过end结束请求,若不使用HTTP分块,则必须在写入请求前设置Content-Length头

      //设置分块模式
      request.setChunked(true);
      request.write("data1","UTF-8")
      request.write(Buffer.buffer("data2"))
      //结束请求
      request.end(body);
      
    • 使用流式请求将磁盘上的文件管送到HTTP 请求体中

      request.setChunked(true);
      asyncFile.pipeTo(request);
      
    • 写HTTP/2帧

      int frameType = 40;
      int frameStatus = 10;
      Buffer payload = Buffer.buffer("some data");// 发送一帧到服务器
      request.writeCustomFrame(frameType, frameStatus, payload);
      
    • 流重置:

      request.reset();
      //在流重置完成时您将会收到通知
      request.exceptionHandler(err -> {if (err instanceof StreamResetException) {StreamResetException reset = (StreamResetException) err;System.out.println("Stream reset " + reset.getCode());}
      });
      
    • 流式响应到WriteSteam中:response.pipeTo(WriteStream ws)

    • 读取响应体:如果响应中包含响应体,那么响应体可能会在读取完header后,以多个分片的形式到达,即当响应体的某部分(数据)到达时,handler 方法绑定的回调函数将会被调用

      response.handler(buffer -> {});
      //整个响应体被完全读取时,endHandler就会被调用。
      response.endHandler(v -> {});
      

      或者直接处理所有响应体数据

      response.body(ar -> {if (ar.succeeded()) {Buffer body = ar.result();}
      });
      
    • 请求和响应组合使用:

      HttpClient 客户端有意地避免返回 Future<HttpClientResponse> , 因为如果在 event-loop 之外设置 Future 的完成处理器可能会导致线程竞争。

      //会交给event loop线程执行
      Future<HttpClientResponse> get = client.get("some-uri");//假设此事件不在event-loop中,则阻塞主线程往下执行
      Thread.sleep(100);get.onSuccess(response -> {// 响应事件在event loop线程可能已经发生response.body(ar -> {});
      });
      

      HttpClientRequest 的使用限制在一个verticle的范围内, 因为Verticle会确保按顺序处理事件。

      vertx.deployVerticle(() -> new AbstractVerticle() {@Overridepublic void start() {HttpClient client = vertx.createHttpClient();Future<HttpClientResponse> get = client.get("some-uri");//此事件在event-loop中,阻塞event loop线程(不建议这么做)Thread.sleep(100);get.onSuccess(response -> {// 响应事件不会提前发生,因为Verticle会保证按顺序处理事件response.body(ar -> {});});}
      }, new DeploymentOptions());
      

      verticle外使用HttpClient进行交互时,可以安全地使用“组合”(compose)。

      Future<JsonObject> future = client.request(HttpMethod.GET, "some-uri").compose(request -> request.send().compose(response -> {return response.body().map(buffer -> buffer.toJsonObject());}}));
      future.onSuccess(json -> {})
      
    • 从响应中读取cookie:response.cookies()

    • 创建HTTP隧道

      client.request(HttpMethod.CONNECT, "some-uri").onSuccess(request -> {// 连接到服务器request.connect(ar -> {if (ar.succeeded()) {HttpClientResponse response = ar.result();if (response.statusCode() != 200) {// 某些原因连接失败} else {// HTTP隧道创建成功,原始数据将传输到缓冲区NetSocket socket = response.netSocket();}}});
      });
      
    • 客户端(HTTP/2)接收推送资源

      // 设置一个推送处理器来感知服务器推送的任何资源
      request.pushHandler(pushedRequest -> {//如果不想收到推送,可重置流if (pushedRequest.path().equals("/main.js")) {pushedRequest.reset();} System.out.println("Server pushed " + pushedRequest.path());// 为响应设置处理器pushedRequest.response().onComplete(pushedResponse -> {System.out.println("The response for the pushed request");});
      });
      
    • 接收自定义HTTP/2帧

      response.customFrameHandler(frame -> {System.out.println("Received a frame type=" + frame.type() +" payload" + frame.payload().toString());
      });
      
    • 启用压缩

      创建客户端时使用 setTryUseCompression 设置配置项启用压缩,请求头包含Accept-Encoding 头,其值为可支持的压缩算法,如Accept-Encoding: gzip, deflate

  • HTTP连接

    • 服务端连接

      //获取服务端上的请求连接
      HttpConnection connection = request.connection();
      //设置连接处理器,在任意连接建立时得到通知
      HttpServer server = vertx.createHttpServer(http2Options);
      server.connectionHandler(connection -> {});
      
    • 客户端连接

      //获取客户端上的请求连接
      HttpConnection connection = request.connection();
      //设置连接处理器,在连接建立时通知
      client.connectionHandler(connection -> {});
      
    • 连接配置(HTTP/2):每个Endpoint(端点)必须遵守连接另一端的发送设置,即建立连接时,客户端和服务端会交换初始配置

      //初始配置
      options.setInitialSettings(new Http2Settings().setMaxConcurrentStreams(100));
      //连接建立后,可随时更改设置
      connection.updateSettings();
      //在另一端更新设置后,这端会收到对应的远程设置
      connection.remoteSettingsHandler(settings -> {});
      
    • Ping(HTTP/2):确定连接往返时间或检查连接有效性

      //ping端
      connection.ping(data, pong -> {});//被ping端
      connection.pingHandler(ping -> {});
      
    • 连接关闭(HTTP/2)

      shutdown:客户端停止发送新请求,并发送 {@literal GOAWAY} 帧到服务端,要求其停止创建流并停止推送响应,直到当前所有流关闭,然后关闭连接。connection.shutdown()

      goAway:只发送 {@literal GOAWAY} 帧告诉远程连接停止创建流,但没有计划关闭连接。connection.shutdown()

      shutdownHandler:当所有流已经关闭 或 接收到 {@literal GOAWAY} 帧时被调用

      connection.shutdownHandler(v -> {// 所有流被关闭时,关闭连接connection.close();
      });
      

      close:关闭连接,对于HTTP/1,会关闭底层Socket,对于HTTP/2,在连接关闭前发送 {@literal GOAWAY} 帧

  • 扩展

    • 水平扩展-客户端共享:多个verticle实例共享一个HTTP客户端

      HttpClient client = vertx.createHttpClient(new HttpClientOptions().setShared(true));
      vertx.deployVerticle(() -> new AbstractVerticle() {@Overridepublic void start() throws Exception {// 使用client}
      }, new DeploymentOptions().setInstances(4));
      
    • 水平扩展-服务端共享:当多个 HTTP 服务端在同一个端口上监听时,Vert.x 会使用轮询策略顺序委托给其中一个服务端

  • 启用HTTPS

    client.request(new RequestOptions().setHost("localhost").setPort(8080).setURI("/").setSsl(true), ar1 -> {});
    

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词