欢迎来到尧图网

客户服务 关于我们

您的位置:首页 > 汽车 > 新车 > 【Netty】客户端功能完善

【Netty】客户端功能完善

2025/3/20 7:00:05 来源:https://blog.csdn.net/weixin_66907051/article/details/146369218  浏览:    关键词:【Netty】客户端功能完善

超时控制

public class RequestTimeoutManager {private final HashedWheelTimer timer = new HashedWheelTimer();private final ConcurrentMap<Long, Timeout> pendingRequests = new ConcurrentHashMap<>();public void addRequest(long requestId, long timeoutMillis, Runnable timeoutCallback) {Timeout timeout = timer.newTimeout(timeout -> {pendingRequests.remove(requestId);timeoutCallback.run();}, timeoutMillis, TimeUnit.MILLISECONDS);pendingRequests.put(requestId, timeout);}public void removeRequest(long requestId) {Timeout timeout = pendingRequests.remove(requestId);if (timeout != null) {timeout.cancel();}}
}public class RequestFuture<T> {private final CompletableFuture<T> future = new CompletableFuture<>();private final long requestId;private final long timeoutMillis;private final RequestTimeoutManager timeoutManager;public RequestFuture(long requestId, long timeoutMillis, RequestTimeoutManager timeoutManager) {this.requestId = requestId;this.timeoutMillis = timeoutMillis;this.timeoutManager = timeoutManager;setupTimeout();}private void setupTimeout() {timeoutManager.addRequest(requestId, timeoutMillis, () -> future.completeExceptionally(new TimeoutException("Request timeout after " + timeoutMillis + "ms")));}public CompletableFuture<T> getFuture() {return future;}public void complete(T result) {timeoutManager.removeRequest(requestId);future.complete(result);}
}public class NettyClientHandler {private final RequestTimeoutManager timeoutManager = new RequestTimeoutManager();private static final long DEFAULT_TIMEOUT = 3000; // 3秒public CompletableFuture<LoginResponse> login(LoginRequest request) {long requestId = generateRequestId();RequestFuture<LoginResponse> future = new RequestFuture<>(requestId, DEFAULT_TIMEOUT, timeoutManager);// 发送请求channel.writeAndFlush(request);return future.getFuture();}// 使用示例public void example() {LoginRequest request = new LoginRequest();login(request).thenAccept(response -> {// 处理响应}).exceptionally(throwable -> {if (throwable instanceof TimeoutException) {// 处理超时}return null;});}
}
超时机制优点
  1. 多层次超时控制:连接建立、请求响应超时、整队不同请求设置不同超时时间
  2. 使用HashedWheelTimer实现定时器,内存占用下、时间精度可控
  3. 异常处理机制,超时自动触发异常,支持异步处理超时情况,方便进行重试

心跳

public class HeartbeatHandler extends ChannelDuplexHandler {private static final int READER_IDLE_TIME = 15; // 读超时时间private static final int WRITER_IDLE_TIME = 5;  // 写超时时间private static final int ALL_IDLE_TIME = 20;    // 总超时时间private final AtomicInteger lostHeartbeatCount = new AtomicInteger(0);private final ConnectionManager connectionManager;@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {ctx.pipeline().addBefore(ctx.name(), "idleStateHandler",new IdleStateHandler(READER_IDLE_TIME, WRITER_IDLE_TIME, ALL_IDLE_TIME, TimeUnit.SECONDS));}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;switch (event.state()) {case READER_IDLE:handleReaderIdle(ctx);break;case WRITER_IDLE:handleWriterIdle(ctx);break;case ALL_IDLE:handleAllIdle(ctx);break;}}}private void handleReaderIdle(ChannelHandlerContext ctx) {int count = lostHeartbeatCount.incrementAndGet();if (count > 3) {  // 连续3次没收到心跳,判定连接断开log.warn("连续{}次未收到心跳,关闭连接", count);ctx.close();connectionManager.scheduleReconnect();} else {log.warn("读空闲超时,第{}次", count);}}private void handleWriterIdle(ChannelHandlerContext ctx) {// 发送心跳包HeartbeatMessage heartbeat = new HeartbeatMessage();ctx.writeAndFlush(heartbeat).addListener(future -> {if (!future.isSuccess()) {log.error("发送心跳包失败", future.cause());ctx.close();}});}private void handleAllIdle(ChannelHandlerContext ctx) {log.warn("全局空闲超时,关闭连接");ctx.close();connectionManager.scheduleReconnect();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg instanceof HeartbeatMessage) {// 收到心跳响应,重置计数器lostHeartbeatCount.set(0);}ctx.fireChannelRead(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("心跳处理异常", cause);ctx.close();}
}

