Files
interview/10-中间件/Netty实战场景.md
yasinshaw 7aa971f511 feat: add Netty and Java NIO interview questions
Added 3 comprehensive interview documents covering Netty and Java NIO:

**1. Netty Core Principles (Netty核心原理.md)**
- Core components: Channel, EventLoop, ChannelPipeline, ByteBuf
- Reactor threading model (single/multi-threaded, master-slave)
- ByteBuf vs Java NIO ByteBuffer
- Zero-copy implementation (4 approaches)
- ChannelPipeline and ChannelHandler
- TCP sticky/unpacking problem solutions
- Heartbeat mechanism

**2. Java NIO Core Principles (Java NIO核心原理.md)**
- NIO vs BIO comparison
- Three core components: Channel, Buffer, Selector
- Selector multiplexing mechanism
- Channel vs Stream differences
- Buffer core attributes and usage
- Non-blocking I/O implementation
- Zero-copy with transferTo and MappedByteBuffer

**3. Netty Practice Scenarios (Netty实战场景.md)**
- High-performance RPC framework design
- WebSocket server implementation
- Million-connection IM system architecture
- Memory leak detection and resolution
- Graceful shutdown implementation
- Heartbeat and reconnection mechanisms

Each document includes:
- Detailed problem descriptions
- Complete code examples (Java)
- Architecture diagrams
- Best practices
- Performance optimization tips
- P7-level bonus points

Total: 3 documents, covering:
- Theoretical foundations
- Practical implementations
- Production scenarios
- Performance tuning
- Common pitfalls

Suitable for backend P7 interview preparation.

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-03-06 10:20:11 +08:00

25 KiB
Raw Blame History

Netty 实战场景与最佳实践

问题

  1. 如何设计一个高性能的 RPC 框架?
  2. 如何实现 WebSocket 服务器?
  3. 如何实现百万连接的 IM 系统?
  4. 如何处理 Netty 的内存泄漏?
  5. 如何实现优雅关闭?
  6. 如何设计心跳和重连机制?
  7. 如何实现分布式 Session 共享?

标准答案

1. 高性能 RPC 框架设计

架构设计

┌──────────────────────────────────────────┐
│           RPC 框架架构                    │
├──────────────────────────────────────────┤
│  Consumer Side                           │
│  ┌──────────┐  ┌──────────┐              │
│  │ Proxy    │  │ Serializer│              │
│  │ (动态代理)│  │ (Protobuf) │              │
│  └────┬─────┘  └──────────┘              │
│       │                                   │
│       ▼                                   │
│  ┌─────────────────────────────────┐    │
│  │     Netty Client                │    │
│  │  - Reconnect                    │    │
│  │  - Timeout                      │    │
│  │  - Heartbeat                    │    │
│  └──────────────┬──────────────────┘    │
│                 │                         │
│                 │ TCP                    │
│                 │                         │
│  ┌──────────────▼──────────────────┐    │
│  │     Netty Server                │    │
│  │  - Boss/Worker Groups            │    │
│  │  - Pipeline                      │    │
│  │  - Handler Chain                 │    │
│  └──────────────┬──────────────────┘    │
│                 │                         │
│       ┌─────────┴─────────┐              │
│       ▼                   ▼              │
│  ┌─────────┐       ┌──────────┐          │
│  │Service  │       │ Registry │          │
│  │(实现)   │       │ (服务发现)│          │
│  └─────────┘       └──────────┘          │
└──────────────────────────────────────────┘

协议设计

// RPC 协议格式
┌───────────┬───────────┬──────────┬──────────┐
 Magic      Length     Type       Body     
 Number     (4 bytes)  (1 byte)            
 (4 bytes)                                 
└───────────┴───────────┴──────────┴──────────┘

// 实现
public class RpcProtocol {

    private static final int MAGIC_NUMBER = 0xCAFEBABE;
    private static final int HEADER_LENGTH = 9;

