项目作者: Nepxion

项目描述 :
? Nepxion Coroutine is a coroutine-driven distribution framework based on Kilim + Promise JDeferred + Zookeeper + Spring Boot, support Nepxion Thunder, Dubbo, Motan 基于规则配置的集成RPC调用的协程分布式调用的聚合框架
高级语言: Java
项目地址: git://github.com/Nepxion/Coroutine.git


Nepxion Coroutine

Total visits License Build Status Codacy Badge Stars Stars

Nepxion Coroutine是一款基于Kilim + Promise JDeferred + Zookeeper + Spring Boot的协程分布式调用的聚合框架,提供聚合规则存储和动态变更通知

介绍

  • 基于微服务框架理念设计
  • 支持同步/异步调用
  • 支持串行/并行调用
  • 支持本地/分布式(包括Thunder,Dubbo,Motan等)/混合链式调用
  • 支持嵌套规则/子规则调用
  • 支持本地/分布式规则引用
  • 支持调用链追踪
  • 异常捕获后智能处理链式调用的终止

架构

协程工作场景图

Coroutine架构图

Coroutine链式调用图

规则

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <coroutine>
  3. <!-- 规则定义 -->
  4. <!-- 1. 规则目录即为注册中心的目录(category)节点名,规则名称即为存储当前规则内容的规则(rule)节点名;如果是本地规则,这两者可以随意定义。例如方法调用时,CoroutineManager.load().startSync("规则目录", "规则名称"...) -->
  5. <!-- 2. 协程(coroutine)节点下可以存在多个规则(rule)节点,以版本号(version)为区分,驱动过程采用最大版本号的规则,版本号必须全局唯一 -->
  6. <rule version="1">
  7. <!-- 规则组件定义 -->
  8. <!-- 规则组件支持本地引用和远程分布式(例如Dubbo接口)引用 -->
  9. <!-- 1. 规则组件的本地引用方式,采用类反射机制 -->
  10. <!-- class为类定义,class属性为类的全路径,例如class="com.nepxion.coroutine.test.service.impl.AServiceImpl" -->
  11. <!-- index为索引号,在当前规则下必须全局唯一 -->
  12. <!-- method为方法定义,method属性为对应方法名 -->
  13. <!-- parameterTypes为参数类型定义,如果一个接口/类下,存在多态的方法(即方法名相同,参数类型不一样),必须以参数类型作为区分 -->
  14. <component>
  15. <class class="com.nepxion.coroutine.test.service.impl.AServiceImpl">
  16. <method index="1" method="doA" parameterTypes="java.lang.String,int"></method>
  17. <method index="2" method="doA" parameterTypes="java.lang.String"></method>
  18. </class>
  19. <class class="com.nepxion.coroutine.test.service.impl.BServiceImpl">
  20. <method index="3" method="doB"></method>
  21. </class>
  22. </component>
  23. <!-- 2. 规则组件的远程分布式注入方式,采用接口注入机制方式 -->
  24. <!-- applicationContext为标准的Spring xml路径配置方式,例如applicationContext="classpath*:cApplicationContext.xml",applicationContext.xml名称必须全局唯一 -->
  25. <!-- id为Spring Bean的id,id必须全局唯一 -->
  26. <!-- index为索引号,在当前规则下必须全局唯一 -->
  27. <!-- method为方法定义,method属性为对应方法名 -->
  28. <!-- parameterTypes为参数类型定义,如果一个接口/类下,存在多态的方法(即方法名相同,参数类型不一样),必须以参数类型作为区分 -->
  29. <component applicationContext="classpath*:cApplicationContext.xml">
  30. <class id="cService">
  31. <method index="4" method="doC"></method>
  32. </class>
  33. </component>
  34. <component applicationContext="classpath*:dApplicationContext.xml">
  35. <class id="dService">
  36. <method index="5" method="doD"></method>
  37. </class>
  38. </component>
  39. <!-- 子规则依赖定义,可以存在多个依赖(dependency)节点 -->
  40. <!-- 1. 子规则不能当前父规则,否则会引起死循环。例如父规则A,引用子规则B,子规则B又引用父规则A -->
  41. <dependency index="5" category="A规则目录" rule="A-1规则" chain="a" timeout="5000"></dependency>
  42. <dependency index="6" category="A规则目录" rule="A-2规则" chain="b" timeout="5000"></dependency>
  43. <dependency index="7" category="B规则目录" rule="B-1规则" chain="c" timeout="5000"></dependency>
  44. <dependency index="8" category="B规则目录" rule="B-2规则" chain="d" timeout="5000"></dependency>
  45. <!-- 链式调用定义 -->
  46. <!-- 可定义多个chain。调用端需要把name值传入,如果配置里name不配,则传入null即可 -->
  47. <!-- 1. 并行(when)的索引(index)值列表,不需要区分次序 -->
  48. <!-- 2. 串行(then)的索引(index)值列表,需要区分次序 -->
  49. <chain name="x">
  50. <then index="1,2"></then>
  51. <when index="3,4"></when>
  52. <then index="5,6,7,8"></then>
  53. </chain>
  54. <chain name="y">
  55. <then index="1,2"></then>
  56. <when index="3,4"></when>
  57. <then index="5,6,7,8"></then>
  58. </chain>
  59. </rule>
  60. </coroutine>

