# Netty 实战场景与最佳实践 ## 问题 1. 如何设计一个高性能的 RPC 框架? 2. 如何实现 WebSocket 服务器? 3. 如何实现百万连接的 IM 系统? 4. 如何处理 Netty 的内存泄漏? 5. 如何实现优雅关闭? 6. 如何设计心跳和重连机制? 7. 如何实现分布式 Session 共享? --- ## 标准答案 ### 1. 高性能 RPC 框架设计 #### **架构设计** ``` ┌──────────────────────────────────────────┐ │ RPC 框架架构 │ ├──────────────────────────────────────────┤ │ Consumer Side │ │ ┌──────────┐ ┌──────────┐ │ │ │ Proxy │ │ Serializer│ │ │ │ (动态代理)│ │ (Protobuf) │ │ │ └────┬─────┘ └──────────┘ │ │ │ │ │ ▼ │ │ ┌─────────────────────────────────┐ │ │ │ Netty Client │ │ │ │ - Reconnect │ │ │ │ - Timeout │ │ │ │ - Heartbeat │ │ │ └──────────────┬──────────────────┘ │ │ │ │ │ │ TCP │ │ │ │ │ ┌──────────────▼──────────────────┐ │ │ │ Netty Server │ │ │ │ - Boss/Worker Groups │ │ │ │ - Pipeline │ │ │ │ - Handler Chain │ │ │ └──────────────┬──────────────────┘ │ │ │ │ │ ┌─────────┴─────────┐ │ │ ▼ ▼ │ │ ┌─────────┐ ┌──────────┐ │ │ │Service │ │ Registry │ │ │ │(实现) │ │ (服务发现)│ │ │ └─────────┘ └──────────┘ │ └──────────────────────────────────────────┘ ``` #### **协议设计** ```java // RPC 协议格式 ┌───────────┬───────────┬──────────┬──────────┐ │ Magic │ Length │ Type │ Body │ │ Number │ (4 bytes) │ (1 byte) │ │ │ (4 bytes) │ │ │ │ └───────────┴───────────┴──────────┴──────────┘ // 实现 public class RpcProtocol { private static final int MAGIC_NUMBER = 0xCAFEBABE; private static final int HEADER_LENGTH = 9; public static ByteBuf encode(byte[] body, byte type) { ByteBuf buffer = Unpooled.buffer(HEADER_LENGTH + body.length); // Magic Number (4 bytes) buffer.writeInt(MAGIC_NUMBER); // Length (4 bytes) buffer.writeInt(body.length); // Type (1 byte) buffer.writeByte(type); // Body buffer.writeBytes(body); return buffer; } public static byte[] decode(ByteBuf buffer) { // 检查 Magic Number if (buffer.readInt() != MAGIC_NUMBER) { throw new IllegalArgumentException("Invalid magic number"); } // 读取 Length int length = buffer.readInt(); // 读取 Type byte type = buffer.readByte(); // 读取 Body byte[] body = new byte[length]; buffer.readBytes(body); return body; } } ``` #### **服务端实现** ```java @Component public class RpcServer { private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private Channel serverChannel; public void start(int port) throws InterruptedException { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 编解码器 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 协议编解码 pipeline.addLast("decoder", new RpcDecoder()); pipeline.addLast("encoder", new RpcEncoder()); // 心跳检测 pipeline.addLast("idleStateHandler", new IdleStateHandler(60, 0, 0)); pipeline.addLast("heartbeatHandler", new HeartbeatHandler()); // 业务处理器 pipeline.addLast("handler", new RpcServerHandler()); } }); // 绑定端口并启动 ChannelFuture future = bootstrap.bind(port).sync(); serverChannel = future.channel(); System.out.println("RPC Server started on port " + port); // 等待服务器 socket 关闭 serverChannel.closeFuture().sync(); } finally { shutdown(); } } public void shutdown() { if (serverChannel != null) { serverChannel.close(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } if (bossGroup != null) { bossGroup.shutdownGracefully(); } } @PreDestroy public void destroy() { shutdown(); } } ``` #### **客户端实现** ```java @Component public class RpcClient { private EventLoopGroup workerGroup; private Channel channel; public void connect(String host, int port) throws InterruptedException { workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 编解码器 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // 协议编解码 pipeline.addLast("decoder", new RpcDecoder()); pipeline.addLast("encoder", new RpcEncoder()); // 心跳检测 pipeline.addLast("idleStateHandler", new IdleStateHandler(0, 30, 0)); pipeline.addLast("heartbeatHandler", new HeartbeatHandler()); // 业务处理器 pipeline.addLast("handler", new RpcClientHandler()); } }); // 连接服务器 ChannelFuture future = bootstrap.connect(host, port).sync(); channel = future.channel(); System.out.println("RPC Client connected to " + host + ":" + port); // 等待连接关闭 channel.closeFuture().sync(); } finally { shutdown(); } } public void shutdown() { if (channel != null) { channel.close(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } } public T sendRequest(RpcRequest request, Class returnType) { // 发送请求并等待响应 RpcClientHandler handler = channel.pipeline() .get(RpcClientHandler.class); return handler.sendRequest(request, returnType); } } ``` --- ### 2. WebSocket 服务器实现 #### **WebSocket 握手机制** ``` Client → Server: HTTP GET (Upgrade: websocket) Server → Client: HTTP 101 (Switching Protocols) Client → Server: WebSocket Frame Server → Client: WebSocket Frame ``` #### **服务器实现** ```java @Component public class WebSocketServer { private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; public void start(int port) throws InterruptedException { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // HTTP 编解码器 pipeline.addLast("httpCodec", new HttpServerCodec()); pipeline.addLast("httpAggregator", new HttpObjectAggregator(65536)); // WebSocket 握手处理器 pipeline.addLast("handshakeHandler", new WebSocketServerProtocolHandler("/ws")); // 自定义 WebSocket 处理器 pipeline.addLast("handler", new WebSocketHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("WebSocket Server started on port " + port); future.channel().closeFuture().sync(); } finally { shutdown(); } } public void shutdown() { if (workerGroup != null) { workerGroup.shutdownGracefully(); } if (bossGroup != null) { bossGroup.shutdownGracefully(); } } } ``` #### **WebSocket Handler** ```java @ChannelHandler.Sharable public class WebSocketHandler extends SimpleChannelInboundHandler { // 存储所有连接的 Channel private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void handlerAdded(ChannelHandlerContext ctx) { channels.add(ctx.channel()); System.out.println("Client connected: " + ctx.channel().id()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) { channels.remove(ctx.channel()); System.out.println("Client disconnected: " + ctx.channel().id()); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) { // 接收消息 String message = msg.text(); System.out.println("Received: " + message); // 广播消息给所有客户端 channels.writeAndFlush(new TextWebSocketFrame( "Server: " + message )); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } } ``` --- ### 3. 百万连接 IM 系统 #### **架构优化** ```java @Configuration public class ImServerConfig { @Bean public EventLoopGroup bossGroup() { // 1 个线程处理 Accept return new NioEventLoopGroup(1); } @Bean public EventLoopGroup workerGroup() { // 根据 CPU 核心数设置 I/O 线程数 // 通常设置为 CPU 核心数 * 2 int threads = Runtime.getRuntime().availableProcessors() * 2; return new NioEventLoopGroup(threads); } @Bean public ServerBootstrap serverBootstrap( EventLoopGroup bossGroup, EventLoopGroup workerGroup) { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 8192) // 增大连接队列 .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.TCP_NODELAY, true) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.SO_SNDBUF, 32 * 1024) // 32KB 发送缓冲 .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 32KB 接收缓冲 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 连接数限制 pipeline.addLast("connectionLimiter", new ConnectionLimiter(1000000)); // 流量整形 pipeline.addLast("trafficShaping", new ChannelTrafficShapingHandler( 1024 * 1024, // 读限制 1MB/s 1024 * 1024 // 写限制 1MB/s )); // 协议编解码 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 0)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); // IM 协议处理器 pipeline.addLast("imHandler", new ImServerHandler()); } }); return bootstrap; } } ``` #### **连接限流** ```java public class ConnectionLimiter extends ChannelInboundHandlerAdapter { private final int maxConnections; private final AtomicInteger activeConnections = new AtomicInteger(0); public ConnectionLimiter(int maxConnections) { this.maxConnections = maxConnections; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (activeConnections.get() >= maxConnections) { // 连接数超限,关闭新连接 ctx.writeAndFlush(new TextWebSocketFrame( "Server is full, please try again later" )).addListener(ChannelFutureListener.CLOSE); return; } int count = activeConnections.incrementAndGet(); System.out.println("Active connections: " + count); ctx.fireChannelRead(msg); } @Override public void channelInactive(ChannelHandlerContext ctx) { int count = activeConnections.decrementAndGet(); System.out.println("Active connections: " + count); ctx.fireChannelInactive(); } } ``` --- ### 4. 内存泄漏排查与解决 #### **常见泄漏场景** ```java // ❌ 错误示例 1: ByteBuf 未释放 public void handler(ChannelHandlerContext ctx, ByteBuf buf) { byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); // 忘记释放 buf } // ✅ 正确做法 public void handler(ChannelHandlerContext ctx, ByteBuf buf) { try { byte[] data = new byte[buf.readableBytes()]; buf.readBytes(data); // 处理数据... } finally { ReferenceCountUtil.release(buf); // 释放 ByteBuf } } // ❌ 错误示例 2: Handler 未移除 public void channelActive(ChannelHandlerContext ctx) { // 添加长生命周期的 Handler ctx.pipeline().addLast("longLived", new LongLivedHandler()); } // ✅ 正确做法 public void channelActive(ChannelHandlerContext ctx) { // 使用 @Sharable 注解 ctx.pipeline().addLast("longLived", SharableHandler.INSTANCE); } // ❌ 错误示例 3: Channel 引用未释放 public class ChannelHolder { private static final Map CHANNELS = new ConcurrentHashMap<>(); public static void addChannel(String id, Channel channel) { CHANNELS.put(id, channel); // 永久持有引用 } } // ✅ 正确做法 public class ChannelHolder { private static final Map CHANNELS = new ConcurrentHashMap<>(); public static void addChannel(String id, Channel channel) { CHANNELS.put(id, channel); // 监听关闭事件,自动移除 channel.closeFuture().addListener((ChannelFutureListener) future -> { CHANNELS.remove(id); }); } } ``` #### **内存泄漏检测** ```java // 启用内存泄漏检测 // JVM 参数: -Dio.netty.leakDetection.level=paranoid public class LeakDetection { public static void main(String[] args) { // 检测级别 // DISABLED - 禁用 // SIMPLE - 采样检测(1%) // ADVANCED - 采样检测,记录创建堆栈 // PARANOID - 全量检测 // 创建资源时使用 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024); // 使用资源泄漏检测器访问器 ResourceLeakDetector detector = new ResourceLeakDetector(ByteBuf.class, ResourceLeakDetector.Level.PARANOID); // 包装对象 ByteBuf wrappedBuffer = detector.wrap(buffer, "test-buffer"); // 使用完成后 wrappedBuffer.release(); // 如果未释放,会记录堆栈并警告 } } ``` --- ### 5. 优雅关闭实现 ```java @Component public class NettyGracefulShutdown implements SmartLifecycle { @Autowired private ServerBootstrap serverBootstrap; private Channel serverChannel; private volatile boolean running = false; @Override public void start() { running = true; // 启动服务器... } @Override public void stop() { if (!running) { return; } System.out.println("Shutting down Netty server gracefully..."); // 1. 停止接受新连接 if (serverChannel != null) { serverChannel.close().syncUninterruptibly(); } // 2. 等待现有请求处理完成(超时 30 秒) EventLoopGroup workerGroup = serverBootstrap.config().group(); Future shutdownFuture = workerGroup.shutdownGracefully(2, 30, TimeUnit.SECONDS); try { // 等待优雅关闭完成 shutdownFuture.await(60, TimeUnit.SECONDS); if (shutdownFuture.isSuccess()) { System.out.println("Netty server shutdown successfully"); } else { System.err.println("Netty server shutdown timeout, force close"); workerGroup.shutdownNow(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); workerGroup.shutdownNow(); } // 3. 关闭 Boss Group EventLoopGroup bossGroup = serverBootstrap.config().childGroup(); bossGroup.shutdownGracefully(); running = false; } @Override public boolean isRunning() { return running; } } ``` --- ### 6. 心跳与重连机制 #### **心跳处理器** ```java @ChannelHandler.Sharable public class HeartbeatHandler extends ChannelInboundHandlerAdapter { private static final ByteBuf HEARTBEAT_BEAT = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { // 读空闲,可能对方已挂掉 System.out.println("Read idle, closing connection"); ctx.close(); } else if (event.state() == IdleState.WRITER_IDLE) { // 写空闲,发送心跳 System.out.println("Write idle, sending heartbeat"); ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate()); } else if (event.state() == IdleState.ALL_IDLE) { // 读写空闲 System.out.println("All idle, sending heartbeat"); ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate()); } } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // 处理心跳响应 if ("HEARTBEAT".equals(msg)) { // 心跳包,不处理 return; } // 业务消息 ctx.fireChannelRead(msg); } } ``` #### **重连机制** ```java @Component public class ReconnectClient { private EventLoopGroup workerGroup; private Channel channel; private ScheduledExecutorService scheduler; public void connect(String host, int port) { workerGroup = new NioEventLoopGroup(); scheduler = Executors.newSingleThreadScheduledExecutor(); doConnect(host, port); } private void doConnect(String host, int port) { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast("idleStateHandler", new IdleStateHandler(0, 10, 0)) .addLast("heartbeatHandler", new HeartbeatHandler()) .addLast("handler", new ClientHandler()); } }); // 连接服务器 ChannelFuture future = bootstrap.connect(host, port); future.addListener((ChannelFutureListener) f -> { if (f.isSuccess()) { System.out.println("Connected to server"); channel = f.channel(); } else { System.out.println("Connection failed, reconnect in 5 seconds..."); // 5 秒后重连 scheduler.schedule(() -> doConnect(host, port), 5, TimeUnit.SECONDS); } }); } public void shutdown() { if (channel != null) { channel.close(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } if (scheduler != null) { scheduler.shutdown(); } } } ``` --- ## P7 加分项 ### 深度理解 - **Reactor 模式**:理解主从 Reactor 多线程模型 - **零拷贝原理**:理解用户空间和内核空间 - **内存管理**:ByteBuf 的池化、引用计数、泄漏检测 ### 实战经验 - **高并发调优**:EventLoopGroup 线程数、TCP 参数调优 - **监控告警**:连接数监控、流量监控、延迟监控 - **故障排查**:内存泄漏、CPU 100%、连接泄漏 ### 架构设计 - **协议设计**:自定义协议、编解码器设计 - **集群部署**:负载均衡、服务发现 - **容错机制**:重连、熔断、限流 ### 性能优化 1. **减少内存拷贝**:使用 CompositeByteBuf 2. **复用对象**:使用 ByteBuf 对象池 3. **优化 TCP 参数**:TCP_NODELAY、SO_KEEPALIVE 4. **流量控制**:ChannelTrafficShapingHandler --- ## 总结 Netty 实战核心要点: 1. **高性能**:Reactor 模型、零拷贝、内存池 2. **高可靠**:心跳、重连、优雅关闭 3. **可扩展**:Pipeline 机制、自定义协议 4. **可监控**:连接数、流量、延迟监控 **最佳实践**: - 合理设置 EventLoopGroup 线程数 - 使用池化的 ByteBuf,及时释放 - 添加心跳和重连机制 - 实现优雅关闭 - 做好监控和告警 - 定期排查内存泄漏