    public static ByteBuf encode(byte[] body, byte type) {
        ByteBuf buffer = Unpooled.buffer(HEADER_LENGTH + body.length);

        // Magic Number (4 bytes)
        buffer.writeInt(MAGIC_NUMBER);

        // Length (4 bytes)
        buffer.writeInt(body.length);

        // Type (1 byte)
        buffer.writeByte(type);

        // Body
        buffer.writeBytes(body);

        return buffer;
    }

    public static byte[] decode(ByteBuf buffer) {
        // 检查 Magic Number
        if (buffer.readInt() != MAGIC_NUMBER) {
            throw new IllegalArgumentException("Invalid magic number");
        }

        // 读取 Length
        int length = buffer.readInt();

        // 读取 Type
        byte type = buffer.readByte();

        // 读取 Body
        byte[] body = new byte[length];
        buffer.readBytes(body);

        return body;
    }
}

服务端实现

@Component
public class RpcServer {

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private Channel serverChannel;

    public void start(int port) throws InterruptedException {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .option(ChannelOption.SO_REUSEADDR, true)
                .childOption(ChannelOption.TCP_NODELAY, true)
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();

                        // 编解码器
                        pipeline.addLast("frameDecoder",
                            new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0));
                        pipeline.addLast("frameEncoder",
                            new LengthFieldPrepender(4));

                        // 协议编解码
                        pipeline.addLast("decoder", new RpcDecoder());
                        pipeline.addLast("encoder", new RpcEncoder());

                        // 心跳检测
                        pipeline.addLast("idleStateHandler",
                            new IdleStateHandler(60, 0, 0));
                        pipeline.addLast("heartbeatHandler", new HeartbeatHandler());

                        // 业务处理器
                        pipeline.addLast("handler", new RpcServerHandler());
                    }
                });

            // 绑定端口并启动
            ChannelFuture future = bootstrap.bind(port).sync();
            serverChannel = future.channel();

            System.out.println("RPC Server started on port " + port);

            // 等待服务器 socket 关闭
            serverChannel.closeFuture().sync();

        } finally {
            shutdown();
        }
    }

    public void shutdown() {
        if (serverChannel != null) {
            serverChannel.close();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
    }

    @PreDestroy
    public void destroy() {
        shutdown();
    }
}

客户端实现

@Component
public class RpcClient {

    private EventLoopGroup workerGroup;
    private Channel channel;

    public void connect(String host, int port) throws InterruptedException {
        workerGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(workerGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();

                        // 编解码器
                        pipeline.addLast("frameDecoder",
                            new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0));
                        pipeline.addLast("frameEncoder",
                            new LengthFieldPrepender(4));

                        // 协议编解码
                        pipeline.addLast("decoder", new RpcDecoder());
                        pipeline.addLast("encoder", new RpcEncoder());

                        // 心跳检测
                        pipeline.addLast("idleStateHandler",
                            new IdleStateHandler(0, 30, 0));
                        pipeline.addLast("heartbeatHandler", new HeartbeatHandler());

                        // 业务处理器
                        pipeline.addLast("handler", new RpcClientHandler());
                    }
                });

            // 连接服务器
            ChannelFuture future = bootstrap.connect(host, port).sync();
            channel = future.channel();

            System.out.println("RPC Client connected to " + host + ":" + port);

            // 等待连接关闭
            channel.closeFuture().sync();

        } finally {
            shutdown();
        }
    }

    public void shutdown() {
        if (channel != null) {
            channel.close();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
    }

    public <T> T sendRequest(RpcRequest request, Class<T> returnType) {
        // 发送请求并等待响应
        RpcClientHandler handler = channel.pipeline()
            .get(RpcClientHandler.class);

        return handler.sendRequest(request, returnType);
    }
}

2. WebSocket 服务器实现

WebSocket 握手机制