示例

异步调用

  1. CoroutineManager.load().startAsync("PayRoute", "Rule", chainName, new String[] { "入参" }, false, new CoroutineCallback<CoroutineResult<Object>>() {
  2. @Override
  3. public void onResult(CoroutineResult<Object> result) {
  4. LOG.info("异步回调结果: 线程序号={}, id={}, result={}", index, result.getId(), result.getResult());
  5. }
  6. @Override
  7. public void onError(Exception exception) {
  8. LOG.error("异步回调异常", exception);
  9. }
  10. });

同步调用

  1. try {
  2. CoroutineResult<Object> result = CoroutineManager.load().startSync("PayRoute", "Rule", chainName, new String[] { "入参" }, 3000, false);
  3. LOG.info("同步调用结果: 线程序号={}, id={}, result={}", index, result.getId(), result.getResult());
  4. } catch (Exception e) {
  5. LOG.error("同步调用异常", e);
  6. }

本地调用方式

参照coroutine-test工程

定义规则1

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <coroutine>
  3. <rule version="1">
  4. <component>
  5. <class class="com.nepxion.coroutine.test.service.impl.AServiceImpl">
  6. <method index="1" method="doThen"></method>
  7. <method index="2" method="doWhen"></method>
  8. <method index="3" method="doMerge"></method>
  9. </class>
  10. <class class="com.nepxion.coroutine.test.service.impl.BServiceImpl">
  11. <method index="4" method="doThen"></method>
  12. <method index="5" method="doWhen"></method>
  13. <method index="6" method="doMerge"></method>
  14. </class>
  15. <class class="com.nepxion.coroutine.test.service.impl.CServiceImpl">
  16. <method index="7" method="doThen"></method>
  17. <method index="8" method="doWhen"></method>
  18. <method index="9" method="doMerge"></method>
  19. </class>
  20. <class class="com.nepxion.coroutine.test.service.impl.DServiceImpl">
  21. <method index="10" method="doThen"></method>
  22. <method index="11" method="doWhen"></method>
  23. <method index="12" method="doMerge"></method>
  24. </class>
  25. </component>
  26. <dependency index="13" category="PayRoute" rule="SubRule" chain="chain2-2" file="rule2.xml" timeout="5000"></dependency>
  27. <chain name="chain1-1">
  28. <then index="1,4"></then>
  29. <when index="8,11"></when>
  30. <then index="12,1,13"></then>
  31. </chain>
  32. <chain name="chain1-2">
  33. <then index="1,4"></then>
  34. <when index="8,11"></when>
  35. <then index="12,1,13"></then>
  36. </chain>
  37. </rule>
  38. </coroutine>

