超时控制
public class RequestTimeoutManager {private final HashedWheelTimer timer = new HashedWheelTimer();private final ConcurrentMap<Long, Timeout> pendingRequests = new ConcurrentHashMap<>();public void addRequest(long requestId, long timeoutMillis, Runnable timeoutCallback) {Timeout timeout = timer.newTimeout(timeout -> {pendingRequests.remove(requestId);timeoutCallback.run();}, timeoutMillis, TimeUnit.MILLISECONDS);pendingRequests.put(requestId, timeout);}public void removeRequest(long requestId) {Timeout timeout = pendingRequests.remove(requestId);if (timeout != null) {timeout.cancel();}}
}public class RequestFuture<T> {private final CompletableFuture<T> future = new CompletableFuture<>();private final long requestId;private final long timeoutMillis;private final RequestTimeoutManager timeoutManager;public RequestFuture(long requestId, long timeoutMillis, RequestTimeoutManager timeoutManager) {this.requestId = requestId;this.timeoutMillis = timeoutMillis;this.timeoutManager = timeoutManager;setupTimeout();}private void setupTimeout() {timeoutManager.addRequest(requestId, timeoutMillis, () -> future.completeExceptionally(new TimeoutException("Request timeout after " + timeoutMillis + "ms")));}public CompletableFuture<T> getFuture() {return future;}public void complete(T result) {timeoutManager.removeRequest(requestId);future.complete(result);}
}public class NettyClientHandler {private final RequestTimeoutManager timeoutManager = new RequestTimeoutManager();private static final long DEFAULT_TIMEOUT = 3000; // 3秒public CompletableFuture<LoginResponse> login(LoginRequest request) {long requestId = generateRequestId();RequestFuture<LoginResponse> future = new RequestFuture<>(requestId, DEFAULT_TIMEOUT, timeoutManager);// 发送请求channel.writeAndFlush(request);return future.getFuture();}// 使用示例public void example() {LoginRequest request = new LoginRequest();login(request).thenAccept(response -> {// 处理响应}).exceptionally(throwable -> {if (throwable instanceof TimeoutException) {// 处理超时}return null;});}
}
超时机制优点
- 多层次超时控制:连接建立、请求响应超时、整队不同请求设置不同超时时间
- 使用
HashedWheelTimer
实现定时器,内存占用下、时间精度可控 - 异常处理机制,超时自动触发异常,支持异步处理超时情况,方便进行重试
心跳
public class HeartbeatHandler extends ChannelDuplexHandler {private static final int READER_IDLE_TIME = 15; // 读超时时间private static final int WRITER_IDLE_TIME = 5; // 写超时时间private static final int ALL_IDLE_TIME = 20; // 总超时时间private final AtomicInteger lostHeartbeatCount = new AtomicInteger(0);private final ConnectionManager connectionManager;@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {ctx.pipeline().addBefore(ctx.name(), "idleStateHandler",new IdleStateHandler(READER_IDLE_TIME, WRITER_IDLE_TIME, ALL_IDLE_TIME, TimeUnit.SECONDS));}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) {if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;switch (event.state()) {case READER_IDLE:handleReaderIdle(ctx);break;case WRITER_IDLE:handleWriterIdle(ctx);break;case ALL_IDLE:handleAllIdle(ctx);break;}}}private void handleReaderIdle(ChannelHandlerContext ctx) {int count = lostHeartbeatCount.incrementAndGet();if (count > 3) { // 连续3次没收到心跳,判定连接断开log.warn("连续{}次未收到心跳,关闭连接", count);ctx.close();connectionManager.scheduleReconnect();} else {log.warn("读空闲超时,第{}次", count);}}private void handleWriterIdle(ChannelHandlerContext ctx) {// 发送心跳包HeartbeatMessage heartbeat = new HeartbeatMessage();ctx.writeAndFlush(heartbeat).addListener(future -> {if (!future.isSuccess()) {log.error("发送心跳包失败", future.cause());ctx.close();}});}private void handleAllIdle(ChannelHandlerContext ctx) {log.warn("全局空闲超时,关闭连接");ctx.close();connectionManager.scheduleReconnect();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {if (msg instanceof HeartbeatMessage) {// 收到心跳响应,重置计数器lostHeartbeatCount.set(0);}ctx.fireChannelRead(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("心跳处理异常", cause);ctx.close();}
}
需要配合
IdleStateHandler
处理器使用
支持多级超时检测
- 读空闲超时:检测服务器是否存活
- 写空闲超时:触发心跳包发送
- 全局空闲超时:作为最后的保障
渐进式处理
- 连续多次心跳失败才断开连接
- 支持重连机制
状态管理
- 使用原子技术器记录失败次数
- 收到心跳响应时重置计数
- 异常情况自动断开重连
可配置性
- 超时时间可配置
- 失败重试次数可配置
- 心跳间隔可配置
流量控制
public class CustomTrafficShapingHandler extends ChannelDuplexHandler {private final GlobalTrafficShapingHandler globalTrafficHandler;public CustomTrafficShapingHandler(EventLoopGroup group,long writeLimit, // 写入速率限制,单位为字节/秒long readLimit, // 读取速率限制,单位为字节/秒long checkInterval // 检查间隔,单位为毫秒) {this.globalTrafficHandler = new GlobalTrafficShapingHandler(group, writeLimit, readLimit, checkInterval);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 当收到数据时,通过令牌桶算法控制读取速率globalTrafficHandler.channelRead(ctx, msg);}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {// 当发送数据时,通过令牌桶算法控制发送速率globalTrafficHandler.write(ctx, msg, promise);}
}
GlobalTrafficShapingHandler
是netty
提供的流量整形处理器,主要通过领票捅算法来实现流量控制,工作原理:
- 令牌桶的算法
- 系统按照设定的速率往桶里放入令牌
- 每次读写操作都需要消耗令牌
- 如果桶里没有足够的令牌,操作会被延迟执行
- 使用示例:
// 配置:写入限制 1MB/s,读取限制 1MB/s,检查间隔 1秒
CustomTrafficShapingHandler handler = new CustomTrafficShapingHandler(group,1 * 1024 * 1024, // 写入限制:1MB/s1 * 1024 * 1024, // 读取限制:1MB/s1000 // 检查间隔:1秒
);
异常重试机制
public class RetryPolicy {private final int maxAttempts;private final long initialRetryDelay;public <T> CompletableFuture<T> execute(Supplier<CompletableFuture<T>> action) {return executeWithRetry(action, 0);}private <T> CompletableFuture<T> executeWithRetry(Supplier<CompletableFuture<T>> action, int attempt) {return action.get().exceptionally(throwable -> {if (attempt < maxAttempts && isRetryable(throwable)) {return executeWithRetry(action, attempt + 1).join();}throw new CompletionException(throwable);});}
}
优点
- 集中处理所有异常
- 不同类型异常有不同的处理策略
- 支持自动重连机制
- 异常日志记录
- 优雅处理连接断开
优雅关闭
public class ShutdownHook {private final NettyClient client;private final ExecutorService executorService;public void registerShutdownHook() {Runtime.getRuntime().addShutdownHook(new Thread(() -> {client.shutdown();executorService.shutdown();try {executorService.awaitTermination(5, TimeUnit.SECONDS);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}));}
}
监控指标收集
public class MetricsHandler extends ChannelDuplexHandler {private final MeterRegistry registry;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {Timer.Sample sample = Timer.start(registry);try {ctx.fireChannelRead(msg);} finally {sample.stop(registry.timer("netty.request.time"));registry.counter("netty.request.count").increment();}}
}
SSL/TLS支持
public class SslContextFactory {public static SslContext createSslContext() {return SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();}
}