Compare commits

...

2 Commits

Author SHA1 Message Date
7aa971f511 feat: add Netty and Java NIO interview questions
Added 3 comprehensive interview documents covering Netty and Java NIO:

**1. Netty Core Principles (Netty核心原理.md)**
- Core components: Channel, EventLoop, ChannelPipeline, ByteBuf
- Reactor threading model (single/multi-threaded, master-slave)
- ByteBuf vs Java NIO ByteBuffer
- Zero-copy implementation (4 approaches)
- ChannelPipeline and ChannelHandler
- TCP sticky/unpacking problem solutions
- Heartbeat mechanism

**2. Java NIO Core Principles (Java NIO核心原理.md)**
- NIO vs BIO comparison
- Three core components: Channel, Buffer, Selector
- Selector multiplexing mechanism
- Channel vs Stream differences
- Buffer core attributes and usage
- Non-blocking I/O implementation
- Zero-copy with transferTo and MappedByteBuffer

**3. Netty Practice Scenarios (Netty实战场景.md)**
- High-performance RPC framework design
- WebSocket server implementation
- Million-connection IM system architecture
- Memory leak detection and resolution
- Graceful shutdown implementation
- Heartbeat and reconnection mechanisms

Each document includes:
- Detailed problem descriptions
- Complete code examples (Java)
- Architecture diagrams
- Best practices
- Performance optimization tips
- P7-level bonus points

Total: 3 documents, covering:
- Theoretical foundations
- Practical implementations
- Production scenarios
- Performance tuning
- Common pitfalls

Suitable for backend P7 interview preparation.

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>
2026-03-06 10:20:11 +08:00
b773a4fa83 vault backup: 2026-03-06 10:18:18 2026-03-06 10:18:18 +08:00
4 changed files with 1875 additions and 6 deletions

View File

@@ -194,8 +194,12 @@
"obsidian-git:Open Git source control": false
}
},
"active": "fcbc762a80282002",
"active": "16a7ce8de420dd10",
"lastOpenFiles": [
"10-中间件/Netty实战场景.md",
"10-中间件/Java NIO核心原理.md",
"10-中间件/Netty核心原理.md",
"10-中间件",
"16-LeetCode Hot 100/从前序与中序遍历序列构造二叉树.md",
"16-LeetCode Hot 100/路径总和.md",
"16-LeetCode Hot 100/对称二叉树.md",
@@ -221,16 +225,12 @@
"16-LeetCode Hot 100",
"00-项目概述/项目概述.md",
"00-项目概述",
"questions/04-消息队列/消息队列_RocketMQ_Kafka.md",
"questions/05-并发编程/ConcurrentHashMap原理.md",
"questions/02-数据库/MySQL主从延迟.md",
"questions/15-简历面试",
"questions/14-Web3与区块链",
"12-面试技巧",
"08-算法与数据结构",
"questions/13-Golang语言",
"questions/12-面试技巧",
"questions/11-运维",
"questions/10-中间件"
"questions/11-运维"
]
}

View File

