Added 3 comprehensive interview documents covering Netty and Java NIO: **1. Netty Core Principles (Netty核心原理.md)** - Core components: Channel, EventLoop, ChannelPipeline, ByteBuf - Reactor threading model (single/multi-threaded, master-slave) - ByteBuf vs Java NIO ByteBuffer - Zero-copy implementation (4 approaches) - ChannelPipeline and ChannelHandler - TCP sticky/unpacking problem solutions - Heartbeat mechanism **2. Java NIO Core Principles (Java NIO核心原理.md)** - NIO vs BIO comparison - Three core components: Channel, Buffer, Selector - Selector multiplexing mechanism - Channel vs Stream differences - Buffer core attributes and usage - Non-blocking I/O implementation - Zero-copy with transferTo and MappedByteBuffer **3. Netty Practice Scenarios (Netty实战场景.md)** - High-performance RPC framework design - WebSocket server implementation - Million-connection IM system architecture - Memory leak detection and resolution - Graceful shutdown implementation - Heartbeat and reconnection mechanisms Each document includes: - Detailed problem descriptions - Complete code examples (Java) - Architecture diagrams - Best practices - Performance optimization tips - P7-level bonus points Total: 3 documents, covering: - Theoretical foundations - Practical implementations - Production scenarios - Performance tuning - Common pitfalls Suitable for backend P7 interview preparation. Generated with [Claude Code](https://claude.ai/code) via [Happy](https://happy.engineering) Co-Authored-By: Claude <noreply@anthropic.com> Co-Authored-By: Happy <yesreply@happy.engineering>
25 KiB
25 KiB
Netty 实战场景与最佳实践
问题
- 如何设计一个高性能的 RPC 框架?
- 如何实现 WebSocket 服务器?
- 如何实现百万连接的 IM 系统?
- 如何处理 Netty 的内存泄漏?
- 如何实现优雅关闭?
- 如何设计心跳和重连机制?
- 如何实现分布式 Session 共享?
标准答案
1. 高性能 RPC 框架设计
架构设计
┌──────────────────────────────────────────┐
│ RPC 框架架构 │
├──────────────────────────────────────────┤
│ Consumer Side │
│ ┌──────────┐ ┌──────────┐ │
│ │ Proxy │ │ Serializer│ │
│ │ (动态代理)│ │ (Protobuf) │ │
│ └────┬─────┘ └──────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────┐ │
│ │ Netty Client │ │
│ │ - Reconnect │ │
│ │ - Timeout │ │
│ │ - Heartbeat │ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ │ TCP │
│ │ │
│ ┌──────────────▼──────────────────┐ │
│ │ Netty Server │ │
│ │ - Boss/Worker Groups │ │
│ │ - Pipeline │ │
│ │ - Handler Chain │ │
│ └──────────────┬──────────────────┘ │
│ │ │
│ ┌─────────┴─────────┐ │
│ ▼ ▼ │
│ ┌─────────┐ ┌──────────┐ │
│ │Service │ │ Registry │ │
│ │(实现) │ │ (服务发现)│ │
│ └─────────┘ └──────────┘ │
└──────────────────────────────────────────┘
协议设计
// RPC 协议格式
┌───────────┬───────────┬──────────┬──────────┐
│ Magic │ Length │ Type │ Body │
│ Number │ (4 bytes) │ (1 byte) │ │
│ (4 bytes) │ │ │ │
└───────────┴───────────┴──────────┴──────────┘
// 实现
public class RpcProtocol {
private static final int MAGIC_NUMBER = 0xCAFEBABE;
private static final int HEADER_LENGTH = 9;
public static ByteBuf encode(byte[] body, byte type) {
ByteBuf buffer = Unpooled.buffer(HEADER_LENGTH + body.length);
// Magic Number (4 bytes)
buffer.writeInt(MAGIC_NUMBER);
// Length (4 bytes)
buffer.writeInt(body.length);
// Type (1 byte)
buffer.writeByte(type);
// Body
buffer.writeBytes(body);
return buffer;
}
public static byte[] decode(ByteBuf buffer) {
// 检查 Magic Number
if (buffer.readInt() != MAGIC_NUMBER) {
throw new IllegalArgumentException("Invalid magic number");
}
// 读取 Length
int length = buffer.readInt();
// 读取 Type
byte type = buffer.readByte();
// 读取 Body
byte[] body = new byte[length];
buffer.readBytes(body);
return body;
}
}
服务端实现
@Component
public class RpcServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel serverChannel;
public void start(int port) throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 编解码器
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0));
pipeline.addLast("frameEncoder",
new LengthFieldPrepender(4));
// 协议编解码
pipeline.addLast("decoder", new RpcDecoder());
pipeline.addLast("encoder", new RpcEncoder());
// 心跳检测
pipeline.addLast("idleStateHandler",
new IdleStateHandler(60, 0, 0));
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
// 业务处理器
pipeline.addLast("handler", new RpcServerHandler());
}
});
// 绑定端口并启动
ChannelFuture future = bootstrap.bind(port).sync();
serverChannel = future.channel();
System.out.println("RPC Server started on port " + port);
// 等待服务器 socket 关闭
serverChannel.closeFuture().sync();
} finally {
shutdown();
}
}
public void shutdown() {
if (serverChannel != null) {
serverChannel.close();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
}
@PreDestroy
public void destroy() {
shutdown();
}
}
客户端实现
@Component
public class RpcClient {
private EventLoopGroup workerGroup;
private Channel channel;
public void connect(String host, int port) throws InterruptedException {
workerGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 编解码器
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(8192, 4, 4, 0, 0));
pipeline.addLast("frameEncoder",
new LengthFieldPrepender(4));
// 协议编解码
pipeline.addLast("decoder", new RpcDecoder());
pipeline.addLast("encoder", new RpcEncoder());
// 心跳检测
pipeline.addLast("idleStateHandler",
new IdleStateHandler(0, 30, 0));
pipeline.addLast("heartbeatHandler", new HeartbeatHandler());
// 业务处理器
pipeline.addLast("handler", new RpcClientHandler());
}
});
// 连接服务器
ChannelFuture future = bootstrap.connect(host, port).sync();
channel = future.channel();
System.out.println("RPC Client connected to " + host + ":" + port);
// 等待连接关闭
channel.closeFuture().sync();
} finally {
shutdown();
}
}
public void shutdown() {
if (channel != null) {
channel.close();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
public <T> T sendRequest(RpcRequest request, Class<T> returnType) {
// 发送请求并等待响应
RpcClientHandler handler = channel.pipeline()
.get(RpcClientHandler.class);
return handler.sendRequest(request, returnType);
}
}
2. WebSocket 服务器实现
WebSocket 握手机制
Client → Server: HTTP GET (Upgrade: websocket)
Server → Client: HTTP 101 (Switching Protocols)
Client → Server: WebSocket Frame
Server → Client: WebSocket Frame
服务器实现
@Component
public class WebSocketServer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
public void start(int port) throws InterruptedException {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// HTTP 编解码器
pipeline.addLast("httpCodec", new HttpServerCodec());
pipeline.addLast("httpAggregator",
new HttpObjectAggregator(65536));
// WebSocket 握手处理器
pipeline.addLast("handshakeHandler",
new WebSocketServerProtocolHandler("/ws"));
// 自定义 WebSocket 处理器
pipeline.addLast("handler", new WebSocketHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
System.out.println("WebSocket Server started on port " + port);
future.channel().closeFuture().sync();
} finally {
shutdown();
}
}
public void shutdown() {
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
}
}
WebSocket Handler
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 存储所有连接的 Channel
private static final ChannelGroup channels =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
channels.add(ctx.channel());
System.out.println("Client connected: " + ctx.channel().id());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
channels.remove(ctx.channel());
System.out.println("Client disconnected: " + ctx.channel().id());
}
@Override
protected void channelRead0(ChannelHandlerContext ctx,
TextWebSocketFrame msg) {
// 接收消息
String message = msg.text();
System.out.println("Received: " + message);
// 广播消息给所有客户端
channels.writeAndFlush(new TextWebSocketFrame(
"Server: " + message
));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
3. 百万连接 IM 系统
架构优化
@Configuration
public class ImServerConfig {
@Bean
public EventLoopGroup bossGroup() {
// 1 个线程处理 Accept
return new NioEventLoopGroup(1);
}
@Bean
public EventLoopGroup workerGroup() {
// 根据 CPU 核心数设置 I/O 线程数
// 通常设置为 CPU 核心数 * 2
int threads = Runtime.getRuntime().availableProcessors() * 2;
return new NioEventLoopGroup(threads);
}
@Bean
public ServerBootstrap serverBootstrap(
EventLoopGroup bossGroup,
EventLoopGroup workerGroup) {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 8192) // 增大连接队列
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024) // 32KB 发送缓冲
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 32KB 接收缓冲
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 连接数限制
pipeline.addLast("connectionLimiter",
new ConnectionLimiter(1000000));
// 流量整形
pipeline.addLast("trafficShaping",
new ChannelTrafficShapingHandler(
1024 * 1024, // 读限制 1MB/s
1024 * 1024 // 写限制 1MB/s
));
// 协议编解码
pipeline.addLast("frameDecoder",
new LengthFieldBasedFrameDecoder(8192, 0, 4, 0, 0));
pipeline.addLast("frameEncoder",
new LengthFieldPrepender(4));
// IM 协议处理器
pipeline.addLast("imHandler", new ImServerHandler());
}
});
return bootstrap;
}
}
连接限流
public class ConnectionLimiter extends ChannelInboundHandlerAdapter {
private final int maxConnections;
private final AtomicInteger activeConnections = new AtomicInteger(0);
public ConnectionLimiter(int maxConnections) {
this.maxConnections = maxConnections;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (activeConnections.get() >= maxConnections) {
// 连接数超限,关闭新连接
ctx.writeAndFlush(new TextWebSocketFrame(
"Server is full, please try again later"
)).addListener(ChannelFutureListener.CLOSE);
return;
}
int count = activeConnections.incrementAndGet();
System.out.println("Active connections: " + count);
ctx.fireChannelRead(msg);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
int count = activeConnections.decrementAndGet();
System.out.println("Active connections: " + count);
ctx.fireChannelInactive();
}
}
4. 内存泄漏排查与解决
常见泄漏场景
// ❌ 错误示例 1: ByteBuf 未释放
public void handler(ChannelHandlerContext ctx, ByteBuf buf) {
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
// 忘记释放 buf
}
// ✅ 正确做法
public void handler(ChannelHandlerContext ctx, ByteBuf buf) {
try {
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
// 处理数据...
} finally {
ReferenceCountUtil.release(buf); // 释放 ByteBuf
}
}
// ❌ 错误示例 2: Handler 未移除
public void channelActive(ChannelHandlerContext ctx) {
// 添加长生命周期的 Handler
ctx.pipeline().addLast("longLived", new LongLivedHandler());
}
// ✅ 正确做法
public void channelActive(ChannelHandlerContext ctx) {
// 使用 @Sharable 注解
ctx.pipeline().addLast("longLived", SharableHandler.INSTANCE);
}
// ❌ 错误示例 3: Channel 引用未释放
public class ChannelHolder {
private static final Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();
public static void addChannel(String id, Channel channel) {
CHANNELS.put(id, channel); // 永久持有引用
}
}
// ✅ 正确做法
public class ChannelHolder {
private static final Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();
public static void addChannel(String id, Channel channel) {
CHANNELS.put(id, channel);
// 监听关闭事件,自动移除
channel.closeFuture().addListener((ChannelFutureListener) future -> {
CHANNELS.remove(id);
});
}
}
内存泄漏检测
// 启用内存泄漏检测
// JVM 参数: -Dio.netty.leakDetection.level=paranoid
public class LeakDetection {
public static void main(String[] args) {
// 检测级别
// DISABLED - 禁用
// SIMPLE - 采样检测(1%)
// ADVANCED - 采样检测,记录创建堆栈
// PARANOID - 全量检测
// 创建资源时使用
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1024);
// 使用资源泄漏检测器访问器
ResourceLeakDetector<ByteBuf> detector =
new ResourceLeakDetector<ByteBuf>(ByteBuf.class,
ResourceLeakDetector.Level.PARANOID);
// 包装对象
ByteBuf wrappedBuffer = detector.wrap(buffer, "test-buffer");
// 使用完成后
wrappedBuffer.release(); // 如果未释放,会记录堆栈并警告
}
}
5. 优雅关闭实现
@Component
public class NettyGracefulShutdown implements SmartLifecycle {
@Autowired
private ServerBootstrap serverBootstrap;
private Channel serverChannel;
private volatile boolean running = false;
@Override
public void start() {
running = true;
// 启动服务器...
}
@Override
public void stop() {
if (!running) {
return;
}
System.out.println("Shutting down Netty server gracefully...");
// 1. 停止接受新连接
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
// 2. 等待现有请求处理完成(超时 30 秒)
EventLoopGroup workerGroup = serverBootstrap.config().group();
Future<?> shutdownFuture = workerGroup.shutdownGracefully(2, 30, TimeUnit.SECONDS);
try {
// 等待优雅关闭完成
shutdownFuture.await(60, TimeUnit.SECONDS);
if (shutdownFuture.isSuccess()) {
System.out.println("Netty server shutdown successfully");
} else {
System.err.println("Netty server shutdown timeout, force close");
workerGroup.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
workerGroup.shutdownNow();
}
// 3. 关闭 Boss Group
EventLoopGroup bossGroup = serverBootstrap.config().childGroup();
bossGroup.shutdownGracefully();
running = false;
}
@Override
public boolean isRunning() {
return running;
}
}
6. 心跳与重连机制
心跳处理器
@ChannelHandler.Sharable
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {
private static final ByteBuf HEARTBEAT_BEAT =
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
// 读空闲,可能对方已挂掉
System.out.println("Read idle, closing connection");
ctx.close();
} else if (event.state() == IdleState.WRITER_IDLE) {
// 写空闲,发送心跳
System.out.println("Write idle, sending heartbeat");
ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate());
} else if (event.state() == IdleState.ALL_IDLE) {
// 读写空闲
System.out.println("All idle, sending heartbeat");
ctx.writeAndFlush(HEARTBEAT_BEAT.duplicate());
}
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 处理心跳响应
if ("HEARTBEAT".equals(msg)) {
// 心跳包,不处理
return;
}
// 业务消息
ctx.fireChannelRead(msg);
}
}
重连机制
@Component
public class ReconnectClient {
private EventLoopGroup workerGroup;
private Channel channel;
private ScheduledExecutorService scheduler;
public void connect(String host, int port) {
workerGroup = new NioEventLoopGroup();
scheduler = Executors.newSingleThreadScheduledExecutor();
doConnect(host, port);
}
private void doConnect(String host, int port) {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("idleStateHandler",
new IdleStateHandler(0, 10, 0))
.addLast("heartbeatHandler", new HeartbeatHandler())
.addLast("handler", new ClientHandler());
}
});
// 连接服务器
ChannelFuture future = bootstrap.connect(host, port);
future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
System.out.println("Connected to server");
channel = f.channel();
} else {
System.out.println("Connection failed, reconnect in 5 seconds...");
// 5 秒后重连
scheduler.schedule(() -> doConnect(host, port), 5, TimeUnit.SECONDS);
}
});
}
public void shutdown() {
if (channel != null) {
channel.close();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (scheduler != null) {
scheduler.shutdown();
}
}
}
P7 加分项
深度理解
- Reactor 模式:理解主从 Reactor 多线程模型
- 零拷贝原理:理解用户空间和内核空间
- 内存管理:ByteBuf 的池化、引用计数、泄漏检测
实战经验
- 高并发调优:EventLoopGroup 线程数、TCP 参数调优
- 监控告警:连接数监控、流量监控、延迟监控
- 故障排查:内存泄漏、CPU 100%、连接泄漏
架构设计
- 协议设计:自定义协议、编解码器设计
- 集群部署:负载均衡、服务发现
- 容错机制:重连、熔断、限流
性能优化
- 减少内存拷贝:使用 CompositeByteBuf
- 复用对象:使用 ByteBuf 对象池
- 优化 TCP 参数:TCP_NODELAY、SO_KEEPALIVE
- 流量控制:ChannelTrafficShapingHandler
总结
Netty 实战核心要点:
- 高性能:Reactor 模型、零拷贝、内存池
- 高可靠:心跳、重连、优雅关闭
- 可扩展:Pipeline 机制、自定义协议
- 可监控:连接数、流量、延迟监控
最佳实践:
- 合理设置 EventLoopGroup 线程数
- 使用池化的 ByteBuf,及时释放
- 添加心跳和重连机制
- 实现优雅关闭
- 做好监控和告警
- 定期排查内存泄漏