點選上方“芋道原始碼”,選擇“置頂公眾號”
技術文章第一時間送達!
原始碼精品專欄
-
RpcRequest 和 RpcResponse
-
Socket傳輸
-
Netty 傳輸
-
同步與非同步 阻塞與非阻塞
-
總結
RPC 被稱為“遠端過程呼叫”,表明瞭一個方法呼叫會跨越網路,跨越行程,所以傳輸層是不可或缺的。一說到網路傳輸,一堆名詞就蹦了出來:TCP、UDP、HTTP,同步 or 非同步,阻塞 or 非阻塞,長連線 or 短連線…
本文介紹兩種傳輸層的實現:使用 Socket 和使用 Netty。前者實現的是阻塞式的通訊,是一個較為簡單的傳輸層實現方式,藉此可以瞭解傳輸層的工作原理及工作內容;後者是非阻塞式的,在一般的 RPC 場景下,效能會表現的很好,所以被很多開源 RPC 框架作為傳輸層的實現方式。
RpcRequest 和 RpcResponse
傳輸層傳輸的主要物件其實就是這兩個類,它們封裝了請求 id,方法名,方法引數,傳回值,異常等 RPC 呼叫中需要的一系列資訊。
public class RpcRequest implements Serializable {
private String interfaceName;
private String methodName;
private String parametersDesc;
private Object[] arguments;
private Map attachments;
private int retries = 0;
private long requestId;
private byte rpcProtocolVersion;
}
public class RpcResponse implements Serializable {
private Object value;
private Exception exception;
private long requestId;
private long processTime;
private int timeout;
private Map attachments;// rpc協議版本相容時可以回傳一些額外的資訊
private byte rpcProtocolVersion;
}
Socket傳輸
Server
public class RpcServerSocketProvider {
public static void main(String[] args) throws Exception {
//序列化層實現參考之前的章節
Serialization serialization = new Hessian2Serialization();
ServerSocket serverSocket = new ServerSocket(8088);
ExecutorService executorService = Executors.newFixedThreadPool(10);
while (true) {
final Socket socket = serverSocket.accept();
executorService.execute(() -> {
try {
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
try {
DataInputStream dis = new DataInputStream(is);
int length = dis.readInt();
byte[] requestBody = new byte[length];
dis.read(requestBody);
//反序列化requestBody => RpcRequest
RpcRequest rpcRequest = serialization.deserialize(requestBody, RpcRequest.class);
//反射呼叫生成響應 並組裝成 rpcResponse
RpcResponse rpcResponse = invoke(rpcRequest);
//序列化rpcResponse => responseBody
byte[] responseBody = serialization.serialize(rpcResponse);
DataOutputStream dos = new DataOutputStream(os);
dos.writeInt(responseBody.length);
dos.write(responseBody);
dos.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
is.close();
os.close();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
public static RpcResponse invoke(RpcRequest rpcRequest) {
//模擬反射呼叫
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(rpcRequest.getRequestId());
//... some operation
return rpcResponse;
}
}
Client
public class RpcSocketConsumer {
public static void main(String[] args) throws Exception {
//序列化層實現參考之前的章節
Serialization serialization = new Hessian2Serialization();
Socket socket = new Socket("localhost", 8088);
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
//封裝rpc請求
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(12345L);
//序列化 rpcRequest => requestBody
byte[] requestBody = serialization.serialize(rpcRequest);
DataOutputStream dos = new DataOutputStream(os);
dos.writeInt(requestBody.length);
dos.write(requestBody);
dos.flush();
DataInputStream dis = new DataInputStream(is);
int length = dis.readInt();
byte[] responseBody = new byte[length];
dis.read(responseBody);
//反序列化 responseBody => rpcResponse
RpcResponse rpcResponse = serialization.deserialize(responseBody, RpcResponse.class);
is.close();
os.close();
socket.close();
System.out.println(rpcResponse.getRequestId());
}
}
dis.readInt() 和 dis.read(byte[] bytes) 決定了使用 Socket 通訊是一種阻塞式的操作,報文頭+報文體的傳輸格式是一種常見的格式,除此之外,使用特殊的字元如空行也可以劃分出報文結構。在示例中,我們使用一個 int(4位元組)來傳遞報問題的長度,之後傳遞報文體,在複雜的通訊協議中,報文頭除了儲存報文體還會額外儲存一些資訊,包括協議名稱,版本,心跳標識等。
在網路傳輸中,只有位元組能夠被識別,所以我們在開頭引入了 Serialization 介面,負責完成 RpcRequest 和 RpcResponse 與位元組的相互轉換。(Serialization 的工作機制可以參考之前的文章)
使用 Socket 通訊可以發現:每次 Server 處理 Client 請求都會從執行緒池中取出一個執行緒來處理請求,這樣的開銷對於一般的 Rpc 呼叫是不能夠接受的,而 Netty 一類的網路框架便派上了用場。
Netty 傳輸
Server 和 ServerHandler
public class RpcNettyProvider {
public static void main(String[] args) throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 建立並初始化 Netty 服務端 Bootstrap 物件
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new RpcDecoder(RpcRequest.class)); // 解碼 RPC 請求
pipeline.addLast(new RpcEncoder(RpcResponse.class)); // 編碼 RPC 響應
pipeline.addLast(new RpcServerHandler()); // 處理 RPC 請求
}
});
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind("127.0.0.1", 8087).sync();
// 關閉 RPC 伺服器
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcRequest> {
@Override
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
RpcResponse rpcResponse = invoke(request);
// 寫入 RPC 響應物件並自動關閉連線
ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE);
}
private RpcResponse invoke(RpcRequest rpcRequest) {
//模擬反射呼叫
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setRequestId(rpcRequest.getRequestId());
//... some operation
return rpcResponse;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Client 和 ClientHandler
public class RpcNettyConsumer {
public static void main(String[] args) throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
// 建立並初始化 Netty 客戶端 Bootstrap 物件
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new RpcEncoder(RpcRequest.class)); // 編碼 RPC 請求
pipeline.addLast(new RpcDecoder(RpcResponse.class)); // 解碼 RPC 響應
pipeline.addLast(new RpcClientHandler()); // 處理 RPC 響應
}
});
bootstrap.option(ChannelOption.TCP_NODELAY, true);
// 連線 RPC 伺服器
ChannelFuture future = bootstrap.connect("127.0.0.1", 8087).sync();
// 寫入 RPC 請求資料並關閉連線
Channel channel = future.channel();
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(123456L);
channel.writeAndFlush(rpcRequest).sync();
channel.closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
@Override
public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception {
System.out.println(response.getRequestId());//處理響應
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
使用 Netty 的好處是很方便地實現了非阻塞式的呼叫,關鍵部分都給出了註釋。上述的程式碼雖然很多,並且和我們熟悉的 Socket 通訊程式碼大相徑庭,但大多數都是 Netty 的模板程式碼,啟動伺服器,配置編解碼器等。真正的 RPC 封裝操作大多集中在 Handler 的 channelRead 方法(負責讀取)以及 channel.writeAndFlush 方法(負責寫入)中。
public class RpcEncoder extends MessageToByteEncoder {
private Class> genericClass;
Serialization serialization = new Hessian2Serialization();
public RpcEncoder(Class> genericClass) {
this.genericClass = genericClass;
}
@Override
public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {
if (genericClass.isInstance(in)) {
byte[] data = serialization.serialize(in);
out.writeInt(data.length);
out.writeBytes(data);
}
}
}
public class RpcDecoder extends ByteToMessageDecoder {
private Class> genericClass;
public RpcDecoder(Class> genericClass) {
this.genericClass = genericClass;
}
Serialization serialization = new Hessian2Serialization();
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List) throws Exception {
if (in.readableBytes() < 4) {
return;
}
in.markReaderIndex();
int dataLength = in.readInt();
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[dataLength];
in.readBytes(data);
out.add(serialization.deserialize(data, genericClass));
}
}
使用 Netty 不能保證傳回的位元組大小,所以需要加上 in.readableBytes() < 4 這樣的判斷,以及 in.markReaderIndex() 這樣的標記,用來區分報文頭和報文體。
同步與非同步 阻塞與非阻塞
這兩組傳輸特性經常被拿來做對比,很多文章聲稱 Socket 是同步阻塞的,Netty 是非同步非阻塞,其實有點問題。
其實這兩組並沒有必然的聯絡,同步阻塞,同步非阻塞,非同步非阻塞都有可能(同步非阻塞倒是沒見過),而大多數使用 Netty 實現的 RPC 呼叫其實應當是同步非阻塞的(當然一般 RPC 也支援非同步非阻塞)。
同步和非同步關註的是訊息通訊機制
所謂同步,就是在發出一個呼叫時,在沒有得到結果之前,該呼叫就不傳回。但是一旦呼叫傳回,就得到傳回值了。
換句話說,就是由呼叫者主動等待這個呼叫的結果。而非同步則是相反,呼叫在發出之後,這個呼叫就直接傳回了,所以沒有傳回結果。換句話說,當一個非同步過程呼叫發出後,呼叫者不會立刻得到結果。而是在呼叫發出後,被呼叫者透過狀態、通知來通知呼叫者,或透過回呼函式處理這個呼叫。
如果需要 RPC 呼叫傳回一個結果,該結果立刻被使用,那意味著著大機率需要是一個同步呼叫。如果不關心其傳回值,則可以將其做成非同步介面,以提升效率。
阻塞和非阻塞關註的是程式在等待呼叫結果(訊息,傳回值)時的狀態.
阻塞呼叫是指呼叫結果傳回之前,當前執行緒會被掛起。呼叫執行緒只有在得到結果之後才會傳回。
非阻塞呼叫指在不能立刻得到結果之前,該呼叫不會阻塞當前執行緒。
在上述的例子中可以看出 Socket 通訊我們顯示宣告了一個包含10個執行緒的執行緒池,每次請求到來,分配一個執行緒,等待客戶端傳遞報文頭和報文體的行為都會阻塞該執行緒,可以見得其整體是阻塞的。而在 Netty 通訊的例子中,每次請求並沒有分配一個執行緒,而是透過 Handler 的方式處理請求(聯想 NIO 中 Selector),是非阻塞的。
使用同步非阻塞方式的通訊機制並不一定同步阻塞式的通訊強,所謂沒有最好,只有更合適,而一般的同步非阻塞 通訊適用於 1.網路連線數量多 2.每個連線的io不頻繁 的場景,與 RPC 呼叫較為契合。而成熟的 RPC 框架的傳輸層和協議層通常也會提供多種選擇,以應對不同的場景。
總結
本文堆砌了一些程式碼,而難點主要是對 Socket 的理解,和 Netty 框架的掌握。Netty 的學習有一定的門檻,但實際需要掌握的知識點其實並不多(僅僅針對 RPC 框架所涉及的知識點而言),學習 Netty ,個人推薦《Netty IN ACTION》以及 https://waylau.gitbooks.io/netty-4-user-guide/Getting%20Started/Before%20Getting%20Started.html 該網站的例子。
參考資料:
http://javatar.iteye.com/blog/1123915 – 梁飛
https://gitee.com/huangyong/rpc – 黃勇
666. 彩蛋
如果你對 RPC 併發感興趣,歡迎加入我的知識一起交流。
目前在知識星球(https://t.zsxq.com/2VbiaEu)更新瞭如下 Dubbo 原始碼解析如下:
01. 除錯環境搭建
02. 專案結構一覽
03. API 配置(一)之應用
04. API 配置(二)之服務提供者
05. API 配置(三)之服務消費者
06. 屬性配置
07. XML 配置
08. 核心流程一覽
…
一共 60 篇++