@@ -0,0 +1,527 @@
# Java NIO 核心原理
## 问题
1. 什么是 NIO与 BIO 的区别是什么?
2. NIO 的三大核心组件是什么?
3. 什么是 Selector如何实现多路复用
4. 什么是 Channel与 Stream 的区别?
5. 什么是 Buffer如何理解 Buffer 的核心属性?
6. NIO 如何实现非阻塞 I/O
7. 什么是零拷贝?如何实现?
---
## 标准答案
### 1. NIO vs BIO
#### **对比表**
| 特性 | BIO (Blocking I/O) | NIO (Non-blocking I/O) |
|------|-------------------|----------------------|
| **I/O 模型** | 阻塞 | 非阻塞 |
| **线程模型** | 每连接一线程 | Reactor 模式 |
| **并发能力** | 低 | 高 |
| **编程复杂度** | 简单 | 复杂 |
| **数据操作** | Stream | Channel + Buffer |
| **适用场景** | 连接数少、高延迟 | 连接数多、高并发 |
#### **代码对比**
**BIO 实现**
```java
// 传统 BIO - 阻塞式
ServerSocket serverSocket = new ServerSocket(8080);
while (true) {
// 阻塞等待连接
Socket socket = serverSocket.accept();
// 每个连接一个线程
new Thread(() -> {
try {
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
// 阻塞读取数据
String line = reader.readLine();
// 处理数据...
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
```
**NIO 实现**
```java
// NIO - 非阻塞式
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 非阻塞等待事件
int readyCount = selector.select();
Set<SelectionKey> readyKeys = selector.selectedKeys();
for (SelectionKey key : readyKeys) {
if (key.isAcceptable()) {
// 处理连接
}
if (key.isReadable()) {
// 处理读
}
}
}
```
---
### 2. NIO 三大核心组件
```
┌────────────────────────────────────────┐
│ Java NIO 架构 │
├────────────────────────────────────────┤
│ │
│ ┌─────────┐ ┌─────────┐ │
│ │Channel │◄──►│ Buffer │ │
│ └────┬────┘ └─────────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │Selector │ │
│ │(多路复用)│ │
│ └─────────┘ │
│ │
└────────────────────────────────────────┘
Channel: 数据通道(双向)
Buffer: 数据容器
Selector: 多路复用器
```
#### **核心组件关系**
```java
// 1. 打开 Channel
FileChannel channel = FileChannel.open(Paths.get("data.txt"));
// 2. 分配 Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 3. 读取数据到 Buffer
channel.read(buffer);
// 4. 切换读写模式
buffer.flip();
// 5. 处理数据
while (buffer.hasRemaining()) {
byte b = buffer.get();
}
// 6. 清空 Buffer
buffer.clear();
```
---
### 3. Selector 多路复用
#### **工作原理**
```
Selector 多路复用模型:
┌──────────────────────────────────┐
│ Selector │
│ │
│ select() - 阻塞等待就绪事件 │
│ │
│ ┌────┐ ┌────┐ ┌────┐ │
│ │Ch1 │ │Ch2 │ │Ch3 │ ... │
│ │就绪│ │就绪│ │就绪│ │
│ └────┘ └────┘ └────┘ │
└──────────────────────────────────┘
│ one thread
┌─────────┴──────────┐
│ Event Loop │
│ - select() │
│ - process() │
│ - repeat │
└────────────────────┘
```
#### **SelectionKey 事件类型**
```java
// 四种事件类型
int OP_ACCEPT = 1 << 4; // 连接就绪ServerSocketChannel
int OP_CONNECT = 1 << 3; // 连接完成SocketChannel
int OP_READ = 1 << 0; // 读就绪
int OP_WRITE = 1 << 2; // 写就绪
// 注册事件
SelectionKey key = channel.register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
// 判断事件类型
if (key.isAcceptable()) { // 连接事件
if (key.isReadable()) { // 读事件
if (key.isWritable()) { // 写事件
if (key.isConnectable()) { // 连接完成事件
```
#### **完整示例**
```java
// NIO Server
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
// 阻塞等待事件(超时 1 秒)
int readyCount = selector.select(1000);
if (readyCount == 0) {
continue;
}
// 获取就绪的 SelectionKey
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); // 移除已处理的 key
if (!key.isValid()) {
continue;
}
// 处理连接事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
// 处理读事件
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = channel.read(buffer);
if (bytesRead == -1) {
// 连接关闭
key.cancel();
channel.close();
} else {
// 处理数据
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println("Received: " + new String(data));
}
}
}
}
```
---
### 4. Channel vs Stream
#### **核心区别**
| 特性 | Stream (IO) | Channel (NIO) |
|------|-------------|---------------|
| **方向** | 单向(读/写) | 双向(读+写) |
| **阻塞** | 阻塞 | 可配置阻塞/非阻塞 |
| **缓冲** | 直接操作流 | 必须通过 Buffer |
| **性能** | 较低 | 高(零拷贝) |
#### **Channel 类型**
```java
// 1. FileChannel - 文件通道
FileChannel fileChannel = FileChannel.open(Paths.get("data.txt"),
StandardOpenOption.READ, StandardOpenOption.WRITE);
// 2. SocketChannel - TCP Socket
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("localhost", 8080));
// 3. ServerSocketChannel - TCP Server
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(8080));
// 4. DatagramChannel - UDP
DatagramChannel datagramChannel = DatagramChannel.open();
datagramChannel.bind(new InetSocketAddress(8080));
// 5. Pipe.SinkChannel / Pipe.SourceChannel - 管道
Pipe pipe = Pipe.open();
Pipe.SinkChannel sinkChannel = pipe.sink();
Pipe.SourceChannel sourceChannel = pipe.source();
```
#### **FileChannel 示例**
```java
// 文件复制(传统方式)
FileChannel sourceChannel = FileChannel.open(Paths.get("source.txt"));
FileChannel destChannel = FileChannel.open(Paths.get("dest.txt"),
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
// 方法1: 使用 Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (sourceChannel.read(buffer) != -1) {
buffer.flip();
destChannel.write(buffer);
buffer.clear();
}
// 方法2: 直接传输(零拷贝)
sourceChannel.transferTo(0, sourceChannel.size(), destChannel);
```
---
### 5. Buffer 核心属性
#### **Buffer 结构**
```
Buffer 核心属性:
┌───────┬──────────┬──────────┬─────────┐
│ 0 │position │ limit │capacity │
│ │ │ │ │
│ 已处理 可读/写数据 │ 不可访问 │
│ 数据 │ │ │
└───────┴──────────┴──────────┴─────────┘
↑ ↑ ↑
mark position limit
操作方法:
- mark(): 标记当前 position
- reset(): 恢复到 mark 位置
- clear(): position=0, limit=capacity
- flip(): limit=position, position=0 (读→写)
- rewind(): position=0, limit 不变
```
#### **Buffer 使用流程**
```java
// 1. 分配 Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 初始状态: position=0, limit=1024, capacity=1024
// 2. 写入数据
buffer.putInt(123);
buffer.putLong(456L);
buffer.put("Hello".getBytes());
// 写入后: position=17, limit=1024
// 3. 切换到读模式
buffer.flip();
// flip 后: position=0, limit=17
// 4. 读取数据
while (buffer.hasRemaining()) {
byte b = buffer.get();
}
// 读取后: position=17, limit=17
// 5. 清空 Buffer
buffer.clear();
// clear 后: position=0, limit=1024, capacity=1024
```
#### **Buffer 类型**
```java
// 基本类型 Buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
CharBuffer charBuffer = CharBuffer.allocate(1024);
ShortBuffer shortBuffer = ShortBuffer.allocate(1024);
IntBuffer intBuffer = IntBuffer.allocate(1024);
LongBuffer longBuffer = LongBuffer.allocate(1024);
FloatBuffer floatBuffer = FloatBuffer.allocate(1024);
DoubleBuffer doubleBuffer = DoubleBuffer.allocate(1024);
// 直接内存 vs 堆内存
ByteBuffer heapBuffer = ByteBuffer.allocate(1024); // 堆内存
ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024); // 直接内存
```
---
### 6. 非阻塞 I/O 实现
#### **阻塞 vs 非阻塞**
```java
// 阻塞模式(默认)
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(true); // 阻塞
channel.connect(new InetSocketAddress("localhost", 8080));
// 阻塞直到连接建立
// 非阻塞模式
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false); // 非阻塞
channel.connect(new InetSocketAddress("localhost", 8080));
// 立即返回
while (!channel.finishConnect()) {
// 连接未完成,做其他事
System.out.println("Connecting...");
}
```
#### **非阻塞读写**
```java
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 非阻塞读
int bytesRead = channel.read(buffer);
if (bytesRead == 0) {
// 没有数据可用
} else if (bytesRead == -1) {
// 连接已关闭
} else {
// 读取到数据
buffer.flip();
// 处理数据...
}
// 非阻塞写
buffer.clear();
buffer.put("Hello".getBytes());
buffer.flip();
int bytesWritten = channel.write(buffer);
if (bytesWritten == 0) {
// 缓冲区满,稍后重试
}
```
---
### 7. 零拷贝实现
#### **传统 I/O vs 零拷贝**
```
传统 I/O4 次拷贝4 次上下文切换):
1. 磁盘 → 内核缓冲区 (DMA)
2. 内核缓冲区 → 用户缓冲区 (CPU)
3. 用户缓冲区 → Socket 缓冲区 (CPU)
4. Socket 缓冲区 → 网卡 (DMA)
零拷贝2 次拷贝2 次上下文切换):
1. 磁盘 → 内核缓冲区 (DMA)
2. 内核缓冲区 → 网卡 (DMA)
```
#### **transferTo 实现**
```java
// 文件传输零拷贝
FileChannel sourceChannel = FileChannel.open(Paths.get("source.txt"));
FileChannel destChannel = FileChannel.open(Paths.get("dest.txt"),
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
// 直接在内核空间传输
long position = 0;
long count = sourceChannel.size();
sourceChannel.transferTo(position, count, destChannel);
```
#### **MappedByteBuffer内存映射**
```java
// 内存映射文件
FileChannel channel = FileChannel.open(Paths.get("data.txt"),
StandardOpenOption.READ, StandardOpenOption.WRITE);
// 映射到内存
MappedByteBuffer mappedBuffer = channel.map(
FileChannel.MapMode.READ_WRITE, // 读写模式
0, // 起始位置
channel.size() // 映射大小
);
// 直接操作内存(零拷贝)
mappedBuffer.putInt(0, 123);
int value = mappedBuffer.getInt(0);
```
---
## P7 加分项
### 深度理解
- **多路复用原理**:理解 select、poll、epoll 的区别
- **零拷贝原理**:理解 DMA、用户空间、内核空间
- **内存管理**:堆内存 vs 直接内存GC 影响
### 实战经验
- **高并发场景**Netty、Mina、Vert.x 框架使用
- **文件处理**:大文件读写、内存映射文件
- **网络编程**:自定义协议、粘包处理
### 性能优化
- **Buffer 复用**:使用对象池减少 GC
- **直接内存**:减少一次拷贝,但分配/释放成本高
- **批量操作**vectorized I/Oscatter/gather
### 常见问题
1. **Buffer 泄漏**:直接内存未释放
2. **select 空转**CPU 100% 问题
3. **epoll 空轮询**Linux kernel bug
4. **文件描述符耗尽**:未关闭 Channel
---
## 总结
Java NIO 的核心优势:
1. **高性能**:非阻塞 I/O、零拷贝
2. **高并发**:单线程处理多连接
3. **灵活性**:可配置阻塞/非阻塞
4. **扩展性**:适合大规模分布式系统
**最佳实践**
- 高并发场景优先使用 NIONetty
- 大文件传输使用 transferTo
- 理解 Buffer 的读写模式切换
- 注意资源释放Channel、Buffer
- 监控文件描述符使用