Client → Server: HTTP GET (Upgrade: websocket)
Server → Client: HTTP 101 (Switching Protocols)
Client → Server: WebSocket Frame
Server → Client: WebSocket Frame

服务器实现

@Component
public class WebSocketServer {

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    public void start(int port) throws InterruptedException {
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ChannelPipeline pipeline = ch.pipeline();

                        // HTTP 编解码器
                        pipeline.addLast("httpCodec", new HttpServerCodec());
                        pipeline.addLast("httpAggregator",
                            new HttpObjectAggregator(65536));

                        // WebSocket 握手处理器
                        pipeline.addLast("handshakeHandler",
                            new WebSocketServerProtocolHandler("/ws"));

                        // 自定义 WebSocket 处理器
                        pipeline.addLast("handler", new WebSocketHandler());
                    }
                });

            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println("WebSocket Server started on port " + port);

            future.channel().closeFuture().sync();

        } finally {
            shutdown();
        }
    }

    public void shutdown() {
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
        if (bossGroup != null) {
            bossGroup.shutdownGracefully();
        }
    }
}

WebSocket Handler

@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    // 存储所有连接的 Channel
    private static final ChannelGroup channels =
        new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        channels.add(ctx.channel());
        System.out.println("Client connected: " + ctx.channel().id());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        channels.remove(ctx.channel());
        System.out.println("Client disconnected: " + ctx.channel().id());
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx,
                                TextWebSocketFrame msg) {
        // 接收消息
        String message = msg.text();
        System.out.println("Received: " + message);

        // 广播消息给所有客户端
        channels.writeAndFlush(new TextWebSocketFrame(
            "Server: " + message
        ));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

3. 百万连接 IM 系统

架构优化

@Configuration
public class ImServerConfig {

    @Bean
    public EventLoopGroup bossGroup() {
        // 1 个线程处理 Accept
        return new NioEventLoopGroup(1);
    }

    @Bean
    public EventLoopGroup workerGroup() {
        // 根据 CPU 核心数设置 I/O 线程数
        // 通常设置为 CPU 核心数 * 2
        int threads = Runtime.getRuntime().availableProcessors() * 2;
        return new NioEventLoopGroup(threads);
    }

    @Bean
    public ServerBootstrap serverBootstrap(
            EventLoopGroup bossGroup,
            EventLoopGroup workerGroup) {

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 8192)  // 增大连接队列
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.SO_SNDBUF, 32 * 1024)  // 32KB 发送缓冲
            .childOption(ChannelOption.SO_RCVBUF, 32 * 1024)  // 32KB 接收缓冲
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ChannelPipeline pipeline = ch.pipeline();

                    // 连接数限制
                    pipeline.addLast("connectionLimiter",
                        new ConnectionLimiter(1000000));

                    // 流量整形
                    pipeline.addLast("trafficShaping",
                        new ChannelTrafficShapingHandler(
                            1024 * 1024,  // 读限制 1MB/s
                            1024 * 1024   // 写限制 1MB/s
                        ));

                    // 协议编解码
                    pipeline.addLast("frameDecoder",
                        new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 0));
                    pipeline.addLast("frameEncoder",
                        new LengthFieldPrepender(4));

                    // IM 协议处理器
                    pipeline.addLast("imHandler", new ImServerHandler());
                }
            });

        return bootstrap;
    }
}

连接限流

public class ConnectionLimiter extends ChannelInboundHandlerAdapter {

    private final int maxConnections;
    private final AtomicInteger activeConnections = new AtomicInteger(0);

    public ConnectionLimiter(int maxConnections) {
        this.maxConnections = maxConnections;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (activeConnections.get() >= maxConnections) {
            // 连接数超限,关闭新连接
            ctx.writeAndFlush(new TextWebSocketFrame(
                "Server is full, please try again later"
            )).addListener(ChannelFutureListener.CLOSE);
            return;
        }

        int count = activeConnections.incrementAndGet();
        System.out.println("Active connections: " + count);

        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        int count = activeConnections.decrementAndGet();
        System.out.println("Active connections: " + count);

        ctx.fireChannelInactive();
    }
}

