扩展版RPC框架-自定义协议
扩展版RPC框架-自定义协议
扩展核心
【1】自定义协议格式梳理(协议消息、相关协议数据字段枚举)
【2】基于Vert.x的TCP实现(参考基于Vert.x的HTTP实现思路进行构建):先从demo(server、client)理解TCP协议的请求响应,然后在RPC框架中引入(服务提供者:server引用自定义的TcpServerHandler;服务消费者在ServerProxy中按照TCP协议规则处理响应)
【3】解决半包、粘包问题:使用Vert.x的RecordParse
【4】装饰者模式的场景应用:对半包、粘包方法进行封装(基于Handler进行装饰对buffer进行处理,引入TcpBufferHandlerWrapper)、修改ServiceProxy中TCP响应处理(将响应处理方法放在VertxTcpClient实现)
需求分析
RPC协议梳理
目前构建的RPC框架使用Vert x的HttpServer作为服务提供者的服务器,代码实现比较简单,其底层网络传输使用的是HTTP协议。
有时候可能会把HTTP和RPC理解为同一类技术,但HTTP只是RPC框架网络传输的一种可选方式罢了。
此处思考一个问题:使用HTTP协议会有什么问题么?或者说,有没有更好的选择?
一般情况下,RPC框架会比较注重性能,而HTTP协议中的头部信息、请求响应格式较“重”,会影响网络传输性能。
举个例子,利用浏览器网络控制台随便查看一个请求, 能看到大量的请求和响应标头。
通过自定义一套PRC协议:利用TCP等传输层协议,自定义请求响应结构,实现性能更高、更为灵活、更安全的RPC框架。
RPC协议设计方案
自定义RPC协议可以分为2大核心部分:自定义网络传输、自定义消息结构
【1】网络传输设计
网络传输设计的目标是:选择-个能够高性能通信的网络协议和传输方式。
HTTP协议的头信息是比较大的,会影响传输性能。但其实除了这点外, HTTP 本身属于无状态协议,这意味着每个HTTP请求都是独立的,每次请求响应都要重新建立和关闭连接,也会影响性能。考虑到这点,在HTTP/1.1引入了持久连接(Keep-Alive) , 允许在单个TCP连接上发送多个HTTP请求和响应,避兔了每次请求都要重新建立和关闭连接的开销。虽然如此,HTTP 本身是应用层协议,目前现在设计的RPC协议也是应用层协议,性能肯定是不如底层(传输层)的TCP协议要高的。所以我们想要追求更高的性能,还是选择使用TCP协议完成网络传输,有更多的自主设计空间。
【2】消息结构设计
消息结构设计的目标是:用最少的空间传递要的信息。
(1)如何使用最少的空间呢?
之前接触到的数据类型可能都是整型、长整型、浮点数类型等等,这些类型其实都比较“重”,占用的字节数较多。比如整型要用4个字节、32 个bit位
在自定义消息结构时,想要节省空间,就要尽可能使用更轻量的类型,比如byte字节类型,只占用1字节、8个bit位。但需要注意的是,Java 中实现bit位运算拼接相对比较麻烦,所以权衡开发成本,在设计消息结构时,尽量给每个数据凑到整个字节。
(2)消息内需要哪些信息呢?
可以从之前的HTTP请求方式中,分析HTTP请求结构,得到RPC消息所需的信息:
- 魔数:作用是安全校验,防止服务器处理了非框架发来的乱七八糟的消息(类似HTTPS的安全证书)
- 版本号:保证请求和响应的一致性(类似HTTP协议有1.0/2.0 等版本)
- 列化方式:来告诉服务端和客户端如何解析数据(类似HTTP的Content- Type内容类型)
- 类型:标识是请求还是响应?或者是心跳检测等其他用途。(类似 HTTP有请求头和响应头)
- 状态:如果是响应,记录响应的结果(类似HTTP的200状态代码)
此外,还需要有请求id,唯一标识某个请求,因为TCP是双向通信的,需要有个唯一标识来追踪每个请求。
请求体:要发送body内容数据,类似于之前HTTP请求中发送的RpcRequest
如果是HTTP这种协议,有专门的key/value结构,很容易找到完整的body数据。但基于TCP协议,想要获取到完整的body内容数据,就需要一些“小心思”了,因为TCP协议本身会存在半包和粘包问题,每次传输的数据可能是不完整的、所以需要在消息头中新增一个字段请求体数据长度,保证能够完整地获取body内容信息。
基于以上的思考,可以得到最终的消息结构设计,如下图:
实际上,这些数据应该是紧凑的,请求头信息总长17个字节。也就是说,上述消息结构, 本质上就是拼接在一起的一个字节数组。后续实现时,需要有消息编码器和消息解码器,编码器先new一个空的Buffer缓冲区,然后按照顺序向缓冲区依次写入这些数据;解码器在读取时也按照顺序依次读取,就能还原出编码前的数据。
通过这种约定的方式,我们就不用记录头信息了。比如magic魔数,不用存储“magic"这个字符串,而是读取第一个字节(前8bit) 就能获取到。
如果学过Redis底层,会发现很多数据结构都是这种设计。(如果是第一次设计协议,或者经验不足,可以先去学一下优秀开源框架的协议设计,这样不会说毫无头绪)例如参考Dubbo的协议设计:
实现步骤
构建步骤说明
【1】新建protocol包,存放所有和自定义协议相关的代码(协议消息类ProtocolMessage、协议常量类ProtocolConstant、消息字段枚举类ProtocolMessageStatusEnum、消息类型枚举ProtocolMessageTypeEnum、序列化器枚举ProtocolMessageSerializerEnum)
【2】网络传输相关:server.tcp包
1.protocol包
类 | 说明 |
---|---|
ProtocolMessage | 协议消息类 |
ProtocolConstant | 协议常量类 |
ProtocolMessageStatusEnum | 协议消息字段枚举类 |
ProtocolMessageTypeEnum | 协议消息类型枚举 |
ProtocolMessageSerializerEnum | 序列化器枚举 |
ProtocolMessage
ProtocolMessage:将消息头单独封装为一个内部类,消息体可以使用泛型类型
ProtocolConstant
ProtocolConstant:记录了和自定义协议有关的关键信息,例如消息头、魔数、版本号信息等
ProtocolMessageStatusEnum
ProtocolMessageStatusEnum:消息字段枚举类相关:协议状态枚举(暂定成功、请求失败、响应失败)
ProtocolMessageTypeEnum
ProtocolMessageTypeEnum:协议消息类型枚举(请求、响应、心跳、其他等)
ProtocolMessageSerializerEnum
ProtocolMessageSerializerEnum:序列化器枚举(和RPC框架支持的序列化器对应)
2.server.tcp包
目前RPC框架使用了高性能的Vert.x作为网络传输服务器,之前用的是HttpServer。同样,Vert.x 也支持TCP服务器,相比于Netty或者自写Socket代码,更加简单易用。
类 | 说明 |
---|---|
VertxTcpServer | TCP服务器实现:类似于VertxHttpServer实现 |
VertxTcpClient | TCP客户端实现:请求服务端 |
测试说明:先后启动服务端、客户端,随后查看控制台输出信息
3.编码器和解码器
基于上面的实现中,Vert.x的TCP服务器收发的消息是Buffer 类型,不能直接写入一一个对象。因此,需要自定义一个编码器和解码器,将Java的消息对象和Buffer进行相互转换。
http请求响应处理:从body处理器中获取到body字节数组,再通过序列化(反序列化)得到RpcRequest/RpcResponse对象。使用TCP服务器后调整为从Buffer中获取字节数组,然后再转码为RpcRequest/RpcResponse对象(相关处理流程都是可以复用的)
实现:protocol/ProtocolMessageEncoder、protocol/ProtocolMessageDecoder
类 | 说明 |
---|---|
ProtocolMessageEncoder | 消息编码器 |
ProtocolMessageDecoder | 消息解码器 |
ProtocolMessageEncoder
/**
* 协议消息编码器
*/
public class ProtocolMessageEncoder {
/**
* 编码
* @param protocolMessage
* @return
* @throws IOException
*/
public static Buffer encode(ProtocolMessage<?> protocolMessage) throws IOException {
if (protocolMessage == null || protocolMessage.getHeader() == null) {
return Buffer.buffer();
}
ProtocolMessage.Header header = protocolMessage.getHeader();
// 依次向缓冲区写入字节
Buffer buffer = Buffer.buffer();
buffer.appendByte(header.getMagic());
buffer.appendByte(header.getVersion());
buffer.appendByte(header.getSerializer());
buffer.appendByte(header.getType());
buffer.appendByte(header.getStatus());
buffer.appendLong(header.getRequestId());
// 获取序列化器
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
throw new RuntimeException("序列化协议不存在");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
// 写入 body 长度和数据
buffer.appendInt(bodyBytes.length);
buffer.appendBytes(bodyBytes);
return buffer;
}
}
ProtocolMessageDecoder
/**
* 协议消息解码器
*/
public class ProtocolMessageDecoder {
/**
* 解码
* @param buffer
* @return
* @throws IOException
*/
public static ProtocolMessage<?> decode(Buffer buffer) throws IOException {
// 分别从指定位置读出 Buffer
ProtocolMessage.Header header = new ProtocolMessage.Header();
byte magic = buffer.getByte(0);
// 校验魔数
if (magic != ProtocolConstant.PROTOCOL_MAGIC) {
throw new RuntimeException("消息 magic 非法");
}
header.setMagic(magic);
header.setVersion(buffer.getByte(1));
header.setSerializer(buffer.getByte(2));
header.setType(buffer.getByte(3));
header.setStatus(buffer.getByte(4));
header.setRequestId(buffer.getLong(5));
header.setBodyLength(buffer.getInt(13));
// 解决粘包问题,只读指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());
// 解析消息体
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
throw new RuntimeException("序列化消息的协议不存在");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
if (messageTypeEnum == null) {
throw new RuntimeException("序列化消息的类型不存在");
}
switch (messageTypeEnum) {
case REQUEST:
RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class);
return new ProtocolMessage<>(header, request);
case RESPONSE:
RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class);
return new ProtocolMessage<>(header, response);
case HEART_BEAT:
case OTHERS:
default:
throw new RuntimeException("暂不支持该消息类型");
}
}
}
测试
4.请求处理器(服务提供者)
可以使用ntty的pipeline组合多个handler (比如编码=>解码=>请求1响应处理)
请求处理器的作用是接受请求,然后通过反射调用服务实现类。类似之前的HttpServerHandler,开发一个TcpServerHandler用于处理请求。
和HttpServerHandler的区别:只是在获取请求、写入响应的方式上,需要调用上述编码器和解码器。通过实现Vert.x提供的Handler<NetSocket>
接口,可以定义TCP请求处理器
TcpServerHandler
public class TcpServerHandler implements Handler<NetSocket> {
/**
* 处理请求
*
* @param netSocket the event to handle
*/
@Override
public void handle(NetSocket netSocket) {
//处理连接
netSocket.handler(buffer -> {
//接受请求,解码
ProtocolMessage<RpcRequest> protocolMessage;
try {
protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
} catch (IOException e) {
throw new RuntimeException(" 协议消息解码错误");
}
RpcRequest rpcRequest = protocolMessage.getBody();
//处理请求
//构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
try {
//获取要调用的服务实现类,通过反射调用
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
//封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType(); rpcResponse.setMessage(" ok");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}
// 发送响应,编码
ProtocolMessage.Header header = protocolMessage.getHeader();
header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey();
ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
try {
Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
netSocket.write(encode);
} catch (IOException e) {
throw new RuntimeException(" 协议消息编码错误");
}
});
}
}
5.请求发送(服务消费者)
ServiceProxy
调整服务代理ServiceProxy消费者发送请求的代码,将http调整tcp请求
核心TCP处理思路(关注核心,后处理业务逻辑)
public class ServiceProxy implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(xxxx, result -> {
// ....... 业务逻辑处理完成响应(解码请求信息进行处理、编码响应数据).......
responseFuture.complete(rpcResponseProtocolMessage.getBody());
});
// 阻塞,直到响应完成才会继续向下执行
RpcResponse rpcResponse = responseFuture.get();
// 关闭TCP连接
netClient.close();
}
}
测试
修改CoreProviderSample服务提供者,将原有http服务启动调整为tcp服务启动(此处需注意需要修改VertxTcpServer的处理器为自定义的TcpServerHandler,统一请求和响应的编码解码,否则如果消费者那边ServiceProxy做了处理而VertxTcpServer还是默认之前写的测试方法,就会出错(例如验证魔法值不合法等等))
测试:CoreProviderSample服务提供者、EasyConsumerSample服务消费者
此处的CoreProviderSample需要将原有http启动方式调整为TCP启动,随后先后启动服务提供者、服务消费者再次连接尝试
// 启动web服务(从RPC框架中的全局配置中获取端口)
// HttpServer httpServer = new VertxHttpServer();
// httpServer.doStart(RpcApplication.getRpcConfig().getServerPort());
// TCP协议方式启动
VertxTcpServer vertxTcpServer = new VertxTcpServer();
vertxTcpServer.doStart(RpcApplication.getRpcConfig().getServerPort());
启动测试,主要关注服务消费者可否正常获取到对应的参数信息即可。如果出现异常则一步步进行排查
6.扩展问题
半包和粘包问题
如果上述步骤启动测试访问出现问题,则可能考虑一个点——粘包半包问题
【粘包半包】基本概念
使用TCP协议网络通讯,可能会出现粘包和半包问题,可以从实际例子理解:
【1】理想情况下,如果客户端连续2次发送消息,场景分析如下
# 客户端发送消息
// 第1次发送
hello noob!hello noob!hello noob!hello noob!
// 第2次发送
hello noob!hello noob!hello noob!hello noob!
# 服务端接收消息(可能存在情况:半包-每次收到的数据更少)
// 第1次接收
hello noob!hello noob!
// 第2次接收
hello noob!hello noob!hello noob!
# 服务端接收消息(可能存在情况:粘包-每次收到的数据更多)
// 第3次接收
hello noob!hello noob!hello noob!hello noob!hello noob!hello noob!
【2】实际案例分析
编写代码:VertxTcpClientTest、VertxTcpServerTest进行测试
/**
* Vertx TCP 服务器
*/
@Slf4j
public class VertxTcpServerTest {
public void doStart(int port) {
// 创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 创建 TCP 服务器
NetServer server = vertx.createNetServer();
// 处理请求
server.connectHandler(socket->{
// 处理连接
socket.handler(buffer -> {
/*
// 处理接收到的字节数组
byte[] requestData = buffer.getBytes();
// 自定义字节数组处理逻辑(例如解析请求、调用服务、构造响应等)
byte[] responseData = handleRequeset(requestData);
// 发送响应(向连接到服务器的客户端发送数据,数据格式为Buffer(Vertx提供的字节数组缓冲区实现))
socket.write(Buffer.buffer(responseData));
*/
String testMessage = "hello noob!hello noob!hello noob!hello noob!";
int messageLength = testMessage.getBytes().length;
int bufferLength = buffer.getBytes().length;
if(bufferLength<messageLength){
System.out.println("半包,length="+bufferLength);
return;
}
if(bufferLength>messageLength){
System.out.println("粘包,length="+bufferLength);
return;
}
String str = new String(buffer.getBytes(0, messageLength));
System.out.println(str);
if(testMessage.equals(str)){
System.out.println("数据接收正常");
}
});
});
// 启动 TCP 服务器并监听指定端口
server.listen(port, result -> {
if (result.succeeded()) {
log.info("TCP server started on port " + port);
} else {
log.info("Failed to start TCP server: " + result.cause());
}
});
}
/**
* 编写处理请求逻辑(结合实际业务场景编写)
* @param requestData
* @return
*/
private byte[] handleRequeset(byte[] requestData) {
return "hello Vertx Server".getBytes();
}
public static void main(String[] args) {
new VertxTcpServerTest().doStart(8888);
}
}
/**
* Vertx TCP 请求客户端
*/
public class VertxTcpClientTest {
public static void main(String[] args) {
new VertxTcpClientTest().start();
}
/**
* 发送请求
*/
public void start(){
// 创建Vert.x实例
Vertx vertx = Vertx.vertx();
vertx.createNetClient().connect(8888, "localhost", result -> {
if (result.succeeded()) {
System.err.println("connect to TCP server");
io.vertx.core.net.NetSocket socket = result.result();
// 发送数据(模拟发送1000次请求)
for(int i=0;i<1000;i++){
socket.write("hello noob!hello noob!hello noob!hello noob!");
}
// 接收响应
socket.handler(buffer -> {
System.out.println("received response from server: " + buffer.toString());
});
} else {
System.err.println("fail to connect to TCP server");
}
});
}
}
如果直接启动服务端,随后启动客户端会发现控制台没有打印信息(如果设置断点一步步放行则可看到对应的输出信息),socker编程启动,但是控制台不打印日志可能是有许多问题导致(日志框架、缓冲问题等),此处也可能是一个低级错误,例如本地启动服务端、客户端,如果启动客户端,对应socker监听应该要查看的是服务端的日志输出(即VertxTcpClientTest启动测试,应该对应查看请求响应服务的日志VertxTcpServerTest,不要混淆了概念)
如何解决半包?
解决半包的核心思路是:在消息头中设置请求体的长度,服务端接收时,判断每次消息的长度是否符合预期,不完整就不读,留到下一次接收到消息时再读取。
if(buffer==null||buffer.length()==0){
throw new RuntimeException("消息buffer为空");
}
if(buffer.getBytes().length<ProtocolConstant.MESSAGE_HEADER_LENGTH){
throw new RuntimeException("出现半包问题");
}
如何解决粘包?
解决粘包的核心思路:每次只读取指定长度的数据,超过长度的留着下一次接收到信息再读取
// 解决粘包问题,只读指定长度的数据
byte[] bodyBytes=buffer.getBytes(17,17+header.getBodyLength());
实现思路看上去简单,但是实现还是比较麻烦,要记录每次接收到的消息位置,维护字节数字缓存。因此考虑使用Vert.x解决半包和粘包问题
Vert.x解决半包和粘包
在Vert.x框架中,可以使用内置的RecordParser 完美解决半包粘包,它的作用是:保证下次读取到特定长度的字符(先不要急着直接修改业务代码,而是先学会该类库的使用,跑通测试流程,再引入到自己的业务代码中)
RecordParser读取固定长度消息
/**
* Vertx TCP 服务器
*/
@Slf4j
public class VertxTcpServerTestByParse {
public void doStart(int port) {
// 创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 创建 TCP 服务器
NetServer server = vertx.createNetServer();
// 处理请求
server.connectHandler(socket->{
// 处理连接
socket.handler(buffer -> {
String testMessage = "hello noob!hello noob!hello noob!hello noob!";
int messageLength = testMessage.getBytes().length;
// 构造parse
RecordParser parser = RecordParser.newFixed(messageLength);
parser.setOutput(new Handler<Buffer>() {
@Override
public void handle(Buffer buffer) {
String str = new String(buffer.getBytes());
System.out.println(str);
if(testMessage.equals(str)){
System.out.println("数据接收正常");
}
}
});
// 使用parse
socket.handler(parser);
});
});
// 启动 TCP 服务器并监听指定端口
server.listen(port, result -> {
if (result.succeeded()) {
log.info("TCP server started on port " + port);
} else {
log.info("Failed to start TCP server: " + result.cause());
}
});
}
}
实际运用中,消息体的长度是不固定的,所以要通过调整RecordParser的固定长度(变长)来解决。此处可以考虑将读取完整的消息拆分为2次:
【1】先完整读取请求头信息,由于请求头信息长度是固定的,可以使用RecordParser 保证每次都完整读取
【2】再根据请求头长度信息更改RecordParser 的固定长度,保证完整获取到请求体
/**
* Vertx TCP 服务器
*/
@Slf4j
public class VertxTcpServerTestByParse {
public void doStart(int port) {
// 创建 Vert.x 实例
Vertx vertx = Vertx.vertx();
// 创建 TCP 服务器
NetServer server = vertx.createNetServer();
// 处理请求
server.connectHandler(socket -> {
// 处理连接
socket.handler(buffer -> {
// 构造parse
RecordParser parser = RecordParser.newFixed(8);
parser.setOutput(new Handler<Buffer>() {
// 初始化
int size = -1;
// 一次完整的读取(头+体)
Buffer resultBuffer = Buffer.buffer();
@Override
public void handle(Buffer buffer) {
if (-1 == size) {
// 读取消息体长度
size = buffer.getInt(4);
parser.fixedSizeMode(size);
// 写入头信息到结果
resultBuffer.appendBuffer(buffer);
} else {
// 写入体信息到结果
resultBuffer.appendBuffer(buffer);
System.out.println(resultBuffer.toString());
// 重置一轮
parser.fixedSizeMode(8);
size = -1;
resultBuffer = Buffer.buffer();
}
// System.out.println(resultBuffer);
}
});
// 使用parse
socket.handler(parser);
});
});
// 启动 TCP 服务器并监听指定端口
server.listen(port, result -> {
if (result.succeeded()) {
log.info("TCP server started on port " + port);
} else {
log.info("Failed to start TCP server: " + result.cause());
}
});
}
/**
* 编写处理请求逻辑(结合实际业务场景编写)
* @param requestData
* @return
*/
private byte[] handleRequest(byte[] requestData) {
return "hello Vertx Server".getBytes();
}
public static void main(String[] args) {
new VertxTcpServerTestByParse().doStart(8888);
}
}
/**
* Vertx TCP 请求客户端
*/
public class VertxTcpClientTestByParse {
public static void main(String[] args) {
new VertxTcpClientTestByParse().start();
}
/**
* 发送请求
*/
public void start() {
// 创建Vert.x实例
Vertx vertx = Vertx.vertx();
vertx.createNetClient().connect(8888, "localhost", result -> {
if (result.succeeded()) {
System.err.println("connect to TCP server");
io.vertx.core.net.NetSocket socket = result.result();
// 发送数据(模拟发送1000次请求)
for (int i = 0; i < 1000; i++) {
Buffer buffer = Buffer.buffer();
String str = "hello noob!hello noob!hello noob!hello noob!";
buffer.appendInt(0);
buffer.appendInt(str.getBytes().length);
buffer.appendBytes(str.getBytes());
socket.write(buffer);
}
// 接收响应
socket.handler(buffer -> {
System.out.println("received response from server: " + buffer.toString());
});
} else {
System.err.println("fail to connect to TCP server");
}
});
}
}
装饰者模式
装饰者模式封装半包、粘包处理器,使用RecordParser对原有的Bufer处理器的能力进行增强。装饰者模式可以简单理解为给对象穿装备,增强对象的能力。
TcpBufferHandlerWrapper
在server.tcp包下新建TcpBufferHandlerWrapper类,实现并增强Handler<Buffer>
接口
TcpServerHandler
修改TcpServerHandler处理逻辑,使用TcpBufferHandlerWrapper封装之前处理请求的代码(请求逻辑不变,修改部分引用)
public class TcpServerHandler implements Handler<NetSocket> {
/**
* 处理请求
* @param netSocket the event to handle
*/
@Override
public void handle(NetSocket netSocket) {
TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {
// ------ 处理逻辑 ------
});
netSocket.handler(bufferHandlerWrapper);
}
}
VertxTcpClient、ServiceProxy代码优化
此前将所有的发送请求、处理响应代码写到了ServiceProxy中,使得这个类看起来臃肿不堪,可以做个优化,将所有的请求响应逻辑提取出来,封装为单独的VertxTcpClient类
相当于将ServiceProxy中TCP处理响应的业务逻辑抽离出来,放到VertxTcpClient中
/**
* Vertx TCP 请求客户端
*/
public class VertxTcpClient {
/**
* 发送请求封装
*
* @param rpcRequest
* @param serviceMetaInfo
* @return
* @throws InterruptedException
* @throws ExecutionException
*/
public static RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo serviceMetaInfo) throws InterruptedException, ExecutionException {
// 发送 TCP 请求
Vertx vertx = Vertx.vertx();
NetClient netClient = vertx.createNetClient();
CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(serviceMetaInfo.getServicePort(), serviceMetaInfo.getServiceHost(),
result -> {
if (!result.succeeded()) {
System.err.println("Failed to connect to TCP server");
return;
}
NetSocket socket = result.result();
// 发送数据,构造消息
ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>();
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());
header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
// 生成全局请求 ID
header.setRequestId(IdUtil.getSnowflakeNextId());
protocolMessage.setHeader(header);
protocolMessage.setBody(rpcRequest);
// 编码请求
try {
Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
socket.write(encodeBuffer);
} catch (IOException e) {
throw new RuntimeException("协议消息编码错误");
}
// 接收响应
TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(
buffer -> {
try {
ProtocolMessage<RpcResponse> rpcResponseProtocolMessage =
(ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
responseFuture.complete(rpcResponseProtocolMessage.getBody());
} catch (IOException e) {
throw new RuntimeException("协议消息解码错误");
}
}
);
socket.handler(bufferHandlerWrapper);
});
RpcResponse rpcResponse = responseFuture.get();
// 记得关闭连接
netClient.close();
return rpcResponse;
}
}
简化ServiceProx服务调用:
RpcResponse rpcResponse = VertxTcpClient.doRequest(rpcRequest,selectedServiceMetaInfo);
return rpcResponse.getData();
启动测试:再次启动CoreProviderSample、EasyConsumerSample访问测试
扩展说明
扩展说明
(1)自己定义一个占用空间更少的RPC协议的消息结构
参考思路:序列化方式字段目前占用了8 bit,但其实总共就几种序列化方式,能否只占用4bit?其他字段也可以按照这种方式思考
思考问题:为什么tcpServer不提供个server接口,或者和httpServer共用接口?
替换这两个服务器(协议实现)涉及的改动点非常多,比如RPC协议、请求处理器等,不是直接能通过配置就替换的,而且RPC框架一般也不需要替换底层的协议,只使用TCP会更好
在系统设计时,按需设计、灵活应用,而不要无脑应用