项目作者: mryingjie

项目描述 :
consumer 支持从配置文件中直接获取消费指定时间后生产的消息
高级语言: Java
项目地址: git://github.com/mryingjie/spring-boot-starter-rocketmq.git
创建时间: 2019-06-20T02:59:08Z
项目社区:https://github.com/mryingjie/spring-boot-starter-rocketmq

开源协议:Apache License 2.0

下载


spring-boot-starter-rocketmq

中文

License

Help developers quickly integrate RocketMQ in Spring Boot. Support the Spring Message specification to facilitate developers to quickly switch from other MQ to RocketMQ.

Features:

  • synchronous transmission
  • synchronous ordered transmission
  • asynchronous transmission
  • asynchronous ordered transmission
  • orderly consume
  • concurrently consume(broadcasting/clustering)
  • one-way transmission
  • transaction transmission
  • Pull consume

Quick Start

Here are some key points listed, the complete example, please refer to: rocketmq-demo

  1. <!--add dependency in pom.xml-->
  2. <dependency>
  3. <groupId>org.apache.rocketmq</groupId>
  4. <artifactId>spring-boot-starter-rocketmq</artifactId>
  5. <version>1.0.0</version>
  6. </dependency>

Produce Message

  1. ## application.properties
  2. spring.rocketmq.name-server=127.0.0.1:9876
  3. spring.rocketmq.producer.group=my-group

Note:

Maybe you need change 127.0.0.1:9876 with your real NameServer address for RocketMQ

  1. @SpringBootApplication
  2. public class ProducerApplication implements CommandLineRunner{
  3. @Resource
  4. private RocketMQTemplate rocketMQTemplate;
  5. public static void main(String[] args){
  6. SpringApplication.run(ProducerApplication.class, args);
  7. }
  8. public void run(String... args) throws Exception {
  9. rocketMQTemplate.convertAndSend("test-topic-1", "Hello, World!");
  10. rocketMQTemplate.send("test-topic-1", MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
  11. rocketMQTemplate.convertAndSend("test-topic-2", new OrderPaidEvent("T_001", new BigDecimal("88.00")));
  12. // rocketMQTemplate.destroy(); // notes: once rocketMQTemplate be destroyed, you can not send any message again with this rocketMQTemplate
  13. }
  14. @Data
  15. @AllArgsConstructor
  16. public class OrderPaidEvent implements Serializable{
  17. private String orderId;
  18. private BigDecimal paidMoney;
  19. }
  20. }

More relevant configurations for produce:

  1. spring.rocketmq.producer.retry-times-when-send-async-failed=0
  2. spring.rocketmq.producer.send-msg-timeout=300000
  3. spring.rocketmq.producer.compress-msg-body-over-howmuch=4096
  4. spring.rocketmq.producer.max-message-size=4194304
  5. spring.rocketmq.producer.retry-another-broker-when-not-store-ok=false
  6. spring.rocketmq.producer.retry-times-when-send-failed=2

Consume Message

  1. ## application.properties
  2. spring.rocketmq.name-server=127.0.0.1:9876
  3. spring.rocketmq.topic=topic
  4. spring.rocketmq.consumerGroup=group01
  5. spring.rocketmq.consumeTime=2019-06-20 10:10:00

Note:

Maybe you need change 127.0.0.1:9876 with your real NameServer address for RocketMQ
If you want consume message starting at specified time ,Plase must set consumeTime for @RocketMQMessageListener. If you do not set it,
Consumer will consume message from the offset of the last consumption of this consumer group.

  1. @SpringBootApplication
  2. public class ConsumerApplication{
  3. public static void main(String[] args){
  4. SpringApplication.run(ConsumerApplication.class, args);
  5. }
  6. @Slf4j
  7. @Service
  8. @RocketMQMessageListener(topic = "${spring.rocketmq.topic}", consumerGroup = "${spring.rocketmq.consumerGroup}",consumeTime = "${spring.rocketmq.consumeTime}")
  9. public class MyConsumer1 implements RocketMQListener<String>{
  10. public void onMessage(String message) {
  11. log.info("received message: {}", message);
  12. }
  13. }
  14. @Slf4j
  15. @Service
  16. @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2" consumeTime = "2019-06-20 10:10:00")
  17. public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
  18. public void onMessage(OrderPaidEvent orderPaidEvent) {
  19. log.info("received orderPaidEvent: {}", orderPaidEvent);
  20. }
  21. }
  22. }

