项目作者: devimteam

项目描述 :
Golang AMQP wrapper
高级语言: Go
项目地址: git://github.com/devimteam/amqp.git
创建时间: 2018-02-21T10:26:54Z
项目社区:https://github.com/devimteam/amqp

开源协议:MIT License

下载


AMQP

Golang AMQP wrapper is a library that wraps amqp.

Check out the docs.

Features

  • Auto-reconnect to brocker and auto redeclare exchanges and queues.
  • Control channels lifecycle: open new on high load and close unused.
  • Declarative style.
    1. client, err := amqp.NewClient(
    2. conn.DefaultConnector("amqp://localhost:5672",
    3. conn.WithLogger(lg), // We want to know connection status and errors.
    4. ),
    5. amqp.TemporaryExchange("example-exchange"), // Declare exchanges and queues.
    6. amqp.PersistentExchanges(
    7. "exchange-one",
    8. "exchange-two",
    9. "exchange-three",
    10. ),
    11. amqp.PersistentQueues(
    12. "queue for one",
    13. "queue for two",
    14. "second queue for two",
    15. ),
    16. amqp.Exchange{
    17. Name: "declare directly",
    18. },
    19. amqp.Queue{
    20. Name: "", // left empty, broker generates name for you.
    21. },
    22. amqp.Binding{ // do not forget to bind queue to exchange.
    23. Exchange: "exchange-one",
    24. Queue: "queue for one",
    25. },
    26. amqp.WithLogger{Logger: lg}, // We want to know AMQP protocol errors.
    27. )
  • Encoding and decoding hiden inside.
    • Use Codec interface for your format.
    • XML, JSON and Protocol Buffers (protobuf) registered yet.
  • Tons of options.
    • Min and max opened channels per publisher/subscriber.
    • Limit receiving messages.
    • Any amount of data formats.
    • Fill all message fields as you wish.
    • And more others…
  • Everything from AMQP may be used directly.

Contributing

We are waiting for your issue or pull request.

Example

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "strconv"
  6. "time"
  7. "github.com/devimteam/amqp"
  8. "github.com/devimteam/amqp/conn"
  9. "github.com/devimteam/amqp/logger"
  10. )
  11. // Data, that we want to deal with.
  12. type Comment struct {
  13. Id string
  14. Message string
  15. }
  16. func main() {
  17. ch := make(chan []interface{})
  18. // Listens errors and writes them to stdout.
  19. go func() {
  20. for l := range ch {
  21. fmt.Println(l...)
  22. }
  23. }()
  24. lg := logger.NewChanLogger(ch) // Logger interface identical to go-kit Logger.
  25. client, err := amqp.NewClient(
  26. conn.DefaultConnector("amqp://localhost:5672",
  27. conn.WithLogger(lg), // We want to know connection status and errors.
  28. ),
  29. amqp.TemporaryExchange("example-exchange"), // Declare exchanges and queues.
  30. amqp.WithLogger{Logger:lg}, // We want to know AMQP protocol errors.
  31. )
  32. if err != nil {
  33. panic(err)
  34. }
  35. subscr := client.Subscriber()
  36. // context used here as closing mechanism.
  37. eventChan := subscr.SubscribeToExchange(context.Background(),"example-exchange", Comment{}, amqp.Consumer{})
  38. go func() {
  39. for event := range eventChan {
  40. fmt.Println(event.Data) // do something with events
  41. }
  42. }()
  43. pubsr:=client.Publisher()
  44. for i := 0; i < 10; i++ {
  45. // Prepare your data before publishing
  46. comment := Comment{
  47. Id: strconv.Itoa(i),
  48. Message: "message " + strconv.Itoa(i),
  49. }
  50. // Context used here for passing data to `before` functions.
  51. err := pubsr.Publish(context.Background(), "example-exchange", comment, amqp.Publish{})
  52. if err != nil {
  53. panic(err)
  54. }
  55. time.Sleep(time.Millisecond * 500)
  56. }
  57. time.Sleep(time.Second * 5) // wait for delivering all messages.
  58. }