项目作者: rabbitmq

项目描述 :
Erlang AMQP 1.0 client
高级语言: Erlang
项目地址: git://github.com/rabbitmq/rabbitmq-amqp1.0-client.git
创建时间: 2016-12-20T16:22:53Z
项目社区:https://github.com/rabbitmq/rabbitmq-amqp1.0-client

开源协议:Other

下载


Erlang AMQP 1.0 client

This was migrated to https://github.com/rabbitmq/rabbitmq-server

This repository has been moved to the main unified RabbitMQ “monorepo”, including all open issues. You can find the source under /deps/amqp10_client.
All issues have been transferred.

Overview

This is an Erlang client for the AMQP 1.0 protocol.

It’s primary purpose is to be used in RabbitMQ related projects but it is a
generic client that was tested with at least 4 implementations of AMQP 1.0.

If you are looking for an Erlang client for AMQP 0-9-1 — a completely different
protocol despite the name — consider this one.

Project Maturity and Status

This client is used in the cross-protocol version of the RabbitMQ Shovel plugin. It is not 100%
feature complete but moderately mature and was tested against at least three AMQP 1.0 servers:
RabbitMQ, Azure ServiceBus, ActiveMQ.

This client library is not officially supported by VMware at this time.

Usage

Connection Settings

The connection_config map contains various configuration properties.

  1. -type address :: inet:socket_address() | inet:hostname().
  2. -type connection_config() ::
  3. #{container_id => binary(), % mandatory
  4. %% must provide a list of addresses or a single address
  5. addresses => [address()],
  6. address => address(),
  7. %% defaults to 5672, mandatory for TLS
  8. port => inet:port_number(),
  9. % the dns name of the target host
  10. % required by some vendors such as Azure ServiceBus
  11. hostname => binary(),
  12. tls_opts => {secure_port, [ssl:ssl_option()]}, % optional
  13. notify => pid(), % Pid to receive protocol notifications. Set to self() if not provided
  14. max_frame_size => non_neg_integer(), % incoming max frame size
  15. idle_time_out => non_neg_integer(), % heartbeat
  16. sasl => none | anon | {plain, User :: binary(), Password :: binary(),
  17. % set this to a negative value to allow a sender to "overshoot" the flow
  18. % control by this margin
  19. transfer_limit_margin => 0 | neg_integer()}
  20. }.

TLS

TLS is enabled by setting the tls_opts connection configuration property.
Currently the only valid value is {secure_port, [ssl_option]} where the port
specified only accepts TLS. It is possible that tls negotiation as described
in the amqp 1.0 protocol will be supported in the future. If no value is provided
for tls_opt then a plain socket will be used.

Basic Example

  1. %% this will connect to a localhost node
  2. {ok, Hostname} = inet:gethostname(),
  3. User = <<"guest">>,
  4. Password = <<"guest">>,
  5. %% create a configuration map
  6. OpnConf = #{address => Hostname,
  7. port => Port,
  8. container_id => <<"test-container">>,
  9. sasl => {plain, User, Password}},
  10. {ok, Connection} = amqp10_client:open_connection(OpnConf),
  11. {ok, Session} = amqp10_client:begin_session(Connection),
  12. SenderLinkName = <<"test-sender">>,
  13. {ok, Sender} = amqp10_client:attach_sender_link(Session, SenderLinkName, <<"a-queue-maybe">>),
  14. %% wait for credit to be received
  15. receive
  16. {amqp10_event, {link, Sender, credited}} -> ok
  17. after 2000 ->
  18. exit(credited_timeout)
  19. end.
  20. %% create a new message using a delivery-tag, body and indicate
  21. %% it's settlement status (true meaning no disposition confirmation
  22. %% will be sent by the receiver).
  23. OutMsg = amqp10_msg:new(<<"my-tag">>, <<"my-body">>, true),
  24. ok = amqp10_client:send_msg(Sender, OutMsg),
  25. ok = amqp10_client:detach_link(Sender),
  26. %% create a receiver link
  27. {ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"test-receiver">>, <<"a-queue-maybe">>),
  28. %% grant some credit to the remote sender but don't auto-renew it
  29. ok = amqp10_client:flow_link_credit(Receiver, 5, never),
  30. %% wait for a delivery
  31. receive
  32. {amqp10_msg, Receiver, InMsg} -> ok
  33. after 2000 ->
  34. exit(delivery_timeout)
  35. end.
  36. ok = amqp10_client:close_connection(Connection),

Events

The ampq10_client API is mostly asynchronous with respect to the AMQP 1.0
protocol. Functions such as amqp10_client:open_connection typically return
after the Open frame has been successfully written to the socket rather than
waiting until the remote end returns with their Open frame. The client will
notify the caller of various internal/async events using amqp10_event
messages. In the example above when the remote replies with their Open frame
a message is sent of the following forma:

  1. {amqp10_event, {connection, ConnectionPid, opened}}

When the connection is closed an event is issued as such:

  1. {amqp10_event, {connection, ConnectionPid, {closed, Why}}}

Why could be normal or contain a description of an error that occured
and resulted in the closure of the connection.

Likewise sessions and links have similar events using a similar format.

  1. %% success events
  2. {amqp10_event, {connection, ConnectionPid, opened}}
  3. {amqp10_event, {session, SessionPid, begun}}
  4. {amqp10_event, {link, LinkRef, attached}}
  1. %% error events
  2. {amqp10_event, {connection, ConnectionPid, {closed, Why}}}
  3. {amqp10_event, {session, SessionPid, {ended, Why}}}
  4. {amqp10_event, {link, LinkRef, {detached, Why}}}

In addition the client may notify the initiator of certain protocol
events such as a receiver running out of credit or credit being available
to a sender.

  1. %% no more credit available to sender
  2. {amqp10_event, {link, Sender, credit_exhausted}}
  3. %% sender credit received
  4. {amqp10_event, {link, Sender, credited}}

Other events may be declared as necessary, Hence it makes sense for a user
of the client to handle all {amqp10_event, _} events to ensure unexpected
messages aren’t kept around in the mailbox.