點選上方“芋道原始碼”,選擇“置頂公眾號”
技術文章第一時間送達!
原始碼精品專欄
前言
Netty是一個高效能、非同步事件驅動的NIO框架,提供了對TCP、UDP和檔案傳輸的支援,作為一個非同步NIO框架,Netty的所有IO操作都是非同步非阻塞的,透過Future-Listener機制,使用者可以方便的主動獲取或者透過通知機制獲得IO操作結果。
作為當前最流行的NIO框架,Netty在網際網路領域、大資料分散式計算領域、遊戲行業、通訊行業等獲得了廣泛的應用,一些業界著名的開源元件也基於Netty構建,比如RPC框架、zookeeper等。
不熟悉NIO的可以先看看下麵兩篇文章
那麼,Netty效能為啥這麼高?主要是因為其內部Reactor模型的實現。
Reactor模型
Netty中的Reactor模型主要由多路復用器(Acceptor)、事件分發器(Dispatcher)、事件處理器(Handler)組成,可以分為三種。
1、單執行緒模型:所有I/O操作都由一個執行緒完成,即多路復用、事件分發和處理都是在一個Reactor執行緒上完成的。
對於一些小容量應用場景,可以使用單執行緒模型。但是對於高負載、大併發的應用卻不合適,主要原因如下:
-
一個執行緒同時處理成百上千的鏈路,效能上無法支撐,即便CPU負荷達到100%,也無法滿足海量訊息的編碼、解碼、讀取和傳送;
-
當負載過重後,處理速度將變慢,這會導致大量客戶端連線超時,超時之後往往會進行重發,最終會導致大量訊息積壓和處理超時,成為系統的效能瓶頸;
-
一旦單執行緒意外跑飛,或者進入死迴圈,會導致整個系統通訊模組不可用,不能接收和處理外部訊息,造成節點故障,可靠性不高。
2、多執行緒模型:為瞭解決單執行緒模型存在的一些問題,演化而來的Reactor執行緒模型。
多執行緒模型的特點:
-
有專門一個Acceptor執行緒用於監聽服務端,接收客戶端的TCP連線請求;
-
網路IO的讀寫操作由一個NIO執行緒池負責,執行緒池可以採用標準的JDK執行緒池實現,包含一個任務佇列和N個可用的執行緒,由這些NIO執行緒負責訊息的讀取、解碼、編碼和傳送;
-
一個NIO執行緒可以同時處理多條鏈路,但是一個鏈路只能對應一個NIO執行緒,防止發生併發操作問題。
在絕大多數場景下,Reactor多執行緒模型都可以滿足效能需求;但是,在極特殊應用場景中,一個NIO執行緒負責監聽和處理所有的客戶端連線可能會存在效能問題。例如百萬客戶端併發連線,或者服務端需要對客戶端的握手訊息進行安全認證,認證本身非常損耗效能。在這類場景下,單獨一個Acceptor執行緒可能會存在效能不足問題,為瞭解決效能問題,產生了第三種Reactor執行緒模型-主從Reactor多執行緒模型。
3、主從多執行緒模型:採用多個reactor,每個reactor都在自己單獨的執行緒裡執行。如果是多核,則可以同時響應多個客戶端的請求,一旦鏈路建立成功就將鏈路註冊到負責I/O讀寫的SubReactor執行緒池上。
事實上,Netty的執行緒模型並非固定不變,在啟動輔助類中建立不同的EventLoopGroup實體並透過適當的引數配置,就可以支援上述三種Reactor執行緒模型。正是因為Netty對Reactor執行緒模型的支援提供了靈活的定製能力,所以可以滿足不同業務場景的效能需求。
示例程式碼
以下是server和client的示例程式碼,其中使用的是 Netty 4.x,先看看如何實現,後續會針對各個模組進行深入分析。
server 程式碼實現
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void run() throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(port).sync(); // (5)
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new EchoServer(port).run();
}
}
EchoServerHandler 實現
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoServerHandler.class.getName());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}
}
1、NioEventLoopGroup 是用來處理I/O操作的執行緒池,Netty對 EventLoopGroup 介面針對不同的傳輸協議提供了不同的實現。在本例子中,需要實體化兩個NioEventLoopGroup,通常第一個稱為“boss”,用來accept客戶端連線,另一個稱為“worker”,處理客戶端資料的讀寫操作。
2、ServerBootstrap 是啟動服務的輔助類,有關socket的引數可以透過ServerBootstrap進行設定。
3、這裡指定NioServerSocketChannel類初始化channel用來接受客戶端請求。
4、通常會為新SocketChannel透過新增一些handler,來設定ChannelPipeline。ChannelInitializer 是一個特殊的handler,其中initChannel方法可以為SocketChannel 的pipeline新增指定handler。
5、透過系結埠8080,就可以對外提供服務了。
client 程式碼實現
public class EchoClient {
private final String host;
private final int port;
private final int firstMessageSize;
public EchoClient(String host, int port, int firstMessageSize) {
this.host = host;
this.port = port;
this.firstMessageSize = firstMessageSize;
}
public void run() throws Exception {
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
//new LoggingHandler(LogLevel.INFO),
new EchoClientHandler(firstMessageSize));
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
final String host = args[0];
final int port = Integer.parseInt(args[1]);
final int firstMessageSize;
if (args.length == 3) {
firstMessageSize = Integer.parseInt(args[2]);
} else {
firstMessageSize = 256;
}
new EchoClient(host, port, firstMessageSize).run();
}
}
EchoClientHandler 實現
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(
EchoClientHandler.class.getName());
private final ByteBuf firstMessage;
/**
* Creates a client-side handler.
*/
public EchoClientHandler(int firstMessageSize) {
if (firstMessageSize <= 0) {
throw new IllegalArgumentException("firstMessageSize: " + firstMessageSize);
}
firstMessage = Unpooled.buffer(firstMessageSize);
for (int i = 0; i < firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);
ctx.close();
}