rpc zookeeper netty pool
一个基于 nacos / zookeeper 自动注册扩展服务、使用 netty 长连接池的高性能轻量级RPC框架
<dependency>
<groupId>com.github.tohodog</groupId>
<artifactId>qsrpc</artifactId>
<version>1.3.0</version>
</dependency>
First configured
nacos
/
zookeeper
#nacos
qsrpc.nacos.addr=192.168.0.100:8848
#qsrpc.nacos.srvname=qsrpc
#zookeeper
#qsrpc.zk.ips=127.0.0.1:2181
#qsrpc.zk.path=/qsrpc
#node server
qsrpc.node.ip=127.0.0.1
qsrpc.node.port=19980
qsrpc.node.action=user,order
#qsrpc.node.weight=1
#qsrpc.node.zip=snappy/gzip
#qsrpc.connect.timeout=60000
//open node server 1 (read application.properties)
NodeInfo nodeInfo = NodeRegistry.buildNode();
//sync callback
NodeLauncher.start(nodeInfo, new MessageListener() {
@Override
public byte[] onMessage(Async async, byte[] message) {
return ("Hello! node1 callback -" + new String(message)).getBytes();
}
});
// open node server 2
ServerConfig.RPC_CONFIG.setNacosAddr("192.168.0.100:8848");
ServerConfig.RPC_CONFIG.setNacosServiceName("qsrpc");
// ServerConfig.RPC_CONFIG.setZkIps("127.0.0.1:2181");
// ServerConfig.RPC_CONFIG.setZkPath("/qsrpc");
NodeInfo nodeInfo2 = new NodeInfo();
nodeInfo2.setAction("order");//node server action
nodeInfo2.setIp("127.0.0.1");//node server ip
nodeInfo2.setPort(8848);//nodeserver port
nodeInfo2.setWeight(2);//request weight
//async callback
NodeLauncher.start(nodeInfo2, new MessageListener() {
@Override
public byte[] onMessage(final Async async, final byte[] message) {
new Thread(new Runnable() {
@Override
public void run() {
async.callBack(("Hello! node2 callback -" + new String(message)).getBytes());
}
}).start();
return null;
}
});
//async
for (int i = 0; i < 9; i++) {
//Send byte[] based on action
RPCClientManager.getInstance().sendAsync("user", "user".getBytes(),
new Callback<byte[]>() {
@Override
public void handleResult(byte[] result) {
System.out.println("send [user] Result: " + new String(result));
}
@Override
public void handleError(Throwable error) {
error.printStackTrace();
}
});
}
System.out.println("send [user] Done");
//sync
for (int i = 0; i < 9; i++) {
Thread.sleep(1000);
byte[] msg_cb = RPCClientManager.getInstance().sendSync("order", "order".getBytes());
System.out.println("send [order] Result: " + new String(msg_cb));
}
System.out.println("send [order] Done");
//future
CallFuture<byte[]> callFuture = RPCClientManager.getInstance().sendAsync("user", "user".getBytes());
System.out.println("send [user] FutureResult: " + new String(callFuture.get()));
Run TestConcurrent.java (Don’t open the console and 360 antivirus etc.)
CPU | request | time | qps |
---|---|---|---|
i3-8100(4-core/4-thread) | 100w(8-thread) | 7817ms | 127926 |
i7-8700(6-core/12-thread) | 100w(8-thread) | 3010ms | 332225 |
在4核自发自收的情况下有12万+的并发数,实际会更高 测试截图1 测试截图2
本项目tcp通信使用长连接+全双工通信(两边可以同时收/发消息),可以保证更大的吞吐量/更少的连接数资源占用,理论上使用一个tcp连接即可满足通信(详见pool),如果使用http/1.1协议的请求-响应模式,同一个连接在同一个时刻只能有一个消息进行传输,如果有大量请求将会阻塞或者需要开更多tcp连接来解决
TCP | 长度 | 消息ID | 协议号 | 加密/压缩 | 内容 | 包尾 |
---|---|---|---|---|---|---|
Byte | 4 | 4 | 1 | 1(4bit+4bit) | n | 2 |
首先,使用长连接那就需要解决tcp粘包问题,常见的两种方式:
当出现带宽不足而CPU性能有余时,压缩就派上用场了,用时间换空间。目前支持了snappy/gzip两种压缩,snappy应用于google的rpc上,具有高速压缩速度和合理的压缩率,gzip速度次于snappy,但压缩率较高,根据实际情况配置,前提必须是带宽出现瓶颈/要求,否则不需要开启压缩
加密功能计划中(加盐位算法)
网络IO目前是基于netty搭建的,支持nio,zero-copy等特性,由于本框架连接模式使用长连接,连接数固定且较少,所以本框架性能对于IO模式(BIO/NIO/AIO)并不是很敏感,netty对于http,iot服务这种有大量连接数的优势就很大了
前面说了一个tcp连接即可支撑通信,为啥又用pool了呢,原因有两个:1. netty工作线程对于同一个连接使用同一个线程来处理的,所以如果客户端发送大量请求时,服务端只有一个线程在处理导致性能问题,起初是想服务端再把消息分发到线程池,但后续测试发现此操作在高并发下会导致延迟增大,因为又把消息放回线程池排队了。2. 相对于一条tcp链接,使用pool会更加灵活,且连接数也很少,并没有性能影响; 本框架还基于pool实现了一个[请求-响应]的通信模式
客户端Pool的maxIdle(maxActive)=服务节点配置的CPU线程数2=服务节点netty的工作线程数,pool采用FIFO先行先出的策略,可以保证在高并发下均匀的使用tcp连接,服务端就不用再次分发消息了
分布式系统中都需要一个配置/服务中心,才能进行统一管理.本框架目前使用zookeeper(已支持nacos)进行服务注册,zookeeper是使用类似文件目录的结构,每个目录都可以存一个data
节点注册是使用[IP:PROT_NAME_TIME]作为目录名,data存了节点的json数据,创建模式为EPHEMERAL_SEQUENTIAL(断开后会删除该目录),这样就达到了自动监听节点上下线的效果,加入时间戳是为了解决当节点快速重启时,注册了两个目录,便于进行区分处理
客户端通过watch目录变化信息,从而获取到所有服务节点信息,同步一个副本到本地Map里(需加上读写锁),客户端就可以实现高效调用对应的服务了