4. 内存泄漏排查与解决

常见泄漏场景

// ❌ 错误示例 1: ByteBuf 未释放
public void handler(ChannelHandlerContext ctx, ByteBuf buf) {
    byte[] data = new byte[buf.readableBytes()];
    buf.readBytes(data);
    // 忘记释放 buf
}

// ✅ 正确做法
public void handler(ChannelHandlerContext ctx, ByteBuf buf) {
    try {
        byte[] data = new byte[buf.readableBytes()];
        buf.readBytes(data);
        // 处理数据...
    } finally {
        ReferenceCountUtil.release(buf);  // 释放 ByteBuf
    }
}

// ❌ 错误示例 2: Handler 未移除
public void channelActive(ChannelHandlerContext ctx) {
    // 添加长生命周期的 Handler
    ctx.pipeline().addLast("longLived", new LongLivedHandler());
}

// ✅ 正确做法
public void channelActive(ChannelHandlerContext ctx) {
    // 使用 @Sharable 注解
    ctx.pipeline().addLast("longLived", SharableHandler.INSTANCE);
}

// ❌ 错误示例 3: Channel 引用未释放
public class ChannelHolder {
    private static final Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();

    public static void addChannel(String id, Channel channel) {
        CHANNELS.put(id, channel);  // 永久持有引用
    }
}

// ✅ 正确做法
public class ChannelHolder {
    private static final Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();

    public static void addChannel(String id, Channel channel) {
        CHANNELS.put(id, channel);

        // 监听关闭事件,自动移除
        channel.closeFuture().addListener((ChannelFutureListener) future -> {
            CHANNELS.remove(id);
        });
    }
}

内存泄漏检测

// 启用内存泄漏检测
// JVM 参数: -Dio.netty.leakDetection.level=paranoid

public class LeakDetection {
    public static void main(String[] args) {
        // 检测级别
        // DISABLED - 禁用
        // SIMPLE - 采样检测1%
        // ADVANCED - 采样检测,记录创建堆栈
        // PARANOID - 全量检测

        // 创建资源时使用
        ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024);

        // 使用资源泄漏检测器访问器
        ResourceLeakDetector<ByteBuf> detector =
            new ResourceLeakDetector<ByteBuf>(ByteBuf.class,
                ResourceLeakDetector.Level.PARANOID);

        // 包装对象
        ByteBuf wrappedBuffer = detector.wrap(buffer, "test-buffer");

        // 使用完成后
        wrappedBuffer.release();  // 如果未释放,会记录堆栈并警告
    }
}

5. 优雅关闭实现

@Component
public class NettyGracefulShutdown implements SmartLifecycle {

    @Autowired
    private ServerBootstrap serverBootstrap;

    private Channel serverChannel;
    private volatile boolean running = false;

    @Override
    public void start() {
        running = true;
        // 启动服务器...
    }

    @Override
    public void stop() {
        if (!running) {
            return;
        }

        System.out.println("Shutting down Netty server gracefully...");

        // 1. 停止接受新连接
        if (serverChannel != null) {
            serverChannel.close().syncUninterruptibly();
        }

        // 2. 等待现有请求处理完成(超时 30 秒)
        EventLoopGroup workerGroup = serverBootstrap.config().group();
        Future<?> shutdownFuture = workerGroup.shutdownGracefully(2, 30, TimeUnit.SECONDS);

        try {
            // 等待优雅关闭完成
            shutdownFuture.await(60, TimeUnit.SECONDS);

            if (shutdownFuture.isSuccess()) {
                System.out.println("Netty server shutdown successfully");
            } else {
                System.err.println("Netty server shutdown timeout, force close");
                workerGroup.shutdownNow();
            }

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            workerGroup.shutdownNow();
        }

        // 3. 关闭 Boss Group
        EventLoopGroup bossGroup = serverBootstrap.config().childGroup();
        bossGroup.shutdownGracefully();

        running = false;
    }

