diff --git a/.obsidian/workspace.json b/.obsidian/workspace.json index 33c0425..42a08cc 100644 --- a/.obsidian/workspace.json +++ b/.obsidian/workspace.json @@ -196,6 +196,9 @@ }, "active": "16a7ce8de420dd10", "lastOpenFiles": [ + "10-中间件/Netty实战场景.md", + "10-中间件/Java NIO核心原理.md", + "10-中间件/Netty核心原理.md", "10-中间件", "16-LeetCode Hot 100/从前序与中序遍历序列构造二叉树.md", "16-LeetCode Hot 100/路径总和.md", @@ -222,15 +225,12 @@ "16-LeetCode Hot 100", "00-项目概述/项目概述.md", "00-项目概述", - "questions/04-消息队列/消息队列_RocketMQ_Kafka.md", - "questions/05-并发编程/ConcurrentHashMap原理.md", "questions/15-简历面试", "questions/14-Web3与区块链", "12-面试技巧", "08-算法与数据结构", "questions/13-Golang语言", "questions/12-面试技巧", - "questions/11-运维", - "questions/10-中间件" + "questions/11-运维" ] } \ No newline at end of file diff --git a/10-中间件/Java NIO核心原理.md b/10-中间件/Java NIO核心原理.md new file mode 100644 index 0000000..c9bb0bf --- /dev/null +++ b/10-中间件/Java NIO核心原理.md @@ -0,0 +1,527 @@ +# Java NIO 核心原理 + +## 问题 + +1. 什么是 NIO?与 BIO 的区别是什么? +2. NIO 的三大核心组件是什么? +3. 什么是 Selector?如何实现多路复用? +4. 什么是 Channel?与 Stream 的区别? +5. 什么是 Buffer?如何理解 Buffer 的核心属性? +6. NIO 如何实现非阻塞 I/O? +7. 什么是零拷贝?如何实现? + +--- + +## 标准答案 + +### 1. NIO vs BIO + +#### **对比表** + +| 特性 | BIO (Blocking I/O) | NIO (Non-blocking I/O) | +|------|-------------------|----------------------| +| **I/O 模型** | 阻塞 | 非阻塞 | +| **线程模型** | 每连接一线程 | Reactor 模式 | +| **并发能力** | 低 | 高 | +| **编程复杂度** | 简单 | 复杂 | +| **数据操作** | Stream | Channel + Buffer | +| **适用场景** | 连接数少、高延迟 | 连接数多、高并发 | + +#### **代码对比** + +**BIO 实现**: +```java +// 传统 BIO - 阻塞式 +ServerSocket serverSocket = new ServerSocket(8080); + +while (true) { + // 阻塞等待连接 + Socket socket = serverSocket.accept(); + + // 每个连接一个线程 + new Thread(() -> { + try { + BufferedReader reader = new BufferedReader( + new InputStreamReader(socket.getInputStream())); + + // 阻塞读取数据 + String line = reader.readLine(); + + // 处理数据... + } catch (IOException e) { + e.printStackTrace(); + } + }).start(); +} +``` + +**NIO 实现**: +```java +// NIO - 非阻塞式 +ServerSocketChannel serverChannel = ServerSocketChannel.open(); +serverChannel.configureBlocking(false); +serverChannel.bind(new InetSocketAddress(8080)); + +Selector selector = Selector.open(); +serverChannel.register(selector, SelectionKey.OP_ACCEPT); + +while (true) { + // 非阻塞等待事件 + int readyCount = selector.select(); + + Set readyKeys = selector.selectedKeys(); + for (SelectionKey key : readyKeys) { + if (key.isAcceptable()) { + // 处理连接 + } + if (key.isReadable()) { + // 处理读 + } + } +} +``` + +--- + +### 2. NIO 三大核心组件 + +``` +┌────────────────────────────────────────┐ +│ Java NIO 架构 │ +├────────────────────────────────────────┤ +│ │ +│ ┌─────────┐ ┌─────────┐ │ +│ │Channel │◄──►│ Buffer │ │ +│ └────┬────┘ └─────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────┐ │ +│ │Selector │ │ +│ │(多路复用)│ │ +│ └─────────┘ │ +│ │ +└────────────────────────────────────────┘ + +Channel: 数据通道(双向) +Buffer: 数据容器 +Selector: 多路复用器 +``` + +#### **核心组件关系** + +```java +// 1. 打开 Channel +FileChannel channel = FileChannel.open(Paths.get("data.txt")); + +// 2. 分配 Buffer +ByteBuffer buffer = ByteBuffer.allocate(1024); + +// 3. 读取数据到 Buffer +channel.read(buffer); + +// 4. 切换读写模式 +buffer.flip(); + +// 5. 处理数据 +while (buffer.hasRemaining()) { + byte b = buffer.get(); +} + +// 6. 清空 Buffer +buffer.clear(); +``` + +--- + +### 3. Selector 多路复用 + +#### **工作原理** + +``` +Selector 多路复用模型: + + ┌──────────────────────────────────┐ + │ Selector │ + │ │ + │ select() - 阻塞等待就绪事件 │ + │ │ + │ ┌────┐ ┌────┐ ┌────┐ │ + │ │Ch1 │ │Ch2 │ │Ch3 │ ... │ + │ │就绪│ │就绪│ │就绪│ │ + │ └────┘ └────┘ └────┘ │ + └──────────────────────────────────┘ + ▲ + │ one thread + │ + ┌─────────┴──────────┐ + │ Event Loop │ + │ - select() │ + │ - process() │ + │ - repeat │ + └────────────────────┘ +``` + +#### **SelectionKey 事件类型** + +```java +// 四种事件类型 +int OP_ACCEPT = 1 << 4; // 连接就绪(ServerSocketChannel) +int OP_CONNECT = 1 << 3; // 连接完成(SocketChannel) +int OP_READ = 1 << 0; // 读就绪 +int OP_WRITE = 1 << 2; // 写就绪 + +// 注册事件 +SelectionKey key = channel.register(selector, + SelectionKey.OP_READ | SelectionKey.OP_WRITE); + +// 判断事件类型 +if (key.isAcceptable()) { // 连接事件 +if (key.isReadable()) { // 读事件 +if (key.isWritable()) { // 写事件 +if (key.isConnectable()) { // 连接完成事件 +``` + +#### **完整示例** + +```java +// NIO Server +ServerSocketChannel serverChannel = ServerSocketChannel.open(); +serverChannel.configureBlocking(false); +serverChannel.bind(new InetSocketAddress(8080)); + +Selector selector = Selector.open(); +serverChannel.register(selector, SelectionKey.OP_ACCEPT); + +while (true) { + // 阻塞等待事件(超时 1 秒) + int readyCount = selector.select(1000); + + if (readyCount == 0) { + continue; + } + + // 获取就绪的 SelectionKey + Set readyKeys = selector.selectedKeys(); + Iterator iterator = readyKeys.iterator(); + + while (iterator.hasNext()) { + SelectionKey key = iterator.next(); + iterator.remove(); // 移除已处理的 key + + if (!key.isValid()) { + continue; + } + + // 处理连接事件 + if (key.isAcceptable()) { + ServerSocketChannel server = (ServerSocketChannel) key.channel(); + SocketChannel channel = server.accept(); + channel.configureBlocking(false); + channel.register(selector, SelectionKey.OP_READ); + } + + // 处理读事件 + if (key.isReadable()) { + SocketChannel channel = (SocketChannel) key.channel(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + int bytesRead = channel.read(buffer); + + if (bytesRead == -1) { + // 连接关闭 + key.cancel(); + channel.close(); + } else { + // 处理数据 + buffer.flip(); + byte[] data = new byte[buffer.remaining()]; + buffer.get(data); + System.out.println("Received: " + new String(data)); + } + } + } +} +``` + +--- + +### 4. Channel vs Stream + +#### **核心区别** + +| 特性 | Stream (IO) | Channel (NIO) | +|------|-------------|---------------| +| **方向** | 单向(读/写) | 双向(读+写) | +| **阻塞** | 阻塞 | 可配置阻塞/非阻塞 | +| **缓冲** | 直接操作流 | 必须通过 Buffer | +| **性能** | 较低 | 高(零拷贝) | + +#### **Channel 类型** + +```java +// 1. FileChannel - 文件通道 +FileChannel fileChannel = FileChannel.open(Paths.get("data.txt"), + StandardOpenOption.READ, StandardOpenOption.WRITE); + +// 2. SocketChannel - TCP Socket +SocketChannel socketChannel = SocketChannel.open(); +socketChannel.configureBlocking(false); +socketChannel.connect(new InetSocketAddress("localhost", 8080)); + +// 3. ServerSocketChannel - TCP Server +ServerSocketChannel serverChannel = ServerSocketChannel.open(); +serverChannel.bind(new InetSocketAddress(8080)); + +// 4. DatagramChannel - UDP +DatagramChannel datagramChannel = DatagramChannel.open(); +datagramChannel.bind(new InetSocketAddress(8080)); + +// 5. Pipe.SinkChannel / Pipe.SourceChannel - 管道 +Pipe pipe = Pipe.open(); +Pipe.SinkChannel sinkChannel = pipe.sink(); +Pipe.SourceChannel sourceChannel = pipe.source(); +``` + +#### **FileChannel 示例** + +```java +// 文件复制(传统方式) +FileChannel sourceChannel = FileChannel.open(Paths.get("source.txt")); +FileChannel destChannel = FileChannel.open(Paths.get("dest.txt"), + StandardOpenOption.CREATE, StandardOpenOption.WRITE); + +// 方法1: 使用 Buffer +ByteBuffer buffer = ByteBuffer.allocate(1024); +while (sourceChannel.read(buffer) != -1) { + buffer.flip(); + destChannel.write(buffer); + buffer.clear(); +} + +// 方法2: 直接传输(零拷贝) +sourceChannel.transferTo(0, sourceChannel.size(), destChannel); +``` + +--- + +### 5. Buffer 核心属性 + +#### **Buffer 结构** + +``` +Buffer 核心属性: + +┌───────┬──────────┬──────────┬─────────┐ +│ 0 │position │ limit │capacity │ +│ │ │ │ │ +│ 已处理 可读/写数据 │ 不可访问 │ +│ 数据 │ │ │ +└───────┴──────────┴──────────┴─────────┘ + ↑ ↑ ↑ + mark position limit + +操作方法: +- mark(): 标记当前 position +- reset(): 恢复到 mark 位置 +- clear(): position=0, limit=capacity +- flip(): limit=position, position=0 (读→写) +- rewind(): position=0, limit 不变 +``` + +#### **Buffer 使用流程** + +```java +// 1. 分配 Buffer +ByteBuffer buffer = ByteBuffer.allocate(1024); + +// 初始状态: position=0, limit=1024, capacity=1024 + +// 2. 写入数据 +buffer.putInt(123); +buffer.putLong(456L); +buffer.put("Hello".getBytes()); + +// 写入后: position=17, limit=1024 + +// 3. 切换到读模式 +buffer.flip(); + +// flip 后: position=0, limit=17 + +// 4. 读取数据 +while (buffer.hasRemaining()) { + byte b = buffer.get(); +} + +// 读取后: position=17, limit=17 + +// 5. 清空 Buffer +buffer.clear(); + +// clear 后: position=0, limit=1024, capacity=1024 +``` + +#### **Buffer 类型** + +```java +// 基本类型 Buffer +ByteBuffer byteBuffer = ByteBuffer.allocate(1024); +CharBuffer charBuffer = CharBuffer.allocate(1024); +ShortBuffer shortBuffer = ShortBuffer.allocate(1024); +IntBuffer intBuffer = IntBuffer.allocate(1024); +LongBuffer longBuffer = LongBuffer.allocate(1024); +FloatBuffer floatBuffer = FloatBuffer.allocate(1024); +DoubleBuffer doubleBuffer = DoubleBuffer.allocate(1024); + +// 直接内存 vs 堆内存 +ByteBuffer heapBuffer = ByteBuffer.allocate(1024); // 堆内存 +ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024); // 直接内存 +``` + +--- + +### 6. 非阻塞 I/O 实现 + +#### **阻塞 vs 非阻塞** + +```java +// 阻塞模式(默认) +SocketChannel channel = SocketChannel.open(); +channel.configureBlocking(true); // 阻塞 +channel.connect(new InetSocketAddress("localhost", 8080)); +// 阻塞直到连接建立 + +// 非阻塞模式 +SocketChannel channel = SocketChannel.open(); +channel.configureBlocking(false); // 非阻塞 +channel.connect(new InetSocketAddress("localhost", 8080)); +// 立即返回 + +while (!channel.finishConnect()) { + // 连接未完成,做其他事 + System.out.println("Connecting..."); +} +``` + +#### **非阻塞读写** + +```java +SocketChannel channel = SocketChannel.open(); +channel.configureBlocking(false); + +ByteBuffer buffer = ByteBuffer.allocate(1024); + +// 非阻塞读 +int bytesRead = channel.read(buffer); +if (bytesRead == 0) { + // 没有数据可用 +} else if (bytesRead == -1) { + // 连接已关闭 +} else { + // 读取到数据 + buffer.flip(); + // 处理数据... +} + +// 非阻塞写 +buffer.clear(); +buffer.put("Hello".getBytes()); +buffer.flip(); + +int bytesWritten = channel.write(buffer); +if (bytesWritten == 0) { + // 缓冲区满,稍后重试 +} +``` + +--- + +### 7. 零拷贝实现 + +#### **传统 I/O vs 零拷贝** + +``` +传统 I/O(4 次拷贝,4 次上下文切换): +1. 磁盘 → 内核缓冲区 (DMA) +2. 内核缓冲区 → 用户缓冲区 (CPU) +3. 用户缓冲区 → Socket 缓冲区 (CPU) +4. Socket 缓冲区 → 网卡 (DMA) + +零拷贝(2 次拷贝,2 次上下文切换): +1. 磁盘 → 内核缓冲区 (DMA) +2. 内核缓冲区 → 网卡 (DMA) +``` + +#### **transferTo 实现** + +```java +// 文件传输零拷贝 +FileChannel sourceChannel = FileChannel.open(Paths.get("source.txt")); +FileChannel destChannel = FileChannel.open(Paths.get("dest.txt"), + StandardOpenOption.CREATE, StandardOpenOption.WRITE); + +// 直接在内核空间传输 +long position = 0; +long count = sourceChannel.size(); +sourceChannel.transferTo(position, count, destChannel); +``` + +#### **MappedByteBuffer(内存映射)** + +```java +// 内存映射文件 +FileChannel channel = FileChannel.open(Paths.get("data.txt"), + StandardOpenOption.READ, StandardOpenOption.WRITE); + +// 映射到内存 +MappedByteBuffer mappedBuffer = channel.map( + FileChannel.MapMode.READ_WRITE, // 读写模式 + 0, // 起始位置 + channel.size() // 映射大小 +); + +// 直接操作内存(零拷贝) +mappedBuffer.putInt(0, 123); +int value = mappedBuffer.getInt(0); +``` + +--- + +## P7 加分项 + +### 深度理解 +- **多路复用原理**:理解 select、poll、epoll 的区别 +- **零拷贝原理**:理解 DMA、用户空间、内核空间 +- **内存管理**:堆内存 vs 直接内存,GC 影响 + +### 实战经验 +- **高并发场景**:Netty、Mina、Vert.x 框架使用 +- **文件处理**:大文件读写、内存映射文件 +- **网络编程**:自定义协议、粘包处理 + +### 性能优化 +- **Buffer 复用**:使用对象池减少 GC +- **直接内存**:减少一次拷贝,但分配/释放成本高 +- **批量操作**:vectorized I/O(scatter/gather) + +### 常见问题 +1. **Buffer 泄漏**:直接内存未释放 +2. **select 空转**:CPU 100% 问题 +3. **epoll 空轮询**:Linux kernel bug +4. **文件描述符耗尽**:未关闭 Channel + +--- + +## 总结 + +Java NIO 的核心优势: +1. **高性能**:非阻塞 I/O、零拷贝 +2. **高并发**:单线程处理多连接 +3. **灵活性**:可配置阻塞/非阻塞 +4. **扩展性**:适合大规模分布式系统 + +**最佳实践**: +- 高并发场景优先使用 NIO(Netty) +- 大文件传输使用 transferTo +- 理解 Buffer 的读写模式切换 +- 注意资源释放(Channel、Buffer) +- 监控文件描述符使用 diff --git a/10-中间件/Netty实战场景.md b/10-中间件/Netty实战场景.md new file mode 100644 index 0000000..5cde8e2 --- /dev/null +++ b/10-中间件/Netty实战场景.md @@ -0,0 +1,803 @@ +# Netty 实战场景与最佳实践 + +## 问题 + +1. 如何设计一个高性能的 RPC 框架? +2. 如何实现 WebSocket 服务器? +3. 如何实现百万连接的 IM 系统? +4. 如何处理 Netty 的内存泄漏? +5. 如何实现优雅关闭? +6. 如何设计心跳和重连机制? +7. 如何实现分布式 Session 共享? + +--- + +## 标准答案 + +### 1. 高性能 RPC 框架设计 + +#### **架构设计** + +``` +┌──────────────────────────────────────────┐ +│ RPC 框架架构 │ +├──────────────────────────────────────────┤ +│ Consumer Side │ +│ ┌──────────┐ ┌──────────┐ │ +│ │ Proxy │ │ Serializer│ │ +│ │ (动态代理)│ │ (Protobuf) │ │ +│ └────┬─────┘ └──────────┘ │ +│ │ │ +│ ▼ │ +│ ┌─────────────────────────────────┐ │ +│ │ Netty Client │ │ +│ │ - Reconnect │ │ +│ │ - Timeout │ │ +│ │ - Heartbeat │ │ +│ └──────────────┬──────────────────┘ │ +│ │ │ +│ │ TCP │ +│ │ │ +│ ┌──────────────▼──────────────────┐ │ +│ │ Netty Server │ │ +│ │ - Boss/Worker Groups │ │ +│ │ - Pipeline │ │ +│ │ - Handler Chain │ │ +│ └──────────────┬──────────────────┘ │ +│ │ │ +│ ┌─────────┴─────────┐ │ +│ ▼ ▼ │ +│ ┌─────────┐ ┌──────────┐ │ +│ │Service │ │ Registry │ │ +│ │(实现) │ │ (服务发现)│ │ +│ └─────────┘ └──────────┘ │ +└──────────────────────────────────────────┘ +``` + +#### **协议设计** + +```java +// RPC 协议格式 +┌───────────┬───────────┬──────────┬──────────┐ +│ Magic │ Length │ Type │ Body │ +│ Number │ (4 bytes) │ (1 byte) │ │ +│ (4 bytes) │ │ │ │ +└───────────┴───────────┴──────────┴──────────┘ + +// 实现 +public class RpcProtocol { + + private static final int MAGIC_NUMBER = 0xCAFEBABE; + private static final int HEADER_LENGTH = 9; + + public static ByteBuf encode(byte[] body, byte type) { + ByteBuf buffer = Unpooled.buffer(HEADER_LENGTH + body.length); + + // Magic Number (4 bytes) + buffer.writeInt(MAGIC_NUMBER); + + // Length (4 bytes) + buffer.writeInt(body.length); + + // Type (1 byte) + buffer.writeByte(type); + + // Body + buffer.writeBytes(body); + + return buffer; + } + + public static byte[] decode(ByteBuf buffer) { + // 检查 Magic Number + if (buffer.readInt() != MAGIC_NUMBER) { + throw new IllegalArgumentException("Invalid magic number"); + } + + // 读取 Length + int length = buffer.readInt(); + + // 读取 Type + byte type = buffer.readByte(); + + // 读取 Body + byte[] body = new byte[length]; + buffer.readBytes(body); + + return body; + } +} +``` + +#### **服务端实现** + +```java +@Component +public class RpcServer { + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + private Channel serverChannel; + + public void start(int port) throws InterruptedException { + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 1024) + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 编解码器 + pipeline.addLast("frameDecoder", + new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0)); + pipeline.addLast("frameEncoder", + new LengthFieldPrepender(4)); + + // 协议编解码 + pipeline.addLast("decoder", new RpcDecoder()); + pipeline.addLast("encoder", new RpcEncoder()); + + // 心跳检测 + pipeline.addLast("idleStateHandler", + new IdleStateHandler(60, 0, 0)); + pipeline.addLast("heartbeatHandler", new HeartbeatHandler()); + + // 业务处理器 + pipeline.addLast("handler", new RpcServerHandler()); + } + }); + + // 绑定端口并启动 + ChannelFuture future = bootstrap.bind(port).sync(); + serverChannel = future.channel(); + + System.out.println("RPC Server started on port " + port); + + // 等待服务器 socket 关闭 + serverChannel.closeFuture().sync(); + + } finally { + shutdown(); + } + } + + public void shutdown() { + if (serverChannel != null) { + serverChannel.close(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void destroy() { + shutdown(); + } +} +``` + +#### **客户端实现** + +```java +@Component +public class RpcClient { + + private EventLoopGroup workerGroup; + private Channel channel; + + public void connect(String host, int port) throws InterruptedException { + workerGroup = new NioEventLoopGroup(); + + try { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(workerGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + .option(ChannelOption.SO_KEEPALIVE, true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 编解码器 + pipeline.addLast("frameDecoder", + new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0)); + pipeline.addLast("frameEncoder", + new LengthFieldPrepender(4)); + + // 协议编解码 + pipeline.addLast("decoder", new RpcDecoder()); + pipeline.addLast("encoder", new RpcEncoder()); + + // 心跳检测 + pipeline.addLast("idleStateHandler", + new IdleStateHandler(0, 30, 0)); + pipeline.addLast("heartbeatHandler", new HeartbeatHandler()); + + // 业务处理器 + pipeline.addLast("handler", new RpcClientHandler()); + } + }); + + // 连接服务器 + ChannelFuture future = bootstrap.connect(host, port).sync(); + channel = future.channel(); + + System.out.println("RPC Client connected to " + host + ":" + port); + + // 等待连接关闭 + channel.closeFuture().sync(); + + } finally { + shutdown(); + } + } + + public void shutdown() { + if (channel != null) { + channel.close(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + } + + public T sendRequest(RpcRequest request, Class returnType) { + // 发送请求并等待响应 + RpcClientHandler handler = channel.pipeline() + .get(RpcClientHandler.class); + + return handler.sendRequest(request, returnType); + } +} +``` + +--- + +### 2. WebSocket 服务器实现 + +#### **WebSocket 握手机制** + +``` +Client → Server: HTTP GET (Upgrade: websocket) +Server → Client: HTTP 101 (Switching Protocols) +Client → Server: WebSocket Frame +Server → Client: WebSocket Frame +``` + +#### **服务器实现** + +```java +@Component +public class WebSocketServer { + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + public void start(int port) throws InterruptedException { + bossGroup = new NioEventLoopGroup(1); + workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // HTTP 编解码器 + pipeline.addLast("httpCodec", new HttpServerCodec()); + pipeline.addLast("httpAggregator", + new HttpObjectAggregator(65536)); + + // WebSocket 握手处理器 + pipeline.addLast("handshakeHandler", + new WebSocketServerProtocolHandler("/ws")); + + // 自定义 WebSocket 处理器 + pipeline.addLast("handler", new WebSocketHandler()); + } + }); + + ChannelFuture future = bootstrap.bind(port).sync(); + System.out.println("WebSocket Server started on port " + port); + + future.channel().closeFuture().sync(); + + } finally { + shutdown(); + } + } + + public void shutdown() { + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + } +} +``` + +#### **WebSocket Handler** + +```java +@ChannelHandler.Sharable +public class WebSocketHandler extends SimpleChannelInboundHandler { + + // 存储所有连接的 Channel + private static final ChannelGroup channels = + new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + channels.add(ctx.channel()); + System.out.println("Client connected: " + ctx.channel().id()); + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + channels.remove(ctx.channel()); + System.out.println("Client disconnected: " + ctx.channel().id()); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, + TextWebSocketFrame msg) { + // 接收消息 + String message = msg.text(); + System.out.println("Received: " + message); + + // 广播消息给所有客户端 + channels.writeAndFlush(new TextWebSocketFrame( + "Server: " + message + )); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} +``` + +--- + +### 3. 百万连接 IM 系统 + +#### **架构优化** + +```java +@Configuration +public class ImServerConfig { + + @Bean + public EventLoopGroup bossGroup() { + // 1 个线程处理 Accept + return new NioEventLoopGroup(1); + } + + @Bean + public EventLoopGroup workerGroup() { + // 根据 CPU 核心数设置 I/O 线程数 + // 通常设置为 CPU 核心数 * 2 + int threads = Runtime.getRuntime().availableProcessors() * 2; + return new NioEventLoopGroup(threads); + } + + @Bean + public ServerBootstrap serverBootstrap( + EventLoopGroup bossGroup, + EventLoopGroup workerGroup) { + + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 8192) // 增大连接队列 + .option(ChannelOption.SO_REUSEADDR, true) + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .childOption(ChannelOption.SO_SNDBUF, 32 * 1024) // 32KB 发送缓冲 + .childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 32KB 接收缓冲 + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ChannelPipeline pipeline = ch.pipeline(); + + // 连接数限制 + pipeline.addLast("connectionLimiter", + new ConnectionLimiter(1000000)); + + // 流量整形 + pipeline.addLast("trafficShaping", + new ChannelTrafficShapingHandler( + 1024 * 1024, // 读限制 1MB/s + 1024 * 1024 // 写限制 1MB/s + )); + + // 协议编解码 + pipeline.addLast("frameDecoder", + new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 0)); + pipeline.addLast("frameEncoder", + new LengthFieldPrepender(4)); + + // IM 协议处理器 + pipeline.addLast("imHandler", new ImServerHandler()); + } + }); + + return bootstrap; + } +} +``` + +#### **连接限流** + +```java +public class ConnectionLimiter extends ChannelInboundHandlerAdapter { + + private final int maxConnections; + private final AtomicInteger activeConnections = new AtomicInteger(0); + + public ConnectionLimiter(int maxConnections) { + this.maxConnections = maxConnections; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (activeConnections.get() >= maxConnections) { + // 连接数超限,关闭新连接 + ctx.writeAndFlush(new TextWebSocketFrame( + "Server is full, please try again later" + )).addListener(ChannelFutureListener.CLOSE); + return; + } + + int count = activeConnections.incrementAndGet(); + System.out.println("Active connections: " + count); + + ctx.fireChannelRead(msg); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + int count = activeConnections.decrementAndGet(); + System.out.println("Active connections: " + count); + + ctx.fireChannelInactive(); + } +} +``` + +--- + +### 4. 内存泄漏排查与解决 + +#### **常见泄漏场景** + +```java +// ❌ 错误示例 1: ByteBuf 未释放 +public void handler(ChannelHandlerContext ctx, ByteBuf buf) { + byte[] data = new byte[buf.readableBytes()]; + buf.readBytes(data); + // 忘记释放 buf +} + +// ✅ 正确做法 +public void handler(ChannelHandlerContext ctx, ByteBuf buf) { + try { + byte[] data = new byte[buf.readableBytes()]; + buf.readBytes(data); + // 处理数据... + } finally { + ReferenceCountUtil.release(buf); // 释放 ByteBuf + } +} + +// ❌ 错误示例 2: Handler 未移除 +public void channelActive(ChannelHandlerContext ctx) { + // 添加长生命周期的 Handler + ctx.pipeline().addLast("longLived", new LongLivedHandler()); +} + +// ✅ 正确做法 +public void channelActive(ChannelHandlerContext ctx) { + // 使用 @Sharable 注解 + ctx.pipeline().addLast("longLived", SharableHandler.INSTANCE); +} + +// ❌ 错误示例 3: Channel 引用未释放 +public class ChannelHolder { + private static final Map CHANNELS = new ConcurrentHashMap<>(); + + public static void addChannel(String id, Channel channel) { + CHANNELS.put(id, channel); // 永久持有引用 + } +} + +// ✅ 正确做法 +public class ChannelHolder { + private static final Map CHANNELS = new ConcurrentHashMap<>(); + + public static void addChannel(String id, Channel channel) { + CHANNELS.put(id, channel); + + // 监听关闭事件,自动移除 + channel.closeFuture().addListener((ChannelFutureListener) future -> { + CHANNELS.remove(id); + }); + } +} +``` + +#### **内存泄漏检测** + +```java +// 启用内存泄漏检测 +// JVM 参数: -Dio.netty.leakDetection.level=paranoid + +public class LeakDetection { + public static void main(String[] args) { + // 检测级别 + // DISABLED - 禁用 + // SIMPLE - 采样检测(1%) + // ADVANCED - 采样检测,记录创建堆栈 + // PARANOID - 全量检测 + + // 创建资源时使用 + ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024); + + // 使用资源泄漏检测器访问器 + ResourceLeakDetector detector = + new ResourceLeakDetector(ByteBuf.class, + ResourceLeakDetector.Level.PARANOID); + + // 包装对象 + ByteBuf wrappedBuffer = detector.wrap(buffer, "test-buffer"); + + // 使用完成后 + wrappedBuffer.release(); // 如果未释放,会记录堆栈并警告 + } +} +``` + +--- + +### 5. 优雅关闭实现 + +```java +@Component +public class NettyGracefulShutdown implements SmartLifecycle { + + @Autowired + private ServerBootstrap serverBootstrap; + + private Channel serverChannel; + private volatile boolean running = false; + + @Override + public void start() { + running = true; + // 启动服务器... + } + + @Override + public void stop() { + if (!running) { + return; + } + + System.out.println("Shutting down Netty server gracefully..."); + + // 1. 停止接受新连接 + if (serverChannel != null) { + serverChannel.close().syncUninterruptibly(); + } + + // 2. 等待现有请求处理完成(超时 30 秒) + EventLoopGroup workerGroup = serverBootstrap.config().group(); + Future shutdownFuture = workerGroup.shutdownGracefully(2, 30, TimeUnit.SECONDS); + + try { + // 等待优雅关闭完成 + shutdownFuture.await(60, TimeUnit.SECONDS); + + if (shutdownFuture.isSuccess()) { + System.out.println("Netty server shutdown successfully"); + } else { + System.err.println("Netty server shutdown timeout, force close"); + workerGroup.shutdownNow(); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + workerGroup.shutdownNow(); + } + + // 3. 关闭 Boss Group + EventLoopGroup bossGroup = serverBootstrap.config().childGroup(); + bossGroup.shutdownGracefully(); + + running = false; + } + + @Override + public boolean isRunning() { + return running; + } +} +``` + +--- + +### 6. 心跳与重连机制 + +#### **心跳处理器** + +```java +@ChannelHandler.Sharable +public class HeartbeatHandler extends ChannelInboundHandlerAdapter { + + private static final ByteBuf HEARTBEAT_BEAT = + Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)); + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + + if (event.state() == IdleState.READER_IDLE) { + // 读空闲,可能对方已挂掉 + System.out.println("Read idle, closing connection"); + ctx.close(); + + } else if (event.state() == IdleState.WRITER_IDLE) { + // 写空闲,发送心跳 + System.out.println("Write idle, sending heartbeat"); + ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate()); + + } else if (event.state() == IdleState.ALL_IDLE) { + // 读写空闲 + System.out.println("All idle, sending heartbeat"); + ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate()); + } + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + // 处理心跳响应 + if ("HEARTBEAT".equals(msg)) { + // 心跳包,不处理 + return; + } + + // 业务消息 + ctx.fireChannelRead(msg); + } +} +``` + +#### **重连机制** + +```java +@Component +public class ReconnectClient { + + private EventLoopGroup workerGroup; + private Channel channel; + private ScheduledExecutorService scheduler; + + public void connect(String host, int port) { + workerGroup = new NioEventLoopGroup(); + scheduler = Executors.newSingleThreadScheduledExecutor(); + + doConnect(host, port); + } + + private void doConnect(String host, int port) { + Bootstrap bootstrap = new Bootstrap(); + bootstrap.group(workerGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + ch.pipeline() + .addLast("idleStateHandler", + new IdleStateHandler(0, 10, 0)) + .addLast("heartbeatHandler", new HeartbeatHandler()) + .addLast("handler", new ClientHandler()); + } + }); + + // 连接服务器 + ChannelFuture future = bootstrap.connect(host, port); + + future.addListener((ChannelFutureListener) f -> { + if (f.isSuccess()) { + System.out.println("Connected to server"); + channel = f.channel(); + } else { + System.out.println("Connection failed, reconnect in 5 seconds..."); + + // 5 秒后重连 + scheduler.schedule(() -> doConnect(host, port), 5, TimeUnit.SECONDS); + } + }); + } + + public void shutdown() { + if (channel != null) { + channel.close(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + if (scheduler != null) { + scheduler.shutdown(); + } + } +} +``` + +--- + +## P7 加分项 + +### 深度理解 +- **Reactor 模式**:理解主从 Reactor 多线程模型 +- **零拷贝原理**:理解用户空间和内核空间 +- **内存管理**:ByteBuf 的池化、引用计数、泄漏检测 + +### 实战经验 +- **高并发调优**:EventLoopGroup 线程数、TCP 参数调优 +- **监控告警**:连接数监控、流量监控、延迟监控 +- **故障排查**:内存泄漏、CPU 100%、连接泄漏 + +### 架构设计 +- **协议设计**:自定义协议、编解码器设计 +- **集群部署**:负载均衡、服务发现 +- **容错机制**:重连、熔断、限流 + +### 性能优化 +1. **减少内存拷贝**:使用 CompositeByteBuf +2. **复用对象**:使用 ByteBuf 对象池 +3. **优化 TCP 参数**:TCP_NODELAY、SO_KEEPALIVE +4. **流量控制**:ChannelTrafficShapingHandler + +--- + +## 总结 + +Netty 实战核心要点: +1. **高性能**:Reactor 模型、零拷贝、内存池 +2. **高可靠**:心跳、重连、优雅关闭 +3. **可扩展**:Pipeline 机制、自定义协议 +4. **可监控**:连接数、流量、延迟监控 + +**最佳实践**: +- 合理设置 EventLoopGroup 线程数 +- 使用池化的 ByteBuf,及时释放 +- 添加心跳和重连机制 +- 实现优雅关闭 +- 做好监控和告警 +- 定期排查内存泄漏 diff --git a/10-中间件/Netty核心原理.md b/10-中间件/Netty核心原理.md new file mode 100644 index 0000000..e0f78c4 --- /dev/null +++ b/10-中间件/Netty核心原理.md @@ -0,0 +1,539 @@ +# Netty 核心原理 + +## 问题 + +1. Netty 的核心组件有哪些? +2. 什么是 EventLoop?线程模型是怎样的? +3. 什么是 ByteBuf?与 Java NIO 的 ByteBuffer 有什么区别? +4. Netty 的零拷贝是如何实现的? +5. 什么是 ChannelPipeline?ChannelHandler 是如何工作的? +6. Netty 如何解决 TCP 粘包/拆包问题? +7. Netty 的心跳机制是如何实现的? + +--- + +## 标准答案 + +### 1. Netty 核心组件 + +#### **核心组件架构** + +``` +┌─────────────────────────────────────────────┐ +│ Netty 架构 │ +├─────────────────────────────────────────────┤ +│ Channel │ +│ ├─ EventLoop (线程模型) │ +│ ├─ ChannelPipeline (处理器链) │ +│ │ └─ ChannelHandler (业务处理器) │ +│ └─ ByteBuf (内存管理) │ +├─────────────────────────────────────────────┤ +│ Bootstrap (启动类) │ +│ ├─ ServerBootstrap (服务端) │ +│ └─ Bootstrap (客户端) │ +└─────────────────────────────────────────────┘ +``` + +#### **关键组件说明** + +**1. Channel** +```java +// Channel 是 Netty 的网络操作抽象类 +Channel channel = ...; + +// 核心方法 +channel.writeAndFlush(msg); // 写数据并刷新 +channel.close(); // 关闭连接 +channel.eventLoop(); // 获取 EventLoop +``` + +**2. EventLoop** +```java +// EventLoop 负责处理 I/O 操作 +EventLoopGroup bossGroup = new NioEventLoopGroup(1); // Accept +EventLoopGroup workerGroup = new NioEventLoopGroup(); // I/O + +// bossGroup 只负责监听并接受连接 +// workerGroup 负责 I/O 读写 +``` + +**3. ChannelPipeline** +```java +// Pipeline 是处理器链 +ChannelPipeline pipeline = channel.pipeline(); + +// 添加处理器 +pipeline.addLast("decoder", new Decoder()); +pipeline.addLast("encoder", new Encoder()); +pipeline.addLast("handler", new BusinessHandler()); +``` + +**4. ByteBuf** +```java +// ByteBuf 是 Netty 的字节容器 +ByteBuf buffer = Unpooled.buffer(1024); + +// 写入数据 +buffer.writeInt(123); +buffer.writeBytes("Hello".getBytes()); + +// 读取数据 +int value = buffer.readInt(); +``` + +--- + +### 2. EventLoop 与线程模型 + +#### **Reactor 线程模型** + +Netty 采用 **Reactor 模式**,支持三种实现: + +``` +1. Reactor 单线程模型 + ┌──────────────┐ + │ Reactor │ + │ (单线程) │ + │ │ + │ - Accept │ + │ - Read │ + │ - Decode │ + │ - Process │ + │ - Encode │ + │ - Send │ + └──────────────┘ + +2. Reactor 多线程模型 + ┌──────────┐ ┌──────────────────┐ + │ Reactor │────>│ Worker Threads │ + │ (主线程) │ │ (线程池) │ + │ │ │ │ + │ - Accept │ │ - Read/Write │ + │ │ │ - Decode/Encode │ + │ │ │ - Process │ + └──────────┘ └──────────────────┘ + +3. 主从 Reactor 多线程模型(Netty 默认) + ┌──────────┐ ┌──────────────────┐ + │ Main │ │ Sub Reactors │ + │ Reactor │────>│ (线程池) │ + │ │ │ │ + │ - Accept │ │ - Read/Write │ + │ │ │ - Decode/Encode │ + │ │ │ - Process │ + └──────────┘ └──────────────────┘ +``` + +#### **EventLoop 工作原理** + +```java +EventLoopGroup bossGroup = new NioEventLoopGroup(1); +EventLoopGroup workerGroup = new NioEventLoopGroup(); + +// 每个 EventLoop: +// 1. 维护一个 Selector +// 2. 维护一个任务队列 +// 3. 维护一个线程 + +// 工作流程: +// - 线程启动后,无限循环执行: +// 1. select() - 查询就绪的 Channel +// 2. processSelectedKeys() - 处理 I/O 事件 +// 3. runAllTasks() - 执行任务队列中的任务 +``` + +#### **任务调度** + +```java +Channel channel = ...; + +// 在 EventLoop 线程中执行任务 +channel.eventLoop().execute(() -> { + System.out.println("Task in EventLoop thread"); +}); + +// 定时任务 +channel.eventLoop().schedule(() -> { + System.out.println("Delayed task"); +}, 1, TimeUnit.SECONDS); +``` + +--- + +### 3. ByteBuf vs ByteBuffer + +#### **对比表** + +| 特性 | Java NIO ByteBuffer | Netty ByteBuf | +|------|---------------------|---------------| +| **长度** | 固定长度,扩容复杂 | 动态扩容 | +| **读写模式** | flip() 切换 | 独立的读写索引 | +| **引用计数** | 不支持 | 支持(池化) | +| **零拷贝** | 不支持 | 支持(Composite) | +| **性能** | 较低 | 高(对象池优化) | + +#### **ByteBuf 三个重要指针** + +``` +ByteBuf 结构: +┌───────┬──────────┬──────────┬─────────┐ +│ 0 │ readerIndex│ writerIndex│ capacity│ +│ │ │ │ │ +│ discard│ readable│ writable │ │ +│ bytes │ bytes │ bytes │ │ +└───────┴──────────┴──────────┴─────────┘ + +操作: +- mark() / reset(): 标记和重置 +- readerIndex(): 读指针 +- writerIndex(): 写指针 +- capacity(): 容量 +- maxCapacity(): 最大容量 +``` + +#### **使用示例** + +```java +// 创建 ByteBuf +ByteBuf buffer = Unpooled.buffer(1024); + +// 写入数据 +buffer.writeBytes("Hello".getBytes()); +buffer.writeInt(123); + +// 读取数据(不移动读指针) +byte[] data = new byte[5]; +buffer.getBytes(buffer.readerIndex(), data); + +// 读取数据(移动读指针) +buffer.readBytes(5); + +// 引用计数(池化) +buffer.retain(); // 引用计数 +1 +buffer.release(); // 引用计数 -1 +``` + +--- + +### 4. 零拷贝实现 + +#### **零拷贝的三种实现** + +**1. CompositeByteBuf(组合缓冲区)** + +```java +// 将多个 ByteBuf 组合成一个逻辑上的 ByteBuf +ByteBuf header = Unpooled.buffer(10); +ByteBuf body = Unpooled.buffer(100); + +// 零拷贝组合 +CompositeByteBuf message = Unpooled.wrappedBuffer(header, body); + +// 物理上不复制数据,只是逻辑组合 +``` + +**2. wrappedBuffer(包装)** + +```java +// 包装字节数组,不复制 +byte[] bytes = "Hello Netty".getBytes(); +ByteBuf buffer = Unpooled.wrappedBuffer(bytes); + +// 底层数据共享,无拷贝 +``` + +**3. slice(切片)** + +```java +ByteBuf buffer = Unpooled.buffer(100); + +// 切片,共享底层内存 +ByteBuf sliced = buffer.slice(0, 50); + +// 修改会互相影响 +sliced.setByte(0, (byte) 1); +``` + +**4. FileChannel.transferTo(文件传输)** + +```java +// 文件传输零拷贝 +FileChannel sourceChannel = ...; +FileChannel destChannel = ...; + +// 直接在内核空间传输,不经过用户空间 +sourceChannel.transferTo(position, count, destChannel); +``` + +--- + +### 5. ChannelPipeline 与 ChannelHandler + +#### **Pipeline 工作流程** + +``` +入站事件 (Inbound): + │ + ▼ +┌─────────────────────────────────────┐ +│ ChannelPipeline │ +│ │ +│ ┌─────────┐ ┌─────────┐ ┌──────┐│ +│ │Decoder1 │→ │Decoder2 │→ │Handler││ +│ └─────────┘ └─────────┘ └──────┘│ +│ ▲ ▲ ▲ │ +│ └────────────┴─────────────┘ │ +│ (数据流) │ +└─────────────────────────────────────┘ + +出站事件 (Outbound): + │ + ▼ +┌─────────────────────────────────────┐ +│ ChannelPipeline │ +│ │ +│ ┌──────┐ ┌─────────┐ ┌─────────┐│ +│ │Handler│→ │Encoder1 │→ │Encoder2 ││ +│ └──────┘ └─────────┘ └─────────┘│ +│ ▲ ▲ ▲ │ +│ └────────────┴─────────────┘ │ +│ (数据流) │ +└─────────────────────────────────────┘ +``` + +#### **ChannelHandler 接口** + +```java +// 入站处理器 +@ChannelHandler.Sharable +public class InboundHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + // 处理读事件 + System.out.println("Received: " + msg); + ctx.fireChannelRead(msg); // 传递给下一个 Handler + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + // 连接建立 + System.out.println("Client connected"); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + // 连接断开 + System.out.println("Client disconnected"); + } +} + +// 出站处理器 +public class OutboundHandler extends ChannelOutboundHandlerAdapter { + + @Override + public void write(ChannelHandlerContext ctx, Object msg, + ChannelPromise promise) { + // 处理写事件 + System.out.println("Sending: " + msg); + ctx.write(msg, promise); + } +} +``` + +#### **Handler 生命周期** + +```java +public interface ChannelHandler { + + // 添加到 Pipeline + void handlerAdded(ChannelHandlerContext ctx); + + // 从 Pipeline 移除 + void handlerRemoved(ChannelHandlerContext ctx); + + // 发生异常 + void exceptionCaught(ChannelHandlerContext ctx, Throwable cause); +} +``` + +--- + +### 6. TCP 粘包/拆包解决方案 + +#### **问题原因** + +``` +TCP 粘包: +发送方: [包1] [包2] [包3] +接收方: [包1+包2+包3] // 粘在一起 + +TCP 拆包: +发送方: [大包] +接收方: [片段1] [片段2] // 被拆分 +``` + +#### **Netty 提供的解码器** + +**1. FixedLengthFrameDecoder(固定长度)** + +```java +// 每个消息固定 10 字节 +pipeline.addLast("framer", new FixedLengthFrameDecoder(10)); +``` + +**2. LineBasedFrameDecoder(分隔符)** + +```java +// 按换行符分割 +pipeline.addLast("framer", new LineBasedFrameDecoder(8192)); +``` + +**3. DelimiterBasedFrameDecoder(自定义分隔符)** + +```java +// 自定义分隔符 +ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes()); +pipeline.addLast("framer", + new DelimiterBasedFrameDecoder(8192, delimiter)); +``` + +**4. LengthFieldBasedFrameDecoder(长度字段)** + +```java +// 消息格式: [长度(4字节)] [数据] +pipeline.addLast("framer", + new LengthFieldBasedFrameDecoder( + 8192, // maxFrameLength + 0, // lengthFieldOffset + 4, // lengthFieldLength + 0, // lengthAdjustment + 0 // initialBytesToStrip + )); +``` + +**5. 自定义解码器** + +```java +public class CustomDecoder extends ByteToMessageDecoder { + + @Override + protected void decode(ChannelHandlerContext ctx, + ByteBuf in, List out) { + // 检查是否有足够数据 + if (in.readableBytes() < 4) { + return; + } + + // 标记读指针 + in.markReaderIndex(); + + // 读取长度 + int length = in.readInt(); + + // 检查数据是否完整 + if (in.readableBytes() < length) { + in.resetReaderIndex(); + return; + } + + // 读取数据 + byte[] data = new byte[length]; + in.readBytes(data); + + out.add(data); + } +} +``` + +--- + +### 7. 心跳机制 + +#### **IdleStateHandler 实现** + +```java +// 读写空闲检测 +pipeline.addLast("idleStateHandler", + new IdleStateHandler( + 30, // 读空闲时间(秒) + 0, // 写空闲时间 + 0 // 读写空闲时间 + )); + +// 心跳处理器 +pipeline.addLast("heartbeatHandler", new HeartbeatHandler()); +``` + +#### **心跳处理器** + +```java +public class HeartbeatHandler extends ChannelInboundHandlerAdapter { + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + + if (event.state() == IdleState.READER_IDLE) { + // 读空闲,关闭连接 + System.out.println("Read idle, close connection"); + ctx.close(); + } + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + // 处理心跳包 + if ("PING".equals(msg)) { + ctx.writeAndFlush("PONG"); + } else { + ctx.fireChannelRead(msg); + } + } +} +``` + +--- + +## P7 加分项 + +### 深度理解 +- **Reactor 模式**:理解单线程、多线程、主从多线程三种模型 +- **零拷贝原理**:理解用户空间和内核空间的数据拷贝 +- **内存管理**:ByteBuf 的池化、引用计数、内存泄漏 + +### 实战经验 +- **高并发场景**:EventLoopGroup 线程数调优 +- **内存调优**:ByteBuf 分配器选择(堆内存 vs 直接内存) +- **性能优化**:避免 Handler 链过长、减少对象创建 + +### 架构设计 +- **协议设计**:自定义协议、编解码器设计 +- **异常处理**:优雅关闭、重连机制 +- **监控告警**:流量监控、连接数监控、延迟监控 + +### 常见问题 +1. **内存泄漏**:ByteBuf 未释放 +2. **线程安全**:共享状态的同步 +3. **连接池**:Channel 复用与管理 +4. **背压处理**:流量控制与慢消费 + +--- + +## 总结 + +Netty 的核心优势: +1. **高性能**:零拷贝、内存池、Reactor 模式 +2. **高可靠性**:心跳机制、重连机制、优雅关闭 +3. **易扩展**:Pipeline 机制、丰富的 Handler +4. **成熟稳定**:广泛应用在 Dubbo、gRPC、WebSocket + +**最佳实践**: +- 合理设置 EventLoopGroup 线程数 +- 使用池化的 ByteBuf +- 及时释放 ByteBuf,避免内存泄漏 +- 使用 LengthFieldBasedFrameDecoder 处理粘包 +- 添加心跳和重连机制 +- 做好监控和日志