View File

@@ -0,0 +1,803 @@
# Netty 实战场景与最佳实践
## 问题
1. 如何设计一个高性能的 RPC 框架?
2. 如何实现 WebSocket 服务器?
3. 如何实现百万连接的 IM 系统?
4. 如何处理 Netty 的内存泄漏?
5. 如何实现优雅关闭?
6. 如何设计心跳和重连机制?
7. 如何实现分布式 Session 共享?
---
## 标准答案
### 1. 高性能 RPC 框架设计
#### **架构设计**
```
┌──────────────────────────────────────────┐
│ RPC 框架架构 │
├──────────────────────────────────────────┤
│ Consumer Side │
│ ┌──────────┐ ┌──────────┐ │
│ │ Proxy │ │ Serializer│ │
│ │ (动态代理)│ │ (Protobuf) │ │
│ └────┬─────┘ └──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Netty Client │ │
│ │ - Reconnect │ │
│ │ - Timeout │ │
│ │ - Heartbeat │ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ │ TCP │
│ │ │
│ ┌──────────────▼──────────────────┐ │
│ │ Netty Server │ │
│ │ - Boss/Worker Groups │ │
│ │ - Pipeline │ │
│ │ - Handler Chain │ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ ┌─────────┴─────────┐ │
│ ▼ ▼ │
│ ┌─────────┐ ┌──────────┐ │
│ │Service │ │ Registry │ │
│ │(实现) │ │ (服务发现)│ │
│ └─────────┘ └──────────┘ │
└──────────────────────────────────────────┘
```
#### **协议设计**
```java
// RPC 协议格式
Magic Length Type Body
Number (4 bytes) (1 byte)
(4 bytes)
// 实现
public class RpcProtocol {
private static final int MAGIC_NUMBER = 0xCAFEBABE;
private static final int HEADER_LENGTH = 9;
public static ByteBuf encode(byte[] body, byte type) {
ByteBuf buffer = Unpooled.buffer(HEADER_LENGTH + body.length);
// Magic Number (4 bytes)
buffer.writeInt(MAGIC_NUMBER);
// Length (4 bytes)
buffer.writeInt(body.length);
// Type (1 byte)
buffer.writeByte(type);
// Body
buffer.writeBytes(body);
return buffer;
}
public static byte[] decode(ByteBuf buffer) {
// 检查 Magic Number
if (buffer.readInt() != MAGIC_NUMBER) {
throw new IllegalArgumentException("Invalid magic number");
}
// 读取 Length
int length = buffer.readInt();
// 读取 Type
byte type = buffer.readByte();
// 读取 Body
byte[] body = new byte[length];
buffer.readBytes(body);
return body;
}
}
```
#### **服务端实现**
```java
@Component
public class RpcServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel serverChannel;
public void start(int port) throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 编解码器
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0));
pipeline.addLast("frameEncoder",
new LengthFieldPrepender(4));
// 协议编解码
pipeline.addLast("decoder", new RpcDecoder());
pipeline.addLast("encoder", new RpcEncoder());
// 心跳检测
pipeline.addLast("idleStateHandler",
new IdleStateHandler(60, 0, 0));
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
// 业务处理器
pipeline.addLast("handler", new RpcServerHandler());
}
});
// 绑定端口并启动
ChannelFuture future = bootstrap.bind(port).sync();
serverChannel = future.channel();
System.out.println("RPC Server started on port " + port);
// 等待服务器 socket 关闭
serverChannel.closeFuture().sync();
} finally {
shutdown();
}
}
public void shutdown() {
if (serverChannel != null) {
serverChannel.close();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
}
@PreDestroy
public void destroy() {
shutdown();
}
}
```
#### **客户端实现**
```java
@Component
public class RpcClient {
private EventLoopGroup workerGroup;
private Channel channel;
public void connect(String host, int port) throws InterruptedException {
workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 编解码器
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0));
pipeline.addLast("frameEncoder",
new LengthFieldPrepender(4));
// 协议编解码
pipeline.addLast("decoder", new RpcDecoder());
pipeline.addLast("encoder", new RpcEncoder());
// 心跳检测
pipeline.addLast("idleStateHandler",
new IdleStateHandler(0, 30, 0));
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
// 业务处理器
pipeline.addLast("handler", new RpcClientHandler());
}
});
// 连接服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
channel = future.channel();
System.out.println("RPC Client connected to " + host + ":" + port);
// 等待连接关闭
channel.closeFuture().sync();
} finally {
shutdown();
}
}
public void shutdown() {
if (channel != null) {
channel.close();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
public <T> T sendRequest(RpcRequest request, Class<T> returnType) {
// 发送请求并等待响应
RpcClientHandler handler = channel.pipeline()
.get(RpcClientHandler.class);
return handler.sendRequest(request, returnType);
}
}
```
---
### 2. WebSocket 服务器实现
#### **WebSocket 握手机制**
```
Client → Server: HTTP GET (Upgrade: websocket)
Server → Client: HTTP 101 (Switching Protocols)
Client → Server: WebSocket Frame
Server → Client: WebSocket Frame
```
#### **服务器实现**
```java
@Component
public class WebSocketServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public void start(int port) throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// HTTP 编解码器
pipeline.addLast("httpCodec", new HttpServerCodec());
pipeline.addLast("httpAggregator",
new HttpObjectAggregator(65536));
// WebSocket 握手处理器
pipeline.addLast("handshakeHandler",
new WebSocketServerProtocolHandler("/ws"));
// 自定义 WebSocket 处理器
pipeline.addLast("handler", new WebSocketHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("WebSocket Server started on port " + port);
future.channel().closeFuture().sync();
} finally {
shutdown();
}
}
public void shutdown() {
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
}
}
```
#### **WebSocket Handler**
```java
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 存储所有连接的 Channel
private static final ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
channels.add(ctx.channel());
System.out.println("Client connected: " + ctx.channel().id());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
channels.remove(ctx.channel());
System.out.println("Client disconnected: " + ctx.channel().id());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) {
// 接收消息
String message = msg.text();
System.out.println("Received: " + message);
// 广播消息给所有客户端
channels.writeAndFlush(new TextWebSocketFrame(
"Server: " + message
));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
```
---
### 3. 百万连接 IM 系统
#### **架构优化**
```java
@Configuration
public class ImServerConfig {
@Bean
public EventLoopGroup bossGroup() {
// 1 个线程处理 Accept
return new NioEventLoopGroup(1);
}
@Bean
public EventLoopGroup workerGroup() {
// 根据 CPU 核心数设置 I/O 线程数
// 通常设置为 CPU 核心数 * 2
int threads = Runtime.getRuntime().availableProcessors() * 2;
return new NioEventLoopGroup(threads);
}
@Bean
public ServerBootstrap serverBootstrap(
EventLoopGroup bossGroup,
EventLoopGroup workerGroup) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 8192) // 增大连接队列
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024) // 32KB 发送缓冲
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 32KB 接收缓冲
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 连接数限制
pipeline.addLast("connectionLimiter",
new ConnectionLimiter(1000000));
// 流量整形
pipeline.addLast("trafficShaping",
new ChannelTrafficShapingHandler(
1024 * 1024, // 读限制 1MB/s
1024 * 1024 // 写限制 1MB/s
));
// 协议编解码
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 0));
pipeline.addLast("frameEncoder",
new LengthFieldPrepender(4));
// IM 协议处理器
pipeline.addLast("imHandler", new ImServerHandler());
}
});
return bootstrap;
}
}
```
#### **连接限流**
```java
public class ConnectionLimiter extends ChannelInboundHandlerAdapter {
private final int maxConnections;
private final AtomicInteger activeConnections = new AtomicInteger(0);
public ConnectionLimiter(int maxConnections) {
this.maxConnections = maxConnections;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (activeConnections.get() >= maxConnections) {
// 连接数超限,关闭新连接
ctx.writeAndFlush(new TextWebSocketFrame(
"Server is full, please try again later"
)).addListener(ChannelFutureListener.CLOSE);
return;
}
int count = activeConnections.incrementAndGet();
System.out.println("Active connections: " + count);
ctx.fireChannelRead(msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
int count = activeConnections.decrementAndGet();
System.out.println("Active connections: " + count);
ctx.fireChannelInactive();
}
}
```
---
### 4. 内存泄漏排查与解决
#### **常见泄漏场景**
```java
// ❌ 错误示例 1: ByteBuf 未释放
public void handler(ChannelHandlerContext ctx, ByteBuf buf) {
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
// 忘记释放 buf
}
// ✅ 正确做法
public void handler(ChannelHandlerContext ctx, ByteBuf buf) {
try {
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
// 处理数据...
} finally {
ReferenceCountUtil.release(buf); // 释放 ByteBuf
}
}
// ❌ 错误示例 2: Handler 未移除
public void channelActive(ChannelHandlerContext ctx) {
// 添加长生命周期的 Handler
ctx.pipeline().addLast("longLived", new LongLivedHandler());
}
// ✅ 正确做法
public void channelActive(ChannelHandlerContext ctx) {
// 使用 @Sharable 注解
ctx.pipeline().addLast("longLived", SharableHandler.INSTANCE);
}
// ❌ 错误示例 3: Channel 引用未释放
public class ChannelHolder {
private static final Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();
public static void addChannel(String id, Channel channel) {
CHANNELS.put(id, channel); // 永久持有引用
}
}
// ✅ 正确做法
public class ChannelHolder {
private static final Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();
public static void addChannel(String id, Channel channel) {
CHANNELS.put(id, channel);
// 监听关闭事件,自动移除
channel.closeFuture().addListener((ChannelFutureListener) future -> {
CHANNELS.remove(id);
});
}
}
```
#### **内存泄漏检测**
```java
// 启用内存泄漏检测
// JVM 参数: -Dio.netty.leakDetection.level=paranoid
public class LeakDetection {
public static void main(String[] args) {
// 检测级别
// DISABLED - 禁用
// SIMPLE - 采样检测1%
// ADVANCED - 采样检测,记录创建堆栈
// PARANOID - 全量检测
// 创建资源时使用
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024);
// 使用资源泄漏检测器访问器
ResourceLeakDetector<ByteBuf> detector =
new ResourceLeakDetector<ByteBuf>(ByteBuf.class,
ResourceLeakDetector.Level.PARANOID);
// 包装对象
ByteBuf wrappedBuffer = detector.wrap(buffer, "test-buffer");
// 使用完成后
wrappedBuffer.release(); // 如果未释放,会记录堆栈并警告
}
}
```
---
### 5. 优雅关闭实现
```java
@Component
public class NettyGracefulShutdown implements SmartLifecycle {
@Autowired
private ServerBootstrap serverBootstrap;
private Channel serverChannel;
private volatile boolean running = false;
@Override
public void start() {
running = true;
// 启动服务器...
}
@Override
public void stop() {
if (!running) {
return;
}
System.out.println("Shutting down Netty server gracefully...");
// 1. 停止接受新连接
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
// 2. 等待现有请求处理完成(超时 30 秒)
EventLoopGroup workerGroup = serverBootstrap.config().group();
Future<?> shutdownFuture = workerGroup.shutdownGracefully(2, 30, TimeUnit.SECONDS);
try {
// 等待优雅关闭完成
shutdownFuture.await(60, TimeUnit.SECONDS);
if (shutdownFuture.isSuccess()) {
System.out.println("Netty server shutdown successfully");
} else {
System.err.println("Netty server shutdown timeout, force close");
workerGroup.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
workerGroup.shutdownNow();
}
// 3. 关闭 Boss Group
EventLoopGroup bossGroup = serverBootstrap.config().childGroup();
bossGroup.shutdownGracefully();
running = false;
}
@Override
public boolean isRunning() {
return running;
}
}
```
---
### 6. 心跳与重连机制
#### **心跳处理器**
```java
@ChannelHandler.Sharable
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_BEAT =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 读空闲,可能对方已挂掉
System.out.println("Read idle, closing connection");
ctx.close();
} else if (event.state() == IdleState.WRITER_IDLE) {
// 写空闲,发送心跳
System.out.println("Write idle, sending heartbeat");
ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate());
} else if (event.state() == IdleState.ALL_IDLE) {
// 读写空闲
System.out.println("All idle, sending heartbeat");
ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate());
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 处理心跳响应
if ("HEARTBEAT".equals(msg)) {
// 心跳包,不处理
return;
}
// 业务消息
ctx.fireChannelRead(msg);
}
}
```
#### **重连机制**
```java
@Component
public class ReconnectClient {
private EventLoopGroup workerGroup;
private Channel channel;
private ScheduledExecutorService scheduler;
public void connect(String host, int port) {
workerGroup = new NioEventLoopGroup();
scheduler = Executors.newSingleThreadScheduledExecutor();
doConnect(host, port);
}
private void doConnect(String host, int port) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("idleStateHandler",
new IdleStateHandler(0, 10, 0))
.addLast("heartbeatHandler", new HeartbeatHandler())
.addLast("handler", new ClientHandler());
}
});
// 连接服务器
ChannelFuture future = bootstrap.connect(host, port);
future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
System.out.println("Connected to server");
channel = f.channel();
} else {
System.out.println("Connection failed, reconnect in 5 seconds...");
// 5 秒后重连
scheduler.schedule(() -> doConnect(host, port), 5, TimeUnit.SECONDS);
}
});
}
public void shutdown() {
if (channel != null) {
channel.close();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (scheduler != null) {
scheduler.shutdown();
}
}
}
```
---
## P7 加分项
### 深度理解
- **Reactor 模式**:理解主从 Reactor 多线程模型
- **零拷贝原理**:理解用户空间和内核空间
- **内存管理**ByteBuf 的池化、引用计数、泄漏检测
### 实战经验
- **高并发调优**EventLoopGroup 线程数、TCP 参数调优
- **监控告警**:连接数监控、流量监控、延迟监控
- **故障排查**内存泄漏、CPU 100%、连接泄漏
### 架构设计
- **协议设计**:自定义协议、编解码器设计
- **集群部署**:负载均衡、服务发现
- **容错机制**:重连、熔断、限流
### 性能优化
1. **减少内存拷贝**:使用 CompositeByteBuf
2. **复用对象**:使用 ByteBuf 对象池
3. **优化 TCP 参数**TCP_NODELAY、SO_KEEPALIVE
4. **流量控制**ChannelTrafficShapingHandler
---
## 总结
Netty 实战核心要点:
1. **高性能**Reactor 模型、零拷贝、内存池
2. **高可靠**:心跳、重连、优雅关闭
3. **可扩展**Pipeline 机制、自定义协议
4. **可监控**:连接数、流量、延迟监控
**最佳实践**
- 合理设置 EventLoopGroup 线程数
- 使用池化的 ByteBuf及时释放
- 添加心跳和重连机制
- 实现优雅关闭
- 做好监控和告警
- 定期排查内存泄漏

