项目作者: tohodog

项目描述 :
rpc zookeeper netty pool
高级语言: Java
项目地址: git://github.com/tohodog/QSRPC.git
创建时间: 2019-04-29T03:32:01Z
项目社区:https://github.com/tohodog/QSRPC

开源协议:Apache License 2.0

下载


logo


一个基于 nacos / zookeeper 自动注册扩展服务、使用 netty 长连接池的高性能轻量级RPC框架

netty nacos zk License

  • 使用 nacos / zookeeper 服务发现, 自动注册扩展服务
  • 使用长连接TCP池, netty 作为网络IO, 支持全双工通信, 高性能
  • 消息发送支持异步/同步
  • 自动选择符合 action 节点服务器, 支持权重分发消息
  • 支持 snappy, gzip 压缩
  • 可进行二次封装开发, 远程调用, 消息路由负载均衡等等
  • 欢迎学习交流~见[QSRPC项目技术选型及简介]

ad

Maven

  1. <dependency>
  2. <groupId>com.github.tohodog</groupId>
  3. <artifactId>qsrpc</artifactId>
  4. <version>1.3.0</version>
  5. </dependency>

Demo

First configured
nacos
/
zookeeper

application.properties

  1. #nacos
  2. qsrpc.nacos.addr=192.168.0.100:8848
  3. #qsrpc.nacos.srvname=qsrpc
  4. #zookeeper
  5. #qsrpc.zk.ips=127.0.0.1:2181
  6. #qsrpc.zk.path=/qsrpc
  7. #node server
  8. qsrpc.node.ip=127.0.0.1
  9. qsrpc.node.port=19980
  10. qsrpc.node.action=user,order
  11. #qsrpc.node.weight=1
  12. #qsrpc.node.zip=snappy/gzip
  13. #qsrpc.connect.timeout=60000

Node

  1. //open node server 1 (read application.properties)
  2. NodeInfo nodeInfo = NodeRegistry.buildNode();
  3. //sync callback
  4. NodeLauncher.start(nodeInfo, new MessageListener() {
  5. @Override
  6. public byte[] onMessage(Async async, byte[] message) {
  7. return ("Hello! node1 callback -" + new String(message)).getBytes();
  8. }
  9. });
  10. // open node server 2
  11. ServerConfig.RPC_CONFIG.setNacosAddr("192.168.0.100:8848");
  12. ServerConfig.RPC_CONFIG.setNacosServiceName("qsrpc");
  13. // ServerConfig.RPC_CONFIG.setZkIps("127.0.0.1:2181");
  14. // ServerConfig.RPC_CONFIG.setZkPath("/qsrpc");
  15. NodeInfo nodeInfo2 = new NodeInfo();
  16. nodeInfo2.setAction("order");//node server action
  17. nodeInfo2.setIp("127.0.0.1");//node server ip
  18. nodeInfo2.setPort(8848);//nodeserver port
  19. nodeInfo2.setWeight(2);//request weight
  20. //async callback
  21. NodeLauncher.start(nodeInfo2, new MessageListener() {
  22. @Override
  23. public byte[] onMessage(final Async async, final byte[] message) {
  24. new Thread(new Runnable() {
  25. @Override
  26. public void run() {
  27. async.callBack(("Hello! node2 callback -" + new String(message)).getBytes());
  28. }
  29. }).start();
  30. return null;
  31. }
  32. });

Client

  1. //async
  2. for (int i = 0; i < 9; i++) {
  3. //Send byte[] based on action
  4. RPCClientManager.getInstance().sendAsync("user", "user".getBytes(),
  5. new Callback<byte[]>() {
  6. @Override
  7. public void handleResult(byte[] result) {
  8. System.out.println("send [user] Result: " + new String(result));
  9. }
  10. @Override
  11. public void handleError(Throwable error) {
  12. error.printStackTrace();
  13. }
  14. });
  15. }
  16. System.out.println("send [user] Done");
  17. //sync
  18. for (int i = 0; i < 9; i++) {
  19. Thread.sleep(1000);
  20. byte[] msg_cb = RPCClientManager.getInstance().sendSync("order", "order".getBytes());
  21. System.out.println("send [order] Result: " + new String(msg_cb));
  22. }
  23. System.out.println("send [order] Done");
  24. //future
  25. CallFuture<byte[]> callFuture = RPCClientManager.getInstance().sendAsync("user", "user".getBytes());
  26. System.out.println("send [user] FutureResult: " + new String(callFuture.get()));

Test

Run TestConcurrent.java (Don’t open the console and 360 antivirus etc.)

CPU request time qps
i3-8100(4-core/4-thread) 100w(8-thread) 7817ms 127926
i7-8700(6-core/12-thread) 100w(8-thread) 3010ms 332225

在4核自发自收的情况下有12万+的并发数,实际会更高 测试截图1 测试截图2