More relevant configurations for consume:

see: RocketMQMessageListener

FAQ

  1. How to connected many nameserver on production environment?

    spring.rocketmq.name-server support the configuration of multiple nameserver, separated by ;. For example: 172.19.0.1: 9876; 172.19.0.2: 9876

  2. When was rocketMQTemplate destroyed?

    Developers do not need to manually execute the rocketMQTemplate.destroy () method when using rocketMQTemplate to send a message in the project, androcketMQTemplate will be destroyed automatically when the spring container is destroyed.

  3. start exception:Caused by: org.apache.rocketmq.client.exception.MQClientException: The consumer group[xxx] has been created before, specify another name please

    RocketMQ in the design do not want a consumer to deal with multiple types of messages at the same time, so the same consumerGroup consumer responsibility should be the same, do not do different things (that is, consumption of multiple topics). Suggested consumerGroup andtopic one correspondence.

  4. How is the message content body being serialized and deserialized?

    RocketMQ’s message body is stored as byte []. When the business system message content body if it is java.lang.String type, unified in accordance withutf-8 code into byte []; If the business system message content is not java.lang.String Type, then use jackson-databind serialized into the JSON format string, and then unified in accordance withutf-8 code into byte [].

  5. How do I specify the tags for topic?

    RocketMQ best practice recommended: an application as much as possible with one Topic, the message sub-type with tags to identify,tags can be set by the application free.

    When you use rocketMQTemplate to send a message, set the destination of the message by setting thedestination parameter of the send method. The destination format is topicName:tagName, : Precedes the name of the topic, followed by the tags name.

    Note:

    tags looks a complex, but when sending a message , the destination can only specify one topic under a tag, can not specify multiple.

  6. How do I set the message’s key when sending a message?

    You can send a message by overloading method like xxxSend(String destination, Message<?> msg, ...), setting headers of msg. for example:

    1. Message<?> message = MessageBuilder.withPayload(payload).setHeader(MessageConst.PROPERTY_KEYS, msgId).build();
    2. rocketMQTemplate.send("topic-test", message);

    Similarly, you can also set the message FLAG,WAIT_STORE_MSG_OK and some other user-defined other header information according to the above method.

    Note:

    In the case of converting Spring’s Message to RocketMQ’s Message, to prevent the header information from conflicting with RocketMQ’s system properties, the prefix USERS_ was added in front of all header names. So if you want to get a custom message header when consuming, please pass through the key at the beginning of USERS_ in the header.

  7. When consume message, in addition to get the message payload, but also want to get RocketMQ message of other system attributes, how to do?

    Consumers in the realization of RocketMQListener interface, only need to be generic for theMessageExt can, so in the onMessage method will receive RocketMQ native ‘MessageExt` message.

    1. @Slf4j
    2. @Service
    3. @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    4. public class MyConsumer2 implements RocketMQListener<MessageExt>{
    5. public void onMessage(MessageExt messageExt) {
    6. log.info("received messageExt: {}", messageExt);
    7. }
    8. }
  8. How do I specify where consumers start consuming messages?

    The default consume offset please refer: RocketMQ FAQ.
    To customize the consumer’s starting location, simply add a RocketMQPushConsumerLifecycleListener interface implementation to the consumer class. Examples are as follows:

    1. @Slf4j
    2. @Service
    3. @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    4. public class MyConsumer1 implements RocketMQListener<String>, RocketMQPushConsumerLifecycleListener {
    5. @Override
    6. public void onMessage(String message) {
    7. log.info("received message: {}", message);
    8. }
    9. @Override
    10. public void prepareStart(final DefaultMQPushConsumer consumer) {
    11. // set consumer consume message from now
    12. consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
    13. consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
    14. }
    15. }

    Similarly, any other configuration on DefaultMQPushConsumer can be done in the same way as above.