View File

@@ -0,0 +1,539 @@
# Netty 核心原理
## 问题
1. Netty 的核心组件有哪些?
2. 什么是 EventLoop线程模型是怎样的
3. 什么是 ByteBuf与 Java NIO 的 ByteBuffer 有什么区别?
4. Netty 的零拷贝是如何实现的?
5. 什么是 ChannelPipelineChannelHandler 是如何工作的?
6. Netty 如何解决 TCP 粘包/拆包问题?
7. Netty 的心跳机制是如何实现的?
---
## 标准答案
### 1. Netty 核心组件
#### **核心组件架构**
```
┌─────────────────────────────────────────────┐
│ Netty 架构 │
├─────────────────────────────────────────────┤
│ Channel │
│ ├─ EventLoop (线程模型) │
│ ├─ ChannelPipeline (处理器链) │
│ │ └─ ChannelHandler (业务处理器) │
│ └─ ByteBuf (内存管理) │
├─────────────────────────────────────────────┤
│ Bootstrap (启动类) │
│ ├─ ServerBootstrap (服务端) │
│ └─ Bootstrap (客户端) │
└─────────────────────────────────────────────┘
```
#### **关键组件说明**
**1. Channel**
```java
// Channel 是 Netty 的网络操作抽象类
Channel channel = ...;
// 核心方法
channel.writeAndFlush(msg); // 写数据并刷新
channel.close(); // 关闭连接
channel.eventLoop(); // 获取 EventLoop
```
**2. EventLoop**
```java
// EventLoop 负责处理 I/O 操作
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // Accept
EventLoopGroup workerGroup = new NioEventLoopGroup(); // I/O
// bossGroup 只负责监听并接受连接
// workerGroup 负责 I/O 读写
```
**3. ChannelPipeline**
```java
// Pipeline 是处理器链
ChannelPipeline pipeline = channel.pipeline();
// 添加处理器
pipeline.addLast("decoder", new Decoder());
pipeline.addLast("encoder", new Encoder());
pipeline.addLast("handler", new BusinessHandler());
```
**4. ByteBuf**
```java
// ByteBuf 是 Netty 的字节容器
ByteBuf buffer = Unpooled.buffer(1024);
// 写入数据
buffer.writeInt(123);
buffer.writeBytes("Hello".getBytes());
// 读取数据
int value = buffer.readInt();
```
---
### 2. EventLoop 与线程模型
#### **Reactor 线程模型**
Netty 采用 **Reactor 模式**,支持三种实现:
```
1. Reactor 单线程模型
┌──────────────┐
│ Reactor │
│ (单线程) │
│ │
│ - Accept │
│ - Read │
│ - Decode │
│ - Process │
│ - Encode │
│ - Send │
└──────────────┘
2. Reactor 多线程模型
┌──────────┐ ┌──────────────────┐
│ Reactor │────>│ Worker Threads │
│ (主线程) │ │ (线程池) │
│ │ │ │
│ - Accept │ │ - Read/Write │
│ │ │ - Decode/Encode │
│ │ │ - Process │
└──────────┘ └──────────────────┘
3. 主从 Reactor 多线程模型Netty 默认)
┌──────────┐ ┌──────────────────┐
│ Main │ │ Sub Reactors │
│ Reactor │────>│ (线程池) │
│ │ │ │
│ - Accept │ │ - Read/Write │
│ │ │ - Decode/Encode │
│ │ │ - Process │
└──────────┘ └──────────────────┘
```
#### **EventLoop 工作原理**
```java
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 每个 EventLoop:
// 1. 维护一个 Selector
// 2. 维护一个任务队列
// 3. 维护一个线程
// 工作流程:
// - 线程启动后,无限循环执行:
// 1. select() - 查询就绪的 Channel
// 2. processSelectedKeys() - 处理 I/O 事件
// 3. runAllTasks() - 执行任务队列中的任务
```
#### **任务调度**
```java
Channel channel = ...;
// 在 EventLoop 线程中执行任务
channel.eventLoop().execute(() -> {
System.out.println("Task in EventLoop thread");
});
// 定时任务
channel.eventLoop().schedule(() -> {
System.out.println("Delayed task");
}, 1, TimeUnit.SECONDS);
```
---
### 3. ByteBuf vs ByteBuffer
#### **对比表**
| 特性 | Java NIO ByteBuffer | Netty ByteBuf |
|------|---------------------|---------------|
| **长度** | 固定长度,扩容复杂 | 动态扩容 |
| **读写模式** | flip() 切换 | 独立的读写索引 |
| **引用计数** | 不支持 | 支持(池化) |
| **零拷贝** | 不支持 | 支持Composite |
| **性能** | 较低 | 高(对象池优化) |
#### **ByteBuf 三个重要指针**
```
ByteBuf 结构:
┌───────┬──────────┬──────────┬─────────┐
│ 0 │ readerIndex│ writerIndex│ capacity│
│ │ │ │ │
│ discard│ readable│ writable │ │
│ bytes │ bytes │ bytes │ │
└───────┴──────────┴──────────┴─────────┘
操作:
- mark() / reset(): 标记和重置
- readerIndex(): 读指针
- writerIndex(): 写指针
- capacity(): 容量
- maxCapacity(): 最大容量
```
#### **使用示例**
```java
// 创建 ByteBuf
ByteBuf buffer = Unpooled.buffer(1024);
// 写入数据
buffer.writeBytes("Hello".getBytes());
buffer.writeInt(123);
// 读取数据(不移动读指针)
byte[] data = new byte[5];
buffer.getBytes(buffer.readerIndex(), data);
// 读取数据(移动读指针)
buffer.readBytes(5);
// 引用计数(池化)
buffer.retain(); // 引用计数 +1
buffer.release(); // 引用计数 -1
```
---
### 4. 零拷贝实现
#### **零拷贝的三种实现**
**1. CompositeByteBuf组合缓冲区**
```java
// 将多个 ByteBuf 组合成一个逻辑上的 ByteBuf
ByteBuf header = Unpooled.buffer(10);
ByteBuf body = Unpooled.buffer(100);
// 零拷贝组合
CompositeByteBuf message = Unpooled.wrappedBuffer(header, body);
// 物理上不复制数据,只是逻辑组合
```
**2. wrappedBuffer包装**
```java
// 包装字节数组,不复制
byte[] bytes = "Hello Netty".getBytes();
ByteBuf buffer = Unpooled.wrappedBuffer(bytes);
// 底层数据共享,无拷贝
```
**3. slice切片**
```java
ByteBuf buffer = Unpooled.buffer(100);
// 切片,共享底层内存
ByteBuf sliced = buffer.slice(0, 50);
// 修改会互相影响
sliced.setByte(0, (byte) 1);
```
**4. FileChannel.transferTo文件传输**
```java
// 文件传输零拷贝
FileChannel sourceChannel = ...;
FileChannel destChannel = ...;
// 直接在内核空间传输,不经过用户空间
sourceChannel.transferTo(position, count, destChannel);
```
---
### 5. ChannelPipeline 与 ChannelHandler
#### **Pipeline 工作流程**
```
入站事件 (Inbound):
┌─────────────────────────────────────┐
│ ChannelPipeline │
│ │
│ ┌─────────┐ ┌─────────┐ ┌──────┐│
│ │Decoder1 │→ │Decoder2 │→ │Handler││
│ └─────────┘ └─────────┘ └──────┘│
│ ▲ ▲ ▲ │
│ └────────────┴─────────────┘ │
│ (数据流) │
└─────────────────────────────────────┘
出站事件 (Outbound):
┌─────────────────────────────────────┐
│ ChannelPipeline │
│ │
│ ┌──────┐ ┌─────────┐ ┌─────────┐│
│ │Handler│→ │Encoder1 │→ │Encoder2 ││
│ └──────┘ └─────────┘ └─────────┘│
│ ▲ ▲ ▲ │
│ └────────────┴─────────────┘ │
│ (数据流) │
└─────────────────────────────────────┘
```
#### **ChannelHandler 接口**
```java
// 入站处理器
@ChannelHandler.Sharable
public class InboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 处理读事件
System.out.println("Received: " + msg);
ctx.fireChannelRead(msg); // 传递给下一个 Handler
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 连接建立
System.out.println("Client connected");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
// 连接断开
System.out.println("Client disconnected");
}
}
// 出站处理器
public class OutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) {
// 处理写事件
System.out.println("Sending: " + msg);
ctx.write(msg, promise);
}
}
```
#### **Handler 生命周期**
```java
public interface ChannelHandler {
// 添加到 Pipeline
void handlerAdded(ChannelHandlerContext ctx);
// 从 Pipeline 移除
void handlerRemoved(ChannelHandlerContext ctx);
// 发生异常
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause);
}
```
---
### 6. TCP 粘包/拆包解决方案
#### **问题原因**
```
TCP 粘包:
发送方: [包1] [包2] [包3]
接收方: [包1+包2+包3] // 粘在一起
TCP 拆包:
发送方: [大包]
接收方: [片段1] [片段2] // 被拆分
```
#### **Netty 提供的解码器**
**1. FixedLengthFrameDecoder固定长度**
```java
// 每个消息固定 10 字节
pipeline.addLast("framer", new FixedLengthFrameDecoder(10));
```
**2. LineBasedFrameDecoder分隔符**
```java
// 按换行符分割
pipeline.addLast("framer", new LineBasedFrameDecoder(8192));
```
**3. DelimiterBasedFrameDecoder自定义分隔符**
```java
// 自定义分隔符
ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
pipeline.addLast("framer",
new DelimiterBasedFrameDecoder(8192, delimiter));
```
**4. LengthFieldBasedFrameDecoder长度字段**
```java
// 消息格式: [长度(4字节)] [数据]
pipeline.addLast("framer",
new LengthFieldBasedFrameDecoder(
8192, // maxFrameLength
0, // lengthFieldOffset
4, // lengthFieldLength
0, // lengthAdjustment
0 // initialBytesToStrip
));
```
**5. 自定义解码器**
```java
public class CustomDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf in, List<Object> out) {
// 检查是否有足够数据
if (in.readableBytes() < 4) {
return;
}
// 标记读指针
in.markReaderIndex();
// 读取长度
int length = in.readInt();
// 检查数据是否完整
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
// 读取数据
byte[] data = new byte[length];
in.readBytes(data);
out.add(data);
}
}
```
---
### 7. 心跳机制
#### **IdleStateHandler 实现**
```java
// 读写空闲检测
pipeline.addLast("idleStateHandler",
new IdleStateHandler(
30, // 读空闲时间(秒)
0, // 写空闲时间
0 // 读写空闲时间
));
// 心跳处理器
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
```
#### **心跳处理器**
```java
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 读空闲,关闭连接
System.out.println("Read idle, close connection");
ctx.close();
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 处理心跳包
if ("PING".equals(msg)) {
ctx.writeAndFlush("PONG");
} else {
ctx.fireChannelRead(msg);
}
}
}
```
---
## P7 加分项
### 深度理解
- **Reactor 模式**:理解单线程、多线程、主从多线程三种模型
- **零拷贝原理**:理解用户空间和内核空间的数据拷贝
- **内存管理**ByteBuf 的池化、引用计数、内存泄漏
### 实战经验
- **高并发场景**EventLoopGroup 线程数调优
- **内存调优**ByteBuf 分配器选择(堆内存 vs 直接内存)
- **性能优化**:避免 Handler 链过长、减少对象创建
### 架构设计
- **协议设计**:自定义协议、编解码器设计
- **异常处理**:优雅关闭、重连机制
- **监控告警**:流量监控、连接数监控、延迟监控
### 常见问题
1. **内存泄漏**ByteBuf 未释放
2. **线程安全**:共享状态的同步
3. **连接池**Channel 复用与管理
4. **背压处理**:流量控制与慢消费
---
## 总结
Netty 的核心优势:
1. **高性能**零拷贝、内存池、Reactor 模式
2. **高可靠性**:心跳机制、重连机制、优雅关闭
3. **易扩展**Pipeline 机制、丰富的 Handler
4. **成熟稳定**:广泛应用在 Dubbo、gRPC、WebSocket
**最佳实践**
- 合理设置 EventLoopGroup 线程数
- 使用池化的 ByteBuf
- 及时释放 ByteBuf避免内存泄漏
- 使用 LengthFieldBasedFrameDecoder 处理粘包
- 添加心跳和重连机制
- 做好监控和日志