TCP 服务端和客户端
-
与Event Bus的区别?
- 通信对象:客户端-服务器通信面向外部客户端,事件总线主要用于内部 Verticle 间的通信,但可通过桥接(SockJS)扩展到外部。
- 通信模式:客户端-服务器通信通常是请求-响应,事件总线支持更灵活的模式,如发布-订阅、请求-响应和点对点。
- 集成性:事件总线提供统一的内部消息系统,桥接后可与外部客户端无缝集成,而客户端-服务器通信需要单独处理协议。
-
TCP服务端
-
若您在Verticle内创建了 TCP 服务端和客户端, 它们将会在Verticle撤销时自动关闭
-
任意一个TCP服务端中的处理器总是在相同的Event-Loop线程上执行。
这意味着如果您在多核的服务器上运行,并且只部署了一个实例, 那么您的服务器上最多只能使用一个核,此时需要部署更多的服务器实例,比如通过部署多个Verticle的方式,在对应的start方法中开启TCP服务器并进行监听(但实际上它内部仅维护一个服务器实例。当传入新的连接时, 它以轮询的方式将其分发给任意一个连接处理器处理)。
-
TCP服务端创建:
//服务端配置,setRegisterWriteHandler用于将每个socket注册到event bus中,意味着Verticle可以向socket的地址(通过socket.writeHandlerId()方法获取)发送Buffer数据,但这个过程只能在本地服务器进行 NetServerOptions options = new NetServerOptions().setRegisterWriteHandler(true); //创建TCP服务端 NetServer server = vertx.createNetServer(options); //让服务端监听localhost:1234 server.listen(1234, "localhost", res -> {if (res.succeeded()) {} else {} }); //连接建立完时收到通知,配置connectHandler server.connectHandler(socket -> {//从socket(buffer)中读取数据socket.handler(buffer -> {//异步写入数据到socket中socket.write("hello")});//在socket关闭时收到通知,配置closeHandlersocket.closeHandler(v -> {...});//在socket异常时收到通知,配置exceptionHandlersocket.exceptionHandler(e -> {...});//获取socket地址socket.localAddress();//获取连接的另一方地址socket.remoteAddress();//发送文件和classpath中的资源socket.sendFile("myfile.dat");//升级到SSL/TLS连接,前提必须为服务器或客户端配置SSL/TLS才能正常工作socket.upgradeToSsl(); }); //关闭TCP服务端 server.close(res -> {if (res.succeeded()) {} else {} });
-
-
TCP客户端
-
TCP客户端创建:
//配置超时时间、初始连接的重连次数和重连间隔 NetClientOptions options = new NetClientOptions().setConnectTimeout(10000).setReconnectAttempts(10).setReconnectInterval(500); //创建TCP客户端 NetClient client = vertx.createNetClient(options);
-
发起请求:
client.connect(4321, "localhost", res -> {if (res.succeeded()) {NetSocket socket = res.result();}else {} });
-
Http服务端和客户端
-
Http服务端
-
Http服务端创建:
//记录网络活动,用于调试 HttpServerOptions options = new HttpServerOptions().setLogActivity(true); //创建Http服务端 HttpServer server = vertx.createHttpServer(options); //处理不合法的请求,比如请求头过大 server.invalidRequestHandler(HttpServerRequest.DEFAULT_INVALID_REQUEST_HANDLER);
-
监听地址:
server.listen(8080, "localhoSt", res -> {if (res.succeeded()) {} else {} });
-
配置请求处理器:
//记录网络活动,用于调试 HttpServerOptions options = new HttpServerOptions().setLogActivity(true); //创建Http服务端 HttpServer server = vertx.createHttpServer(options); //处理不合法的请求,比如请求头过大 server.invalidRequestHandler(HttpServerRequest.DEFAULT_INVALID_REQUEST_HANDLER); //配置请求处理器,在读取完请求头后调用(因此没有请求体) server.requestHandler(request -> {//可以从request中读取uri,path,params和headers等其他信息。。。//接收请求体:设置处理器,每次请求体的一小块数据收到时,该处理器都会被调用,避免请求体数据过大耗尽服务器可用内存Buffer totalBuffer = Buffer.buffer();request.handler(buffer -> {totalBuffer.appendBuffer(buffer);});//在读取完请求体后被调用request.endHandler(v -> {System.out.println("请求体长度:" + totalBuffer.length());//建立response响应HttpServerResponse response = request.response();//响应数据response.setStatusCode(200);//设置响应头response.putHeader("content-type", "text/html");//异步写入响应体,原理是将写操作进入队列response.write("Hello world");//在队列为空时结束响应并写入响应体response.end("响应结束");//设置响应异常处理器response.exceptionHandler(e -> {});});//设置请求异常处理器request.exceptionHandler(e -> {}); });
-
如果你确认不会传输很大的数据,则可以通过bodyHandler直接获取整个请求体:
request.bodyHandler(totalBuffer -> {});
-
配置HTTP/2 服务端:
HttpServerOptions options = new HttpServerOptions()//ALPN是一个TLS的扩展,它在客户端和服务器开始交换数据之前协商协议.setUseAlpn(true).setSsl(true).setKeyStoreOptions(new JksOptions().setPath("/path/to/my/keystore"));
当服务器接受 HTTP/2 连接时,它会向客户端发送其
初始设置
。 定义客户端如何使用连接,服务器的默认初始设置为:getMaxConcurrentStreams
:限制单个连接上可以同时打开的流(stream)数量- 其他默认的 HTTP/2 的设置
-
处理HTML表单:
-
使用
application/x-www-form-urlencoded
或multipart/form-data
这两种 content-type 来提交 HTML 表单。 -
对于使用 URL 编码过的表单,表单属性会被编码在URL中,如同普通查询参数一样。
-
对于 multipart 类型的表单,它会被编码在请求体中,在整个请求体被 完全读取之前不可用
-
若您想要读取 multipart 表单的属性,需要在读取请求体 之前 调用
setExpectMultipart
方法, 然后在整个请求体都被读取后,您可以使用formAttributes
方法来读取表单属性。server.requestHandler(request -> {request.setExpectMultipart(true);request.endHandler(v -> {MultiMap formAttributes = request.formAttributes();}); });
-
-
处理文件上传:
-
Vert.x 可以处理以 multipart 编码形式上传的的文件
-
在读取请求体 之前 调用
setExpectMultipart
方法,并对请求设置uploadHandler
-
当服务器每次接收到上传请求时, uploadHandler将被调用一次
server.requestHandler(request -> {request.setExpectMultipart(true);request.uploadHandler(upload -> {//将文件上传到服务器磁盘的某个地方upload.streamToFileSystem("directory/" + upload.filename());//上传的文件可能很大,不会在单个缓冲区中包含整个数据,所以设置处理器分批接收upload.handler(chunk -> {System.out.println("length:" + chunk.length());});}); });
-
-
处理cookies:
-
管理cookie
server.requestHandler(request -> {//获取cookieCookie key = request.getCookie("key");//移除cookierequest.response().removeCookie("key");//添加cookierequest.response().addCookie(Cookie.cookie("name", "value")); });
-
给Cookie设置SameSite(限制站点发送):
cookie.setSameSite(CookieSameSite.LAX);
- None - 允许在跨域请求和非跨域请求中发送
- Strict - 只能在同站点的请求中发送
- Lax - 在跨域的子请求(例如调用加载图像或iframe)不发送该Cookie, 但当用户从外部站点导航到URL时将发送该Cookie, 例如通过链接打开。
-
-
处理压缩体:Vert.x 可以处理在客户端通过 deflate 、 gzip 或 brotli 算法压缩过的请求体信息。在创建服务器时调用HttpServerOptions对象的
setDecompressionSupported(true)
,并确保在类路径classpath上存在有 Brotli4j类库。 -
响应结束(调用end方法)后,vertx不会自动关闭keep-alive的连接,可以手动调用close方法关闭底层的TCP连接,或调用
HttpServerOptions
对象的setIdleTimeout
方法配置空闲多少时间后自动关闭 -
分块响应数据(HTTP/2流无效):
response.setChunked(true);
当处于分块模式时,每次调用任意一个write
方法将导致新的 HTTP 块被写出(如果不分块则会在响应结束前一直缓存在内存中) -
响应文件:
reponse.sendFile(Stirng name,long offset,long length)
-
管道式(流式)响应:将输出流写入到响应中,这样可以直接往输出流写入数据达到流式传输的效果,不需要手动调用end方法结束响应
response.setChunked(true); request.pipeTo(response);
-
接收/写入自定义 HTTP/2 帧:HTTP/2 帧以二进制压缩格式存放内容。可以交错发送,然后根据每个帧头的数据流标识符重新组装。因此不受流量控制限制,会被立即发送或接收。
//接收 request.customFrameHandler(frame -> {System.out.println("Received a frame type=" + frame.type() +" payload" + frame.payload().toString()); }); //写入 int frameType = 40; int frameStatus = 10; Buffer payload = Buffer.buffer("some data");// 向客户端发送一帧 response.writeCustomFrame(frameType, frameStatus, payload);
-
流重置
HTTP/1.x 不允许请求或响应流执行清除重置,服务器需要接受整个响应。
HTTP/2 在请求/响应期间随时支持流重置,默认会发送
NO_ERROR
(0)错误代码request.response().reset();
在流重置完成时收到通知:
request.response().exceptionHandler(err -> {if (err instanceof StreamResetException) {StreamResetException reset = (StreamResetException) err;System.out.println("Stream reset " + reset.getCode());} });
-
服务器推送(Server Push):HTTP/2 可以为单个客户端请求并行发送多个响应
eg:当服务器准备推送响应时,推送响应处理器会被调用,并会发送响应(因此需要在响应结束之前调用
push
方法)。推送响应处理器可能会接收到失败,如:客户端可能取消推送,因为在缓存中已经包含了main.js
, 不再需要它// 准备响应时会推送main.js到客户端 response.push(HttpMethod.GET, "/main.js", ar -> {if (ar.succeeded()) {// 服务器准备推送响应HttpServerResponse pushedResponse = ar.result();// 发送main.js响应pushedResponse.putHeader("content-type", "application/json").end("alert(\"Push response hello\")");} else {System.out.println("Could not push client resource " + ar.cause());} });// 响应请求的资源内容 response.sendFile("<html><head><script src=\"/main.js\"></script></head><body></body></html>");
-
HTTP压缩:
启动HTTP压缩后。服务器将自动检查客户端请求头中是否包含了
Accept-Encoding
,若找到,将使用所支持的压缩算法之一(gzip/deflate )自动压缩响应正文。设置压缩级别:压缩可以减少网络流量,但是CPU密集度会更高,所以通过调整压缩密度解决
//支持压缩 options.setCompressionSupported(true); //设置压缩界别 options.setCompressionLevel(6); //设置压缩算法 GzipOptions gzip = StandardCompressionOptions.gzip(6, 15, 8); options.addCompressor(gzip);
关闭压缩:
response.putHeader(HttpHeaders.CONTENT_ENCODING, HttpHeaders.IDENTITY)
-
-
HTTP客户端
-
HTTP客户端创建:
HttpClient client = vertx.createHttpClient();
-
配置连接池
//启动长连接keep-alive options.setKeepAlive(false); //连接池的最大连接数 options.setMaxPoolSize(10); //设置空闲连接超时时间 options.setKeepAliveTimeout(100)
-
配置管道:管道意味着在在不等待响应的情况下,在同一个连接上发送另一个请求。
//启用管道 options.setPipelining(true); //单个连接的管道的请求数限制 options.setPipeliningLimit(2);
-
配置HTTP/2客户端
HttpClientOptions options = new HttpClientOptions()//启动HTTP/2协议.setProtocolVersion(HttpVersion.HTTP_2)//.setSsl(true)//使用ALPN启动TLS.setUseAlpn(true).setTrustAll(true)//记录网络活动日志.setLogActivity(true);
-
HTTP/2多路复用配置
HTTP/1 的长连接和管道,实现了在一个TCP连接上不等待响应,而连续发送多次请求,但由于处理响应的顺序性,会出现队头阻塞的问题(如果一个请求处理耗时过长,会阻塞管道中的后续请求)。
HTTP/2 基于流和帧的特性,让每个HTTP请求和响应分解为多个可以乱序发送的帧,这些帧通过流ID重新组装,使得能在一个TCP连接上同时发送多个请求和响应,即多路复用。
options//限制每个连接的多路复用流数量.setHttp2MultiplexingLimit(10)//设置HTTP/2连接池.setHttp2MaxPoolSize(3);
-
发起请求:
client.request(HttpMethod.GET,8080, "localhost", "/some-uri", ar -> {//配置请求处理器if (ar.succeeded()) {//获取请求HttpClientRequest request = ar.result();//设置超时时间request.setTimeout(5000);//设置请求头MultiMap headers = HttpHeaders.set("content-type", "application/json");request.headers().adfdAll(headers);request.putHeader("other-header", "ailu");//send:将整个请求的数据发送到目标服务器,并代表请求已经完成,不能再写入额外数据。//发送请求1,携带String类型的请求体request.send("requestBody",ar1 -> {if (ar1.succeeded()) {HttpClientResponse response = ar1.result();}});//发送请求2,携带Buffer类型的请求体request.send(Buffer.buffer("requestBody"),ar1 -> {});//发送请求3,携带实现ReadStream类型的请求体request.send(stream,ar1 -> {});} });
-
可提前配置要请求的主机/端口
HttpClientOptions options = new HttpClientOptions().setDefaultHost("localhost").setDefaultPort(8080);
-
使用write写入请求体,通过end结束请求,若不使用HTTP分块,则必须在写入请求前设置Content-Length头
//设置分块模式 request.setChunked(true); request.write("data1","UTF-8") request.write(Buffer.buffer("data2")) //结束请求 request.end(body);
-
使用流式请求将磁盘上的文件管送到HTTP 请求体中
request.setChunked(true); asyncFile.pipeTo(request);
-
写HTTP/2帧
int frameType = 40; int frameStatus = 10; Buffer payload = Buffer.buffer("some data");// 发送一帧到服务器 request.writeCustomFrame(frameType, frameStatus, payload);
-
流重置:
request.reset(); //在流重置完成时您将会收到通知 request.exceptionHandler(err -> {if (err instanceof StreamResetException) {StreamResetException reset = (StreamResetException) err;System.out.println("Stream reset " + reset.getCode());} });
-
流式响应到WriteSteam中:
response.pipeTo(WriteStream ws)
-
读取响应体:如果响应中包含响应体,那么响应体可能会在读取完header后,以多个分片的形式到达,即当响应体的某部分(数据)到达时,
handler
方法绑定的回调函数将会被调用response.handler(buffer -> {}); //整个响应体被完全读取时,endHandler就会被调用。 response.endHandler(v -> {});
或者直接处理所有响应体数据
response.body(ar -> {if (ar.succeeded()) {Buffer body = ar.result();} });
-
请求和响应组合使用:
HttpClient 客户端有意地避免返回
Future<HttpClientResponse>
, 因为如果在 event-loop 之外设置 Future 的完成处理器可能会导致线程竞争。//会交给event loop线程执行 Future<HttpClientResponse> get = client.get("some-uri");//假设此事件不在event-loop中,则阻塞主线程往下执行 Thread.sleep(100);get.onSuccess(response -> {// 响应事件在event loop线程可能已经发生response.body(ar -> {}); });
将
HttpClientRequest
的使用限制在一个verticle的范围内, 因为Verticle会确保按顺序处理事件。vertx.deployVerticle(() -> new AbstractVerticle() {@Overridepublic void start() {HttpClient client = vertx.createHttpClient();Future<HttpClientResponse> get = client.get("some-uri");//此事件在event-loop中,阻塞event loop线程(不建议这么做)Thread.sleep(100);get.onSuccess(response -> {// 响应事件不会提前发生,因为Verticle会保证按顺序处理事件response.body(ar -> {});});} }, new DeploymentOptions());
在verticle外使用HttpClient进行交互时,可以安全地使用“组合”(compose)。
Future<JsonObject> future = client.request(HttpMethod.GET, "some-uri").compose(request -> request.send().compose(response -> {return response.body().map(buffer -> buffer.toJsonObject());}})); future.onSuccess(json -> {})
-
从响应中读取cookie:
response.cookies()
-
创建HTTP隧道
client.request(HttpMethod.CONNECT, "some-uri").onSuccess(request -> {// 连接到服务器request.connect(ar -> {if (ar.succeeded()) {HttpClientResponse response = ar.result();if (response.statusCode() != 200) {// 某些原因连接失败} else {// HTTP隧道创建成功,原始数据将传输到缓冲区NetSocket socket = response.netSocket();}}}); });
-
客户端(HTTP/2)接收推送资源
// 设置一个推送处理器来感知服务器推送的任何资源 request.pushHandler(pushedRequest -> {//如果不想收到推送,可重置流if (pushedRequest.path().equals("/main.js")) {pushedRequest.reset();} System.out.println("Server pushed " + pushedRequest.path());// 为响应设置处理器pushedRequest.response().onComplete(pushedResponse -> {System.out.println("The response for the pushed request");}); });
-
接收自定义HTTP/2帧
response.customFrameHandler(frame -> {System.out.println("Received a frame type=" + frame.type() +" payload" + frame.payload().toString()); });
-
启用压缩
创建客户端时使用
setTryUseCompression
设置配置项启用压缩,请求头包含Accept-Encoding
头,其值为可支持的压缩算法,如Accept-Encoding: gzip, deflate
-
-
HTTP连接
-
服务端连接
//获取服务端上的请求连接 HttpConnection connection = request.connection(); //设置连接处理器,在任意连接建立时得到通知 HttpServer server = vertx.createHttpServer(http2Options); server.connectionHandler(connection -> {});
-
客户端连接
//获取客户端上的请求连接 HttpConnection connection = request.connection(); //设置连接处理器,在连接建立时通知 client.connectionHandler(connection -> {});
-
连接配置(HTTP/2):每个Endpoint(端点)必须遵守连接另一端的发送设置,即建立连接时,客户端和服务端会交换初始配置
//初始配置 options.setInitialSettings(new Http2Settings().setMaxConcurrentStreams(100)); //连接建立后,可随时更改设置 connection.updateSettings(); //在另一端更新设置后,这端会收到对应的远程设置 connection.remoteSettingsHandler(settings -> {});
-
Ping(HTTP/2):确定连接往返时间或检查连接有效性
//ping端 connection.ping(data, pong -> {});//被ping端 connection.pingHandler(ping -> {});
-
连接关闭(HTTP/2)
shutdown:客户端停止发送新请求,并发送
{@literal GOAWAY} 帧
到服务端,要求其停止创建流并停止推送响应,直到当前所有流关闭,然后关闭连接。connection.shutdown()
goAway:只发送
{@literal GOAWAY} 帧
告诉远程连接停止创建流,但没有计划关闭连接。connection.shutdown()
shutdownHandler:当所有流已经关闭 或 接收到
{@literal GOAWAY} 帧
时被调用connection.shutdownHandler(v -> {// 所有流被关闭时,关闭连接connection.close(); });
close:关闭连接,对于HTTP/1,会关闭底层Socket,对于HTTP/2,在连接关闭前发送
{@literal GOAWAY} 帧
-
-
扩展
-
水平扩展-客户端共享:多个verticle实例共享一个HTTP客户端
HttpClient client = vertx.createHttpClient(new HttpClientOptions().setShared(true)); vertx.deployVerticle(() -> new AbstractVerticle() {@Overridepublic void start() throws Exception {// 使用client} }, new DeploymentOptions().setInstances(4));
-
水平扩展-服务端共享:当多个 HTTP 服务端在同一个端口上监听时,Vert.x 会使用轮询策略顺序委托给其中一个服务端
-
-
启用HTTPS
client.request(new RequestOptions().setHost("localhost").setPort(8080).setURI("/").setSsl(true), ar1 -> {});