项目作者: kumuluz

项目描述 :
KumuluzEE AMQP provides support for Advanced Message Queueing Protocol, such as RabbitMQ and ActiveMQ.
高级语言: Java
项目地址: git://github.com/kumuluz/kumuluzee-amqp.git
创建时间: 2019-01-30T07:31:47Z
项目社区:https://github.com/kumuluz/kumuluzee-amqp

开源协议:Other

下载


KumuluzEE AMQP

Build Status

KumuluzEE AMQP project for development of messaging applications.

KumuluzEE AMQP enables you to easily send and recieve messages over the AMQP protocol. Currently, it includes the RabbitMQ-based implementation. The RabbitMQ documentation can be found here.

Usage

You can enable KumuluzEE AMQP RabbitMQ support by adding the following dependency to pom.xml:

  1. <dependency>
  2. <groupId>com.kumuluz.ee.amqp</groupId>
  3. <artifactId>kumuluzee-amqp-rabbitmq</artifactId>
  4. <version>${kumuluzee-amqp-rabbitmq.version}</version>
  5. </dependency>

Installing RabbitMQ

In order to use RabbitMQ, you first need to install the RabbitMQ broker. You can find installation guide here.

Configuration

To configure RabbitMQ, create configuration in resources/config.yaml.
Here you can put your RabbitMQ hosts and their configurations.

  1. kumuluzee:
  2. amqp:
  3. rabbitmq:
  4. hosts: List
  5. name: String (required)
  6. url: String (required) - null
  7. password: String - null
  8. username: String - null
  9. port: Integer - null
  10. automaticRecoveryEnabled: Boolean - null
  11. channelRpcTimeout: Integer - null
  12. channelShouldCheckRpcResponseType: Boolean - null
  13. connectionTimeout: Integer - null
  14. enableHostnameVerification: Boolean - null
  15. handshakeTimeout: Integer - null
  16. networkRecoveryInterval: Integer - null
  17. requestedChannelMax: Integer - null
  18. requestedFrameMax: Integer - null
  19. requestedHeartbeat: Integer - null
  20. shutdownTimeout: Integer - null
  21. topologyRecoveryEnabled: Boolean - null
  22. uri: String - null
  23. virtualHost: String - null
  24. automaticRecoveryEnabled: Boolean - null
  25. clientProperties: Map<String, Object> - null
  26. exchanges: List
  27. name: String (required)
  28. type: String [fanout, direct, topic, headers] - fanout
  29. durable: Boolean - false
  30. autoDelete: Boolean - false
  31. arguments: Map<String, Object> - null
  32. queues:
  33. name: String (required)
  34. exclusive: Boolean - false
  35. durable: Boolean - false
  36. autoDelete: Boolean - false
  37. arguments: Map<String, Object> - null
  38. properties: List
  39. name: String (required)
  40. contentType: String - null
  41. contentEncoding: String - null
  42. headers: Map<String, Object> - null
  43. deliveryMode: Integer - null
  44. priority: Integer - null
  45. corelationId: String - null
  46. replyTo: String - null
  47. expiration: String - null
  48. messageId: String - null
  49. timestamp: Boolean - null
  50. type: String - null
  51. userId: String - null
  52. appId: String - null
  53. clusterId: String - null

Connection

You can create a connection to a server with parameters that are not available in the config.yaml.

Annotate a class (all methods which return a map will be considered) or a method (only a method which returns a map will be considered) with @AMQPConnection. In this method create a new connection to the broker using ConnectionFactory provided by RabbitMQ.

The method has to return a Map object, where String is the name of the connection. You can then configure exchanges and queues in the config.yaml with the name you selected. All other parameters in the config.yaml are ignored.

  1. @AMQPConnection
  2. public Map<String, Connection> localhostConnection() {
  3. Map<String, Connection> localhost = new HashMap<>();
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost("localhost");
  6. Connection connection = null;
  7. try {
  8. connection = connectionFactory.newConnection();
  9. } catch (IOException | TimeoutException e) {
  10. log.severe("Connection could not be created");
  11. }
  12. localhost.put("MQtest", connection);
  13. return localhost;
  14. }
  1. kumuluzee:
  2. amqp:
  3. rabbitmq:
  4. hosts:
  5. - name: MQtest
  6. port: 9000 #ignored
  7. automaticRecoveryEnabled: true #ignored
  8. exchanges:
  9. - name: directExchange
  10. type: direct

The connection to the server is managed by the framework and does not need to be closed. In case you want to manage connections yourself use RabbitConnection class where you can create and close connections.

Channel

You can obtain RabbitMQ Channel by using @AMQPChannel. Then you can use this channel to send and receive messages (Read more).

  1. @AMQPChannel(host: String - "")
  1. @Inject
  2. @AMQPChannel("hostName")
  3. private Channel channel;

Consuming messages

In order to listen to queues you can annotate your method with @AMQPConsumer.
The first parameter of the annotated method is the object of type that is expected to be received, the second parameter is optional and must be of type MessageInfo which will allow you to obtain details about the received messages.
If the message body cannot be deserialized to the expected type the method will not be invoked and an error will be logged.

  1. @AMQPConsumer(
  2. host: String - "",
  3. exchange: String - "",
  4. key: String[] - {""},
  5. prefetch: int - 100,
  6. autoAck: boolean - true
  7. )
  1. @AMQPConsumer(...)
  2. public void listenToDirectExchange(
  3. message: any,
  4. info: MessageInfo - optional
  5. ) {
  6. ...
  7. }

Send messages

You can send messages by annotating a method with @AMQPProducer annotation. The method must return an object which will be then sent to the consumer.

  1. @AMQPProducer(
  2. host: String - "",
  3. exchange: String - "",
  4. key: String[] - {""},
  5. properties: String - ""
  6. )
  1. @AMQPProducer(host="MQtest", exchange = "directExchange", key = "object")
  2. public ExampleObject sendObject() {
  3. ExampleObject exampleObject = new ExampleObject();
  4. exampleObject.setContent("I'm just an object.");
  5. return exampleObject;
  6. }

You can also send a Message object, where you can define host, exchange, keys, body and properties (which wouldn’t be possible to define in the config.yml). Keep in mind that Message parameters will override annotation parameters.

  1. @AMQPProducer
  2. public Message sendFullMessage() {
  3. Message message = new Message();
  4. ExampleObject exampleObject = new ExampleObject();
  5. exampleObject.setContent("I'm an object in a special message");
  6. if (Math.random() < 0.5) {
  7. message.host("MQtest")
  8. .key(new String[]{"object"})
  9. .exchange("directExchange")
  10. .basicProperties(MessageProperties.BASIC);
  11. } else {
  12. message.host("MQtest2")
  13. .key(new String[]{"testQueue"})
  14. .basicProperties("testProperty");
  15. }
  16. return message.body(exampleObject);
  17. }

To send a message to a specific queue, you just have to remove the exchange from the annotation and use key as a queue name

  1. @AMQPProducer(host="MQtest2", key = "testQueue")
  2. public Message sendToQueue(){
  3. Message message = new Message();
  4. ExampleObject exampleObject = new ExampleObject();
  5. exampleObject.setContent("I'm an object in a message");
  6. return message.body(exampleObject).basicProperties(MessageProperties.BASIC);
  7. }

Sample

You can start by using the sample code.

Changelog

Recent changes can be viewed on Github on the Releases Page

Contribute

See the contributing docs

When submitting an issue, please follow the
guidelines.

When submitting a bugfix, write a test that exposes the bug and fails before applying your fix. Submit the test
alongside the fix.

When submitting a new feature, add tests that cover the feature.

License

MIT