Compare commits
2 Commits
4247e0700d
...
7aa971f511
| Author | SHA1 | Date | |
|---|---|---|---|
| 7aa971f511 | |||
| b773a4fa83 |
12
.obsidian/workspace.json
vendored
12
.obsidian/workspace.json
vendored
@@ -194,8 +194,12 @@
|
||||
"obsidian-git:Open Git source control": false
|
||||
}
|
||||
},
|
||||
"active": "fcbc762a80282002",
|
||||
"active": "16a7ce8de420dd10",
|
||||
"lastOpenFiles": [
|
||||
"10-中间件/Netty实战场景.md",
|
||||
"10-中间件/Java NIO核心原理.md",
|
||||
"10-中间件/Netty核心原理.md",
|
||||
"10-中间件",
|
||||
"16-LeetCode Hot 100/从前序与中序遍历序列构造二叉树.md",
|
||||
"16-LeetCode Hot 100/路径总和.md",
|
||||
"16-LeetCode Hot 100/对称二叉树.md",
|
||||
@@ -221,16 +225,12 @@
|
||||
"16-LeetCode Hot 100",
|
||||
"00-项目概述/项目概述.md",
|
||||
"00-项目概述",
|
||||
"questions/04-消息队列/消息队列_RocketMQ_Kafka.md",
|
||||
"questions/05-并发编程/ConcurrentHashMap原理.md",
|
||||
"questions/02-数据库/MySQL主从延迟.md",
|
||||
"questions/15-简历面试",
|
||||
"questions/14-Web3与区块链",
|
||||
"12-面试技巧",
|
||||
"08-算法与数据结构",
|
||||
"questions/13-Golang语言",
|
||||
"questions/12-面试技巧",
|
||||
"questions/11-运维",
|
||||
"questions/10-中间件"
|
||||
"questions/11-运维"
|
||||
]
|
||||
}
|
||||
527
10-中间件/Java NIO核心原理.md
Normal file
527
10-中间件/Java NIO核心原理.md
Normal file
@@ -0,0 +1,527 @@
|
||||
# Java NIO 核心原理
|
||||
|
||||
## 问题
|
||||
|
||||
1. 什么是 NIO?与 BIO 的区别是什么?
|
||||
2. NIO 的三大核心组件是什么?
|
||||
3. 什么是 Selector?如何实现多路复用?
|
||||
4. 什么是 Channel?与 Stream 的区别?
|
||||
5. 什么是 Buffer?如何理解 Buffer 的核心属性?
|
||||
6. NIO 如何实现非阻塞 I/O?
|
||||
7. 什么是零拷贝?如何实现?
|
||||
|
||||
---
|
||||
|
||||
## 标准答案
|
||||
|
||||
### 1. NIO vs BIO
|
||||
|
||||
#### **对比表**
|
||||
|
||||
| 特性 | BIO (Blocking I/O) | NIO (Non-blocking I/O) |
|
||||
|------|-------------------|----------------------|
|
||||
| **I/O 模型** | 阻塞 | 非阻塞 |
|
||||
| **线程模型** | 每连接一线程 | Reactor 模式 |
|
||||
| **并发能力** | 低 | 高 |
|
||||
| **编程复杂度** | 简单 | 复杂 |
|
||||
| **数据操作** | Stream | Channel + Buffer |
|
||||
| **适用场景** | 连接数少、高延迟 | 连接数多、高并发 |
|
||||
|
||||
#### **代码对比**
|
||||
|
||||
**BIO 实现**:
|
||||
```java
|
||||
// 传统 BIO - 阻塞式
|
||||
ServerSocket serverSocket = new ServerSocket(8080);
|
||||
|
||||
while (true) {
|
||||
// 阻塞等待连接
|
||||
Socket socket = serverSocket.accept();
|
||||
|
||||
// 每个连接一个线程
|
||||
new Thread(() -> {
|
||||
try {
|
||||
BufferedReader reader = new BufferedReader(
|
||||
new InputStreamReader(socket.getInputStream()));
|
||||
|
||||
// 阻塞读取数据
|
||||
String line = reader.readLine();
|
||||
|
||||
// 处理数据...
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}).start();
|
||||
}
|
||||
```
|
||||
|
||||
**NIO 实现**:
|
||||
```java
|
||||
// NIO - 非阻塞式
|
||||
ServerSocketChannel serverChannel = ServerSocketChannel.open();
|
||||
serverChannel.configureBlocking(false);
|
||||
serverChannel.bind(new InetSocketAddress(8080));
|
||||
|
||||
Selector selector = Selector.open();
|
||||
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||
|
||||
while (true) {
|
||||
// 非阻塞等待事件
|
||||
int readyCount = selector.select();
|
||||
|
||||
Set<SelectionKey> readyKeys = selector.selectedKeys();
|
||||
for (SelectionKey key : readyKeys) {
|
||||
if (key.isAcceptable()) {
|
||||
// 处理连接
|
||||
}
|
||||
if (key.isReadable()) {
|
||||
// 处理读
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 2. NIO 三大核心组件
|
||||
|
||||
```
|
||||
┌────────────────────────────────────────┐
|
||||
│ Java NIO 架构 │
|
||||
├────────────────────────────────────────┤
|
||||
│ │
|
||||
│ ┌─────────┐ ┌─────────┐ │
|
||||
│ │Channel │◄──►│ Buffer │ │
|
||||
│ └────┬────┘ └─────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌─────────┐ │
|
||||
│ │Selector │ │
|
||||
│ │(多路复用)│ │
|
||||
│ └─────────┘ │
|
||||
│ │
|
||||
└────────────────────────────────────────┘
|
||||
|
||||
Channel: 数据通道(双向)
|
||||
Buffer: 数据容器
|
||||
Selector: 多路复用器
|
||||
```
|
||||
|
||||
#### **核心组件关系**
|
||||
|
||||
```java
|
||||
// 1. 打开 Channel
|
||||
FileChannel channel = FileChannel.open(Paths.get("data.txt"));
|
||||
|
||||
// 2. 分配 Buffer
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||
|
||||
// 3. 读取数据到 Buffer
|
||||
channel.read(buffer);
|
||||
|
||||
// 4. 切换读写模式
|
||||
buffer.flip();
|
||||
|
||||
// 5. 处理数据
|
||||
while (buffer.hasRemaining()) {
|
||||
byte b = buffer.get();
|
||||
}
|
||||
|
||||
// 6. 清空 Buffer
|
||||
buffer.clear();
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 3. Selector 多路复用
|
||||
|
||||
#### **工作原理**
|
||||
|
||||
```
|
||||
Selector 多路复用模型:
|
||||
|
||||
┌──────────────────────────────────┐
|
||||
│ Selector │
|
||||
│ │
|
||||
│ select() - 阻塞等待就绪事件 │
|
||||
│ │
|
||||
│ ┌────┐ ┌────┐ ┌────┐ │
|
||||
│ │Ch1 │ │Ch2 │ │Ch3 │ ... │
|
||||
│ │就绪│ │就绪│ │就绪│ │
|
||||
│ └────┘ └────┘ └────┘ │
|
||||
└──────────────────────────────────┘
|
||||
▲
|
||||
│ one thread
|
||||
│
|
||||
┌─────────┴──────────┐
|
||||
│ Event Loop │
|
||||
│ - select() │
|
||||
│ - process() │
|
||||
│ - repeat │
|
||||
└────────────────────┘
|
||||
```
|
||||
|
||||
#### **SelectionKey 事件类型**
|
||||
|
||||
```java
|
||||
// 四种事件类型
|
||||
int OP_ACCEPT = 1 << 4; // 连接就绪(ServerSocketChannel)
|
||||
int OP_CONNECT = 1 << 3; // 连接完成(SocketChannel)
|
||||
int OP_READ = 1 << 0; // 读就绪
|
||||
int OP_WRITE = 1 << 2; // 写就绪
|
||||
|
||||
// 注册事件
|
||||
SelectionKey key = channel.register(selector,
|
||||
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
|
||||
|
||||
// 判断事件类型
|
||||
if (key.isAcceptable()) { // 连接事件
|
||||
if (key.isReadable()) { // 读事件
|
||||
if (key.isWritable()) { // 写事件
|
||||
if (key.isConnectable()) { // 连接完成事件
|
||||
```
|
||||
|
||||
#### **完整示例**
|
||||
|
||||
```java
|
||||
// NIO Server
|
||||
ServerSocketChannel serverChannel = ServerSocketChannel.open();
|
||||
serverChannel.configureBlocking(false);
|
||||
serverChannel.bind(new InetSocketAddress(8080));
|
||||
|
||||
Selector selector = Selector.open();
|
||||
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
|
||||
|
||||
while (true) {
|
||||
// 阻塞等待事件(超时 1 秒)
|
||||
int readyCount = selector.select(1000);
|
||||
|
||||
if (readyCount == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 获取就绪的 SelectionKey
|
||||
Set<SelectionKey> readyKeys = selector.selectedKeys();
|
||||
Iterator<SelectionKey> iterator = readyKeys.iterator();
|
||||
|
||||
while (iterator.hasNext()) {
|
||||
SelectionKey key = iterator.next();
|
||||
iterator.remove(); // 移除已处理的 key
|
||||
|
||||
if (!key.isValid()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 处理连接事件
|
||||
if (key.isAcceptable()) {
|
||||
ServerSocketChannel server = (ServerSocketChannel) key.channel();
|
||||
SocketChannel channel = server.accept();
|
||||
channel.configureBlocking(false);
|
||||
channel.register(selector, SelectionKey.OP_READ);
|
||||
}
|
||||
|
||||
// 处理读事件
|
||||
if (key.isReadable()) {
|
||||
SocketChannel channel = (SocketChannel) key.channel();
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||
int bytesRead = channel.read(buffer);
|
||||
|
||||
if (bytesRead == -1) {
|
||||
// 连接关闭
|
||||
key.cancel();
|
||||
channel.close();
|
||||
} else {
|
||||
// 处理数据
|
||||
buffer.flip();
|
||||
byte[] data = new byte[buffer.remaining()];
|
||||
buffer.get(data);
|
||||
System.out.println("Received: " + new String(data));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 4. Channel vs Stream
|
||||
|
||||
#### **核心区别**
|
||||
|
||||
| 特性 | Stream (IO) | Channel (NIO) |
|
||||
|------|-------------|---------------|
|
||||
| **方向** | 单向(读/写) | 双向(读+写) |
|
||||
| **阻塞** | 阻塞 | 可配置阻塞/非阻塞 |
|
||||
| **缓冲** | 直接操作流 | 必须通过 Buffer |
|
||||
| **性能** | 较低 | 高(零拷贝) |
|
||||
|
||||
#### **Channel 类型**
|
||||
|
||||
```java
|
||||
// 1. FileChannel - 文件通道
|
||||
FileChannel fileChannel = FileChannel.open(Paths.get("data.txt"),
|
||||
StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||
|
||||
// 2. SocketChannel - TCP Socket
|
||||
SocketChannel socketChannel = SocketChannel.open();
|
||||
socketChannel.configureBlocking(false);
|
||||
socketChannel.connect(new InetSocketAddress("localhost", 8080));
|
||||
|
||||
// 3. ServerSocketChannel - TCP Server
|
||||
ServerSocketChannel serverChannel = ServerSocketChannel.open();
|
||||
serverChannel.bind(new InetSocketAddress(8080));
|
||||
|
||||
// 4. DatagramChannel - UDP
|
||||
DatagramChannel datagramChannel = DatagramChannel.open();
|
||||
datagramChannel.bind(new InetSocketAddress(8080));
|
||||
|
||||
// 5. Pipe.SinkChannel / Pipe.SourceChannel - 管道
|
||||
Pipe pipe = Pipe.open();
|
||||
Pipe.SinkChannel sinkChannel = pipe.sink();
|
||||
Pipe.SourceChannel sourceChannel = pipe.source();
|
||||
```
|
||||
|
||||
#### **FileChannel 示例**
|
||||
|
||||
```java
|
||||
// 文件复制(传统方式)
|
||||
FileChannel sourceChannel = FileChannel.open(Paths.get("source.txt"));
|
||||
FileChannel destChannel = FileChannel.open(Paths.get("dest.txt"),
|
||||
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
|
||||
|
||||
// 方法1: 使用 Buffer
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||
while (sourceChannel.read(buffer) != -1) {
|
||||
buffer.flip();
|
||||
destChannel.write(buffer);
|
||||
buffer.clear();
|
||||
}
|
||||
|
||||
// 方法2: 直接传输(零拷贝)
|
||||
sourceChannel.transferTo(0, sourceChannel.size(), destChannel);
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 5. Buffer 核心属性
|
||||
|
||||
#### **Buffer 结构**
|
||||
|
||||
```
|
||||
Buffer 核心属性:
|
||||
|
||||
┌───────┬──────────┬──────────┬─────────┐
|
||||
│ 0 │position │ limit │capacity │
|
||||
│ │ │ │ │
|
||||
│ 已处理 可读/写数据 │ 不可访问 │
|
||||
│ 数据 │ │ │
|
||||
└───────┴──────────┴──────────┴─────────┘
|
||||
↑ ↑ ↑
|
||||
mark position limit
|
||||
|
||||
操作方法:
|
||||
- mark(): 标记当前 position
|
||||
- reset(): 恢复到 mark 位置
|
||||
- clear(): position=0, limit=capacity
|
||||
- flip(): limit=position, position=0 (读→写)
|
||||
- rewind(): position=0, limit 不变
|
||||
```
|
||||
|
||||
#### **Buffer 使用流程**
|
||||
|
||||
```java
|
||||
// 1. 分配 Buffer
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||
|
||||
// 初始状态: position=0, limit=1024, capacity=1024
|
||||
|
||||
// 2. 写入数据
|
||||
buffer.putInt(123);
|
||||
buffer.putLong(456L);
|
||||
buffer.put("Hello".getBytes());
|
||||
|
||||
// 写入后: position=17, limit=1024
|
||||
|
||||
// 3. 切换到读模式
|
||||
buffer.flip();
|
||||
|
||||
// flip 后: position=0, limit=17
|
||||
|
||||
// 4. 读取数据
|
||||
while (buffer.hasRemaining()) {
|
||||
byte b = buffer.get();
|
||||
}
|
||||
|
||||
// 读取后: position=17, limit=17
|
||||
|
||||
// 5. 清空 Buffer
|
||||
buffer.clear();
|
||||
|
||||
// clear 后: position=0, limit=1024, capacity=1024
|
||||
```
|
||||
|
||||
#### **Buffer 类型**
|
||||
|
||||
```java
|
||||
// 基本类型 Buffer
|
||||
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
|
||||
CharBuffer charBuffer = CharBuffer.allocate(1024);
|
||||
ShortBuffer shortBuffer = ShortBuffer.allocate(1024);
|
||||
IntBuffer intBuffer = IntBuffer.allocate(1024);
|
||||
LongBuffer longBuffer = LongBuffer.allocate(1024);
|
||||
FloatBuffer floatBuffer = FloatBuffer.allocate(1024);
|
||||
DoubleBuffer doubleBuffer = DoubleBuffer.allocate(1024);
|
||||
|
||||
// 直接内存 vs 堆内存
|
||||
ByteBuffer heapBuffer = ByteBuffer.allocate(1024); // 堆内存
|
||||
ByteBuffer directBuffer = ByteBuffer.allocateDirect(1024); // 直接内存
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 6. 非阻塞 I/O 实现
|
||||
|
||||
#### **阻塞 vs 非阻塞**
|
||||
|
||||
```java
|
||||
// 阻塞模式(默认)
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
channel.configureBlocking(true); // 阻塞
|
||||
channel.connect(new InetSocketAddress("localhost", 8080));
|
||||
// 阻塞直到连接建立
|
||||
|
||||
// 非阻塞模式
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
channel.configureBlocking(false); // 非阻塞
|
||||
channel.connect(new InetSocketAddress("localhost", 8080));
|
||||
// 立即返回
|
||||
|
||||
while (!channel.finishConnect()) {
|
||||
// 连接未完成,做其他事
|
||||
System.out.println("Connecting...");
|
||||
}
|
||||
```
|
||||
|
||||
#### **非阻塞读写**
|
||||
|
||||
```java
|
||||
SocketChannel channel = SocketChannel.open();
|
||||
channel.configureBlocking(false);
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1024);
|
||||
|
||||
// 非阻塞读
|
||||
int bytesRead = channel.read(buffer);
|
||||
if (bytesRead == 0) {
|
||||
// 没有数据可用
|
||||
} else if (bytesRead == -1) {
|
||||
// 连接已关闭
|
||||
} else {
|
||||
// 读取到数据
|
||||
buffer.flip();
|
||||
// 处理数据...
|
||||
}
|
||||
|
||||
// 非阻塞写
|
||||
buffer.clear();
|
||||
buffer.put("Hello".getBytes());
|
||||
buffer.flip();
|
||||
|
||||
int bytesWritten = channel.write(buffer);
|
||||
if (bytesWritten == 0) {
|
||||
// 缓冲区满,稍后重试
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 7. 零拷贝实现
|
||||
|
||||
#### **传统 I/O vs 零拷贝**
|
||||
|
||||
```
|
||||
传统 I/O(4 次拷贝,4 次上下文切换):
|
||||
1. 磁盘 → 内核缓冲区 (DMA)
|
||||
2. 内核缓冲区 → 用户缓冲区 (CPU)
|
||||
3. 用户缓冲区 → Socket 缓冲区 (CPU)
|
||||
4. Socket 缓冲区 → 网卡 (DMA)
|
||||
|
||||
零拷贝(2 次拷贝,2 次上下文切换):
|
||||
1. 磁盘 → 内核缓冲区 (DMA)
|
||||
2. 内核缓冲区 → 网卡 (DMA)
|
||||
```
|
||||
|
||||
#### **transferTo 实现**
|
||||
|
||||
```java
|
||||
// 文件传输零拷贝
|
||||
FileChannel sourceChannel = FileChannel.open(Paths.get("source.txt"));
|
||||
FileChannel destChannel = FileChannel.open(Paths.get("dest.txt"),
|
||||
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
|
||||
|
||||
// 直接在内核空间传输
|
||||
long position = 0;
|
||||
long count = sourceChannel.size();
|
||||
sourceChannel.transferTo(position, count, destChannel);
|
||||
```
|
||||
|
||||
#### **MappedByteBuffer(内存映射)**
|
||||
|
||||
```java
|
||||
// 内存映射文件
|
||||
FileChannel channel = FileChannel.open(Paths.get("data.txt"),
|
||||
StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||
|
||||
// 映射到内存
|
||||
MappedByteBuffer mappedBuffer = channel.map(
|
||||
FileChannel.MapMode.READ_WRITE, // 读写模式
|
||||
0, // 起始位置
|
||||
channel.size() // 映射大小
|
||||
);
|
||||
|
||||
// 直接操作内存(零拷贝)
|
||||
mappedBuffer.putInt(0, 123);
|
||||
int value = mappedBuffer.getInt(0);
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## P7 加分项
|
||||
|
||||
### 深度理解
|
||||
- **多路复用原理**:理解 select、poll、epoll 的区别
|
||||
- **零拷贝原理**:理解 DMA、用户空间、内核空间
|
||||
- **内存管理**:堆内存 vs 直接内存,GC 影响
|
||||
|
||||
### 实战经验
|
||||
- **高并发场景**:Netty、Mina、Vert.x 框架使用
|
||||
- **文件处理**:大文件读写、内存映射文件
|
||||
- **网络编程**:自定义协议、粘包处理
|
||||
|
||||
### 性能优化
|
||||
- **Buffer 复用**:使用对象池减少 GC
|
||||
- **直接内存**:减少一次拷贝,但分配/释放成本高
|
||||
- **批量操作**:vectorized I/O(scatter/gather)
|
||||
|
||||
### 常见问题
|
||||
1. **Buffer 泄漏**:直接内存未释放
|
||||
2. **select 空转**:CPU 100% 问题
|
||||
3. **epoll 空轮询**:Linux kernel bug
|
||||
4. **文件描述符耗尽**:未关闭 Channel
|
||||
|
||||
---
|
||||
|
||||
## 总结
|
||||
|
||||
Java NIO 的核心优势:
|
||||
1. **高性能**:非阻塞 I/O、零拷贝
|
||||
2. **高并发**:单线程处理多连接
|
||||
3. **灵活性**:可配置阻塞/非阻塞
|
||||
4. **扩展性**:适合大规模分布式系统
|
||||
|
||||
**最佳实践**:
|
||||
- 高并发场景优先使用 NIO(Netty)
|
||||
- 大文件传输使用 transferTo
|
||||
- 理解 Buffer 的读写模式切换
|
||||
- 注意资源释放(Channel、Buffer)
|
||||
- 监控文件描述符使用
|
||||
803
10-中间件/Netty实战场景.md
Normal file
803
10-中间件/Netty实战场景.md
Normal file
@@ -0,0 +1,803 @@
|
||||
# Netty 实战场景与最佳实践
|
||||
|
||||
## 问题
|
||||
|
||||
1. 如何设计一个高性能的 RPC 框架?
|
||||
2. 如何实现 WebSocket 服务器?
|
||||
3. 如何实现百万连接的 IM 系统?
|
||||
4. 如何处理 Netty 的内存泄漏?
|
||||
5. 如何实现优雅关闭?
|
||||
6. 如何设计心跳和重连机制?
|
||||
7. 如何实现分布式 Session 共享?
|
||||
|
||||
---
|
||||
|
||||
## 标准答案
|
||||
|
||||
### 1. 高性能 RPC 框架设计
|
||||
|
||||
#### **架构设计**
|
||||
|
||||
```
|
||||
┌──────────────────────────────────────────┐
|
||||
│ RPC 框架架构 │
|
||||
├──────────────────────────────────────────┤
|
||||
│ Consumer Side │
|
||||
│ ┌──────────┐ ┌──────────┐ │
|
||||
│ │ Proxy │ │ Serializer│ │
|
||||
│ │ (动态代理)│ │ (Protobuf) │ │
|
||||
│ └────┬─────┘ └──────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌─────────────────────────────────┐ │
|
||||
│ │ Netty Client │ │
|
||||
│ │ - Reconnect │ │
|
||||
│ │ - Timeout │ │
|
||||
│ │ - Heartbeat │ │
|
||||
│ └──────────────┬──────────────────┘ │
|
||||
│ │ │
|
||||
│ │ TCP │
|
||||
│ │ │
|
||||
│ ┌──────────────▼──────────────────┐ │
|
||||
│ │ Netty Server │ │
|
||||
│ │ - Boss/Worker Groups │ │
|
||||
│ │ - Pipeline │ │
|
||||
│ │ - Handler Chain │ │
|
||||
│ └──────────────┬──────────────────┘ │
|
||||
│ │ │
|
||||
│ ┌─────────┴─────────┐ │
|
||||
│ ▼ ▼ │
|
||||
│ ┌─────────┐ ┌──────────┐ │
|
||||
│ │Service │ │ Registry │ │
|
||||
│ │(实现) │ │ (服务发现)│ │
|
||||
│ └─────────┘ └──────────┘ │
|
||||
└──────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
#### **协议设计**
|
||||
|
||||
```java
|
||||
// RPC 协议格式
|
||||
┌───────────┬───────────┬──────────┬──────────┐
|
||||
│ Magic │ Length │ Type │ Body │
|
||||
│ Number │ (4 bytes) │ (1 byte) │ │
|
||||
│ (4 bytes) │ │ │ │
|
||||
└───────────┴───────────┴──────────┴──────────┘
|
||||
|
||||
// 实现
|
||||
public class RpcProtocol {
|
||||
|
||||
private static final int MAGIC_NUMBER = 0xCAFEBABE;
|
||||
private static final int HEADER_LENGTH = 9;
|
||||
|
||||
public static ByteBuf encode(byte[] body, byte type) {
|
||||
ByteBuf buffer = Unpooled.buffer(HEADER_LENGTH + body.length);
|
||||
|
||||
// Magic Number (4 bytes)
|
||||
buffer.writeInt(MAGIC_NUMBER);
|
||||
|
||||
// Length (4 bytes)
|
||||
buffer.writeInt(body.length);
|
||||
|
||||
// Type (1 byte)
|
||||
buffer.writeByte(type);
|
||||
|
||||
// Body
|
||||
buffer.writeBytes(body);
|
||||
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public static byte[] decode(ByteBuf buffer) {
|
||||
// 检查 Magic Number
|
||||
if (buffer.readInt() != MAGIC_NUMBER) {
|
||||
throw new IllegalArgumentException("Invalid magic number");
|
||||
}
|
||||
|
||||
// 读取 Length
|
||||
int length = buffer.readInt();
|
||||
|
||||
// 读取 Type
|
||||
byte type = buffer.readByte();
|
||||
|
||||
// 读取 Body
|
||||
byte[] body = new byte[length];
|
||||
buffer.readBytes(body);
|
||||
|
||||
return body;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### **服务端实现**
|
||||
|
||||
```java
|
||||
@Component
|
||||
public class RpcServer {
|
||||
|
||||
private EventLoopGroup bossGroup;
|
||||
private EventLoopGroup workerGroup;
|
||||
private Channel serverChannel;
|
||||
|
||||
public void start(int port) throws InterruptedException {
|
||||
bossGroup = new NioEventLoopGroup(1);
|
||||
workerGroup = new NioEventLoopGroup();
|
||||
|
||||
try {
|
||||
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||
bootstrap.group(bossGroup, workerGroup)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.option(ChannelOption.SO_BACKLOG, 1024)
|
||||
.option(ChannelOption.SO_REUSEADDR, true)
|
||||
.childOption(ChannelOption.TCP_NODELAY, true)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
|
||||
// 编解码器
|
||||
pipeline.addLast("frameDecoder",
|
||||
new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0));
|
||||
pipeline.addLast("frameEncoder",
|
||||
new LengthFieldPrepender(4));
|
||||
|
||||
// 协议编解码
|
||||
pipeline.addLast("decoder", new RpcDecoder());
|
||||
pipeline.addLast("encoder", new RpcEncoder());
|
||||
|
||||
// 心跳检测
|
||||
pipeline.addLast("idleStateHandler",
|
||||
new IdleStateHandler(60, 0, 0));
|
||||
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
|
||||
|
||||
// 业务处理器
|
||||
pipeline.addLast("handler", new RpcServerHandler());
|
||||
}
|
||||
});
|
||||
|
||||
// 绑定端口并启动
|
||||
ChannelFuture future = bootstrap.bind(port).sync();
|
||||
serverChannel = future.channel();
|
||||
|
||||
System.out.println("RPC Server started on port " + port);
|
||||
|
||||
// 等待服务器 socket 关闭
|
||||
serverChannel.closeFuture().sync();
|
||||
|
||||
} finally {
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
if (serverChannel != null) {
|
||||
serverChannel.close();
|
||||
}
|
||||
if (workerGroup != null) {
|
||||
workerGroup.shutdownGracefully();
|
||||
}
|
||||
if (bossGroup != null) {
|
||||
bossGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### **客户端实现**
|
||||
|
||||
```java
|
||||
@Component
|
||||
public class RpcClient {
|
||||
|
||||
private EventLoopGroup workerGroup;
|
||||
private Channel channel;
|
||||
|
||||
public void connect(String host, int port) throws InterruptedException {
|
||||
workerGroup = new NioEventLoopGroup();
|
||||
|
||||
try {
|
||||
Bootstrap bootstrap = new Bootstrap();
|
||||
bootstrap.group(workerGroup)
|
||||
.channel(NioSocketChannel.class)
|
||||
.option(ChannelOption.TCP_NODELAY, true)
|
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
|
||||
.option(ChannelOption.SO_KEEPALIVE, true)
|
||||
.handler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
|
||||
// 编解码器
|
||||
pipeline.addLast("frameDecoder",
|
||||
new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0));
|
||||
pipeline.addLast("frameEncoder",
|
||||
new LengthFieldPrepender(4));
|
||||
|
||||
// 协议编解码
|
||||
pipeline.addLast("decoder", new RpcDecoder());
|
||||
pipeline.addLast("encoder", new RpcEncoder());
|
||||
|
||||
// 心跳检测
|
||||
pipeline.addLast("idleStateHandler",
|
||||
new IdleStateHandler(0, 30, 0));
|
||||
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
|
||||
|
||||
// 业务处理器
|
||||
pipeline.addLast("handler", new RpcClientHandler());
|
||||
}
|
||||
});
|
||||
|
||||
// 连接服务器
|
||||
ChannelFuture future = bootstrap.connect(host, port).sync();
|
||||
channel = future.channel();
|
||||
|
||||
System.out.println("RPC Client connected to " + host + ":" + port);
|
||||
|
||||
// 等待连接关闭
|
||||
channel.closeFuture().sync();
|
||||
|
||||
} finally {
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
if (channel != null) {
|
||||
channel.close();
|
||||
}
|
||||
if (workerGroup != null) {
|
||||
workerGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
|
||||
public <T> T sendRequest(RpcRequest request, Class<T> returnType) {
|
||||
// 发送请求并等待响应
|
||||
RpcClientHandler handler = channel.pipeline()
|
||||
.get(RpcClientHandler.class);
|
||||
|
||||
return handler.sendRequest(request, returnType);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 2. WebSocket 服务器实现
|
||||
|
||||
#### **WebSocket 握手机制**
|
||||
|
||||
```
|
||||
Client → Server: HTTP GET (Upgrade: websocket)
|
||||
Server → Client: HTTP 101 (Switching Protocols)
|
||||
Client → Server: WebSocket Frame
|
||||
Server → Client: WebSocket Frame
|
||||
```
|
||||
|
||||
#### **服务器实现**
|
||||
|
||||
```java
|
||||
@Component
|
||||
public class WebSocketServer {
|
||||
|
||||
private EventLoopGroup bossGroup;
|
||||
private EventLoopGroup workerGroup;
|
||||
|
||||
public void start(int port) throws InterruptedException {
|
||||
bossGroup = new NioEventLoopGroup(1);
|
||||
workerGroup = new NioEventLoopGroup();
|
||||
|
||||
try {
|
||||
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||
bootstrap.group(bossGroup, workerGroup)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
|
||||
// HTTP 编解码器
|
||||
pipeline.addLast("httpCodec", new HttpServerCodec());
|
||||
pipeline.addLast("httpAggregator",
|
||||
new HttpObjectAggregator(65536));
|
||||
|
||||
// WebSocket 握手处理器
|
||||
pipeline.addLast("handshakeHandler",
|
||||
new WebSocketServerProtocolHandler("/ws"));
|
||||
|
||||
// 自定义 WebSocket 处理器
|
||||
pipeline.addLast("handler", new WebSocketHandler());
|
||||
}
|
||||
});
|
||||
|
||||
ChannelFuture future = bootstrap.bind(port).sync();
|
||||
System.out.println("WebSocket Server started on port " + port);
|
||||
|
||||
future.channel().closeFuture().sync();
|
||||
|
||||
} finally {
|
||||
shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
if (workerGroup != null) {
|
||||
workerGroup.shutdownGracefully();
|
||||
}
|
||||
if (bossGroup != null) {
|
||||
bossGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### **WebSocket Handler**
|
||||
|
||||
```java
|
||||
@ChannelHandler.Sharable
|
||||
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
|
||||
|
||||
// 存储所有连接的 Channel
|
||||
private static final ChannelGroup channels =
|
||||
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
||||
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) {
|
||||
channels.add(ctx.channel());
|
||||
System.out.println("Client connected: " + ctx.channel().id());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) {
|
||||
channels.remove(ctx.channel());
|
||||
System.out.println("Client disconnected: " + ctx.channel().id());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx,
|
||||
TextWebSocketFrame msg) {
|
||||
// 接收消息
|
||||
String message = msg.text();
|
||||
System.out.println("Received: " + message);
|
||||
|
||||
// 广播消息给所有客户端
|
||||
channels.writeAndFlush(new TextWebSocketFrame(
|
||||
"Server: " + message
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
cause.printStackTrace();
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 3. 百万连接 IM 系统
|
||||
|
||||
#### **架构优化**
|
||||
|
||||
```java
|
||||
@Configuration
|
||||
public class ImServerConfig {
|
||||
|
||||
@Bean
|
||||
public EventLoopGroup bossGroup() {
|
||||
// 1 个线程处理 Accept
|
||||
return new NioEventLoopGroup(1);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public EventLoopGroup workerGroup() {
|
||||
// 根据 CPU 核心数设置 I/O 线程数
|
||||
// 通常设置为 CPU 核心数 * 2
|
||||
int threads = Runtime.getRuntime().availableProcessors() * 2;
|
||||
return new NioEventLoopGroup(threads);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public ServerBootstrap serverBootstrap(
|
||||
EventLoopGroup bossGroup,
|
||||
EventLoopGroup workerGroup) {
|
||||
|
||||
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||
bootstrap.group(bossGroup, workerGroup)
|
||||
.channel(NioServerSocketChannel.class)
|
||||
.option(ChannelOption.SO_BACKLOG, 8192) // 增大连接队列
|
||||
.option(ChannelOption.SO_REUSEADDR, true)
|
||||
.childOption(ChannelOption.TCP_NODELAY, true)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024) // 32KB 发送缓冲
|
||||
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 32KB 接收缓冲
|
||||
.childHandler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
|
||||
// 连接数限制
|
||||
pipeline.addLast("connectionLimiter",
|
||||
new ConnectionLimiter(1000000));
|
||||
|
||||
// 流量整形
|
||||
pipeline.addLast("trafficShaping",
|
||||
new ChannelTrafficShapingHandler(
|
||||
1024 * 1024, // 读限制 1MB/s
|
||||
1024 * 1024 // 写限制 1MB/s
|
||||
));
|
||||
|
||||
// 协议编解码
|
||||
pipeline.addLast("frameDecoder",
|
||||
new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 0));
|
||||
pipeline.addLast("frameEncoder",
|
||||
new LengthFieldPrepender(4));
|
||||
|
||||
// IM 协议处理器
|
||||
pipeline.addLast("imHandler", new ImServerHandler());
|
||||
}
|
||||
});
|
||||
|
||||
return bootstrap;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### **连接限流**
|
||||
|
||||
```java
|
||||
public class ConnectionLimiter extends ChannelInboundHandlerAdapter {
|
||||
|
||||
private final int maxConnections;
|
||||
private final AtomicInteger activeConnections = new AtomicInteger(0);
|
||||
|
||||
public ConnectionLimiter(int maxConnections) {
|
||||
this.maxConnections = maxConnections;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
if (activeConnections.get() >= maxConnections) {
|
||||
// 连接数超限,关闭新连接
|
||||
ctx.writeAndFlush(new TextWebSocketFrame(
|
||||
"Server is full, please try again later"
|
||||
)).addListener(ChannelFutureListener.CLOSE);
|
||||
return;
|
||||
}
|
||||
|
||||
int count = activeConnections.incrementAndGet();
|
||||
System.out.println("Active connections: " + count);
|
||||
|
||||
ctx.fireChannelRead(msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) {
|
||||
int count = activeConnections.decrementAndGet();
|
||||
System.out.println("Active connections: " + count);
|
||||
|
||||
ctx.fireChannelInactive();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 4. 内存泄漏排查与解决
|
||||
|
||||
#### **常见泄漏场景**
|
||||
|
||||
```java
|
||||
// ❌ 错误示例 1: ByteBuf 未释放
|
||||
public void handler(ChannelHandlerContext ctx, ByteBuf buf) {
|
||||
byte[] data = new byte[buf.readableBytes()];
|
||||
buf.readBytes(data);
|
||||
// 忘记释放 buf
|
||||
}
|
||||
|
||||
// ✅ 正确做法
|
||||
public void handler(ChannelHandlerContext ctx, ByteBuf buf) {
|
||||
try {
|
||||
byte[] data = new byte[buf.readableBytes()];
|
||||
buf.readBytes(data);
|
||||
// 处理数据...
|
||||
} finally {
|
||||
ReferenceCountUtil.release(buf); // 释放 ByteBuf
|
||||
}
|
||||
}
|
||||
|
||||
// ❌ 错误示例 2: Handler 未移除
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
// 添加长生命周期的 Handler
|
||||
ctx.pipeline().addLast("longLived", new LongLivedHandler());
|
||||
}
|
||||
|
||||
// ✅ 正确做法
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
// 使用 @Sharable 注解
|
||||
ctx.pipeline().addLast("longLived", SharableHandler.INSTANCE);
|
||||
}
|
||||
|
||||
// ❌ 错误示例 3: Channel 引用未释放
|
||||
public class ChannelHolder {
|
||||
private static final Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();
|
||||
|
||||
public static void addChannel(String id, Channel channel) {
|
||||
CHANNELS.put(id, channel); // 永久持有引用
|
||||
}
|
||||
}
|
||||
|
||||
// ✅ 正确做法
|
||||
public class ChannelHolder {
|
||||
private static final Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();
|
||||
|
||||
public static void addChannel(String id, Channel channel) {
|
||||
CHANNELS.put(id, channel);
|
||||
|
||||
// 监听关闭事件,自动移除
|
||||
channel.closeFuture().addListener((ChannelFutureListener) future -> {
|
||||
CHANNELS.remove(id);
|
||||
});
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### **内存泄漏检测**
|
||||
|
||||
```java
|
||||
// 启用内存泄漏检测
|
||||
// JVM 参数: -Dio.netty.leakDetection.level=paranoid
|
||||
|
||||
public class LeakDetection {
|
||||
public static void main(String[] args) {
|
||||
// 检测级别
|
||||
// DISABLED - 禁用
|
||||
// SIMPLE - 采样检测(1%)
|
||||
// ADVANCED - 采样检测,记录创建堆栈
|
||||
// PARANOID - 全量检测
|
||||
|
||||
// 创建资源时使用
|
||||
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024);
|
||||
|
||||
// 使用资源泄漏检测器访问器
|
||||
ResourceLeakDetector<ByteBuf> detector =
|
||||
new ResourceLeakDetector<ByteBuf>(ByteBuf.class,
|
||||
ResourceLeakDetector.Level.PARANOID);
|
||||
|
||||
// 包装对象
|
||||
ByteBuf wrappedBuffer = detector.wrap(buffer, "test-buffer");
|
||||
|
||||
// 使用完成后
|
||||
wrappedBuffer.release(); // 如果未释放,会记录堆栈并警告
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 5. 优雅关闭实现
|
||||
|
||||
```java
|
||||
@Component
|
||||
public class NettyGracefulShutdown implements SmartLifecycle {
|
||||
|
||||
@Autowired
|
||||
private ServerBootstrap serverBootstrap;
|
||||
|
||||
private Channel serverChannel;
|
||||
private volatile boolean running = false;
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
running = true;
|
||||
// 启动服务器...
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (!running) {
|
||||
return;
|
||||
}
|
||||
|
||||
System.out.println("Shutting down Netty server gracefully...");
|
||||
|
||||
// 1. 停止接受新连接
|
||||
if (serverChannel != null) {
|
||||
serverChannel.close().syncUninterruptibly();
|
||||
}
|
||||
|
||||
// 2. 等待现有请求处理完成(超时 30 秒)
|
||||
EventLoopGroup workerGroup = serverBootstrap.config().group();
|
||||
Future<?> shutdownFuture = workerGroup.shutdownGracefully(2, 30, TimeUnit.SECONDS);
|
||||
|
||||
try {
|
||||
// 等待优雅关闭完成
|
||||
shutdownFuture.await(60, TimeUnit.SECONDS);
|
||||
|
||||
if (shutdownFuture.isSuccess()) {
|
||||
System.out.println("Netty server shutdown successfully");
|
||||
} else {
|
||||
System.err.println("Netty server shutdown timeout, force close");
|
||||
workerGroup.shutdownNow();
|
||||
}
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
workerGroup.shutdownNow();
|
||||
}
|
||||
|
||||
// 3. 关闭 Boss Group
|
||||
EventLoopGroup bossGroup = serverBootstrap.config().childGroup();
|
||||
bossGroup.shutdownGracefully();
|
||||
|
||||
running = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning() {
|
||||
return running;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 6. 心跳与重连机制
|
||||
|
||||
#### **心跳处理器**
|
||||
|
||||
```java
|
||||
@ChannelHandler.Sharable
|
||||
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
private static final ByteBuf HEARTBEAT_BEAT =
|
||||
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||
if (evt instanceof IdleStateEvent) {
|
||||
IdleStateEvent event = (IdleStateEvent) evt;
|
||||
|
||||
if (event.state() == IdleState.READER_IDLE) {
|
||||
// 读空闲,可能对方已挂掉
|
||||
System.out.println("Read idle, closing connection");
|
||||
ctx.close();
|
||||
|
||||
} else if (event.state() == IdleState.WRITER_IDLE) {
|
||||
// 写空闲,发送心跳
|
||||
System.out.println("Write idle, sending heartbeat");
|
||||
ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate());
|
||||
|
||||
} else if (event.state() == IdleState.ALL_IDLE) {
|
||||
// 读写空闲
|
||||
System.out.println("All idle, sending heartbeat");
|
||||
ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
// 处理心跳响应
|
||||
if ("HEARTBEAT".equals(msg)) {
|
||||
// 心跳包,不处理
|
||||
return;
|
||||
}
|
||||
|
||||
// 业务消息
|
||||
ctx.fireChannelRead(msg);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### **重连机制**
|
||||
|
||||
```java
|
||||
@Component
|
||||
public class ReconnectClient {
|
||||
|
||||
private EventLoopGroup workerGroup;
|
||||
private Channel channel;
|
||||
private ScheduledExecutorService scheduler;
|
||||
|
||||
public void connect(String host, int port) {
|
||||
workerGroup = new NioEventLoopGroup();
|
||||
scheduler = Executors.newSingleThreadScheduledExecutor();
|
||||
|
||||
doConnect(host, port);
|
||||
}
|
||||
|
||||
private void doConnect(String host, int port) {
|
||||
Bootstrap bootstrap = new Bootstrap();
|
||||
bootstrap.group(workerGroup)
|
||||
.channel(NioSocketChannel.class)
|
||||
.option(ChannelOption.TCP_NODELAY, true)
|
||||
.handler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
protected void initChannel(SocketChannel ch) {
|
||||
ch.pipeline()
|
||||
.addLast("idleStateHandler",
|
||||
new IdleStateHandler(0, 10, 0))
|
||||
.addLast("heartbeatHandler", new HeartbeatHandler())
|
||||
.addLast("handler", new ClientHandler());
|
||||
}
|
||||
});
|
||||
|
||||
// 连接服务器
|
||||
ChannelFuture future = bootstrap.connect(host, port);
|
||||
|
||||
future.addListener((ChannelFutureListener) f -> {
|
||||
if (f.isSuccess()) {
|
||||
System.out.println("Connected to server");
|
||||
channel = f.channel();
|
||||
} else {
|
||||
System.out.println("Connection failed, reconnect in 5 seconds...");
|
||||
|
||||
// 5 秒后重连
|
||||
scheduler.schedule(() -> doConnect(host, port), 5, TimeUnit.SECONDS);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
if (channel != null) {
|
||||
channel.close();
|
||||
}
|
||||
if (workerGroup != null) {
|
||||
workerGroup.shutdownGracefully();
|
||||
}
|
||||
if (scheduler != null) {
|
||||
scheduler.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## P7 加分项
|
||||
|
||||
### 深度理解
|
||||
- **Reactor 模式**:理解主从 Reactor 多线程模型
|
||||
- **零拷贝原理**:理解用户空间和内核空间
|
||||
- **内存管理**:ByteBuf 的池化、引用计数、泄漏检测
|
||||
|
||||
### 实战经验
|
||||
- **高并发调优**:EventLoopGroup 线程数、TCP 参数调优
|
||||
- **监控告警**:连接数监控、流量监控、延迟监控
|
||||
- **故障排查**:内存泄漏、CPU 100%、连接泄漏
|
||||
|
||||
### 架构设计
|
||||
- **协议设计**:自定义协议、编解码器设计
|
||||
- **集群部署**:负载均衡、服务发现
|
||||
- **容错机制**:重连、熔断、限流
|
||||
|
||||
### 性能优化
|
||||
1. **减少内存拷贝**:使用 CompositeByteBuf
|
||||
2. **复用对象**:使用 ByteBuf 对象池
|
||||
3. **优化 TCP 参数**:TCP_NODELAY、SO_KEEPALIVE
|
||||
4. **流量控制**:ChannelTrafficShapingHandler
|
||||
|
||||
---
|
||||
|
||||
## 总结
|
||||
|
||||
Netty 实战核心要点:
|
||||
1. **高性能**:Reactor 模型、零拷贝、内存池
|
||||
2. **高可靠**:心跳、重连、优雅关闭
|
||||
3. **可扩展**:Pipeline 机制、自定义协议
|
||||
4. **可监控**:连接数、流量、延迟监控
|
||||
|
||||
**最佳实践**:
|
||||
- 合理设置 EventLoopGroup 线程数
|
||||
- 使用池化的 ByteBuf,及时释放
|
||||
- 添加心跳和重连机制
|
||||
- 实现优雅关闭
|
||||
- 做好监控和告警
|
||||
- 定期排查内存泄漏
|
||||
539
10-中间件/Netty核心原理.md
Normal file
539
10-中间件/Netty核心原理.md
Normal file
@@ -0,0 +1,539 @@
|
||||
# Netty 核心原理
|
||||
|
||||
## 问题
|
||||
|
||||
1. Netty 的核心组件有哪些?
|
||||
2. 什么是 EventLoop?线程模型是怎样的?
|
||||
3. 什么是 ByteBuf?与 Java NIO 的 ByteBuffer 有什么区别?
|
||||
4. Netty 的零拷贝是如何实现的?
|
||||
5. 什么是 ChannelPipeline?ChannelHandler 是如何工作的?
|
||||
6. Netty 如何解决 TCP 粘包/拆包问题?
|
||||
7. Netty 的心跳机制是如何实现的?
|
||||
|
||||
---
|
||||
|
||||
## 标准答案
|
||||
|
||||
### 1. Netty 核心组件
|
||||
|
||||
#### **核心组件架构**
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────┐
|
||||
│ Netty 架构 │
|
||||
├─────────────────────────────────────────────┤
|
||||
│ Channel │
|
||||
│ ├─ EventLoop (线程模型) │
|
||||
│ ├─ ChannelPipeline (处理器链) │
|
||||
│ │ └─ ChannelHandler (业务处理器) │
|
||||
│ └─ ByteBuf (内存管理) │
|
||||
├─────────────────────────────────────────────┤
|
||||
│ Bootstrap (启动类) │
|
||||
│ ├─ ServerBootstrap (服务端) │
|
||||
│ └─ Bootstrap (客户端) │
|
||||
└─────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
#### **关键组件说明**
|
||||
|
||||
**1. Channel**
|
||||
```java
|
||||
// Channel 是 Netty 的网络操作抽象类
|
||||
Channel channel = ...;
|
||||
|
||||
// 核心方法
|
||||
channel.writeAndFlush(msg); // 写数据并刷新
|
||||
channel.close(); // 关闭连接
|
||||
channel.eventLoop(); // 获取 EventLoop
|
||||
```
|
||||
|
||||
**2. EventLoop**
|
||||
```java
|
||||
// EventLoop 负责处理 I/O 操作
|
||||
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // Accept
|
||||
EventLoopGroup workerGroup = new NioEventLoopGroup(); // I/O
|
||||
|
||||
// bossGroup 只负责监听并接受连接
|
||||
// workerGroup 负责 I/O 读写
|
||||
```
|
||||
|
||||
**3. ChannelPipeline**
|
||||
```java
|
||||
// Pipeline 是处理器链
|
||||
ChannelPipeline pipeline = channel.pipeline();
|
||||
|
||||
// 添加处理器
|
||||
pipeline.addLast("decoder", new Decoder());
|
||||
pipeline.addLast("encoder", new Encoder());
|
||||
pipeline.addLast("handler", new BusinessHandler());
|
||||
```
|
||||
|
||||
**4. ByteBuf**
|
||||
```java
|
||||
// ByteBuf 是 Netty 的字节容器
|
||||
ByteBuf buffer = Unpooled.buffer(1024);
|
||||
|
||||
// 写入数据
|
||||
buffer.writeInt(123);
|
||||
buffer.writeBytes("Hello".getBytes());
|
||||
|
||||
// 读取数据
|
||||
int value = buffer.readInt();
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 2. EventLoop 与线程模型
|
||||
|
||||
#### **Reactor 线程模型**
|
||||
|
||||
Netty 采用 **Reactor 模式**,支持三种实现:
|
||||
|
||||
```
|
||||
1. Reactor 单线程模型
|
||||
┌──────────────┐
|
||||
│ Reactor │
|
||||
│ (单线程) │
|
||||
│ │
|
||||
│ - Accept │
|
||||
│ - Read │
|
||||
│ - Decode │
|
||||
│ - Process │
|
||||
│ - Encode │
|
||||
│ - Send │
|
||||
└──────────────┘
|
||||
|
||||
2. Reactor 多线程模型
|
||||
┌──────────┐ ┌──────────────────┐
|
||||
│ Reactor │────>│ Worker Threads │
|
||||
│ (主线程) │ │ (线程池) │
|
||||
│ │ │ │
|
||||
│ - Accept │ │ - Read/Write │
|
||||
│ │ │ - Decode/Encode │
|
||||
│ │ │ - Process │
|
||||
└──────────┘ └──────────────────┘
|
||||
|
||||
3. 主从 Reactor 多线程模型(Netty 默认)
|
||||
┌──────────┐ ┌──────────────────┐
|
||||
│ Main │ │ Sub Reactors │
|
||||
│ Reactor │────>│ (线程池) │
|
||||
│ │ │ │
|
||||
│ - Accept │ │ - Read/Write │
|
||||
│ │ │ - Decode/Encode │
|
||||
│ │ │ - Process │
|
||||
└──────────┘ └──────────────────┘
|
||||
```
|
||||
|
||||
#### **EventLoop 工作原理**
|
||||
|
||||
```java
|
||||
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
||||
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
||||
|
||||
// 每个 EventLoop:
|
||||
// 1. 维护一个 Selector
|
||||
// 2. 维护一个任务队列
|
||||
// 3. 维护一个线程
|
||||
|
||||
// 工作流程:
|
||||
// - 线程启动后,无限循环执行:
|
||||
// 1. select() - 查询就绪的 Channel
|
||||
// 2. processSelectedKeys() - 处理 I/O 事件
|
||||
// 3. runAllTasks() - 执行任务队列中的任务
|
||||
```
|
||||
|
||||
#### **任务调度**
|
||||
|
||||
```java
|
||||
Channel channel = ...;
|
||||
|
||||
// 在 EventLoop 线程中执行任务
|
||||
channel.eventLoop().execute(() -> {
|
||||
System.out.println("Task in EventLoop thread");
|
||||
});
|
||||
|
||||
// 定时任务
|
||||
channel.eventLoop().schedule(() -> {
|
||||
System.out.println("Delayed task");
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 3. ByteBuf vs ByteBuffer
|
||||
|
||||
#### **对比表**
|
||||
|
||||
| 特性 | Java NIO ByteBuffer | Netty ByteBuf |
|
||||
|------|---------------------|---------------|
|
||||
| **长度** | 固定长度,扩容复杂 | 动态扩容 |
|
||||
| **读写模式** | flip() 切换 | 独立的读写索引 |
|
||||
| **引用计数** | 不支持 | 支持(池化) |
|
||||
| **零拷贝** | 不支持 | 支持(Composite) |
|
||||
| **性能** | 较低 | 高(对象池优化) |
|
||||
|
||||
#### **ByteBuf 三个重要指针**
|
||||
|
||||
```
|
||||
ByteBuf 结构:
|
||||
┌───────┬──────────┬──────────┬─────────┐
|
||||
│ 0 │ readerIndex│ writerIndex│ capacity│
|
||||
│ │ │ │ │
|
||||
│ discard│ readable│ writable │ │
|
||||
│ bytes │ bytes │ bytes │ │
|
||||
└───────┴──────────┴──────────┴─────────┘
|
||||
|
||||
操作:
|
||||
- mark() / reset(): 标记和重置
|
||||
- readerIndex(): 读指针
|
||||
- writerIndex(): 写指针
|
||||
- capacity(): 容量
|
||||
- maxCapacity(): 最大容量
|
||||
```
|
||||
|
||||
#### **使用示例**
|
||||
|
||||
```java
|
||||
// 创建 ByteBuf
|
||||
ByteBuf buffer = Unpooled.buffer(1024);
|
||||
|
||||
// 写入数据
|
||||
buffer.writeBytes("Hello".getBytes());
|
||||
buffer.writeInt(123);
|
||||
|
||||
// 读取数据(不移动读指针)
|
||||
byte[] data = new byte[5];
|
||||
buffer.getBytes(buffer.readerIndex(), data);
|
||||
|
||||
// 读取数据(移动读指针)
|
||||
buffer.readBytes(5);
|
||||
|
||||
// 引用计数(池化)
|
||||
buffer.retain(); // 引用计数 +1
|
||||
buffer.release(); // 引用计数 -1
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 4. 零拷贝实现
|
||||
|
||||
#### **零拷贝的三种实现**
|
||||
|
||||
**1. CompositeByteBuf(组合缓冲区)**
|
||||
|
||||
```java
|
||||
// 将多个 ByteBuf 组合成一个逻辑上的 ByteBuf
|
||||
ByteBuf header = Unpooled.buffer(10);
|
||||
ByteBuf body = Unpooled.buffer(100);
|
||||
|
||||
// 零拷贝组合
|
||||
CompositeByteBuf message = Unpooled.wrappedBuffer(header, body);
|
||||
|
||||
// 物理上不复制数据,只是逻辑组合
|
||||
```
|
||||
|
||||
**2. wrappedBuffer(包装)**
|
||||
|
||||
```java
|
||||
// 包装字节数组,不复制
|
||||
byte[] bytes = "Hello Netty".getBytes();
|
||||
ByteBuf buffer = Unpooled.wrappedBuffer(bytes);
|
||||
|
||||
// 底层数据共享,无拷贝
|
||||
```
|
||||
|
||||
**3. slice(切片)**
|
||||
|
||||
```java
|
||||
ByteBuf buffer = Unpooled.buffer(100);
|
||||
|
||||
// 切片,共享底层内存
|
||||
ByteBuf sliced = buffer.slice(0, 50);
|
||||
|
||||
// 修改会互相影响
|
||||
sliced.setByte(0, (byte) 1);
|
||||
```
|
||||
|
||||
**4. FileChannel.transferTo(文件传输)**
|
||||
|
||||
```java
|
||||
// 文件传输零拷贝
|
||||
FileChannel sourceChannel = ...;
|
||||
FileChannel destChannel = ...;
|
||||
|
||||
// 直接在内核空间传输,不经过用户空间
|
||||
sourceChannel.transferTo(position, count, destChannel);
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 5. ChannelPipeline 与 ChannelHandler
|
||||
|
||||
#### **Pipeline 工作流程**
|
||||
|
||||
```
|
||||
入站事件 (Inbound):
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────┐
|
||||
│ ChannelPipeline │
|
||||
│ │
|
||||
│ ┌─────────┐ ┌─────────┐ ┌──────┐│
|
||||
│ │Decoder1 │→ │Decoder2 │→ │Handler││
|
||||
│ └─────────┘ └─────────┘ └──────┘│
|
||||
│ ▲ ▲ ▲ │
|
||||
│ └────────────┴─────────────┘ │
|
||||
│ (数据流) │
|
||||
└─────────────────────────────────────┘
|
||||
|
||||
出站事件 (Outbound):
|
||||
│
|
||||
▼
|
||||
┌─────────────────────────────────────┐
|
||||
│ ChannelPipeline │
|
||||
│ │
|
||||
│ ┌──────┐ ┌─────────┐ ┌─────────┐│
|
||||
│ │Handler│→ │Encoder1 │→ │Encoder2 ││
|
||||
│ └──────┘ └─────────┘ └─────────┘│
|
||||
│ ▲ ▲ ▲ │
|
||||
│ └────────────┴─────────────┘ │
|
||||
│ (数据流) │
|
||||
└─────────────────────────────────────┘
|
||||
```
|
||||
|
||||
#### **ChannelHandler 接口**
|
||||
|
||||
```java
|
||||
// 入站处理器
|
||||
@ChannelHandler.Sharable
|
||||
public class InboundHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
// 处理读事件
|
||||
System.out.println("Received: " + msg);
|
||||
ctx.fireChannelRead(msg); // 传递给下一个 Handler
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
// 连接建立
|
||||
System.out.println("Client connected");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) {
|
||||
// 连接断开
|
||||
System.out.println("Client disconnected");
|
||||
}
|
||||
}
|
||||
|
||||
// 出站处理器
|
||||
public class OutboundHandler extends ChannelOutboundHandlerAdapter {
|
||||
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg,
|
||||
ChannelPromise promise) {
|
||||
// 处理写事件
|
||||
System.out.println("Sending: " + msg);
|
||||
ctx.write(msg, promise);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### **Handler 生命周期**
|
||||
|
||||
```java
|
||||
public interface ChannelHandler {
|
||||
|
||||
// 添加到 Pipeline
|
||||
void handlerAdded(ChannelHandlerContext ctx);
|
||||
|
||||
// 从 Pipeline 移除
|
||||
void handlerRemoved(ChannelHandlerContext ctx);
|
||||
|
||||
// 发生异常
|
||||
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause);
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 6. TCP 粘包/拆包解决方案
|
||||
|
||||
#### **问题原因**
|
||||
|
||||
```
|
||||
TCP 粘包:
|
||||
发送方: [包1] [包2] [包3]
|
||||
接收方: [包1+包2+包3] // 粘在一起
|
||||
|
||||
TCP 拆包:
|
||||
发送方: [大包]
|
||||
接收方: [片段1] [片段2] // 被拆分
|
||||
```
|
||||
|
||||
#### **Netty 提供的解码器**
|
||||
|
||||
**1. FixedLengthFrameDecoder(固定长度)**
|
||||
|
||||
```java
|
||||
// 每个消息固定 10 字节
|
||||
pipeline.addLast("framer", new FixedLengthFrameDecoder(10));
|
||||
```
|
||||
|
||||
**2. LineBasedFrameDecoder(分隔符)**
|
||||
|
||||
```java
|
||||
// 按换行符分割
|
||||
pipeline.addLast("framer", new LineBasedFrameDecoder(8192));
|
||||
```
|
||||
|
||||
**3. DelimiterBasedFrameDecoder(自定义分隔符)**
|
||||
|
||||
```java
|
||||
// 自定义分隔符
|
||||
ByteBuf delimiter = Unpooled.copiedBuffer("\t".getBytes());
|
||||
pipeline.addLast("framer",
|
||||
new DelimiterBasedFrameDecoder(8192, delimiter));
|
||||
```
|
||||
|
||||
**4. LengthFieldBasedFrameDecoder(长度字段)**
|
||||
|
||||
```java
|
||||
// 消息格式: [长度(4字节)] [数据]
|
||||
pipeline.addLast("framer",
|
||||
new LengthFieldBasedFrameDecoder(
|
||||
8192, // maxFrameLength
|
||||
0, // lengthFieldOffset
|
||||
4, // lengthFieldLength
|
||||
0, // lengthAdjustment
|
||||
0 // initialBytesToStrip
|
||||
));
|
||||
```
|
||||
|
||||
**5. 自定义解码器**
|
||||
|
||||
```java
|
||||
public class CustomDecoder extends ByteToMessageDecoder {
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext ctx,
|
||||
ByteBuf in, List<Object> out) {
|
||||
// 检查是否有足够数据
|
||||
if (in.readableBytes() < 4) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 标记读指针
|
||||
in.markReaderIndex();
|
||||
|
||||
// 读取长度
|
||||
int length = in.readInt();
|
||||
|
||||
// 检查数据是否完整
|
||||
if (in.readableBytes() < length) {
|
||||
in.resetReaderIndex();
|
||||
return;
|
||||
}
|
||||
|
||||
// 读取数据
|
||||
byte[] data = new byte[length];
|
||||
in.readBytes(data);
|
||||
|
||||
out.add(data);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 7. 心跳机制
|
||||
|
||||
#### **IdleStateHandler 实现**
|
||||
|
||||
```java
|
||||
// 读写空闲检测
|
||||
pipeline.addLast("idleStateHandler",
|
||||
new IdleStateHandler(
|
||||
30, // 读空闲时间(秒)
|
||||
0, // 写空闲时间
|
||||
0 // 读写空闲时间
|
||||
));
|
||||
|
||||
// 心跳处理器
|
||||
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
|
||||
```
|
||||
|
||||
#### **心跳处理器**
|
||||
|
||||
```java
|
||||
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||
if (evt instanceof IdleStateEvent) {
|
||||
IdleStateEvent event = (IdleStateEvent) evt;
|
||||
|
||||
if (event.state() == IdleState.READER_IDLE) {
|
||||
// 读空闲,关闭连接
|
||||
System.out.println("Read idle, close connection");
|
||||
ctx.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
// 处理心跳包
|
||||
if ("PING".equals(msg)) {
|
||||
ctx.writeAndFlush("PONG");
|
||||
} else {
|
||||
ctx.fireChannelRead(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## P7 加分项
|
||||
|
||||
### 深度理解
|
||||
- **Reactor 模式**:理解单线程、多线程、主从多线程三种模型
|
||||
- **零拷贝原理**:理解用户空间和内核空间的数据拷贝
|
||||
- **内存管理**:ByteBuf 的池化、引用计数、内存泄漏
|
||||
|
||||
### 实战经验
|
||||
- **高并发场景**:EventLoopGroup 线程数调优
|
||||
- **内存调优**:ByteBuf 分配器选择(堆内存 vs 直接内存)
|
||||
- **性能优化**:避免 Handler 链过长、减少对象创建
|
||||
|
||||
### 架构设计
|
||||
- **协议设计**:自定义协议、编解码器设计
|
||||
- **异常处理**:优雅关闭、重连机制
|
||||
- **监控告警**:流量监控、连接数监控、延迟监控
|
||||
|
||||
### 常见问题
|
||||
1. **内存泄漏**:ByteBuf 未释放
|
||||
2. **线程安全**:共享状态的同步
|
||||
3. **连接池**:Channel 复用与管理
|
||||
4. **背压处理**:流量控制与慢消费
|
||||
|
||||
---
|
||||
|
||||
## 总结
|
||||
|
||||
Netty 的核心优势:
|
||||
1. **高性能**:零拷贝、内存池、Reactor 模式
|
||||
2. **高可靠性**:心跳机制、重连机制、优雅关闭
|
||||
3. **易扩展**:Pipeline 机制、丰富的 Handler
|
||||
4. **成熟稳定**:广泛应用在 Dubbo、gRPC、WebSocket
|
||||
|
||||
**最佳实践**:
|
||||
- 合理设置 EventLoopGroup 线程数
|
||||
- 使用池化的 ByteBuf
|
||||
- 及时释放 ByteBuf,避免内存泄漏
|
||||
- 使用 LengthFieldBasedFrameDecoder 处理粘包
|
||||
- 添加心跳和重连机制
|
||||
- 做好监控和日志
|
||||
Reference in New Issue
Block a user