    @Override
    public boolean isRunning() {
        return running;
    }
}

6. 心跳与重连机制

心跳处理器

@ChannelHandler.Sharable
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {

    private static final ByteBuf HEARTBEAT_BEAT =
        Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;

            if (event.state() == IdleState.READER_IDLE) {
                // 读空闲,可能对方已挂掉
                System.out.println("Read idle, closing connection");
                ctx.close();

            } else if (event.state() == IdleState.WRITER_IDLE) {
                // 写空闲,发送心跳
                System.out.println("Write idle, sending heartbeat");
                ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate());

            } else if (event.state() == IdleState.ALL_IDLE) {
                // 读写空闲
                System.out.println("All idle, sending heartbeat");
                ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate());
            }
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 处理心跳响应
        if ("HEARTBEAT".equals(msg)) {
            // 心跳包,不处理
            return;
        }

        // 业务消息
        ctx.fireChannelRead(msg);
    }
}

重连机制

@Component
public class ReconnectClient {

    private EventLoopGroup workerGroup;
    private Channel channel;
    private ScheduledExecutorService scheduler;

    public void connect(String host, int port) {
        workerGroup = new NioEventLoopGroup();
        scheduler = Executors.newSingleThreadScheduledExecutor();

        doConnect(host, port);
    }

    private void doConnect(String host, int port) {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(workerGroup)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) {
                    ch.pipeline()
                        .addLast("idleStateHandler",
                            new IdleStateHandler(0, 10, 0))
                        .addLast("heartbeatHandler", new HeartbeatHandler())
                        .addLast("handler", new ClientHandler());
                }
            });

        // 连接服务器
        ChannelFuture future = bootstrap.connect(host, port);

        future.addListener((ChannelFutureListener) f -> {
            if (f.isSuccess()) {
                System.out.println("Connected to server");
                channel = f.channel();
            } else {
                System.out.println("Connection failed, reconnect in 5 seconds...");

                // 5 秒后重连
                scheduler.schedule(() -> doConnect(host, port), 5, TimeUnit.SECONDS);
            }
        });
    }

    public void shutdown() {
        if (channel != null) {
            channel.close();
        }
        if (workerGroup != null) {
            workerGroup.shutdownGracefully();
        }
        if (scheduler != null) {
            scheduler.shutdown();
        }
    }
}

P7 加分项

深度理解

  • Reactor 模式:理解主从 Reactor 多线程模型
  • 零拷贝原理:理解用户空间和内核空间
  • 内存管理ByteBuf 的池化、引用计数、泄漏检测

实战经验

  • 高并发调优EventLoopGroup 线程数、TCP 参数调优
  • 监控告警:连接数监控、流量监控、延迟监控
  • 故障排查内存泄漏、CPU 100%、连接泄漏

架构设计

  • 协议设计:自定义协议、编解码器设计
  • 集群部署:负载均衡、服务发现
  • 容错机制:重连、熔断、限流

性能优化

  1. 减少内存拷贝:使用 CompositeByteBuf
  2. 复用对象:使用 ByteBuf 对象池
  3. 优化 TCP 参数TCP_NODELAY、SO_KEEPALIVE
  4. 流量控制ChannelTrafficShapingHandler

总结

Netty 实战核心要点:

  1. 高性能Reactor 模型、零拷贝、内存池
  2. 高可靠:心跳、重连、优雅关闭
  3. 可扩展Pipeline 机制、自定义协议
  4. 可监控:连接数、流量、延迟监控

最佳实践

  • 合理设置 EventLoopGroup 线程数
  • 使用池化的 ByteBuf及时释放
  • 添加心跳和重连机制
  • 实现优雅关闭
  • 做好监控和告警
  • 定期排查内存泄漏