This guide is a quick tutorial that helps you to get started with the AMQP 0.9.1 specification in general and the Langohr in particular. It should take about 15 minutes to read and study the provided code examples. This guide covers:
This work is licensed under a Creative Commons Attribution 3.0 Unported License (including images and stylesheets). The source is available on Github.
This guide covers Langohr 5.1.x.
Langohr requires Clojure 1.8+. The latest stable release is recommended.
Langohr requires JDK 8 or newer.
Langohr works with RabbitMQ versions 2.x and 3.x. Some features may only be available in more recent releases (for example, 3.7.x).
Langohr is a Clojure client for RabbitMQ that embrace AMQP 0.9.1 Model. It reflects AMQP 0.9.1 protocol structure in the API and uses established terminology and support all of the RabbitMQ extensions to AMQP 0.9.1.
Langohr was designed to combine good parts of several other clients: the official RabbitMQ Java client, Bunny and March Hare. It does not, however, try to hide the protocol and RabbitMQ capabilities behind layers of DSLs and new abstractions.
Langohr may seem like a low level client but in return it is predictable, gives you access to all RabbitMQ features and freedom to design message routing scheme and error handling strategy that makes sense for you.
Here is what Langohr does not try to be:
The RabbitMQ site has a good installation guide that addresses many operating systems.
On MacOS, the fastest way to install RabbitMQ is with Homebrew:
brew install rabbitmq
then run it:
rabbitmq-server
On Debian and Ubuntu, the recommended way is to use a RabbitMQ apt repository.
For RPM-based distributions like Fedora, RHEL or CentOS, the RabbitMQ team provides an RPM package.
Langohr artifacts are released to Clojars.
[com.novemberain/langohr "5.1.0"]
Add Clojars repository definition to your pom.xml
:
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
And then the dependency:
<dependency>
<groupId>com.novemberain</groupId>
<artifactId>langohr</artifactId>
<version>5.1.0</version>
</dependency>
You can verify your installation in the REPL:
$ lein repl
user=> (require 'langohr.core)
;= nil
user=> langohr.core/*default-config*
;= {:host "localhost", :port 5672, :username "guest", :vhost "/", :password "guest"}
Let us begin with the classic "Hello, world" example. First, here is the code:
(ns clojurewerkz.langohr.examples.hello-world
(:gen-class)
(:require [langohr.core :as rmq]
[langohr.channel :as lch]
[langohr.queue :as lq]
[langohr.consumers :as lc]
[langohr.basic :as lb]))
(def ^{:const true}
default-exchange-name "")
(defn message-handler
[ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload]
(println (format "[consumer] Received a message: %s, delivery tag: %d, content type: %s, type: %s"
(String. payload "UTF-8") delivery-tag content-type type)))
(defn -main
[& args]
(let [conn (rmq/connect)
ch (lch/open conn)
qname "langohr.examples.hello-world"]
(println (format "[main] Connected. Channel id: %d" (.getChannelNumber ch)))
(lq/declare ch qname {:exclusive false :auto-delete true})
(lc/subscribe ch qname message-handler {:auto-ack true})
(lb/publish ch default-exchange-name qname "Hello!" {:content-type "text/plain" :type "greetings.hi"})
(Thread/sleep 2000)
(println "[main] Disconnecting...")
(rmq/close ch)
(rmq/close conn)))
This example demonstrates a very common communication scenario: application A wants to publish a message that will end up in a queue that application B listens on. In this case, the queue name is "langohr.examples.hello-world". Let us go through the code step by step:
(ns clojurewerkz.langohr.examples.hello-world
(:gen-class)
(:require [langohr.core :as rmq]
[langohr.channel :as lch]
[langohr.queue :as lq]
[langohr.exchange :as le]
[langohr.consumers :as lc]
[langohr.basic :as lb]))
defines our example app namespace that requires (loads) several Langohr namespaces:
langohr.core
langohr.channel
langohr.queue
langohr.basic
langohr.consumers
and will be compiled ahead-of-time (so we can run it).
Clojure applications are compiled to JVM bytecode. The -main
function is the entry point.
A few things is going on here:
langohr.core/connect
. We pass no arguments so connection parameters are all defaults."langohr.examples.hello-world"
(let [conn (rmq/connect)]
(comment ...))
connects to RabbitMQ at 127.0.0.1:5672
with vhost of "/"
, username
of "guest"
and password of "guest"
and returns the connection.
rmq
is an alias for langohr.core
(see the ns snippet above).
(let [conn (rmq/connect)
ch (lch/open conn)]
(comment ...))
opens a new channel. AMQP is a multi-channeled protocol (connections have one or more channels). This makes it possible for highly concurrent clients to reuse a single TCP connection for multiple "virtual connections".
lch
is an alias for langohr.channel
.
The code below
(lq/declare ch qname {:exclusive false :auto-delete true})
declares a queue on the channel that we have just opened. Consumer applications get messages from queues. We declared this queue with the "auto-delete" and "non-exclusive" parameters. Basically, this means that the queue will be deleted when our little app exits.
Now that we have a queue, we can start consuming messages from it:
(lc/subscribe ch queue-name message-handler {:auto-ack true})
We use langohr.queue/subscribe
to start a consumer. This function
returns immediately and the handler provided (message-handler
) will
be invoked for every inbound delivery.
Finally, here's the handling function:
(defn message-handler
[ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload]
(println (format "[consumer] Received a message: %s, delivery tag: %d, content type: %s, type: %s"
(String. payload "UTF-8") delivery-tag content-type type)))
It takes a channel the consumer uses, a Clojure map of message metadata and message payload as array of bytes. We turn it into a string and print it, as well as a few message properties.
Next we publish a message to an exchange. Exchanges receive messages that are sent by producers. Exchanges route messages to queues according to rules called bindings. In this particular example, there are no explicitly defined bindings. The exchange that we defined is known as the default exchange and it has implicit bindings to all queues.
Routing key is one of the message properties (also called
attributes). The default exchange will route the message to a queue
that has the same name as the message's routing key. This is how our
message ends up in the "langohr.examples.hello-world"
queue.
(lb/publish ch default-exchange-name qname "Hello!" {:content-type "text/plain" :type "greetings.hi"})
Then we use langohr.core/close
, a polymorphic function, to close both the channel and connection.
(rmq/close ch)
(rmq/close conn)
This diagram demonstrates the "Hello, world" example data flow:
For the sake of simplicity, both the message producer (App I) and the consumer (App II) are running in the same JVM process. Now let us move on to a little bit more sophisticated example.
Blabbr is a One-to-Many Publish/Subscribe (pubsub) Routing Example.
The previous example demonstrated how a connection to a broker is made and how to do 1:1 communication using the default exchange. Now let us take a look at another common scenario: broadcast, or multiple consumers and one producer.
A very well-known broadcast example is Twitter: every time a person tweets, followers receive a notification. Blabbr, our imaginary information network, models this scenario: every network member has a separate queue and publishes blabs to a separate exchange. Three Blabbr members, Joe, Aaron and Bob, follow the official NBA account on Blabbr to get updates about what is happening in the world of basketball. Here is the code:
(ns clojurewerkz.langohr.examples.blabbr
(:gen-class)
(:require [langohr.core :as rmq]
[langohr.channel :as lch]
[langohr.queue :as lq]
[langohr.exchange :as le]
[langohr.consumers :as lc]
[langohr.basic :as lb]))
(defn start-consumer
"Starts a consumer bound to the given topic exchange in a separate thread"
[ch topic-name username]
(let [queue-name (format "nba.newsfeeds.%s" username)
handler (fn [ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload]
(println (format "[consumer] %s received %s" username (String. payload "UTF-8"))))]
(lq/declare ch queue-name {:exclusive false :auto-delete true})
(lq/bind ch queue-name topic-name)
(lc/subscribe ch queue-name handler {:auto-ack true})))
(defn -main
[& args]
(let [conn (rmq/connect)
ch (lch/open conn)
ex "nba.scores"
users ["joe" "aaron" "bob"]]
(le/declare ch ex "fanout" {:durable false :auto-delete true})
(doseq [u users]
(start-consumer ch ex u))
(lb/publish ch ex "" "BOS 101, NYK 89" {:content-type "text/plain" :type "scores.update"})
(lb/publish ch ex "" "ORL 85, ATL 88" {:content-type "text/plain" :type "scores.update"})
(Thread/sleep 2000)
(rmq/close ch)
(rmq/close conn)))
In this example, opening a channel is no different to opening a channel in the previous example, however, we do one extra thing: declare an exchange:
(let [conn (rmq/connect)
ch (lch/open conn)
ex "nba.scores"]
(le/declare ch ex "fanout" {:durable false :auto-delete true}))
The exchange that we declare above is a fanout exchange. A fanout exchange delivers messages to all of the queues that are bound to it: exactly what we want in the case of Blabbr.
This piece of code
(let [queue-name (format "nba.newsfeeds.%s" username)
handler (fn [ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload]
(println (format "[consumer] %s received %s" username (String. payload "UTF-8"))))]
(lq/declare ch queue-name :exclusive false :auto-delete true)
(lq/bind ch queue-name topic-name)
(lc/subscribe ch queue-name handler :auto-ack true))
is similar to the subscription code that we used for message delivery previously, but also does a bit more: it sets up a binding between the queue and the exchange that was declared earlier. We need to do this to make sure that our fanout exchange routes messages to the queues of all subscribed followers.
Another difference in this example is the routing key we use for publishing: it is an empty string.
(lb/publish ch ex "" "BOS 101, NYK 89" {:content-type "text/plain" :type "scores.update"})
(lb/publish ch ex "" "ORL 85, ATL 88" {:content-type "text/plain" :type "scores.update"})
Fanout exchanges simply put a copy of the message in each queue bound to them and ignore the routing key.
A diagram for Blabbr looks like this:
Weathr is a Many-to-Many Topic Routing Example.
So far, we have seen point-to-point communication and broadcasting. Those two communication styles are possible with many protocols, for instance, HTTP handles these scenarios just fine. You may ask "what differentiates AMQP 0-9-1?" Well, next we are going to introduce you to topic exchanges and routing with patterns, one of the features that makes AMQP 0-9-1 very powerful.
Our third example involves weather condition updates. What makes it different from the previous two examples is that not all of the consumers are interested in all of the messages. People who live in Portland usually do not care about the weather in Hong Kong (unless they are visiting soon). They are much more interested in weather conditions around Portland, possibly all of Oregon and sometimes a few neighbouring states.
Our example features multiple consumer applications monitoring updates for different regions. Some are interested in updates for a specific city, others for a specific state and so on, all the way up to continents. Updates may overlap so that an update for San Diego, CA appears as an update for California, but also should show up on the North America updates list.
Here is the code:
(ns clojurewerkz.langohr.examples.weathr
(:gen-class)
(:require [langohr.core :as rmq]
[langohr.channel :as lch]
[langohr.queue :as lq]
[langohr.exchange :as le]
[langohr.consumers :as lc]
[langohr.basic :as lb]))
(def ^{:const true}
weather-exchange "weathr")
(defn start-consumer
"Starts a consumer bound to the given topic exchange in a separate thread"
[ch topic-name queue-name]
(let [queue-name' (:queue (lq/declare ch queue-name {:exclusive false :auto-delete true}))
handler (fn [ch {:keys [routing-key] :as meta} ^bytes payload]
(println (format "[consumer] Consumed '%s' from %s, routing key: %s" (String. payload "UTF-8") queue-name' routing-key)))]
(lq/bind ch queue-name' weather-exchange {:routing-key topic-name})
(lc/subscribe ch queue-name' handler {:auto-ack true})))
(defn publish-update
"Publishes a weather update"
[ch payload routing-key]
(lb/publish ch weather-exchange routing-key payload {:content-type "text/plain" :type "weather.update"}))
(defn -main
[& args]
(let [conn (rmq/connect)
ch (lch/open conn)
locations {"" "americas.north.#"
"americas.south" "americas.south.#"
"us.california" "americas.north.us.ca.*"
"us.tx.austin" "#.tx.austin"
"it.rome" "europe.italy.rome"
"asia.hk" "asia.southeast.hk.#"}]
(le/declare ch weather-exchange "topic" {:durable false :auto-delete true})
(doseq [[k v] locations]
(start-consumer ch v k))
(publish-update ch "San Diego update" "americas.north.us.ca.sandiego")
(publish-update ch "Berkeley update" "americas.north.us.ca.berkeley")
(publish-update ch "SF update" "americas.north.us.ca.sanfrancisco")
(publish-update ch "NYC update" "americas.north.us.ny.newyork")
(publish-update ch "São Paolo update" "americas.south.brazil.saopaolo")
(publish-update ch "Hong Kong update" "asia.southeast.hk.hongkong")
(publish-update ch "Kyoto update" "asia.southeast.japan.kyoto")
(publish-update ch "Shanghai update" "asia.southeast.prc.shanghai")
(publish-update ch "Rome update" "europe.italy.roma")
(publish-update ch "Paris update" "europe.france.paris")
(Thread/sleep 2000)
(rmq/close ch)
(rmq/close conn)))
The first line that is different from the Blabbr example is
(le/declare ch weather-exchange "topic" {:durable false :auto-delete true})
We use a topic exchange here. Topic exchanges are used for multicast messaging where consumers indicate which topics they are interested in (think of it as subscribing to a feed for an individual tag in your favourite blog as opposed to the full feed). Routing with a topic exchange is done by specifying a routing pattern on binding, for example:
(lq/bind ch queue-name' weather-exchange {:routing-key topic-name})
Here we bind a queue with the name of "americas.south" to the topic
exchange declared earlier using the langohr.queue/bind
function.
This means that only messages with a routing key matching
americas.south.#
will be routed to that queue. A routing pattern
consists of several words separated by dots, in a similar way to URI
path segments joined by slashes. Here are a few examples:
Now let us take a look at a few routing keys that match the "americas.south.#" pattern:
In other words, the #
part of the pattern matches 0 or more words.
For a pattern like americas.south.*
, some matching routing keys would be:
but not
so *
only matches a single word. The AMQP 0.9.1 specification says
that topic segments (words) may contain the letters A-Z and a-z and
digits 0-9.
Another novel piece in this example is the use of a server-named queue:
(let [queue-name' (.getQueue (lq/declare ch queue-name :exclusive false :auto-delete true))]
(comment ...))
To declare a server-named queue you pass queue name as an empty string
and RabbitMQ will generate a cluster-unique name for you. The name
will be returned to the client and you can access it using Java
interop on the value langohr.queue/declare
returns to you. We do
this to output queue name.
Server-named queues are commonly used for collecting replies to other messages (the "request-reply" pattern, often referred to as "RPC") or when no application needs to know the exact queue name, it just has to be unique.
When you run this example, the output will look a bit like this:
[consumer] Consumed 'San Diego update' from us.california, routing key: americas.north.us.ca.sandiego
[consumer] Consumed 'San Diego update' from amq.gen-AbT8568AYqRY1sC8uB1myf, routing key: americas.north.us.ca.sandiego
[consumer] Consumed 'Berkeley update' from us.california, routing key: americas.north.us.ca.berkeley
[consumer] Consumed 'Berkeley update' from amq.gen-AbT8568AYqRY1sC8uB1myf, routing key: americas.north.us.ca.berkeley
[consumer] Consumed 'SF update' from us.california, routing key: americas.north.us.ca.sanfrancisco
[consumer] Consumed 'SF update' from amq.gen-AbT8568AYqRY1sC8uB1myf, routing key: americas.north.us.ca.sanfrancisco
[consumer] Consumed 'NYC update' from amq.gen-AbT8568AYqRY1sC8uB1myf, routing key: americas.north.us.ny.newyork
[consumer] Consumed 'São Paolo update' from americas.south, routing key: americas.south.brazil.saopaolo
[consumer] Consumed 'Hong Kong update' from asia.hk, routing key: asia.southeast.hk.hongkong
As you can see, some messages were routed to multiple queues and some were not routed to any queues ("dead lettered"). Some queues have names we picked and some have names generated by RabbitMQ.
A (very simplistic) diagram to demonstrate topic exchange in action:
The rest of this guide will cover Langohr API design principles. if you are completely new to AMQP and RabbitMQ, feel free to skip them for now and go straight to the Wrapping Up section.
AMQP 0.9.1 operations are grouped into classes (no relation to classes in OO languages such as Java or Objective-C):
connection.*
channel.*
queue.*
exchange.*
basic.*
tx.*
With a couple of exceptions, the Langohr API follows this structure:
queue.declare
is accessible via the langohr.queue/declare
functionexchange.declare
is accessible via langohr.exchange/declare
basic.publish
is accessible via langohr.basic/publish
basic.cancel
is accessible via langohr.basic/cancel
and so on. This makes it easy to predict function names, navigate API reference and source code and communicate with developers who use other AMQP 0.9.1 clients.
The exceptions to this are:
connection.open
is exposed as langohr.core/connect
connection.close
is accessible via langohr.core/close
channel.close
is accessible via langohr.core/close
(the function is polymorphic and works on both connections and channels)AMQP 0.9.1 and messaging in general are inherently asynchronous:
applications publish messages as events happen and react to events
elsewhere by consuming messages and processing them. Per AMQP 0.9.1
specification, some AMQP methods (protocol
operations, similar to GET
and POST
in HTTP) are asynchronous (do
not block) and some are synchronous (block until a response is
received).
There are good reasons for this. For some methods, there may be no
response (e.g. publishing messages) or it may arrive at an unknown
moment in the future, possibly hours and days later. Some operations
(e.g. declaring a queue) typically finish in a few milliseconds and
applications have to wait for them to finish before they can do
anything else. In the case of the queue.declare
method, it is not
possible to start a consumer on a queue that wasn't declared.
RabbitMQ Java client and Langohr take a pragmatic approach: some API functions block, others do not. A few examples of blocking operations:
langohr.queue/declare
langohr.exchange/declare
langohr.queue/bind
langohr.queue/purge
langohr.queue/delete
langohr.basic/get
("pull API", synchronous by design)The following functions do not block the calling thread:
langohr.basic/publish
langohr.basic/ack
langohr.basic/reject
langohr.basic/nack
these lists are not complete but should give you an idea about Langohr's philosophy: operations that are performance-sensitive or inherintly asynchronous won't block, operations that are synchronous by nature or required to finish for other operations to proceed as blocking.
This offers developers a good balance of convenience of blocking function calls and good throughput for publishing.
This is the end of the tutorial. Congratulations! You have learned quite a bit about both RabbitMQ and Langohr. This is only the tip of the iceberg. RabbitMQ has many more features built into the protocol:
and so on. Other guides explain these features in depth, as well as use cases for them. To stay up to date with Langohr development, follow @clojurewerkz on Twitter and join our mailing list.
The documentation is organized as a number of guides, covering various topics.
We recommend that you read the following guides first, if possible, in this order:
Please take a moment to tell us what you think about this guide on Twitter or the Clojure RabbitMQ mailing list
Let us know what was unclear or what has not been covered. Maybe you do not like the guide style or grammar or discover spelling mistakes. Reader feedback is key to making the documentation better.