定义规则2

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <coroutine>
  3. <rule version="1">
  4. <component>
  5. <class class="com.nepxion.coroutine.test.service.impl.AServiceImpl">
  6. <method index="1" method="doThen"></method>
  7. <method index="2" method="doWhen"></method>
  8. <method index="3" method="doMerge"></method>
  9. </class>
  10. <class class="com.nepxion.coroutine.test.service.impl.BServiceImpl">
  11. <method index="4" method="doThen"></method>
  12. <method index="5" method="doWhen"></method>
  13. <method index="6" method="doMerge"></method>
  14. </class>
  15. <class class="com.nepxion.coroutine.test.service.impl.CServiceImpl">
  16. <method index="7" method="doThen"></method>
  17. <method index="8" method="doWhen"></method>
  18. <method index="9" method="doMerge"></method>
  19. </class>
  20. <class class="com.nepxion.coroutine.test.service.impl.DServiceImpl">
  21. <method index="10" method="doThen"></method>
  22. <method index="11" method="doWhen"></method>
  23. <method index="12" method="doMerge"></method>
  24. </class>
  25. </component>
  26. <chain name="chain2-1">
  27. <when index="2,5"></when>
  28. <then index="9,10"></then>
  29. <when index="8,11"></when>
  30. <then index="3,4"></then>
  31. </chain>
  32. <chain name="chain2-2">
  33. <then index="1,4,7,10"></then>
  34. </chain>
  35. </rule>
  36. </coroutine>

调用入口

  1. package com.nepxion.coroutine.test;
  2. /**
  3. * <p>Title: Nepxion Coroutine</p>
  4. * <p>Description: Nepxion Coroutine For Distribution</p>
  5. * <p>Copyright: Copyright (c) 2017</p>
  6. * <p>Company: Nepxion</p>
  7. * @author Haojun Ren
  8. * @version 1.0
  9. */
  10. import java.util.Timer;
  11. import java.util.TimerTask;
  12. import org.junit.Test;
  13. import org.slf4j.Logger;
  14. import org.slf4j.LoggerFactory;
  15. import com.nepxion.coroutine.common.callback.CoroutineCallback;
  16. import com.nepxion.coroutine.data.entity.CoroutineResult;
  17. import com.nepxion.coroutine.framework.core.CoroutineManager;
  18. public class CoroutineTest {
  19. private static final Logger LOG = LoggerFactory.getLogger(CoroutineTest.class);
  20. @Test
  21. public void testRemote() throws Exception {
  22. // 请确保Zookeeper有对应的规则(运行CoroutineRuleRegistryTest相关方法)
  23. // 从远程注册中心装载
  24. // 启动和远程注册中心连接
  25. CoroutineManager.start();
  26. // 解析远端规则(支持子规则引用)
  27. CoroutineManager.parseRemote("PayRoute", "Rule");
  28. // 链名称从xml配置中获取
  29. invokeAsync("chain1-1");
  30. invokeSync("chain1-2");
  31. System.in.read();
  32. }
  33. @Test
  34. public void testLocalRule1() throws Exception {
  35. // 从本地装载
  36. // 解析本地规则(不支持子规则引用)
  37. CoroutineManager.parseLocal("PayRoute", "Rule", "rule1.xml");
  38. // 链名称从xml配置中获取
  39. invokeAsync("chain1-1");
  40. invokeSync("chain1-2");
  41. System.in.read();
  42. }
  43. @Test
  44. public void testLocalRule2() throws Exception {
  45. // 从本地装载
  46. // 解析本地规则(不支持子规则引用)
  47. CoroutineManager.parseLocal("PayRoute", "Rule", "rule2.xml");
  48. // 链名称从xml配置中获取
  49. invokeAsync("chain2-1");
  50. invokeSync("chain2-2");
  51. System.in.read();
  52. }
  53. public void invokeAsync(final String chainName) {
  54. Timer timer = new Timer();
  55. timer.scheduleAtFixedRate(new TimerTask() {
  56. public void run() {
  57. for (int i = 0; i < 1; i++) {
  58. final int index = i % 5;
  59. CoroutineManager.load().startAsync("PayRoute", "Rule", chainName, new String[] { "Start[" + index + "]" }, false, new CoroutineCallback<CoroutineResult<Object>>() {
  60. @Override
  61. public void onResult(CoroutineResult<Object> result) {
  62. LOG.info("异步回调结果: 线程序号={}, id={}, result={}", index, result.getId(), result.getResult());
  63. }
  64. @Override
  65. public void onError(Exception exception) {
  66. LOG.error("异步回调异常", exception);
  67. }
  68. });
  69. }
  70. LOG.info("------------------------------------------------------------");
  71. }
  72. }, 0, 20000);
  73. }
  74. public void invokeSync(final String chainName) {
  75. Timer timer = new Timer();
  76. timer.scheduleAtFixedRate(new TimerTask() {
  77. public void run() {
  78. for (int i = 0; i < 1; i++) {
  79. final int index = i % 5;
  80. try {
  81. CoroutineResult<Object> result = CoroutineManager.load().startSync("PayRoute", "Rule", chainName, new String[] { "Start[" + index + "]" }, 3000, false);
  82. LOG.info("同步调用结果: 线程序号={}, id={}, result={}", index, result.getId(), result.getResult());
  83. } catch (Exception e) {
  84. LOG.error("同步调用异常", e);
  85. }
  86. }
  87. LOG.info("------------------------------------------------------------");
  88. }
  89. }, 0, 20000);
  90. }
  91. }

