feat: add Netty and Java NIO interview questions
Added 3 comprehensive interview documents covering Netty and Java NIO: **1. Netty Core Principles (Netty核心原理.md)** - Core components: Channel, EventLoop, ChannelPipeline, ByteBuf - Reactor threading model (single/multi-threaded, master-slave) - ByteBuf vs Java NIO ByteBuffer - Zero-copy implementation (4 approaches) - ChannelPipeline and ChannelHandler - TCP sticky/unpacking problem solutions - Heartbeat mechanism **2. Java NIO Core Principles (Java NIO核心原理.md)** - NIO vs BIO comparison - Three core components: Channel, Buffer, Selector - Selector multiplexing mechanism - Channel vs Stream differences - Buffer core attributes and usage - Non-blocking I/O implementation - Zero-copy with transferTo and MappedByteBuffer **3. Netty Practice Scenarios (Netty实战场景.md)** - High-performance RPC framework design - WebSocket server implementation - Million-connection IM system architecture - Memory leak detection and resolution - Graceful shutdown implementation - Heartbeat and reconnection mechanisms Each document includes: - Detailed problem descriptions - Complete code examples (Java) - Architecture diagrams - Best practices - Performance optimization tips - P7-level bonus points Total: 3 documents, covering: - Theoretical foundations - Practical implementations - Production scenarios - Performance tuning - Common pitfalls Suitable for backend P7 interview preparation. Generated with [Claude Code](https://claude.ai/code) via [Happy](https://happy.engineering) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Happy <yesreply@happy.engineering>
This commit is contained in:
8
.obsidian/workspace.json
vendored
8
.obsidian/workspace.json
vendored
@@ -196,6 +196,9 @@
|
|||||||
},
|
},
|
||||||
"active": "16a7ce8de420dd10",
|
"active": "16a7ce8de420dd10",
|
||||||
"lastOpenFiles": [
|
"lastOpenFiles": [
|
||||||
|
"10-中间件/Netty实战场景.md",
|
||||||
|
"10-中间件/Java NIO核心原理.md",
|
||||||
|
"10-中间件/Netty核心原理.md",
|
||||||
"10-中间件",
|
"10-中间件",
|
||||||
"16-LeetCode Hot 100/从前序与中序遍历序列构造二叉树.md",
|
"16-LeetCode Hot 100/从前序与中序遍历序列构造二叉树.md",
|
||||||
"16-LeetCode Hot 100/路径总和.md",
|
"16-LeetCode Hot 100/路径总和.md",
|
||||||
@@ -222,15 +225,12 @@
|
|||||||
"16-LeetCode Hot 100",
|
"16-LeetCode Hot 100",
|
||||||
"00-项目概述/项目概述.md",
|
"00-项目概述/项目概述.md",
|
||||||
"00-项目概述",
|
"00-项目概述",
|
||||||
"questions/04-消息队列/消息队列_RocketMQ_Kafka.md",
|
|
||||||
"questions/05-并发编程/ConcurrentHashMap原理.md",
|
|
||||||
"questions/15-简历面试",
|
"questions/15-简历面试",
|
||||||
"questions/14-Web3与区块链",
|
"questions/14-Web3与区块链",
|
||||||
"12-面试技巧",
|
"12-面试技巧",
|
||||||
"08-算法与数据结构",
|
"08-算法与数据结构",
|
||||||
"questions/13-Golang语言",
|
"questions/13-Golang语言",
|
||||||
"questions/12-面试技巧",
|
"questions/12-面试技巧",
|
||||||
"questions/11-运维",
|
"questions/11-运维"
|
||||||
"questions/10-中间件"
|
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
527
10-中间件/Java NIO核心原理.md
Normal file
527
10-中间件/Java NIO核心原理.md
Normal file
@@ -0,0 +1,527 @@
|
|||||||
|
# Java NIO 核心原理
|
||||||
|
|
||||||
|
## 问题
|
||||||
|
|
||||||
|
1. 什么是 NIO?与 BIO 的区别是什么?
|
||||||
|
2. NIO 的三大核心组件是什么?
|
||||||
|
3. 什么是 Selector?如何实现多路复用?
|
||||||
|
4. 什么是 Channel?与 Stream 的区别?
|
||||||
|
5. 什么是 Buffer?如何理解 Buffer 的核心属性?
|
||||||
|
6. NIO 如何实现非阻塞 I/O?
|
||||||
|
7. 什么是零拷贝?如何实现?
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 标准答案
|
||||||
|
|
||||||
|
### 1. NIO vs BIO
|
||||||
|
|
||||||
|
#### **对比表**
|
||||||
|
|
||||||
|
| 特性 | BIO (Blocking I/O) | NIO (Non-blocking I/O) |
|
||||||
|
|------|-------------------|----------------------|
|
||||||
|
| **I/O 模型** | 阻塞 | 非阻塞 |
|
||||||
|
| **线程模型** | 每连接一线程 | Reactor 模式 |
|
||||||
|
| **并发能力** | 低 | 高 |
|
||||||
|
| **编程复杂度** | 简单 | 复杂 |
|
||||||
|
| **数据操作** | Stream | Channel + Buffer |
|
||||||
|
| **适用场景** | 连接数少、高延迟 | 连接数多、高并发 |
|
||||||
|
|
||||||
|
#### **代码对比**
|
||||||
|
|
||||||
|
**BIO 实现**:
|
||||||
|
```java
|
||||||
|
// 传统 BIO - 阻塞式
|
||||||
|
ServerSocket serverSocket = new ServerSocket(8080);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
// 阻塞等待连接
|
||||||
|
Socket socket = serverSocket.accept();
|
||||||
|
|
||||||
|
// 每个连接一个线程
|
||||||
|
new Thread(() -> {
|
||||||
|
try {
|
||||||
|
BufferedReader reader = new BufferedReader(
|
||||||
|
new InputStreamReader(socket.getInputStream()));
|
||||||
|
|
||||||
|
// 阻塞读取数据
|
||||||
|
String line = reader.readLine();
|
||||||
|
|
||||||
|
// 处理数据...
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}).start();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**NIO 实现**:
|
||||||
|
```java
|
||||||
|
// NIO - 非阻塞式
|
||||||
|
ServerSocketChannel serverChannel = ServerSocketChannel.open();
|
||||||
|
serverChannel.configureBlocking(false);
|
||||||
|
serverChannel.bind(new InetSocketAddress(8080));
|
||||||
|
|
||||||
|
Selector selector = Selector.open();
|
||||||
|
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
// 非阻塞等待事件
|
||||||
|
int readyCount = selector.select();
|
||||||
|
|
||||||
|
Set<SelectionKey> readyKeys = selector.selectedKeys();
|
||||||
|
for (SelectionKey key : readyKeys) {
|
||||||
|
if (key.isAcceptable()) {
|
||||||
|
// 处理连接
|
||||||
|
}
|
||||||
|
if (key.isReadable()) {
|
||||||
|
// 处理读
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 2. NIO 三大核心组件
|
||||||
|
|
||||||
|
```
|
||||||
|
┌────────────────────────────────────────┐
|
||||||
|
│ Java NIO 架构 │
|
||||||
|
├────────────────────────────────────────┤
|
||||||
|
│ │
|
||||||
|
│ ┌─────────┐ ┌─────────┐ │
|
||||||
|
│ │Channel │◄──►│ Buffer │ │
|
||||||
|
│ └────┬────┘ └─────────┘ │
|
||||||
|
│ │ │
|
||||||
|
│ ▼ │
|
||||||
|
│ ┌─────────┐ │
|
||||||
|
│ │Selector │ │
|
||||||
|
│ │(多路复用)│ │
|
||||||
|
│ └─────────┘ │
|
||||||
|
│ │
|
||||||
|
└────────────────────────────────────────┘
|
||||||
|
|
||||||
|
Channel: 数据通道(双向)
|
||||||
|
Buffer: 数据容器
|
||||||
|
Selector: 多路复用器
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **核心组件关系**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 1. 打开 Channel
|
||||||
|
FileChannel channel = FileChannel.open(Paths.get("data.txt"));
|
||||||
|
|
||||||
|
// 2. 分配 Buffer
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||||
|
|
||||||
|
// 3. 读取数据到 Buffer
|
||||||
|
channel.read(buffer);
|
||||||
|
|
||||||
|
// 4. 切换读写模式
|
||||||
|
buffer.flip();
|
||||||
|
|
||||||
|
// 5. 处理数据
|
||||||
|
while (buffer.hasRemaining()) {
|
||||||
|
byte b = buffer.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6. 清空 Buffer
|
||||||
|
buffer.clear();
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 3. Selector 多路复用
|
||||||
|
|
||||||
|
#### **工作原理**
|
||||||
|
|
||||||
|
```
|
||||||
|
Selector 多路复用模型:
|
||||||
|
|
||||||
|
┌──────────────────────────────────┐
|
||||||
|
│ Selector │
|
||||||
|
│ │
|
||||||
|
│ select() - 阻塞等待就绪事件 │
|
||||||
|
│ │
|
||||||
|
│ ┌────┐ ┌────┐ ┌────┐ │
|
||||||
|
│ │Ch1 │ │Ch2 │ │Ch3 │ ... │
|
||||||
|
│ │就绪│ │就绪│ │就绪│ │
|
||||||
|
│ └────┘ └────┘ └────┘ │
|
||||||
|
└──────────────────────────────────┘
|
||||||
|
▲
|
||||||
|
│ one thread
|
||||||
|
│
|
||||||
|
┌─────────┴──────────┐
|
||||||
|
│ Event Loop │
|
||||||
|
│ - select() │
|
||||||
|
│ - process() │
|
||||||
|
│ - repeat │
|
||||||
|
└────────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **SelectionKey 事件类型**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 四种事件类型
|
||||||
|
int OP_ACCEPT = 1 << 4; // 连接就绪(ServerSocketChannel)
|
||||||
|
int OP_CONNECT = 1 << 3; // 连接完成(SocketChannel)
|
||||||
|
int OP_READ = 1 << 0; // 读就绪
|
||||||
|
int OP_WRITE = 1 << 2; // 写就绪
|
||||||
|
|
||||||
|
// 注册事件
|
||||||
|
SelectionKey key = channel.register(selector,
|
||||||
|
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
|
||||||
|
|
||||||
|
// 判断事件类型
|
||||||
|
if (key.isAcceptable()) { // 连接事件
|
||||||
|
if (key.isReadable()) { // 读事件
|
||||||
|
if (key.isWritable()) { // 写事件
|
||||||
|
if (key.isConnectable()) { // 连接完成事件
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **完整示例**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// NIO Server
|
||||||
|
ServerSocketChannel serverChannel = ServerSocketChannel.open();
|
||||||
|
serverChannel.configureBlocking(false);
|
||||||
|
serverChannel.bind(new InetSocketAddress(8080));
|
||||||
|
|
||||||
|
Selector selector = Selector.open();
|
||||||
|
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
// 阻塞等待事件(超时 1 秒)
|
||||||
|
int readyCount = selector.select(1000);
|
||||||
|
|
||||||
|
if (readyCount == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 获取就绪的 SelectionKey
|
||||||
|
Set<SelectionKey> readyKeys = selector.selectedKeys();
|
||||||
|
Iterator<SelectionKey> iterator = readyKeys.iterator();
|
||||||
|
|
||||||
|
while (iterator.hasNext()) {
|
||||||
|
SelectionKey key = iterator.next();
|
||||||
|
iterator.remove(); // 移除已处理的 key
|
||||||
|
|
||||||
|
if (!key.isValid()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理连接事件
|
||||||
|
if (key.isAcceptable()) {
|
||||||
|
ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
||||||
|
SocketChannel channel = server.accept();
|
||||||
|
channel.configureBlocking(false);
|
||||||
|
channel.register(selector, SelectionKey.OP_READ);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理读事件
|
||||||
|
if (key.isReadable()) {
|
||||||
|
SocketChannel channel = (SocketChannel) key.channel();
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||||
|
int bytesRead = channel.read(buffer);
|
||||||
|
|
||||||
|
if (bytesRead == -1) {
|
||||||
|
// 连接关闭
|
||||||
|
key.cancel();
|
||||||
|
channel.close();
|
||||||
|
} else {
|
||||||
|
// 处理数据
|
||||||
|
buffer.flip();
|
||||||
|
byte[] data = new byte[buffer.remaining()];
|
||||||
|
buffer.get(data);
|
||||||
|
System.out.println("Received: " + new String(data));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 4. Channel vs Stream
|
||||||
|
|
||||||
|
#### **核心区别**
|
||||||
|
|
||||||
|
| 特性 | Stream (IO) | Channel (NIO) |
|
||||||
|
|------|-------------|---------------|
|
||||||
|
| **方向** | 单向(读/写) | 双向(读+写) |
|
||||||
|
| **阻塞** | 阻塞 | 可配置阻塞/非阻塞 |
|
||||||
|
| **缓冲** | 直接操作流 | 必须通过 Buffer |
|
||||||
|
| **性能** | 较低 | 高(零拷贝) |
|
||||||
|
|
||||||
|
#### **Channel 类型**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 1. FileChannel - 文件通道
|
||||||
|
FileChannel fileChannel = FileChannel.open(Paths.get("data.txt"),
|
||||||
|
StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||||
|
|
||||||
|
// 2. SocketChannel - TCP Socket
|
||||||
|
SocketChannel socketChannel = SocketChannel.open();
|
||||||
|
socketChannel.configureBlocking(false);
|
||||||
|
socketChannel.connect(new InetSocketAddress("localhost", 8080));
|
||||||
|
|
||||||
|
// 3. ServerSocketChannel - TCP Server
|
||||||
|
ServerSocketChannel serverChannel = ServerSocketChannel.open();
|
||||||
|
serverChannel.bind(new InetSocketAddress(8080));
|
||||||
|
|
||||||
|
// 4. DatagramChannel - UDP
|
||||||
|
DatagramChannel datagramChannel = DatagramChannel.open();
|
||||||
|
datagramChannel.bind(new InetSocketAddress(8080));
|
||||||
|
|
||||||
|
// 5. Pipe.SinkChannel / Pipe.SourceChannel - 管道
|
||||||
|
Pipe pipe = Pipe.open();
|
||||||
|
Pipe.SinkChannel sinkChannel = pipe.sink();
|
||||||
|
Pipe.SourceChannel sourceChannel = pipe.source();
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **FileChannel 示例**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 文件复制(传统方式)
|
||||||
|
FileChannel sourceChannel = FileChannel.open(Paths.get("source.txt"));
|
||||||
|
FileChannel destChannel = FileChannel.open(Paths.get("dest.txt"),
|
||||||
|
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
|
||||||
|
|
||||||
|
// 方法1: 使用 Buffer
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||||
|
while (sourceChannel.read(buffer) != -1) {
|
||||||
|
buffer.flip();
|
||||||
|
destChannel.write(buffer);
|
||||||
|
buffer.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 方法2: 直接传输(零拷贝)
|
||||||
|
sourceChannel.transferTo(0, sourceChannel.size(), destChannel);
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 5. Buffer 核心属性
|
||||||
|
|
||||||
|
#### **Buffer 结构**
|
||||||
|
|
||||||
|
```
|
||||||
|
Buffer 核心属性:
|
||||||
|
|
||||||
|
┌───────┬──────────┬──────────┬─────────┐
|
||||||
|
│ 0 │position │ limit │capacity │
|
||||||
|
│ │ │ │ │
|
||||||
|
│ 已处理 可读/写数据 │ 不可访问 │
|
||||||
|
│ 数据 │ │ │
|
||||||
|
└───────┴──────────┴──────────┴─────────┘
|
||||||
|
↑ ↑ ↑
|
||||||
|
mark position limit
|
||||||
|
|
||||||
|
操作方法:
|
||||||
|
- mark(): 标记当前 position
|
||||||
|
- reset(): 恢复到 mark 位置
|
||||||
|
- clear(): position=0, limit=capacity
|
||||||
|
- flip(): limit=position, position=0 (读→写)
|
||||||
|
- rewind(): position=0, limit 不变
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **Buffer 使用流程**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 1. 分配 Buffer
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||||
|
|
||||||
|
// 初始状态: position=0, limit=1024, capacity=1024
|
||||||
|
|
||||||
|
// 2. 写入数据
|
||||||
|
buffer.putInt(123);
|
||||||
|
buffer.putLong(456L);
|
||||||
|
buffer.put("Hello".getBytes());
|
||||||
|
|
||||||
|
// 写入后: position=17, limit=1024
|
||||||
|
|
||||||
|
// 3. 切换到读模式
|
||||||
|
buffer.flip();
|
||||||
|
|
||||||
|
// flip 后: position=0, limit=17
|
||||||
|
|
||||||
|
// 4. 读取数据
|
||||||
|
while (buffer.hasRemaining()) {
|
||||||
|
byte b = buffer.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 读取后: position=17, limit=17
|
||||||
|
|
||||||
|
// 5. 清空 Buffer
|
||||||
|
buffer.clear();
|
||||||
|
|
||||||
|
// clear 后: position=0, limit=1024, capacity=1024
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **Buffer 类型**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 基本类型 Buffer
|
||||||
|
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
|
||||||
|
CharBuffer charBuffer = CharBuffer.allocate(1024);
|
||||||
|
ShortBuffer shortBuffer = ShortBuffer.allocate(1024);
|
||||||
|
IntBuffer intBuffer = IntBuffer.allocate(1024);
|
||||||
|
LongBuffer longBuffer = LongBuffer.allocate(1024);
|
||||||
|
FloatBuffer floatBuffer = FloatBuffer.allocate(1024);
|
||||||
|
DoubleBuffer doubleBuffer = DoubleBuffer.allocate(1024);
|
||||||
|
|
||||||
|
// 直接内存 vs 堆内存
|
||||||
|
ByteBuffer heapBuffer = ByteBuffer.allocate(1024); // 堆内存
|
||||||
|
ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024); // 直接内存
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 6. 非阻塞 I/O 实现
|
||||||
|
|
||||||
|
#### **阻塞 vs 非阻塞**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 阻塞模式(默认)
|
||||||
|
SocketChannel channel = SocketChannel.open();
|
||||||
|
channel.configureBlocking(true); // 阻塞
|
||||||
|
channel.connect(new InetSocketAddress("localhost", 8080));
|
||||||
|
// 阻塞直到连接建立
|
||||||
|
|
||||||
|
// 非阻塞模式
|
||||||
|
SocketChannel channel = SocketChannel.open();
|
||||||
|
channel.configureBlocking(false); // 非阻塞
|
||||||
|
channel.connect(new InetSocketAddress("localhost", 8080));
|
||||||
|
// 立即返回
|
||||||
|
|
||||||
|
while (!channel.finishConnect()) {
|
||||||
|
// 连接未完成,做其他事
|
||||||
|
System.out.println("Connecting...");
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **非阻塞读写**
|
||||||
|
|
||||||
|
```java
|
||||||
|
SocketChannel channel = SocketChannel.open();
|
||||||
|
channel.configureBlocking(false);
|
||||||
|
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||||
|
|
||||||
|
// 非阻塞读
|
||||||
|
int bytesRead = channel.read(buffer);
|
||||||
|
if (bytesRead == 0) {
|
||||||
|
// 没有数据可用
|
||||||
|
} else if (bytesRead == -1) {
|
||||||
|
// 连接已关闭
|
||||||
|
} else {
|
||||||
|
// 读取到数据
|
||||||
|
buffer.flip();
|
||||||
|
// 处理数据...
|
||||||
|
}
|
||||||
|
|
||||||
|
// 非阻塞写
|
||||||
|
buffer.clear();
|
||||||
|
buffer.put("Hello".getBytes());
|
||||||
|
buffer.flip();
|
||||||
|
|
||||||
|
int bytesWritten = channel.write(buffer);
|
||||||
|
if (bytesWritten == 0) {
|
||||||
|
// 缓冲区满,稍后重试
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 7. 零拷贝实现
|
||||||
|
|
||||||
|
#### **传统 I/O vs 零拷贝**
|
||||||
|
|
||||||
|
```
|
||||||
|
传统 I/O(4 次拷贝,4 次上下文切换):
|
||||||
|
1. 磁盘 → 内核缓冲区 (DMA)
|
||||||
|
2. 内核缓冲区 → 用户缓冲区 (CPU)
|
||||||
|
3. 用户缓冲区 → Socket 缓冲区 (CPU)
|
||||||
|
4. Socket 缓冲区 → 网卡 (DMA)
|
||||||
|
|
||||||
|
零拷贝(2 次拷贝,2 次上下文切换):
|
||||||
|
1. 磁盘 → 内核缓冲区 (DMA)
|
||||||
|
2. 内核缓冲区 → 网卡 (DMA)
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **transferTo 实现**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 文件传输零拷贝
|
||||||
|
FileChannel sourceChannel = FileChannel.open(Paths.get("source.txt"));
|
||||||
|
FileChannel destChannel = FileChannel.open(Paths.get("dest.txt"),
|
||||||
|
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
|
||||||
|
|
||||||
|
// 直接在内核空间传输
|
||||||
|
long position = 0;
|
||||||
|
long count = sourceChannel.size();
|
||||||
|
sourceChannel.transferTo(position, count, destChannel);
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **MappedByteBuffer(内存映射)**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 内存映射文件
|
||||||
|
FileChannel channel = FileChannel.open(Paths.get("data.txt"),
|
||||||
|
StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||||
|
|
||||||
|
// 映射到内存
|
||||||
|
MappedByteBuffer mappedBuffer = channel.map(
|
||||||
|
FileChannel.MapMode.READ_WRITE, // 读写模式
|
||||||
|
0, // 起始位置
|
||||||
|
channel.size() // 映射大小
|
||||||
|
);
|
||||||
|
|
||||||
|
// 直接操作内存(零拷贝)
|
||||||
|
mappedBuffer.putInt(0, 123);
|
||||||
|
int value = mappedBuffer.getInt(0);
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## P7 加分项
|
||||||
|
|
||||||
|
### 深度理解
|
||||||
|
- **多路复用原理**:理解 select、poll、epoll 的区别
|
||||||
|
- **零拷贝原理**:理解 DMA、用户空间、内核空间
|
||||||
|
- **内存管理**:堆内存 vs 直接内存,GC 影响
|
||||||
|
|
||||||
|
### 实战经验
|
||||||
|
- **高并发场景**:Netty、Mina、Vert.x 框架使用
|
||||||
|
- **文件处理**:大文件读写、内存映射文件
|
||||||
|
- **网络编程**:自定义协议、粘包处理
|
||||||
|
|
||||||
|
### 性能优化
|
||||||
|
- **Buffer 复用**:使用对象池减少 GC
|
||||||
|
- **直接内存**:减少一次拷贝,但分配/释放成本高
|
||||||
|
- **批量操作**:vectorized I/O(scatter/gather)
|
||||||
|
|
||||||
|
### 常见问题
|
||||||
|
1. **Buffer 泄漏**:直接内存未释放
|
||||||
|
2. **select 空转**:CPU 100% 问题
|
||||||
|
3. **epoll 空轮询**:Linux kernel bug
|
||||||
|
4. **文件描述符耗尽**:未关闭 Channel
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 总结
|
||||||
|
|
||||||
|
Java NIO 的核心优势:
|
||||||
|
1. **高性能**:非阻塞 I/O、零拷贝
|
||||||
|
2. **高并发**:单线程处理多连接
|
||||||
|
3. **灵活性**:可配置阻塞/非阻塞
|
||||||
|
4. **扩展性**:适合大规模分布式系统
|
||||||
|
|
||||||
|
**最佳实践**:
|
||||||
|
- 高并发场景优先使用 NIO(Netty)
|
||||||
|
- 大文件传输使用 transferTo
|
||||||
|
- 理解 Buffer 的读写模式切换
|
||||||
|
- 注意资源释放(Channel、Buffer)
|
||||||
|
- 监控文件描述符使用
|
||||||
803
10-中间件/Netty实战场景.md
Normal file
803
10-中间件/Netty实战场景.md
Normal file
@@ -0,0 +1,803 @@
|
|||||||
|
# Netty 实战场景与最佳实践
|
||||||
|
|
||||||
|
## 问题
|
||||||
|
|
||||||
|
1. 如何设计一个高性能的 RPC 框架?
|
||||||
|
2. 如何实现 WebSocket 服务器?
|
||||||
|
3. 如何实现百万连接的 IM 系统?
|
||||||
|
4. 如何处理 Netty 的内存泄漏?
|
||||||
|
5. 如何实现优雅关闭?
|
||||||
|
6. 如何设计心跳和重连机制?
|
||||||
|
7. 如何实现分布式 Session 共享?
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 标准答案
|
||||||
|
|
||||||
|
### 1. 高性能 RPC 框架设计
|
||||||
|
|
||||||
|
#### **架构设计**
|
||||||
|
|
||||||
|
```
|
||||||
|
┌──────────────────────────────────────────┐
|
||||||
|
│ RPC 框架架构 │
|
||||||
|
├──────────────────────────────────────────┤
|
||||||
|
│ Consumer Side │
|
||||||
|
│ ┌──────────┐ ┌──────────┐ │
|
||||||
|
│ │ Proxy │ │ Serializer│ │
|
||||||
|
│ │ (动态代理)│ │ (Protobuf) │ │
|
||||||
|
│ └────┬─────┘ └──────────┘ │
|
||||||
|
│ │ │
|
||||||
|
│ ▼ │
|
||||||
|
│ ┌─────────────────────────────────┐ │
|
||||||
|
│ │ Netty Client │ │
|
||||||
|
│ │ - Reconnect │ │
|
||||||
|
│ │ - Timeout │ │
|
||||||
|
│ │ - Heartbeat │ │
|
||||||
|
│ └──────────────┬──────────────────┘ │
|
||||||
|
│ │ │
|
||||||
|
│ │ TCP │
|
||||||
|
│ │ │
|
||||||
|
│ ┌──────────────▼──────────────────┐ │
|
||||||
|
│ │ Netty Server │ │
|
||||||
|
│ │ - Boss/Worker Groups │ │
|
||||||
|
│ │ - Pipeline │ │
|
||||||
|
│ │ - Handler Chain │ │
|
||||||
|
│ └──────────────┬──────────────────┘ │
|
||||||
|
│ │ │
|
||||||
|
│ ┌─────────┴─────────┐ │
|
||||||
|
│ ▼ ▼ │
|
||||||
|
│ ┌─────────┐ ┌──────────┐ │
|
||||||
|
│ │Service │ │ Registry │ │
|
||||||
|
│ │(实现) │ │ (服务发现)│ │
|
||||||
|
│ └─────────┘ └──────────┘ │
|
||||||
|
└──────────────────────────────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **协议设计**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// RPC 协议格式
|
||||||
|
┌───────────┬───────────┬──────────┬──────────┐
|
||||||
|
│ Magic │ Length │ Type │ Body │
|
||||||
|
│ Number │ (4 bytes) │ (1 byte) │ │
|
||||||
|
│ (4 bytes) │ │ │ │
|
||||||
|
└───────────┴───────────┴──────────┴──────────┘
|
||||||
|
|
||||||
|
// 实现
|
||||||
|
public class RpcProtocol {
|
||||||
|
|
||||||
|
private static final int MAGIC_NUMBER = 0xCAFEBABE;
|
||||||
|
private static final int HEADER_LENGTH = 9;
|
||||||
|
|
||||||
|
public static ByteBuf encode(byte[] body, byte type) {
|
||||||
|
ByteBuf buffer = Unpooled.buffer(HEADER_LENGTH + body.length);
|
||||||
|
|
||||||
|
// Magic Number (4 bytes)
|
||||||
|
buffer.writeInt(MAGIC_NUMBER);
|
||||||
|
|
||||||
|
// Length (4 bytes)
|
||||||
|
buffer.writeInt(body.length);
|
||||||
|
|
||||||
|
// Type (1 byte)
|
||||||
|
buffer.writeByte(type);
|
||||||
|
|
||||||
|
// Body
|
||||||
|
buffer.writeBytes(body);
|
||||||
|
|
||||||
|
return buffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static byte[] decode(ByteBuf buffer) {
|
||||||
|
// 检查 Magic Number
|
||||||
|
if (buffer.readInt() != MAGIC_NUMBER) {
|
||||||
|
throw new IllegalArgumentException("Invalid magic number");
|
||||||
|
}
|
||||||
|
|
||||||
|
// 读取 Length
|
||||||
|
int length = buffer.readInt();
|
||||||
|
|
||||||
|
// 读取 Type
|
||||||
|
byte type = buffer.readByte();
|
||||||
|
|
||||||
|
// 读取 Body
|
||||||
|
byte[] body = new byte[length];
|
||||||
|
buffer.readBytes(body);
|
||||||
|
|
||||||
|
return body;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **服务端实现**
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Component
|
||||||
|
public class RpcServer {
|
||||||
|
|
||||||
|
private EventLoopGroup bossGroup;
|
||||||
|
private EventLoopGroup workerGroup;
|
||||||
|
private Channel serverChannel;
|
||||||
|
|
||||||
|
public void start(int port) throws InterruptedException {
|
||||||
|
bossGroup = new NioEventLoopGroup(1);
|
||||||
|
workerGroup = new NioEventLoopGroup();
|
||||||
|
|
||||||
|
try {
|
||||||
|
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||||
|
bootstrap.group(bossGroup, workerGroup)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.option(ChannelOption.SO_BACKLOG, 1024)
|
||||||
|
.option(ChannelOption.SO_REUSEADDR, true)
|
||||||
|
.childOption(ChannelOption.TCP_NODELAY, true)
|
||||||
|
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||||
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) {
|
||||||
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
|
|
||||||
|
// 编解码器
|
||||||
|
pipeline.addLast("frameDecoder",
|
||||||
|
new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0));
|
||||||
|
pipeline.addLast("frameEncoder",
|
||||||
|
new LengthFieldPrepender(4));
|
||||||
|
|
||||||
|
// 协议编解码
|
||||||
|
pipeline.addLast("decoder", new RpcDecoder());
|
||||||
|
pipeline.addLast("encoder", new RpcEncoder());
|
||||||
|
|
||||||
|
// 心跳检测
|
||||||
|
pipeline.addLast("idleStateHandler",
|
||||||
|
new IdleStateHandler(60, 0, 0));
|
||||||
|
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
|
||||||
|
|
||||||
|
// 业务处理器
|
||||||
|
pipeline.addLast("handler", new RpcServerHandler());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 绑定端口并启动
|
||||||
|
ChannelFuture future = bootstrap.bind(port).sync();
|
||||||
|
serverChannel = future.channel();
|
||||||
|
|
||||||
|
System.out.println("RPC Server started on port " + port);
|
||||||
|
|
||||||
|
// 等待服务器 socket 关闭
|
||||||
|
serverChannel.closeFuture().sync();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() {
|
||||||
|
if (serverChannel != null) {
|
||||||
|
serverChannel.close();
|
||||||
|
}
|
||||||
|
if (workerGroup != null) {
|
||||||
|
workerGroup.shutdownGracefully();
|
||||||
|
}
|
||||||
|
if (bossGroup != null) {
|
||||||
|
bossGroup.shutdownGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@PreDestroy
|
||||||
|
public void destroy() {
|
||||||
|
shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **客户端实现**
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Component
|
||||||
|
public class RpcClient {
|
||||||
|
|
||||||
|
private EventLoopGroup workerGroup;
|
||||||
|
private Channel channel;
|
||||||
|
|
||||||
|
public void connect(String host, int port) throws InterruptedException {
|
||||||
|
workerGroup = new NioEventLoopGroup();
|
||||||
|
|
||||||
|
try {
|
||||||
|
Bootstrap bootstrap = new Bootstrap();
|
||||||
|
bootstrap.group(workerGroup)
|
||||||
|
.channel(NioSocketChannel.class)
|
||||||
|
.option(ChannelOption.TCP_NODELAY, true)
|
||||||
|
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
|
||||||
|
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||||
|
.handler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) {
|
||||||
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
|
|
||||||
|
// 编解码器
|
||||||
|
pipeline.addLast("frameDecoder",
|
||||||
|
new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0));
|
||||||
|
pipeline.addLast("frameEncoder",
|
||||||
|
new LengthFieldPrepender(4));
|
||||||
|
|
||||||
|
// 协议编解码
|
||||||
|
pipeline.addLast("decoder", new RpcDecoder());
|
||||||
|
pipeline.addLast("encoder", new RpcEncoder());
|
||||||
|
|
||||||
|
// 心跳检测
|
||||||
|
pipeline.addLast("idleStateHandler",
|
||||||
|
new IdleStateHandler(0, 30, 0));
|
||||||
|
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
|
||||||
|
|
||||||
|
// 业务处理器
|
||||||
|
pipeline.addLast("handler", new RpcClientHandler());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 连接服务器
|
||||||
|
ChannelFuture future = bootstrap.connect(host, port).sync();
|
||||||
|
channel = future.channel();
|
||||||
|
|
||||||
|
System.out.println("RPC Client connected to " + host + ":" + port);
|
||||||
|
|
||||||
|
// 等待连接关闭
|
||||||
|
channel.closeFuture().sync();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() {
|
||||||
|
if (channel != null) {
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
if (workerGroup != null) {
|
||||||
|
workerGroup.shutdownGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public <T> T sendRequest(RpcRequest request, Class<T> returnType) {
|
||||||
|
// 发送请求并等待响应
|
||||||
|
RpcClientHandler handler = channel.pipeline()
|
||||||
|
.get(RpcClientHandler.class);
|
||||||
|
|
||||||
|
return handler.sendRequest(request, returnType);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 2. WebSocket 服务器实现
|
||||||
|
|
||||||
|
#### **WebSocket 握手机制**
|
||||||
|
|
||||||
|
```
|
||||||
|
Client → Server: HTTP GET (Upgrade: websocket)
|
||||||
|
Server → Client: HTTP 101 (Switching Protocols)
|
||||||
|
Client → Server: WebSocket Frame
|
||||||
|
Server → Client: WebSocket Frame
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **服务器实现**
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Component
|
||||||
|
public class WebSocketServer {
|
||||||
|
|
||||||
|
private EventLoopGroup bossGroup;
|
||||||
|
private EventLoopGroup workerGroup;
|
||||||
|
|
||||||
|
public void start(int port) throws InterruptedException {
|
||||||
|
bossGroup = new NioEventLoopGroup(1);
|
||||||
|
workerGroup = new NioEventLoopGroup();
|
||||||
|
|
||||||
|
try {
|
||||||
|
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||||
|
bootstrap.group(bossGroup, workerGroup)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) {
|
||||||
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
|
|
||||||
|
// HTTP 编解码器
|
||||||
|
pipeline.addLast("httpCodec", new HttpServerCodec());
|
||||||
|
pipeline.addLast("httpAggregator",
|
||||||
|
new HttpObjectAggregator(65536));
|
||||||
|
|
||||||
|
// WebSocket 握手处理器
|
||||||
|
pipeline.addLast("handshakeHandler",
|
||||||
|
new WebSocketServerProtocolHandler("/ws"));
|
||||||
|
|
||||||
|
// 自定义 WebSocket 处理器
|
||||||
|
pipeline.addLast("handler", new WebSocketHandler());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
ChannelFuture future = bootstrap.bind(port).sync();
|
||||||
|
System.out.println("WebSocket Server started on port " + port);
|
||||||
|
|
||||||
|
future.channel().closeFuture().sync();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() {
|
||||||
|
if (workerGroup != null) {
|
||||||
|
workerGroup.shutdownGracefully();
|
||||||
|
}
|
||||||
|
if (bossGroup != null) {
|
||||||
|
bossGroup.shutdownGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **WebSocket Handler**
|
||||||
|
|
||||||
|
```java
|
||||||
|
@ChannelHandler.Sharable
|
||||||
|
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
|
||||||
|
|
||||||
|
// 存储所有连接的 Channel
|
||||||
|
private static final ChannelGroup channels =
|
||||||
|
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handlerAdded(ChannelHandlerContext ctx) {
|
||||||
|
channels.add(ctx.channel());
|
||||||
|
System.out.println("Client connected: " + ctx.channel().id());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handlerRemoved(ChannelHandlerContext ctx) {
|
||||||
|
channels.remove(ctx.channel());
|
||||||
|
System.out.println("Client disconnected: " + ctx.channel().id());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void channelRead0(ChannelHandlerContext ctx,
|
||||||
|
TextWebSocketFrame msg) {
|
||||||
|
// 接收消息
|
||||||
|
String message = msg.text();
|
||||||
|
System.out.println("Received: " + message);
|
||||||
|
|
||||||
|
// 广播消息给所有客户端
|
||||||
|
channels.writeAndFlush(new TextWebSocketFrame(
|
||||||
|
"Server: " + message
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||||
|
cause.printStackTrace();
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 3. 百万连接 IM 系统
|
||||||
|
|
||||||
|
#### **架构优化**
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Configuration
|
||||||
|
public class ImServerConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public EventLoopGroup bossGroup() {
|
||||||
|
// 1 个线程处理 Accept
|
||||||
|
return new NioEventLoopGroup(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public EventLoopGroup workerGroup() {
|
||||||
|
// 根据 CPU 核心数设置 I/O 线程数
|
||||||
|
// 通常设置为 CPU 核心数 * 2
|
||||||
|
int threads = Runtime.getRuntime().availableProcessors() * 2;
|
||||||
|
return new NioEventLoopGroup(threads);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ServerBootstrap serverBootstrap(
|
||||||
|
EventLoopGroup bossGroup,
|
||||||
|
EventLoopGroup workerGroup) {
|
||||||
|
|
||||||
|
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||||
|
bootstrap.group(bossGroup, workerGroup)
|
||||||
|
.channel(NioServerSocketChannel.class)
|
||||||
|
.option(ChannelOption.SO_BACKLOG, 8192) // 增大连接队列
|
||||||
|
.option(ChannelOption.SO_REUSEADDR, true)
|
||||||
|
.childOption(ChannelOption.TCP_NODELAY, true)
|
||||||
|
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||||
|
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024) // 32KB 发送缓冲
|
||||||
|
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 32KB 接收缓冲
|
||||||
|
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) {
|
||||||
|
ChannelPipeline pipeline = ch.pipeline();
|
||||||
|
|
||||||
|
// 连接数限制
|
||||||
|
pipeline.addLast("connectionLimiter",
|
||||||
|
new ConnectionLimiter(1000000));
|
||||||
|
|
||||||
|
// 流量整形
|
||||||
|
pipeline.addLast("trafficShaping",
|
||||||
|
new ChannelTrafficShapingHandler(
|
||||||
|
1024 * 1024, // 读限制 1MB/s
|
||||||
|
1024 * 1024 // 写限制 1MB/s
|
||||||
|
));
|
||||||
|
|
||||||
|
// 协议编解码
|
||||||
|
pipeline.addLast("frameDecoder",
|
||||||
|
new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 0));
|
||||||
|
pipeline.addLast("frameEncoder",
|
||||||
|
new LengthFieldPrepender(4));
|
||||||
|
|
||||||
|
// IM 协议处理器
|
||||||
|
pipeline.addLast("imHandler", new ImServerHandler());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return bootstrap;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **连接限流**
|
||||||
|
|
||||||
|
```java
|
||||||
|
public class ConnectionLimiter extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
|
private final int maxConnections;
|
||||||
|
private final AtomicInteger activeConnections = new AtomicInteger(0);
|
||||||
|
|
||||||
|
public ConnectionLimiter(int maxConnections) {
|
||||||
|
this.maxConnections = maxConnections;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
if (activeConnections.get() >= maxConnections) {
|
||||||
|
// 连接数超限,关闭新连接
|
||||||
|
ctx.writeAndFlush(new TextWebSocketFrame(
|
||||||
|
"Server is full, please try again later"
|
||||||
|
)).addListener(ChannelFutureListener.CLOSE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int count = activeConnections.incrementAndGet();
|
||||||
|
System.out.println("Active connections: " + count);
|
||||||
|
|
||||||
|
ctx.fireChannelRead(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) {
|
||||||
|
int count = activeConnections.decrementAndGet();
|
||||||
|
System.out.println("Active connections: " + count);
|
||||||
|
|
||||||
|
ctx.fireChannelInactive();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 4. 内存泄漏排查与解决
|
||||||
|
|
||||||
|
#### **常见泄漏场景**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// ❌ 错误示例 1: ByteBuf 未释放
|
||||||
|
public void handler(ChannelHandlerContext ctx, ByteBuf buf) {
|
||||||
|
byte[] data = new byte[buf.readableBytes()];
|
||||||
|
buf.readBytes(data);
|
||||||
|
// 忘记释放 buf
|
||||||
|
}
|
||||||
|
|
||||||
|
// ✅ 正确做法
|
||||||
|
public void handler(ChannelHandlerContext ctx, ByteBuf buf) {
|
||||||
|
try {
|
||||||
|
byte[] data = new byte[buf.readableBytes()];
|
||||||
|
buf.readBytes(data);
|
||||||
|
// 处理数据...
|
||||||
|
} finally {
|
||||||
|
ReferenceCountUtil.release(buf); // 释放 ByteBuf
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ❌ 错误示例 2: Handler 未移除
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) {
|
||||||
|
// 添加长生命周期的 Handler
|
||||||
|
ctx.pipeline().addLast("longLived", new LongLivedHandler());
|
||||||
|
}
|
||||||
|
|
||||||
|
// ✅ 正确做法
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) {
|
||||||
|
// 使用 @Sharable 注解
|
||||||
|
ctx.pipeline().addLast("longLived", SharableHandler.INSTANCE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ❌ 错误示例 3: Channel 引用未释放
|
||||||
|
public class ChannelHolder {
|
||||||
|
private static final Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public static void addChannel(String id, Channel channel) {
|
||||||
|
CHANNELS.put(id, channel); // 永久持有引用
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ✅ 正确做法
|
||||||
|
public class ChannelHolder {
|
||||||
|
private static final Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public static void addChannel(String id, Channel channel) {
|
||||||
|
CHANNELS.put(id, channel);
|
||||||
|
|
||||||
|
// 监听关闭事件,自动移除
|
||||||
|
channel.closeFuture().addListener((ChannelFutureListener) future -> {
|
||||||
|
CHANNELS.remove(id);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **内存泄漏检测**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 启用内存泄漏检测
|
||||||
|
// JVM 参数: -Dio.netty.leakDetection.level=paranoid
|
||||||
|
|
||||||
|
public class LeakDetection {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
// 检测级别
|
||||||
|
// DISABLED - 禁用
|
||||||
|
// SIMPLE - 采样检测(1%)
|
||||||
|
// ADVANCED - 采样检测,记录创建堆栈
|
||||||
|
// PARANOID - 全量检测
|
||||||
|
|
||||||
|
// 创建资源时使用
|
||||||
|
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024);
|
||||||
|
|
||||||
|
// 使用资源泄漏检测器访问器
|
||||||
|
ResourceLeakDetector<ByteBuf> detector =
|
||||||
|
new ResourceLeakDetector<ByteBuf>(ByteBuf.class,
|
||||||
|
ResourceLeakDetector.Level.PARANOID);
|
||||||
|
|
||||||
|
// 包装对象
|
||||||
|
ByteBuf wrappedBuffer = detector.wrap(buffer, "test-buffer");
|
||||||
|
|
||||||
|
// 使用完成后
|
||||||
|
wrappedBuffer.release(); // 如果未释放,会记录堆栈并警告
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 5. 优雅关闭实现
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Component
|
||||||
|
public class NettyGracefulShutdown implements SmartLifecycle {
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ServerBootstrap serverBootstrap;
|
||||||
|
|
||||||
|
private Channel serverChannel;
|
||||||
|
private volatile boolean running = false;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
running = true;
|
||||||
|
// 启动服务器...
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
if (!running) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
System.out.println("Shutting down Netty server gracefully...");
|
||||||
|
|
||||||
|
// 1. 停止接受新连接
|
||||||
|
if (serverChannel != null) {
|
||||||
|
serverChannel.close().syncUninterruptibly();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. 等待现有请求处理完成(超时 30 秒)
|
||||||
|
EventLoopGroup workerGroup = serverBootstrap.config().group();
|
||||||
|
Future<?> shutdownFuture = workerGroup.shutdownGracefully(2, 30, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 等待优雅关闭完成
|
||||||
|
shutdownFuture.await(60, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
if (shutdownFuture.isSuccess()) {
|
||||||
|
System.out.println("Netty server shutdown successfully");
|
||||||
|
} else {
|
||||||
|
System.err.println("Netty server shutdown timeout, force close");
|
||||||
|
workerGroup.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
workerGroup.shutdownNow();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. 关闭 Boss Group
|
||||||
|
EventLoopGroup bossGroup = serverBootstrap.config().childGroup();
|
||||||
|
bossGroup.shutdownGracefully();
|
||||||
|
|
||||||
|
running = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isRunning() {
|
||||||
|
return running;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 6. 心跳与重连机制
|
||||||
|
|
||||||
|
#### **心跳处理器**
|
||||||
|
|
||||||
|
```java
|
||||||
|
@ChannelHandler.Sharable
|
||||||
|
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
|
private static final ByteBuf HEARTBEAT_BEAT =
|
||||||
|
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||||
|
if (evt instanceof IdleStateEvent) {
|
||||||
|
IdleStateEvent event = (IdleStateEvent) evt;
|
||||||
|
|
||||||
|
if (event.state() == IdleState.READER_IDLE) {
|
||||||
|
// 读空闲,可能对方已挂掉
|
||||||
|
System.out.println("Read idle, closing connection");
|
||||||
|
ctx.close();
|
||||||
|
|
||||||
|
} else if (event.state() == IdleState.WRITER_IDLE) {
|
||||||
|
// 写空闲,发送心跳
|
||||||
|
System.out.println("Write idle, sending heartbeat");
|
||||||
|
ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate());
|
||||||
|
|
||||||
|
} else if (event.state() == IdleState.ALL_IDLE) {
|
||||||
|
// 读写空闲
|
||||||
|
System.out.println("All idle, sending heartbeat");
|
||||||
|
ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
// 处理心跳响应
|
||||||
|
if ("HEARTBEAT".equals(msg)) {
|
||||||
|
// 心跳包,不处理
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 业务消息
|
||||||
|
ctx.fireChannelRead(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **重连机制**
|
||||||
|
|
||||||
|
```java
|
||||||
|
@Component
|
||||||
|
public class ReconnectClient {
|
||||||
|
|
||||||
|
private EventLoopGroup workerGroup;
|
||||||
|
private Channel channel;
|
||||||
|
private ScheduledExecutorService scheduler;
|
||||||
|
|
||||||
|
public void connect(String host, int port) {
|
||||||
|
workerGroup = new NioEventLoopGroup();
|
||||||
|
scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
|
||||||
|
doConnect(host, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doConnect(String host, int port) {
|
||||||
|
Bootstrap bootstrap = new Bootstrap();
|
||||||
|
bootstrap.group(workerGroup)
|
||||||
|
.channel(NioSocketChannel.class)
|
||||||
|
.option(ChannelOption.TCP_NODELAY, true)
|
||||||
|
.handler(new ChannelInitializer<SocketChannel>() {
|
||||||
|
@Override
|
||||||
|
protected void initChannel(SocketChannel ch) {
|
||||||
|
ch.pipeline()
|
||||||
|
.addLast("idleStateHandler",
|
||||||
|
new IdleStateHandler(0, 10, 0))
|
||||||
|
.addLast("heartbeatHandler", new HeartbeatHandler())
|
||||||
|
.addLast("handler", new ClientHandler());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 连接服务器
|
||||||
|
ChannelFuture future = bootstrap.connect(host, port);
|
||||||
|
|
||||||
|
future.addListener((ChannelFutureListener) f -> {
|
||||||
|
if (f.isSuccess()) {
|
||||||
|
System.out.println("Connected to server");
|
||||||
|
channel = f.channel();
|
||||||
|
} else {
|
||||||
|
System.out.println("Connection failed, reconnect in 5 seconds...");
|
||||||
|
|
||||||
|
// 5 秒后重连
|
||||||
|
scheduler.schedule(() -> doConnect(host, port), 5, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void shutdown() {
|
||||||
|
if (channel != null) {
|
||||||
|
channel.close();
|
||||||
|
}
|
||||||
|
if (workerGroup != null) {
|
||||||
|
workerGroup.shutdownGracefully();
|
||||||
|
}
|
||||||
|
if (scheduler != null) {
|
||||||
|
scheduler.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## P7 加分项
|
||||||
|
|
||||||
|
### 深度理解
|
||||||
|
- **Reactor 模式**:理解主从 Reactor 多线程模型
|
||||||
|
- **零拷贝原理**:理解用户空间和内核空间
|
||||||
|
- **内存管理**:ByteBuf 的池化、引用计数、泄漏检测
|
||||||
|
|
||||||
|
### 实战经验
|
||||||
|
- **高并发调优**:EventLoopGroup 线程数、TCP 参数调优
|
||||||
|
- **监控告警**:连接数监控、流量监控、延迟监控
|
||||||
|
- **故障排查**:内存泄漏、CPU 100%、连接泄漏
|
||||||
|
|
||||||
|
### 架构设计
|
||||||
|
- **协议设计**:自定义协议、编解码器设计
|
||||||
|
- **集群部署**:负载均衡、服务发现
|
||||||
|
- **容错机制**:重连、熔断、限流
|
||||||
|
|
||||||
|
### 性能优化
|
||||||
|
1. **减少内存拷贝**:使用 CompositeByteBuf
|
||||||
|
2. **复用对象**:使用 ByteBuf 对象池
|
||||||
|
3. **优化 TCP 参数**:TCP_NODELAY、SO_KEEPALIVE
|
||||||
|
4. **流量控制**:ChannelTrafficShapingHandler
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 总结
|
||||||
|
|
||||||
|
Netty 实战核心要点:
|
||||||
|
1. **高性能**:Reactor 模型、零拷贝、内存池
|
||||||
|
2. **高可靠**:心跳、重连、优雅关闭
|
||||||
|
3. **可扩展**:Pipeline 机制、自定义协议
|
||||||
|
4. **可监控**:连接数、流量、延迟监控
|
||||||
|
|
||||||
|
**最佳实践**:
|
||||||
|
- 合理设置 EventLoopGroup 线程数
|
||||||
|
- 使用池化的 ByteBuf,及时释放
|
||||||
|
- 添加心跳和重连机制
|
||||||
|
- 实现优雅关闭
|
||||||
|
- 做好监控和告警
|
||||||
|
- 定期排查内存泄漏
|
||||||
539
10-中间件/Netty核心原理.md
Normal file
539
10-中间件/Netty核心原理.md
Normal file
@@ -0,0 +1,539 @@
|
|||||||
|
# Netty 核心原理
|
||||||
|
|
||||||
|
## 问题
|
||||||
|
|
||||||
|
1. Netty 的核心组件有哪些?
|
||||||
|
2. 什么是 EventLoop?线程模型是怎样的?
|
||||||
|
3. 什么是 ByteBuf?与 Java NIO 的 ByteBuffer 有什么区别?
|
||||||
|
4. Netty 的零拷贝是如何实现的?
|
||||||
|
5. 什么是 ChannelPipeline?ChannelHandler 是如何工作的?
|
||||||
|
6. Netty 如何解决 TCP 粘包/拆包问题?
|
||||||
|
7. Netty 的心跳机制是如何实现的?
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 标准答案
|
||||||
|
|
||||||
|
### 1. Netty 核心组件
|
||||||
|
|
||||||
|
#### **核心组件架构**
|
||||||
|
|
||||||
|
```
|
||||||
|
┌─────────────────────────────────────────────┐
|
||||||
|
│ Netty 架构 │
|
||||||
|
├─────────────────────────────────────────────┤
|
||||||
|
│ Channel │
|
||||||
|
│ ├─ EventLoop (线程模型) │
|
||||||
|
│ ├─ ChannelPipeline (处理器链) │
|
||||||
|
│ │ └─ ChannelHandler (业务处理器) │
|
||||||
|
│ └─ ByteBuf (内存管理) │
|
||||||
|
├─────────────────────────────────────────────┤
|
||||||
|
│ Bootstrap (启动类) │
|
||||||
|
│ ├─ ServerBootstrap (服务端) │
|
||||||
|
│ └─ Bootstrap (客户端) │
|
||||||
|
└─────────────────────────────────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **关键组件说明**
|
||||||
|
|
||||||
|
**1. Channel**
|
||||||
|
```java
|
||||||
|
// Channel 是 Netty 的网络操作抽象类
|
||||||
|
Channel channel = ...;
|
||||||
|
|
||||||
|
// 核心方法
|
||||||
|
channel.writeAndFlush(msg); // 写数据并刷新
|
||||||
|
channel.close(); // 关闭连接
|
||||||
|
channel.eventLoop(); // 获取 EventLoop
|
||||||
|
```
|
||||||
|
|
||||||
|
**2. EventLoop**
|
||||||
|
```java
|
||||||
|
// EventLoop 负责处理 I/O 操作
|
||||||
|
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // Accept
|
||||||
|
EventLoopGroup workerGroup = new NioEventLoopGroup(); // I/O
|
||||||
|
|
||||||
|
// bossGroup 只负责监听并接受连接
|
||||||
|
// workerGroup 负责 I/O 读写
|
||||||
|
```
|
||||||
|
|
||||||
|
**3. ChannelPipeline**
|
||||||
|
```java
|
||||||
|
// Pipeline 是处理器链
|
||||||
|
ChannelPipeline pipeline = channel.pipeline();
|
||||||
|
|
||||||
|
// 添加处理器
|
||||||
|
pipeline.addLast("decoder", new Decoder());
|
||||||
|
pipeline.addLast("encoder", new Encoder());
|
||||||
|
pipeline.addLast("handler", new BusinessHandler());
|
||||||
|
```
|
||||||
|
|
||||||
|
**4. ByteBuf**
|
||||||
|
```java
|
||||||
|
// ByteBuf 是 Netty 的字节容器
|
||||||
|
ByteBuf buffer = Unpooled.buffer(1024);
|
||||||
|
|
||||||
|
// 写入数据
|
||||||
|
buffer.writeInt(123);
|
||||||
|
buffer.writeBytes("Hello".getBytes());
|
||||||
|
|
||||||
|
// 读取数据
|
||||||
|
int value = buffer.readInt();
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 2. EventLoop 与线程模型
|
||||||
|
|
||||||
|
#### **Reactor 线程模型**
|
||||||
|
|
||||||
|
Netty 采用 **Reactor 模式**,支持三种实现:
|
||||||
|
|
||||||
|
```
|
||||||
|
1. Reactor 单线程模型
|
||||||
|
┌──────────────┐
|
||||||
|
│ Reactor │
|
||||||
|
│ (单线程) │
|
||||||
|
│ │
|
||||||
|
│ - Accept │
|
||||||
|
│ - Read │
|
||||||
|
│ - Decode │
|
||||||
|
│ - Process │
|
||||||
|
│ - Encode │
|
||||||
|
│ - Send │
|
||||||
|
└──────────────┘
|
||||||
|
|
||||||
|
2. Reactor 多线程模型
|
||||||
|
┌──────────┐ ┌──────────────────┐
|
||||||
|
│ Reactor │────>│ Worker Threads │
|
||||||
|
│ (主线程) │ │ (线程池) │
|
||||||
|
│ │ │ │
|
||||||
|
│ - Accept │ │ - Read/Write │
|
||||||
|
│ │ │ - Decode/Encode │
|
||||||
|
│ │ │ - Process │
|
||||||
|
└──────────┘ └──────────────────┘
|
||||||
|
|
||||||
|
3. 主从 Reactor 多线程模型(Netty 默认)
|
||||||
|
┌──────────┐ ┌──────────────────┐
|
||||||
|
│ Main │ │ Sub Reactors │
|
||||||
|
│ Reactor │────>│ (线程池) │
|
||||||
|
│ │ │ │
|
||||||
|
│ - Accept │ │ - Read/Write │
|
||||||
|
│ │ │ - Decode/Encode │
|
||||||
|
│ │ │ - Process │
|
||||||
|
└──────────┘ └──────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **EventLoop 工作原理**
|
||||||
|
|
||||||
|
```java
|
||||||
|
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
||||||
|
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
||||||
|
|
||||||
|
// 每个 EventLoop:
|
||||||
|
// 1. 维护一个 Selector
|
||||||
|
// 2. 维护一个任务队列
|
||||||
|
// 3. 维护一个线程
|
||||||
|
|
||||||
|
// 工作流程:
|
||||||
|
// - 线程启动后,无限循环执行:
|
||||||
|
// 1. select() - 查询就绪的 Channel
|
||||||
|
// 2. processSelectedKeys() - 处理 I/O 事件
|
||||||
|
// 3. runAllTasks() - 执行任务队列中的任务
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **任务调度**
|
||||||
|
|
||||||
|
```java
|
||||||
|
Channel channel = ...;
|
||||||
|
|
||||||
|
// 在 EventLoop 线程中执行任务
|
||||||
|
channel.eventLoop().execute(() -> {
|
||||||
|
System.out.println("Task in EventLoop thread");
|
||||||
|
});
|
||||||
|
|
||||||
|
// 定时任务
|
||||||
|
channel.eventLoop().schedule(() -> {
|
||||||
|
System.out.println("Delayed task");
|
||||||
|
}, 1, TimeUnit.SECONDS);
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 3. ByteBuf vs ByteBuffer
|
||||||
|
|
||||||
|
#### **对比表**
|
||||||
|
|
||||||
|
| 特性 | Java NIO ByteBuffer | Netty ByteBuf |
|
||||||
|
|------|---------------------|---------------|
|
||||||
|
| **长度** | 固定长度,扩容复杂 | 动态扩容 |
|
||||||
|
| **读写模式** | flip() 切换 | 独立的读写索引 |
|
||||||
|
| **引用计数** | 不支持 | 支持(池化) |
|
||||||
|
| **零拷贝** | 不支持 | 支持(Composite) |
|
||||||
|
| **性能** | 较低 | 高(对象池优化) |
|
||||||
|
|
||||||
|
#### **ByteBuf 三个重要指针**
|
||||||
|
|
||||||
|
```
|
||||||
|
ByteBuf 结构:
|
||||||
|
┌───────┬──────────┬──────────┬─────────┐
|
||||||
|
│ 0 │ readerIndex│ writerIndex│ capacity│
|
||||||
|
│ │ │ │ │
|
||||||
|
│ discard│ readable│ writable │ │
|
||||||
|
│ bytes │ bytes │ bytes │ │
|
||||||
|
└───────┴──────────┴──────────┴─────────┘
|
||||||
|
|
||||||
|
操作:
|
||||||
|
- mark() / reset(): 标记和重置
|
||||||
|
- readerIndex(): 读指针
|
||||||
|
- writerIndex(): 写指针
|
||||||
|
- capacity(): 容量
|
||||||
|
- maxCapacity(): 最大容量
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **使用示例**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 创建 ByteBuf
|
||||||
|
ByteBuf buffer = Unpooled.buffer(1024);
|
||||||
|
|
||||||
|
// 写入数据
|
||||||
|
buffer.writeBytes("Hello".getBytes());
|
||||||
|
buffer.writeInt(123);
|
||||||
|
|
||||||
|
// 读取数据(不移动读指针)
|
||||||
|
byte[] data = new byte[5];
|
||||||
|
buffer.getBytes(buffer.readerIndex(), data);
|
||||||
|
|
||||||
|
// 读取数据(移动读指针)
|
||||||
|
buffer.readBytes(5);
|
||||||
|
|
||||||
|
// 引用计数(池化)
|
||||||
|
buffer.retain(); // 引用计数 +1
|
||||||
|
buffer.release(); // 引用计数 -1
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 4. 零拷贝实现
|
||||||
|
|
||||||
|
#### **零拷贝的三种实现**
|
||||||
|
|
||||||
|
**1. CompositeByteBuf(组合缓冲区)**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 将多个 ByteBuf 组合成一个逻辑上的 ByteBuf
|
||||||
|
ByteBuf header = Unpooled.buffer(10);
|
||||||
|
ByteBuf body = Unpooled.buffer(100);
|
||||||
|
|
||||||
|
// 零拷贝组合
|
||||||
|
CompositeByteBuf message = Unpooled.wrappedBuffer(header, body);
|
||||||
|
|
||||||
|
// 物理上不复制数据,只是逻辑组合
|
||||||
|
```
|
||||||
|
|
||||||
|
**2. wrappedBuffer(包装)**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 包装字节数组,不复制
|
||||||
|
byte[] bytes = "Hello Netty".getBytes();
|
||||||
|
ByteBuf buffer = Unpooled.wrappedBuffer(bytes);
|
||||||
|
|
||||||
|
// 底层数据共享,无拷贝
|
||||||
|
```
|
||||||
|
|
||||||
|
**3. slice(切片)**
|
||||||
|
|
||||||
|
```java
|
||||||
|
ByteBuf buffer = Unpooled.buffer(100);
|
||||||
|
|
||||||
|
// 切片,共享底层内存
|
||||||
|
ByteBuf sliced = buffer.slice(0, 50);
|
||||||
|
|
||||||
|
// 修改会互相影响
|
||||||
|
sliced.setByte(0, (byte) 1);
|
||||||
|
```
|
||||||
|
|
||||||
|
**4. FileChannel.transferTo(文件传输)**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 文件传输零拷贝
|
||||||
|
FileChannel sourceChannel = ...;
|
||||||
|
FileChannel destChannel = ...;
|
||||||
|
|
||||||
|
// 直接在内核空间传输,不经过用户空间
|
||||||
|
sourceChannel.transferTo(position, count, destChannel);
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 5. ChannelPipeline 与 ChannelHandler
|
||||||
|
|
||||||
|
#### **Pipeline 工作流程**
|
||||||
|
|
||||||
|
```
|
||||||
|
入站事件 (Inbound):
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
┌─────────────────────────────────────┐
|
||||||
|
│ ChannelPipeline │
|
||||||
|
│ │
|
||||||
|
│ ┌─────────┐ ┌─────────┐ ┌──────┐│
|
||||||
|
│ │Decoder1 │→ │Decoder2 │→ │Handler││
|
||||||
|
│ └─────────┘ └─────────┘ └──────┘│
|
||||||
|
│ ▲ ▲ ▲ │
|
||||||
|
│ └────────────┴─────────────┘ │
|
||||||
|
│ (数据流) │
|
||||||
|
└─────────────────────────────────────┘
|
||||||
|
|
||||||
|
出站事件 (Outbound):
|
||||||
|
│
|
||||||
|
▼
|
||||||
|
┌─────────────────────────────────────┐
|
||||||
|
│ ChannelPipeline │
|
||||||
|
│ │
|
||||||
|
│ ┌──────┐ ┌─────────┐ ┌─────────┐│
|
||||||
|
│ │Handler│→ │Encoder1 │→ │Encoder2 ││
|
||||||
|
│ └──────┘ └─────────┘ └─────────┘│
|
||||||
|
│ ▲ ▲ ▲ │
|
||||||
|
│ └────────────┴─────────────┘ │
|
||||||
|
│ (数据流) │
|
||||||
|
└─────────────────────────────────────┘
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **ChannelHandler 接口**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 入站处理器
|
||||||
|
@ChannelHandler.Sharable
|
||||||
|
public class InboundHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
// 处理读事件
|
||||||
|
System.out.println("Received: " + msg);
|
||||||
|
ctx.fireChannelRead(msg); // 传递给下一个 Handler
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelActive(ChannelHandlerContext ctx) {
|
||||||
|
// 连接建立
|
||||||
|
System.out.println("Client connected");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelInactive(ChannelHandlerContext ctx) {
|
||||||
|
// 连接断开
|
||||||
|
System.out.println("Client disconnected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 出站处理器
|
||||||
|
public class OutboundHandler extends ChannelOutboundHandlerAdapter {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void write(ChannelHandlerContext ctx, Object msg,
|
||||||
|
ChannelPromise promise) {
|
||||||
|
// 处理写事件
|
||||||
|
System.out.println("Sending: " + msg);
|
||||||
|
ctx.write(msg, promise);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **Handler 生命周期**
|
||||||
|
|
||||||
|
```java
|
||||||
|
public interface ChannelHandler {
|
||||||
|
|
||||||
|
// 添加到 Pipeline
|
||||||
|
void handlerAdded(ChannelHandlerContext ctx);
|
||||||
|
|
||||||
|
// 从 Pipeline 移除
|
||||||
|
void handlerRemoved(ChannelHandlerContext ctx);
|
||||||
|
|
||||||
|
// 发生异常
|
||||||
|
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause);
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 6. TCP 粘包/拆包解决方案
|
||||||
|
|
||||||
|
#### **问题原因**
|
||||||
|
|
||||||
|
```
|
||||||
|
TCP 粘包:
|
||||||
|
发送方: [包1] [包2] [包3]
|
||||||
|
接收方: [包1+包2+包3] // 粘在一起
|
||||||
|
|
||||||
|
TCP 拆包:
|
||||||
|
发送方: [大包]
|
||||||
|
接收方: [片段1] [片段2] // 被拆分
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **Netty 提供的解码器**
|
||||||
|
|
||||||
|
**1. FixedLengthFrameDecoder(固定长度)**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 每个消息固定 10 字节
|
||||||
|
pipeline.addLast("framer", new FixedLengthFrameDecoder(10));
|
||||||
|
```
|
||||||
|
|
||||||
|
**2. LineBasedFrameDecoder(分隔符)**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 按换行符分割
|
||||||
|
pipeline.addLast("framer", new LineBasedFrameDecoder(8192));
|
||||||
|
```
|
||||||
|
|
||||||
|
**3. DelimiterBasedFrameDecoder(自定义分隔符)**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 自定义分隔符
|
||||||
|
ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
|
||||||
|
pipeline.addLast("framer",
|
||||||
|
new DelimiterBasedFrameDecoder(8192, delimiter));
|
||||||
|
```
|
||||||
|
|
||||||
|
**4. LengthFieldBasedFrameDecoder(长度字段)**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 消息格式: [长度(4字节)] [数据]
|
||||||
|
pipeline.addLast("framer",
|
||||||
|
new LengthFieldBasedFrameDecoder(
|
||||||
|
8192, // maxFrameLength
|
||||||
|
0, // lengthFieldOffset
|
||||||
|
4, // lengthFieldLength
|
||||||
|
0, // lengthAdjustment
|
||||||
|
0 // initialBytesToStrip
|
||||||
|
));
|
||||||
|
```
|
||||||
|
|
||||||
|
**5. 自定义解码器**
|
||||||
|
|
||||||
|
```java
|
||||||
|
public class CustomDecoder extends ByteToMessageDecoder {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void decode(ChannelHandlerContext ctx,
|
||||||
|
ByteBuf in, List<Object> out) {
|
||||||
|
// 检查是否有足够数据
|
||||||
|
if (in.readableBytes() < 4) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 标记读指针
|
||||||
|
in.markReaderIndex();
|
||||||
|
|
||||||
|
// 读取长度
|
||||||
|
int length = in.readInt();
|
||||||
|
|
||||||
|
// 检查数据是否完整
|
||||||
|
if (in.readableBytes() < length) {
|
||||||
|
in.resetReaderIndex();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 读取数据
|
||||||
|
byte[] data = new byte[length];
|
||||||
|
in.readBytes(data);
|
||||||
|
|
||||||
|
out.add(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 7. 心跳机制
|
||||||
|
|
||||||
|
#### **IdleStateHandler 实现**
|
||||||
|
|
||||||
|
```java
|
||||||
|
// 读写空闲检测
|
||||||
|
pipeline.addLast("idleStateHandler",
|
||||||
|
new IdleStateHandler(
|
||||||
|
30, // 读空闲时间(秒)
|
||||||
|
0, // 写空闲时间
|
||||||
|
0 // 读写空闲时间
|
||||||
|
));
|
||||||
|
|
||||||
|
// 心跳处理器
|
||||||
|
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
|
||||||
|
```
|
||||||
|
|
||||||
|
#### **心跳处理器**
|
||||||
|
|
||||||
|
```java
|
||||||
|
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||||
|
if (evt instanceof IdleStateEvent) {
|
||||||
|
IdleStateEvent event = (IdleStateEvent) evt;
|
||||||
|
|
||||||
|
if (event.state() == IdleState.READER_IDLE) {
|
||||||
|
// 读空闲,关闭连接
|
||||||
|
System.out.println("Read idle, close connection");
|
||||||
|
ctx.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||||
|
// 处理心跳包
|
||||||
|
if ("PING".equals(msg)) {
|
||||||
|
ctx.writeAndFlush("PONG");
|
||||||
|
} else {
|
||||||
|
ctx.fireChannelRead(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## P7 加分项
|
||||||
|
|
||||||
|
### 深度理解
|
||||||
|
- **Reactor 模式**:理解单线程、多线程、主从多线程三种模型
|
||||||
|
- **零拷贝原理**:理解用户空间和内核空间的数据拷贝
|
||||||
|
- **内存管理**:ByteBuf 的池化、引用计数、内存泄漏
|
||||||
|
|
||||||
|
### 实战经验
|
||||||
|
- **高并发场景**:EventLoopGroup 线程数调优
|
||||||
|
- **内存调优**:ByteBuf 分配器选择(堆内存 vs 直接内存)
|
||||||
|
- **性能优化**:避免 Handler 链过长、减少对象创建
|
||||||
|
|
||||||
|
### 架构设计
|
||||||
|
- **协议设计**:自定义协议、编解码器设计
|
||||||
|
- **异常处理**:优雅关闭、重连机制
|
||||||
|
- **监控告警**:流量监控、连接数监控、延迟监控
|
||||||
|
|
||||||
|
### 常见问题
|
||||||
|
1. **内存泄漏**:ByteBuf 未释放
|
||||||
|
2. **线程安全**:共享状态的同步
|
||||||
|
3. **连接池**:Channel 复用与管理
|
||||||
|
4. **背压处理**:流量控制与慢消费
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 总结
|
||||||
|
|
||||||
|
Netty 的核心优势:
|
||||||
|
1. **高性能**:零拷贝、内存池、Reactor 模式
|
||||||
|
2. **高可靠性**:心跳机制、重连机制、优雅关闭
|
||||||
|
3. **易扩展**:Pipeline 机制、丰富的 Handler
|
||||||
|
4. **成熟稳定**:广泛应用在 Dubbo、gRPC、WebSocket
|
||||||
|
|
||||||
|
**最佳实践**:
|
||||||
|
- 合理设置 EventLoopGroup 线程数
|
||||||
|
- 使用池化的 ByteBuf
|
||||||
|
- 及时释放 ByteBuf,避免内存泄漏
|
||||||
|
- 使用 LengthFieldBasedFrameDecoder 处理粘包
|
||||||
|
- 添加心跳和重连机制
|
||||||
|
- 做好监控和日志
|
||||||
Reference in New Issue
Block a user