Erlang RabbitMQ Client library
Overview
This guide covers an Erlang client for RabbitMQ (AMQP 0-9-1).
This user guide assumes that the reader is familiar with basic concepts of AMQP 0-9-1.
Refer to guides on connections, channels, queues, publishers, and consumers to learn about those key RabbitMQ concepts in more details.
Some topics covered in this guide include
- How to add a dependency on this client
- Usage basics
- How to connect to and disconnect from a RabbitMQ node
- How to open a channel and perform protocol operations on it
- How to define a topology: declare queues, exchanges and bindings
- Publishing and consumption of messages
- Handling of returned messages
and more.
Dependency
The client library is named amqp_client
and distributed via Hex.pm
together with its key dependency, rabbit-common
.
Below are dependency snippets to be used with popular build tools: Mix, Rebar 3 and erlang.mk.
Mix
{:rabbit_common, "~> 3.8"}
Rebar 3
{rabbit_common, "&version-erlang-client;"}
erlang.mk
dep_rabbit_common = hex &version-erlang-client;
Basics
The basic usage of the client follows these broad steps:
- Make sure the
amqp_client
Erlang application is started - Establish a connection to a RabbitMQ node
- Open a new channel on the connection
- Execute AMQP 0-9-1 commands with a channel such as declaring exchanges and queues, defining bindings between them, publishing messages, registering consumers (subscribing), and so on
- Register optional event handlers such as returned message handler
- When no longer required, close the channel and the connection
The amqp_client Application
RabbitMQ Erlang client is an Erlang application named amqp_client
.
As with any Erlang application, to begin using the client it's necessary to first make sure it is started:
application:ensure_started(amqp_client).
Key Modules and Concepts
The main two modules in the client library are:
amqp_connection
, which is used to open a connection to a RabbitMQ node and open channels on itamqp_channel
, which exposes most AMQP 0-9-1 operations such as queue declaration or consumer registration
Once a connection has been established and successfully authenticated,
and a channel has been opened, an application will typically use the
amqp_channel:call/{2,3}
and amqp_channel:cast/{2,3}
functions
together with AMQP 0-9-1 protocol method records to perform most operations.
Several additional modules make it possible for applications to react to certain events. They will be covered later in this guide.
The library is made up of two layers:
- A high level logical layer that follows the AMQP 0-9-1 protocol and operation execution model
- A low-level protocol implementation layer that is responsible for communicating with RabbitMQ nodes
Network Connection Types
AMQP 0-9-1 clients connect to RabbitMQ using TCP. One AMQP 0-9-1 connection uses one TCP connection under the hood. However, the Erlang client is unique in that it provides an alternative way of communicating with RabbitMQ nodes.
Network Client
Much like other clients, this library provides a TCP-based client that uses a TCP connection to transfer serialised protocol frames to the server. This client is called the network client and most applications should use.
To use the network client, start a connection using amqp_connection:start/1
with the
parameter set to an #amqp_params_network
record.
Direct (Erlang Distribution) Client
Alternatively, Erlang distribution connections can be used instead of separate TCP connections. This communication method assumes that the application that uses the client runs on the same Erlang cluster as RabbitMQ nodes.
The use of direct client should be limited to applications that are deployed side by side with RabbitMQ. Shovel and Federation plugins are two examples of such applications.
In most other cases, developers should prefer the more traditional network client covered above. It will be easier to reason about for operators and developers not familiar with Erlang.
To use the direct driver, start a connection using amqp_connection:start/1
with the
parameter set to an #amqp_params_direct
record.
Including Header Files
The Erlang client uses a number of record definitions which you will encounter in this guide. These records fall into two broad categories:
- Generated AMQP 0-9-1 method definitions
- Definitions of data structures that are commonly used throughout the client
To gain access to these records, you need to include the amqp_client.hrl in every module that uses the Erlang client:
-include("amqp_client.hrl").
Connecting to RabbitMQ
The amqp_connection
module is used to start a connection to a RabbitMQ node.
In this example we will use a network connection, which is the recommended
option for most use cases:
{ok, Connection} = amqp_connection:start(#amqp_params_network{})
This function returns an {ok, Connection}
pair, where Connection
is the
pid of a process that maintains a permanent connection.
This pid will be used to open channels on the connection and close the connection.
In case of an error, the above call returns an {error, Error}
pair.
The #amqp_params_network
record sets the following default values:
Parameter | Default Value |
username | guest |
password | guest |
virtual_host | / |
host | localhost |
port | 5672 |
channel_max | 2047 |
frame_max | 0 |
heartbeat | 0 |
ssl_options | none |
auth_mechanisms | [fun amqp_auth_mechanisms:plain/3, fun amqp_auth_mechanisms:amqplain/3] |
client_properties | [] |
These values are only the defaults that will work with an out of the box RabbitMQ node running on the same host. If the target node or the environment has been configured differently, these values can be overridden to match the actual deployment scenario.
TLS options can also be specified globally using the
ssl_options
environment key for the amqp_client
application.
They will be merged with the TLS parameters from the URI (the latter will take precedence).
Direct (Erlang Distribution) Client
Applications that are deployed inside the same Erlang cluster as the RabbitMQ, such as RabbitMQ plugins, can start a direct connection that bypasses network serialisation and relies on Erlang distribution for data transfers.
To start a direct connection, use amqp_connection:start/1
with the parameter
set to an #amqp_params_direct
record:
{ok, Connection} = amqp_connection:start(#amqp_params_direct{})
Credentials are optional for direct connections, since Erlang distribution relies on a shared secret, the Erlang cookie, for authentication.
If a username and password are provided then they will be used for authentication and made available to authentication backends.
If only a username is supplied, then the user is considered trusted and logged in unconditionally.
If neither username nor password are provided, then the connection will be considered to be from a fully trusted user which can connect to any virtual host and has full permissions.
The #amqp_params_direct
record sets the following default values:
Parameter | Default Value |
username | none |
password | none |
virtual_host | / |
node | node() |
client_properties | [] |
Connecting to RabbitMQ Using an AMQP URI
Instead of working with records such #amqp_params_network
directly,
AMQP URIs may be used.
The amqp_uri:parse/1
function is provided for this purpose.
It parses an URI and returns the equivalent #amqp_params_network
or #amqp_params_direct
record.
Diverging from the spec, if the hostname is omitted, the
connection is assumed to be direct and an #amqp_params_direct{}
record is returned. In addition to the standard host, port, user,
password and vhost parameters, extra parameters may be specified
via the query string (e.g. "?heartbeat=5" to configure a heartbeat timeout).
Creating Channels
Once a connection has been established, use the amqp_connection
module
to open one or more channels that will be used
to define the topology, publish and consume messages:
{ok, Channel} = amqp_connection:open_channel(Connection)
This function takes the pid of the connection process and returns
a {ok, Channel}
pair, where Channel
is a pid that represents
a channel and will be used to execute protocol commands.
Using AMQP 0-9-1 Methods (Protocol Operations)
The client library's primary way of interacting with RabbitMQ nodes is by sending and handling AMQP 0-9-1 methods (also referred to as "commands" in this guide) that are represented by records.
The client tries to use sensible default values for each record.
For example, when using the #'exchange.declare'{}
method to declare a transient exchange,
it is sufficient to only specify a name:
#'exchange.declare'{exchange = <<"my_exchange">>}
The above example is equivalent to this:
#'exchange.declare'{exchange = <<"my_exchange">>,
type = <<"direct">>,
passive = false,
durable = false,
auto_delete = false,
internal = false,
nowait = false,
arguments = []}
Defining a Topology: Exchanges, Queues, Bindings
Once a channel has been established, the amqp_channel
module can
be used to manage the fundamental objects within AMQP, namely
exchanges and queues. The following function creates an exchange
called my_exchange, which by default, is the direct exchange:
Declare = #'exchange.declare'{exchange = <<"my_exchange">>},
#'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare)
Similarly, a transient queue called my_queue
is created by this code:
Declare = #'queue.declare'{queue = <<"my_queue">>},
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)
To declare a durable queue:
Declare = #'queue.declare'{
queue = <<"my_queue">>,
durable = true
},
#'queue.declare_ok'{} = amqp_channel:call(Channel, Declare)
In some cases an application wants to use a transient queue and is not interested in the actual name
of the queue. In this case, it is possible to let the broker generate a random name for a
queue. To do this, use a #'queue.declare'{}
method and leave the
queue attribute undefined. Specifying a blank string for queue name
would have the same effect.
#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{})
The server will generate a queue name unique in this cluster and return this name as part of the acknowledgement.
Bindings
To create a routing rule from an exchange to a queue, the
#'queue.bind'{}
command is used:
Binding = #'queue.bind'{queue = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.bind_ok'{} = amqp_channel:call(Channel, Binding)
When this routing rule is no longer required, this route can be
deleted using the #'queue.unbind'{}
command:
Binding = #'queue.unbind'{queue = Queue,
exchange = Exchange,
routing_key = RoutingKey},
#'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
Deleting Entities
An exchange can be deleted by the #'exchange.delete'{}
command:
Delete = #'exchange.delete'{exchange = <<"my_exchange">>},
#'exchange.delete_ok'{} = amqp_channel:call(Channel, Delete)
Similarly, a queue is deleted using the #'queue.delete'{}
command:
Delete = #'queue.delete'{queue = <<"my_queue">>},
#'queue.delete_ok'{} = amqp_channel:call(Channel, Delete)
Synchronous and Asynchronous Protocol Methods, Calls and Casts
Note that the above examples use amqp_channel:call/2
.
This is because they use synchronous AMQP 0-9-1 methods that produce a response
(unlike a group of methods called asynchronous methods).
It is generally advisable to use amqp_channel:call/{2,3}
for synchronous methods,
rather than amqp_channel:cast/{2,3}
, even though both functions work with both
sync and async methods.
One difference between the two functions is that amqp_channel:call/{2,3}
blocks the calling
process until the reply comes back from the server (for sync methods) or the method
has been sent on the wire (for async methods), whereas amqp_channel:cast/{2,3}
returns an 'ok' immediately.
Thus, only by using amqp_channel:call/{2,3}
can we verify that the server
has acknowledged our command.
Publishing Messages
To publish a message to an exchange with a particular routing key,
the #'basic.publish'{}
method.
Messages are represented using the #amqp_msg{}
record:
Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = X, routing_key = Key},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload})
By default, the properties field of the #amqp_msg{}
record contains
a minimal set of message properties as a #'P_basic'{}
properties record.
If an application needs to override any of the defaults, for example,
to send persistent messages, the #amqp_msg{}
needs to
be constructed accordingly:
Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = X, routing_key = Key},
Props = #'P_basic'{delivery_mode = 2}, %% persistent message
Msg = #amqp_msg{props = Props, payload = Payload},
amqp_channel:cast(Channel, Publish, Msg)
Full list of message properties can be found in the Publishers guide.
The AMQP 0-9-1 #'basic.publish'
method is asynchronous:
the server will not send a response to it. However, clients can opt in
to have unroutable messages returned to them.
This is described in the section on return message handlers.
The above example does not use Publisher Confirms.
To await for all outstanding publishes to be confirmed after publishing
a batch of messages, use amqp_channel:wait_for_confirms/2
function.
It will return a true
if all outstanding publishes were successfully confirmed
or a timeout
if timeout has occurred.
Note that waiting after every published message is extremely inefficient and unnecessary. A more optimal way would be to publish a batch of messages and await their confirmation. If some publishes were not confirmed in time, the entire last batch can be republished.
Consumers: Subscribing To Queues Using the "Push API"
Applications can subscribe to be delivered messages routed to a queue. This "push API" is the recommended way of consuming messages (the other being polling, which should be avoided when possible).
To add a consumer to a queue (subscribe to a queue), the
#'basic.consume'{}
method is used in one of two ways:
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:subscribe(Channel, #'basic.consume'{queue = Q}, Consumer)
or
%% A consumer process is not provided so the calling
%% process (`self()`) will be the consumer
#'basic.consume_ok'{consumer_tag = Tag} =
amqp_channel:call(Channel, #'basic.consume'{queue = Q})
The consumer argument is the pid of a process to which the client library
will deliver messages.
This can be an arbitrary Erlang process, including the process that initiated
the connection.
The #'basic.consume_ok'{}
return contains a consumer tag. The tag is a consumer
(subscription) identifier that is used to cancel the consumer.
This is used at a later point in time to cancel the consumer.
This notification is sent both to the process that created the subscription
(as the return value to amqp_channel:subscribe/3
) and
as a message to the consumer process.
When a consumer process is subscribed to a queue, it will receive messages in its mailbox. An example receive loop looks like this:
loop(Channel) ->
receive
%% This is the first message received
#'basic.consume_ok'{} ->
loop(Channel);
%% This is received when the subscription is cancelled
#'basic.cancel_ok'{} ->
ok;
%% A delivery
{#'basic.deliver'{delivery_tag = Tag}, Content} ->
%% Do something with the message payload
%% (some work here)
%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
%% Loop
loop(Channel)
end.
In the above example, the process consumes the consumer registration (subscription) notification and then proceeds to wait for delivery messages to arrive in its process mailbox.
When messages are received, the loop does something useful with the message and
sends an acknowledgement back to the server.
If the consumer is cancelled, a cancellation notification will be sent to the
consumer process. In this scenario, the receive loop just
exits. If the application does not wish to explicitly acknowledge
message receipts, it can use automatic acknowledgement mode.
For that, set the no_ack
property of #'basic.consume'
record
to true
. When in automatic acknowledgement mode, consumers do not
acknowledge deliveries: RabbitMQ will consider them delivered
immediately after sending them down the connection.
Cancelling a Consumer
To cancel a consumer, use the consumer tag returned
with the #'basic.consume_ok'{}
response:
amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag})
A cancelled consumer may still receive "in flight" deliveries, e.g. those currently in TCP buffers at the time of consumer cancellation. However, eventually — and usually shortly after — consumer cancellation there will be no further deliveries to its handling process.
Implementation of Consumers
The channel uses a module implementing the amqp_gen_consumer
behaviour to determine how it should handle consumer events.
Effectively, this module handles client-side
consumer registration and ensures routing of deliveries to the appropriate consumers.
For instance, the default consumer module, amqp_selective_consumer
,
keeps track of which processes are subscribed to which queues and routes deliveries appropriately;
in addition, if the channel gives it a delivery for an unknown
consumer, it will pass it to a default consumer, should one be registered.
By contrast, amqp_direct_consumer
simply forwards all the
messages it receives from the channel to its only registered consumer.
The consumer module for a channel is chosen when the channel is
opened by setting the second parameter to amqp_connection:open_channel/2
.
The consumer module implements the amqp_gen_consumer
behaviour and thus implements functions to handle
receiving basic.consume
, basic.consume_ok
, basic.cancel
, basic.cancel_ok
methods as well
delivery of published messages.
Closing Channels And The Connection
When a channel is no longer required, a client should close it.
This is achieved using amqp_channel:close/1
:
amqp_channel:close(Channel)
To close the connection, amqp_connection:close/1
is used:
amqp_connection:close(Connection)
Closing a connection will automatically implicitly close all channels on that connection.
Both the #'channel.close' and #'connection.close' commands
take the arguments reply_code
(an integer) and reply_text
(a binary),
which can be set by the client depending on the reason why
the channel or connection is being closed.
In most cases the reply_code
should set to 200 to indicate a normal shutdown.
The reply_text
attribute is just an arbitrary string, that the server
may or may not log. If a client wants to set to a different reply
code and/or text, it can use the overloaded functions
amqp_channel:close/3
and amqp_connection:close/3
respectively.
Delivery Flow Control
By default, there is no flow control within a channel other than normal TCP back-pressure. A consumer can set the size of the prefetch buffer that the broker will maintain for outstanding unacknowledged messages on a single channel. This is achieved using the #'basic.qos' command:
amqp_channel:call(Channel, #'basic.qos'{prefetch_count = Prefetch})
Applications are recommended to use a prefetch. Learn more in the Publisher Confirms and Consumer Acknowledgements guide.
Blocked Connections
When a node detects that it is below a certain available resource threshold, it may choose to stop reading from publishers' network sockets.
RabbitMQ supports a mechanism to allow clients to be told this has taken place.
Use amqp_connection:register_blocked_handler/2
giving the
pid of a process to which #'connection.blocked'{}
and
#'connection.unblocked'{}
should may be sent.
Handling Returned Messages
The broker will return undeliverable messages back to the
originating client. These are messages published either with the
immediate or mandatory flags set. In order for the application to
get notified of a return, it must register a callback process
that can process #'basic.return'{}
frames.
Here is an example of unrouteable message handling:
amqp_channel:register_return_handler(Channel, self()),
amqp_channel:call(Channel, #'exchange.declare'{exchange = X}),
Publish = #'basic.publish'{exchange = X, routing_key = SomeKey,
mandatory = true},
amqp_channel:call(Channel, Publish, #amqp_msg{payload = Payload}),
receive
{BasicReturn, Content} ->
#'basic.return'{reply_text = <<"unroutable">>, exchange = X} = BasicReturn
%% Do something with the returned message
end
Receiving Messages Using the "Fetch API"
It is also possible to retrieve individual messages on demand ("pull API" a.k.a. polling). This approach to consumption is highly inefficient as it is effectively polling and applications repeatedly have to ask for results even if the vast majority of the requests yield no results. Therefore using this approach is highly discouraged.
This is achieved using the #'basic.get'{}
command:
Get = #'basic.get'{queue = Q, no_ack = true},
{#'basic.get_ok'{}, Content} = amqp_channel:call(Channel, Get),
#amqp_msg{payload = Payload} = Content
The payload that is returned is an Erlang binary, and it is up to the application to decode it, as the structure of this content is opaque to both client library and the server.
If the queue were empty when the #'basic.get'{}
command was
invoked, then the channel will return an #'basic.get_empty'
result, as illustrated here:
#'basic.get_empty'{} = amqp_channel:call(Channel, Get)
Note that the previous example sets the no_ack flag on the
#'basic.get'{}
command. This tells the broker that the receiver
will not send an acknowledgement of the message. In doing so, the
broker can absolve itself of the responsibility for delivery -
once it believes it has delivered a message, then it is free to
assume that consuming application has taken responsibility for
it. In general, a lot of applications will not want these
semantics, rather, they will want to explicitly acknowledge the
receipt of a message. This is done with the #'basic.ack'
command, where the no_ack field is turned off by default:
Get = #'basic.get'{queue = Q},
{#'basic.get_ok'{delivery_tag = Tag}, Content}
= amqp_channel:call(Channel, Get),
%% Do something with the message payload.......and then ack it
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag})
Notice that the #'basic.ack'{}
method was sent using
amqp_channel:cast/2
instead of amqp_channel:call/2
. This is
because acknowledgements are entirely asynchronous and the server
will not produce a response for them.
A Basic Example
Below is a complete example of basic usage of the library. For the sake of simplicity it does not use publisher confirms and uses a polling consumer which performs manual acknowledgements.
-module(amqp_example).
-include("amqp_client.hrl").
-compile([export_all]).
test() ->
%% Start a network connection
{ok, Connection} = amqp_connection:start(#amqp_params_network{}),
%% Open a channel on the connection
{ok, Channel} = amqp_connection:open_channel(Connection),
%% Declare a queue
#'queue.declare_ok'{queue = Q}
= amqp_channel:call(Channel, #'queue.declare'{}),
%% Publish a message
Payload = <<"foobar">>,
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}),
%% Poll for a message
Get = #'basic.get'{queue = Q},
{#'basic.get_ok'{delivery_tag = Tag}, Content}
= amqp_channel:call(Channel, Get),
%% Do something with the message payload
%% (some work here)
%% Ack the message
amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag}),
%% Close the channel
amqp_channel:close(Channel),
%% Close the connection
amqp_connection:close(Connection),
ok.
In this example, a queue is created with a server generated name and a message is published directly to the queue. This makes use of the fact that every queue is bound to the default exchange via its own queue name. The message is then dequeued and acknowledged.
Compiling Code with Client as a Dependency
The client build process produces two deployment archives:
- amqp_client.ez, which contains all of the client library modules
- rabbit_common.ez, which contains the common modules from the server that are required at run-time
Both dependencies can be provisioned using build tools such as Rebar 3 or erlang.mk.
For the sake of an example. let's assume that the dependency management tool
used compiles dependencies under the ./deps
directory.
Then to compile the example code manually, erlc
can be used with ERL_LIBS
pointing to the
./deps
directory:
ERL_LIBS=deps erlc -o ebin amqp_example.erl
And then to run your application you could set the Erlang run-time like this:
ERL_LIBS=deps erl -pa ebin
# => Erlang/OTP 23 [erts-11.0] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:16]
# =>
# => Eshell V11.0 (abort with ^G)
# => 1> amqp_example:test().
# => ok
# => 2>