运行结果

  1. 2017-12-23 19:13:43.641 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.data.cache.CoroutineCache:39] - Daemon thread for scanning cache starts...
  2. 2017-12-23 19:13:43.656 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=1, categoryName=PayRoute, ruleName=Rule, chainName=chain1-2, class=com.nepxion.coroutine.test.service.impl.AServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=3 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  3. 2017-12-23 19:13:43.656 INFO [Coroutine-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=1, categoryName=PayRoute, ruleName=Rule, chainName=chain1-1, class=com.nepxion.coroutine.test.service.impl.AServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=3 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  4. 2017-12-23 19:13:43.660 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=4, categoryName=PayRoute, ruleName=Rule, chainName=chain1-2, class=com.nepxion.coroutine.test.service.impl.BServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  5. 2017-12-23 19:13:43.660 INFO [Coroutine-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=4, categoryName=PayRoute, ruleName=Rule, chainName=chain1-1, class=com.nepxion.coroutine.test.service.impl.BServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  6. 2017-12-23 19:13:43.666 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.common.thread.ThreadPoolFactory:38] - Thread pool executor is created, threadName=Promise-192.168.1.3-thread, corePoolSize=64, maximumPoolSize=128, keepAliveTime=900000, allowCoreThreadTimeOut=false
  7. 2017-12-23 19:13:43.669 INFO [Promise-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=8, categoryName=PayRoute, ruleName=Rule, chainName=chain1-2, class=com.nepxion.coroutine.test.service.impl.CServiceImpl, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  8. 2017-12-23 19:13:43.669 INFO [Promise-192.168.1.3-thread-2][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=11, categoryName=PayRoute, ruleName=Rule, chainName=chain1-2, class=com.nepxion.coroutine.test.service.impl.DServiceImpl, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  9. 2017-12-23 19:13:43.669 INFO [Promise-192.168.1.3-thread-3][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=11, categoryName=PayRoute, ruleName=Rule, chainName=chain1-1, class=com.nepxion.coroutine.test.service.impl.DServiceImpl, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  10. 2017-12-23 19:13:43.669 INFO [Promise-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=8, categoryName=PayRoute, ruleName=Rule, chainName=chain1-1, class=com.nepxion.coroutine.test.service.impl.CServiceImpl, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  11. 2017-12-23 19:13:43.675 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=12, categoryName=PayRoute, ruleName=Rule, chainName=chain1-2, class=com.nepxion.coroutine.test.service.impl.DServiceImpl, method=doMerge, parameterTypes=com.nepxion.coroutine.data.entity.CoroutineList, returnType=java.lang.String, spentTime=0 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  12. 2017-12-23 19:13:43.675 INFO [Coroutine-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=12, categoryName=PayRoute, ruleName=Rule, chainName=chain1-1, class=com.nepxion.coroutine.test.service.impl.DServiceImpl, method=doMerge, parameterTypes=com.nepxion.coroutine.data.entity.CoroutineList, returnType=java.lang.String, spentTime=0 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  13. 2017-12-23 19:13:43.675 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=1, categoryName=PayRoute, ruleName=Rule, chainName=chain1-2, class=com.nepxion.coroutine.test.service.impl.AServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  14. 2017-12-23 19:13:43.675 INFO [Coroutine-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=1, categoryName=PayRoute, ruleName=Rule, chainName=chain1-1, class=com.nepxion.coroutine.test.service.impl.AServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  15. 2017-12-23 19:13:43.677 INFO [Coroutine-192.168.1.3-thread-2][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=1, categoryName=PayRoute, ruleName=SubRule, chainName=chain2-2, class=com.nepxion.coroutine.test.service.impl.AServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  16. 2017-12-23 19:13:43.677 INFO [Coroutine-192.168.1.3-thread-3][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=1, categoryName=PayRoute, ruleName=SubRule, chainName=chain2-2, class=com.nepxion.coroutine.test.service.impl.AServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  17. 2017-12-23 19:13:43.678 INFO [Coroutine-192.168.1.3-thread-2][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=4, categoryName=PayRoute, ruleName=SubRule, chainName=chain2-2, class=com.nepxion.coroutine.test.service.impl.BServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  18. 2017-12-23 19:13:43.678 INFO [Coroutine-192.168.1.3-thread-3][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=4, categoryName=PayRoute, ruleName=SubRule, chainName=chain2-2, class=com.nepxion.coroutine.test.service.impl.BServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  19. 2017-12-23 19:13:43.678 INFO [Coroutine-192.168.1.3-thread-2][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=7, categoryName=PayRoute, ruleName=SubRule, chainName=chain2-2, class=com.nepxion.coroutine.test.service.impl.CServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  20. 2017-12-23 19:13:43.678 INFO [Coroutine-192.168.1.3-thread-3][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=7, categoryName=PayRoute, ruleName=SubRule, chainName=chain2-2, class=com.nepxion.coroutine.test.service.impl.CServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=0 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  21. 2017-12-23 19:13:43.679 INFO [Coroutine-192.168.1.3-thread-3][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=10, categoryName=PayRoute, ruleName=SubRule, chainName=chain2-2, class=com.nepxion.coroutine.test.service.impl.DServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=1 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  22. 2017-12-23 19:13:43.679 INFO [Coroutine-192.168.1.3-thread-2][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=10, categoryName=PayRoute, ruleName=SubRule, chainName=chain2-2, class=com.nepxion.coroutine.test.service.impl.DServiceImpl, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=1 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  23. 2017-12-23 19:13:43.680 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:61] - Serial completed, referenceType=dependencyReference, index=13, categoryName=PayRoute, ruleName=SubRule, chainName=chain1-2, returnType=com.nepxion.coroutine.data.entity.CoroutineResult, spentTime=4 ms, id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  24. 2017-12-23 19:13:43.680 INFO [Coroutine-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:61] - Serial completed, referenceType=dependencyReference, index=13, categoryName=PayRoute, ruleName=SubRule, chainName=chain1-1, returnType=com.nepxion.coroutine.data.entity.CoroutineResult, spentTime=4 ms, id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  25. 2017-12-23 19:13:43.692 INFO [Coroutine-192.168.1.3-thread-0][com.nepxion.coroutine.test.CoroutineTest$1$1:80] - 异步回调结果: 线程序号=0, id=com.nepxion.coroutine.data.entity.CoroutineId@3ee322f7[
  26. id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  27. categoryName=PayRoute
  28. ruleName=Rule
  29. ], result=com.nepxion.coroutine.data.entity.CoroutineResult@719c38e3[
  30. id=com.nepxion.coroutine.data.entity.CoroutineId@14874a4f[
  31. id=98c84565-ca73-42b8-a2e8-da4dee6df22a
  32. categoryName=PayRoute
  33. ruleName=SubRule
  34. ]
  35. timestamp=0
  36. result=(Start[0] -> A[0] -> B[0] -> C[0] -> D[0] , Start[0] -> A[0] -> B[0] -> D[0] -> D[0]) -> A[0]) -> A[0]) -> B[0]) -> C[0]) -> D[0])
  37. exception=<null>
  38. ]
  39. 2017-12-23 19:13:43.692 INFO [Timer-1][com.nepxion.coroutine.test.CoroutineTest$2:103] - 同步调用结果: 线程序号=0, id=com.nepxion.coroutine.data.entity.CoroutineId@320ca662[
  40. id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  41. categoryName=PayRoute
  42. ruleName=Rule
  43. ], result=com.nepxion.coroutine.data.entity.CoroutineResult@475af95a[
  44. id=com.nepxion.coroutine.data.entity.CoroutineId@3ce1ece3[
  45. id=6fcb1456-fc34-49a0-9ad4-832fd5f3f375
  46. categoryName=PayRoute
  47. ruleName=SubRule
  48. ]
  49. timestamp=0
  50. result=(Start[0] -> A[0] -> B[0] -> C[0] -> D[0] , Start[0] -> A[0] -> B[0] -> D[0] -> D[0]) -> A[0]) -> A[0]) -> B[0]) -> C[0]) -> D[0])
  51. exception=<null>
  52. ]

分布式调用方式

基于Spring Boot在Dubbo和Thunder框架的协程调用,分布式API的聚合

定义规则

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <coroutine>
  3. <rule version="1">
  4. <component applicationContext="classpath*:dubbo-client-context-coroutine.xml">
  5. <class id="aService">
  6. <method index="1" method="doThen"></method>
  7. <method index="2" method="doWhen"></method>
  8. <method index="3" method="doMerge"></method>
  9. </class>
  10. <class id="bService">
  11. <method index="4" method="doThen"></method>
  12. <method index="5" method="doWhen"></method>
  13. <method index="6" method="doMerge"></method>
  14. </class>
  15. </component>
  16. <component applicationContext="classpath*:thunder-client-context-coroutine.xml">
  17. <class id="cService">
  18. <method index="7" method="doThen"></method>
  19. <method index="8" method="doWhen"></method>
  20. <method index="9" method="doMerge"></method>
  21. </class>
  22. <class id="dService">
  23. <method index="10" method="doThen"></method>
  24. <method index="11" method="doWhen"></method>
  25. <method index="12" method="doMerge"></method>
  26. </class>
  27. </component>
  28. <chain>
  29. <when index="2,5"></when>
  30. <then index="9,10"></then>
  31. <when index="8,11"></when>
  32. <then index="3,4"></then>
  33. </chain>
  34. </rule>
  35. </coroutine>

调用入口

运行coroutine-spring-boot-dubbo-server-example下的DubboServerApplication.java

运行coroutine-spring-boot-thunder-server-example下的ThunderServerApplication.java

运行coroutine-spring-boot-client-example下的CoroutineClientApplication.java

  1. package com.nepxion.coroutine;
  2. /**
  3. * <p>Title: Nepxion Coroutine</p>
  4. * <p>Description: Nepxion Coroutine For Distribution</p>
  5. * <p>Copyright: Copyright (c) 2017</p>
  6. * <p>Company: Nepxion</p>
  7. * @author Haojun Ren
  8. * @version 1.0
  9. */
  10. import java.util.Timer;
  11. import java.util.TimerTask;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import org.springframework.boot.SpringApplication;
  15. import org.springframework.boot.autoconfigure.SpringBootApplication;
  16. import org.springframework.boot.context.embedded.EmbeddedServletContainerFactory;
  17. import org.springframework.boot.context.embedded.tomcat.TomcatEmbeddedServletContainerFactory;
  18. import org.springframework.context.annotation.Bean;
  19. import org.springframework.context.annotation.ComponentScan;
  20. import com.nepxion.coroutine.common.callback.CoroutineCallback;
  21. import com.nepxion.coroutine.data.entity.CoroutineResult;
  22. import com.nepxion.coroutine.framework.core.CoroutineManager;
  23. @SpringBootApplication
  24. @ComponentScan(basePackages = { "com.nepxion.coroutine" })
  25. public class CoroutineClientApplication {
  26. private static final Logger LOG = LoggerFactory.getLogger(CoroutineClientApplication.class);
  27. public static void main(String[] args) throws Exception {
  28. SpringApplication.run(CoroutineClientApplication.class, args);
  29. // invokeRemote();
  30. invokeLocal();
  31. }
  32. public static void invokeRemote() throws Exception {
  33. // 请确保Zookeeper有对应的规则(运行CoroutineRuleRegistry相关方法)
  34. // 从远程注册中心装载
  35. // 启动和远程注册中心连接
  36. CoroutineManager.start();
  37. // 解析远端规则(支持子规则引用)
  38. CoroutineManager.parseRemote("Distribution PayRoute", "Distribution Rule");
  39. invokeAsync();
  40. invokeSync();
  41. }
  42. public static void invokeLocal() throws Exception {
  43. // 从本地装载
  44. // 解析本地规则(不支持子规则引用)
  45. CoroutineManager.parseLocal("Distribution PayRoute", "Distribution Rule", "rule.xml");
  46. invokeAsync();
  47. invokeSync();
  48. }
  49. public static void invokeAsync() {
  50. Timer timer = new Timer();
  51. timer.scheduleAtFixedRate(new TimerTask() {
  52. public void run() {
  53. for (int i = 0; i < 1; i++) {
  54. final int index = i % 5;
  55. CoroutineManager.load().startAsync("Distribution PayRoute", "Distribution Rule", null, new String[] { "Start[" + index + "]" }, false, new CoroutineCallback<CoroutineResult<Object>>() {
  56. @Override
  57. public void onResult(CoroutineResult<Object> result) {
  58. LOG.info("异步回调结果: 线程序号={}, id={}, result={}", index, result.getId(), result.getResult());
  59. }
  60. @Override
  61. public void onError(Exception exception) {
  62. LOG.error("异步回调异常", exception);
  63. }
  64. });
  65. }
  66. LOG.info("------------------------------------------------------------");
  67. }
  68. }, 0, 20000);
  69. }
  70. public static void invokeSync() {
  71. Timer timer = new Timer();
  72. timer.scheduleAtFixedRate(new TimerTask() {
  73. public void run() {
  74. for (int i = 0; i < 1; i++) {
  75. final int index = i % 5;
  76. try {
  77. CoroutineResult<Object> result = CoroutineManager.load().startSync("Distribution PayRoute", "Distribution Rule", null, new String[] { "Start[" + index + "]" }, 3000, false);
  78. LOG.info("同步调用结果: 线程序号={}, id={}, result={}", index, result.getId(), result.getResult());
  79. } catch (Exception e) {
  80. LOG.error("同步调用异常", e);
  81. }
  82. }
  83. LOG.info("------------------------------------------------------------");
  84. }
  85. }, 0, 20000);
  86. }
  87. @Bean
  88. public EmbeddedServletContainerFactory createEmbeddedServletContainerFactory() {
  89. TomcatEmbeddedServletContainerFactory tomcatFactory = new TomcatEmbeddedServletContainerFactory();
  90. tomcatFactory.setPort(9081);
  91. return tomcatFactory;
  92. }
  93. }

运行结果

  1. 2017-12-23 19:20:28.905 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.common.thread.ThreadPoolFactory:38] - Thread pool executor is created, threadName=Promise-192.168.1.3-thread, corePoolSize=64, maximumPoolSize=128, keepAliveTime=900000, allowCoreThreadTimeOut=false
  2. 2017-12-23 19:20:29.052 INFO [Promise-192.168.1.3-thread-2][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=5, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=bService, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=136 ms, id=956ab727-0e18-48aa-8334-1453c139fa23
  3. 2017-12-23 19:20:29.052 INFO [Promise-192.168.1.3-thread-3][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=5, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=bService, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=136 ms, id=b22a77b6-0776-49a1-a6bb-e15268ffd0c1
  4. 2017-12-23 19:20:29.052 INFO [Promise-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=2, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=aService, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=136 ms, id=956ab727-0e18-48aa-8334-1453c139fa23
  5. 2017-12-23 19:20:29.052 INFO [Promise-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=2, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=aService, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=136 ms, id=b22a77b6-0776-49a1-a6bb-e15268ffd0c1
  6. 2017-12-23 19:20:29.199 INFO [ClientAffinityThreadFactory][com.nepxion.thunder.common.thread.ThreadPoolFactory:106] - Thread pool executor is created, threadName=Thunder-reference-192.168.1.3:6010-thread, corePoolSize=32, maximumPoolSize=64, keepAliveTime=900000, allowCoreThreadTimeOut=false
  7. 2017-12-23 19:20:29.200 INFO [Coroutine-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=9, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=cService, method=doMerge, parameterTypes=java.util.List, returnType=java.lang.String, spentTime=143 ms, id=956ab727-0e18-48aa-8334-1453c139fa23
  8. 2017-12-23 19:20:29.201 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=9, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=cService, method=doMerge, parameterTypes=java.util.List, returnType=java.lang.String, spentTime=143 ms, id=b22a77b6-0776-49a1-a6bb-e15268ffd0c1
  9. 2017-12-23 19:20:29.206 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=10, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=dService, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=3 ms, id=b22a77b6-0776-49a1-a6bb-e15268ffd0c1
  10. 2017-12-23 19:20:29.206 INFO [Coroutine-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=10, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=dService, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=3 ms, id=956ab727-0e18-48aa-8334-1453c139fa23
  11. 2017-12-23 19:20:29.213 INFO [Promise-192.168.1.3-thread-4][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=8, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=cService, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=5 ms, id=b22a77b6-0776-49a1-a6bb-e15268ffd0c1
  12. 2017-12-23 19:20:29.214 INFO [Promise-192.168.1.3-thread-5][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=8, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=cService, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=6 ms, id=956ab727-0e18-48aa-8334-1453c139fa23
  13. 2017-12-23 19:20:29.214 INFO [Promise-192.168.1.3-thread-7][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=11, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=dService, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=6 ms, id=b22a77b6-0776-49a1-a6bb-e15268ffd0c1
  14. 2017-12-23 19:20:29.215 INFO [Promise-192.168.1.3-thread-6][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Parallel completed, referenceType=componentReference, index=11, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=dService, method=doWhen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=7 ms, id=956ab727-0e18-48aa-8334-1453c139fa23
  15. 2017-12-23 19:20:29.220 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=3, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=aService, method=doMerge, parameterTypes=java.util.List, returnType=java.lang.String, spentTime=5 ms, id=b22a77b6-0776-49a1-a6bb-e15268ffd0c1
  16. 2017-12-23 19:20:29.220 INFO [Coroutine-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=3, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=aService, method=doMerge, parameterTypes=java.util.List, returnType=java.lang.String, spentTime=5 ms, id=956ab727-0e18-48aa-8334-1453c139fa23
  17. 2017-12-23 19:20:29.222 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=4, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=bService, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=2 ms, id=b22a77b6-0776-49a1-a6bb-e15268ffd0c1
  18. 2017-12-23 19:20:29.222 INFO [Coroutine-192.168.1.3-thread-0][com.nepxion.coroutine.monitor.log.LogMonitorLauncher:55] - Serial completed, referenceType=componentReference, index=4, categoryName=Distribution PayRoute, ruleName=Distribution Rule, chainName=null, classId=bService, method=doThen, parameterTypes=java.lang.String, returnType=java.lang.String, spentTime=2 ms, id=956ab727-0e18-48aa-8334-1453c139fa23
  19. 2017-12-23 19:20:29.223 INFO [Timer-1][com.nepxion.coroutine.CoroutineClientApplication$2:96] - 同步调用结果: 线程序号=0, id=com.nepxion.coroutine.data.entity.CoroutineId@2295f70c[
  20. id=956ab727-0e18-48aa-8334-1453c139fa23
  21. categoryName=Distribution PayRoute
  22. ruleName=Distribution Rule
  23. ], result=((Start[0] -> A[0] -> C[0] , Start[0] -> B[0] -> C[0]) -> D[0]) -> C[0]) -> A[0]) , (Start[0] -> A[0] -> C[0] , Start[0] -> B[0] -> C[0]) -> D[0]) -> D[0]) -> A[0])) -> B[0]))
  24. 2017-12-23 19:20:29.223 INFO [Coroutine-192.168.1.3-thread-1][com.nepxion.coroutine.CoroutineClientApplication$1$1:73] - 异步回调结果: 线程序号=0, id=com.nepxion.coroutine.data.entity.CoroutineId@51baebd3[
  25. id=b22a77b6-0776-49a1-a6bb-e15268ffd0c1
  26. categoryName=Distribution PayRoute
  27. ruleName=Distribution Rule
  28. ], result=((Start[0] -> A[0] -> C[0] , Start[0] -> B[0] -> C[0]) -> D[0]) -> C[0]) -> A[0]) , (Start[0] -> A[0] -> C[0] , Start[0] -> B[0] -> C[0]) -> D[0]) -> D[0]) -> A[0])) -> B[0]))

请联系我

微信、钉钉、公众号和文档

Star走势图

Stargazers over time