项目作者: blokur

项目描述 :
High level Go library on top of amqp (Golang)
高级语言: Go
项目地址: git://github.com/blokur/harego.git
创建时间: 2020-07-23T15:51:53Z
项目社区:https://github.com/blokur/harego

开源协议:Apache License 2.0

下载


Harego

High-level library on top of amqp.

Build Status

Harego

  1. Description
  2. Usage
  3. Development

Description

A harego.Consumer/harego.Publisher is a concurrent safe queue manager for
RabbitMQ, and a high-level implementation on top of
amqp library. A Consumer/Publisher creates
one or more workers for publishing/consuming messages. The default values are
chosen to make the Consumer/Publisher a durable queue working with the
default exchange and topic kind. Consumer/Publisher can be configure by
passing provided ConfigFunc functions to NewConsumer/NewPublisher
constructors.

The Consume() method will call the provided HandlerFunc with the next
available message on the next available worker. The return value of the
HandlerFunc decided what would happen to the message. The Consume worker
will delay before act on the ack for the amount of time the HandlerFunc
returns as the second value.

You can increase the worker sizes by passing Workers(n) to the NewClient
constructor.

When the Close() method is called, all connections will be closed and the
Client will be useless. You can create a new object for more works.

Note

This library is in beta phase and the API might change until we reach a stable
release.

Usage

Consumer

The only requirement for the NewConsumer function is a Connector to connect to
the broker when needed:

  1. // to use an address:
  2. harego.NewConsumer(harego.URLConnector(address))
  3. // to use an amqp connection:
  4. harego.NewConsumer(harego.AMQPConnector(conn))

The connector is used when the connection is lost, so the Client can initiate a
new connection.

In this setup the myqueue is bound to the myexchange exchange, and handler
is called for each message that are read from this queue:

  1. consumer, err := harego.NewConsumer(harego.URLConnector(address),
  2. harego.ExchangeName("myexchange"),
  3. harego.QueueName("myqueue"),
  4. )
  5. // handle the error.
  6. err = consumer.Consume(ctx, func(msg *amqp.Delivery) (harego.AckType, time.Duration) {
  7. return harego.AckTypeAck, 0
  8. })
  9. // handle the error.

You can create multiple workers in the above example for concurrently handle
multiple messages:

  1. consumer, err := harego.NewConsumer(harego.URLConnector(address),
  2. harego.ExchangeName("myexchange"),
  3. harego.QueueName("myqueue"),
  4. harego.Workers(20),
  5. )
  6. // handle the error.
  7. err = consumer.Consume(ctx, func(msg *amqp.Delivery) (harego.AckType, time.Duration) {
  8. return harego.AckTypeAck, 0
  9. })
  10. // handle the error.

The handler will receive 20 messages concurrently and the Ack is sent for each
message separately.

Publisher

In this setup the message is sent to the myexchange exchange:

  1. publisher, err := harego.NewPublisher(harego.URLConnector(address),
  2. harego.ExchangeName("myexchange"),
  3. )
  4. // handle the error.
  5. err = publisher.Publish(&amqp.Publishing{
  6. Body: []byte(msg),
  7. })
  8. // handle the error.

Delays

If the returned duration is 0, the acknowledgement is sent to the broker
immediately. Otherwise Consume function sleeps for that duration before it’s
been sent. Please note that the delay will cause the current handler to sleep
for this duration, therefore you need enough workers to be able to handle next
available messages.

Requeueing

If you return a harego.AckTypeRequeue from the handler, the message is sent
back to the same queue. This means this message will be consumed after all
messages in the queue is consumed.

Development

Prerequisite

This project supports Go >= 1.21. To run targets from the Makefile you
need to install GNU make. You also need docker installed for integration tests.

[!WARNING]
Please make sure you are using the Go toolchain version >= 1.21.9 to avoid
some vulnerabilities in the older versions.

If you have asdf installed, you can add the following line to the
.tool-versions file in the root of the project to get the latest toolchain:

  1. golang 1.22.3

In order to install dependencies:

  1. make dependencies

This also installs reflex to help with development process.

Running Tests

Note that these make targets are not necessarily required for running tests.
You still can do the go test ./... form. Just make sure you add the
-tags=integration in order to run the integration tests.

To watch for file changes and run unittest:

  1. make unittest
  2. # or to run them with race flag:
  3. make unittest_race

There is also a integration_test target for running integration tests.

Make Examples

  1. make unittest
  2. make unittest run=TestMyTest # runs a specific test with regexp
  3. make unittest dir=./db/... # runs tests in a package
  4. make unittest dir=./db/... run=TestSomethingElse
  5. make unittest flags="-race -count=2"

Please see the Makefile for more targets.

Mocks

To generate mocks run:

  1. make mocks

RabbitMQ

For convenience you can trigger the integration_deps target to setup required
RabbitMQ instance:

  1. make integration_deps