Future

  • Support nacos…
  • AIO…

QSRPC项目技术选型及简介

1.TCP通信

1.1 连接模式:

 本项目tcp通信使用长连接+全双工通信(两边可以同时收/发消息),可以保证更大的吞吐量/更少的连接数资源占用,理论上使用一个tcp连接即可满足通信(详见pool),如果使用http/1.1协议的请求-响应模式,同一个连接在同一个时刻只能有一个消息进行传输,如果有大量请求将会阻塞或者需要开更多tcp连接来解决

1.2 协议:

TCP 长度 消息ID 协议号 加密/压缩 内容 包尾
Byte 4 4 1 1(4bit+4bit) n 2

 首先,使用长连接那就需要解决tcp粘包问题,常见的两种方式:

  • 包头长度:优点最简单,也是最高效的,缺点是无法感知数据包错误,会导致后续所有包错乱
  • 特定包尾:优点能感知包错误,不影响后续包,缺点需要遍历所有字节,且不能与包内容冲突


     综上,本框架使用的是包头长度+特定包尾,结合了两者优点,避免了缺点,高效实用,检测到包错误会自动断开.
    没有使用校检码转码等,因为需要考虑实际情况,内网里出错概率非常低,出错了也能重连,对于RPC框架追求性能来说是合适的,即使是外网,后续有需求可以增加校验加密协议


     其次,因为支持全双工那就需要解决消息回调问题,本协议使用了一个消息ID,由客户端生成,服务端返回消息带上;由于发送和接收是非连续的,所以客户端需要维护一个回调池,以ID为key,value为此次请求的context(callback),因为是异步的,请求有可能没有响应,所以池需要有超时机制

1.3 压缩/加密:

 当出现带宽不足而CPU性能有余时,压缩就派上用场了,用时间换空间。目前支持了snappy/gzip两种压缩,snappy应用于google的rpc上,具有高速压缩速度和合理的压缩率,gzip速度次于snappy,但压缩率较高,根据实际情况配置,前提必须是带宽出现瓶颈/要求,否则不需要开启压缩

 加密功能计划中(加盐位算法)

1.4 IO框架:

网络IO目前是基于netty搭建的,支持nio,zero-copy等特性,由于本框架连接模式使用长连接,连接数固定且较少,所以本框架性能对于IO模式(BIO/NIO/AIO)并不是很敏感,netty对于http,iot服务这种有大量连接数的优势就很大了

2. Tcp pool

 前面说了一个tcp连接即可支撑通信,为啥又用pool了呢,原因有两个:1. netty工作线程对于同一个连接使用同一个线程来处理的,所以如果客户端发送大量请求时,服务端只有一个线程在处理导致性能问题,起初是想服务端再把消息分发到线程池,但后续测试发现此操作在高并发下会导致延迟增大,因为又把消息放回线程池排队了。2. 相对于一条tcp链接,使用pool会更加灵活,且连接数也很少,并没有性能影响; 本框架还基于pool实现了一个[请求-响应]的通信模式


 客户端Pool的maxIdle(maxActive)=服务节点配置的CPU线程数
2=服务节点netty的工作线程数,pool采用FIFO先行先出的策略,可以保证在高并发下均匀的使用tcp连接,服务端就不用再次分发消息了

3. 服务注册发现

 分布式系统中都需要一个配置/服务中心,才能进行统一管理.本框架目前使用zookeeper(已支持nacos)进行服务注册,zookeeper是使用类似文件目录的结构,每个目录都可以存一个data

 节点注册是使用[IP:PROT_NAME_TIME]作为目录名,data存了节点的json数据,创建模式为EPHEMERAL_SEQUENTIAL(断开后会删除该目录),这样就达到了自动监听节点上下线的效果,加入时间戳是为了解决当节点快速重启时,注册了两个目录,便于进行区分处理

 客户端通过watch目录变化信息,从而获取到所有服务节点信息,同步一个副本到本地Map里(需加上读写锁),客户端就可以实现高效调用对应的服务了

Log

v1.3.0(2021-11-25)

  • 重构服务发现模块
  • 优化nacos,使用index,增量拉取节点信息
  • 升级依赖
  • 其他优化…

    v1.2.0(2021-04-16)

  • 支持Nacos 2.0
  • 优化zk服务发现性能
  • 支持代码配置参数
  • 其他优化…

    v1.1.2(2020-11-26)

  • 支持根据IP选择指定节点
  • 池增加驱逐机制
  • 优化选择节点性能
  • 其他优化…

    v1.1.1(2020-11-22)

  • Support compress
  • Optimization pool log test…

v1.0.1(2019-09-26)

  • Support future get
  • Optimization

    v1.0.0(2019-09-19)

  • Open sourse

Other

  • 有问题请Add issues
  • 如果项目对你有帮助的话欢迎starsvg