需要配合IdleStateHandler处理器使用

支持多级超时检测
  1. 读空闲超时:检测服务器是否存活
  2. 写空闲超时:触发心跳包发送
  3. 全局空闲超时:作为最后的保障
渐进式处理
  1. 连续多次心跳失败才断开连接
  2. 支持重连机制
状态管理
  1. 使用原子技术器记录失败次数
  2. 收到心跳响应时重置计数
  3. 异常情况自动断开重连
可配置性
  1. 超时时间可配置
  2. 失败重试次数可配置
  3. 心跳间隔可配置

流量控制

public class CustomTrafficShapingHandler extends ChannelDuplexHandler {private final GlobalTrafficShapingHandler globalTrafficHandler;public CustomTrafficShapingHandler(EventLoopGroup group,long writeLimit,        // 写入速率限制,单位为字节/秒long readLimit,         // 读取速率限制,单位为字节/秒long checkInterval      // 检查间隔,单位为毫秒) {this.globalTrafficHandler = new GlobalTrafficShapingHandler(group, writeLimit, readLimit, checkInterval);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 当收到数据时,通过令牌桶算法控制读取速率globalTrafficHandler.channelRead(ctx, msg);}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {// 当发送数据时,通过令牌桶算法控制发送速率globalTrafficHandler.write(ctx, msg, promise);}
}

GlobalTrafficShapingHandlernetty提供的流量整形处理器,主要通过领票捅算法来实现流量控制,工作原理:

  1. 令牌桶的算法
    • 系统按照设定的速率往桶里放入令牌
    • 每次读写操作都需要消耗令牌
    • 如果桶里没有足够的令牌,操作会被延迟执行
  2. 使用示例:
// 配置:写入限制 1MB/s,读取限制 1MB/s,检查间隔 1秒
CustomTrafficShapingHandler handler = new CustomTrafficShapingHandler(group,1 * 1024 * 1024,  // 写入限制:1MB/s1 * 1024 * 1024,  // 读取限制:1MB/s1000              // 检查间隔:1秒
);

异常重试机制

public class RetryPolicy {private final int maxAttempts;private final long initialRetryDelay;public <T> CompletableFuture<T> execute(Supplier<CompletableFuture<T>> action) {return executeWithRetry(action, 0);}private <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> action, int attempt) {return action.get().exceptionally(throwable -> {if (attempt < maxAttempts && isRetryable(throwable)) {return executeWithRetry(action, attempt + 1).join();}throw new CompletionException(throwable);});}
}
优点
  1. 集中处理所有异常
  2. 不同类型异常有不同的处理策略
  3. 支持自动重连机制
  4. 异常日志记录
  5. 优雅处理连接断开

优雅关闭

public class ShutdownHook {private final NettyClient client;private final ExecutorService executorService;public void registerShutdownHook() {Runtime.getRuntime().addShutdownHook(new Thread(() -> {client.shutdown();executorService.shutdown();try {executorService.awaitTermination(5, TimeUnit.SECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}));}
}

监控指标收集

public class MetricsHandler extends ChannelDuplexHandler {private final MeterRegistry registry;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {Timer.Sample sample = Timer.start(registry);try {ctx.fireChannelRead(msg);} finally {sample.stop(registry.timer("netty.request.time"));registry.counter("netty.request.count").increment();}}
}

SSL/TLS支持

public class SslContextFactory {public static SslContext createSslContext() {return SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();}
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

热搜词