Development of a robust application, be it message publisher or message consumer, involves dealing with multiple kinds of failures: protocol exceptions, network failures, broker failures and so on. Correct error handling and recovery is not easy. This guide explains how the library helps you in dealing with issues like
as well as
This work is licensed under a Creative Commons Attribution 3.0 Unported License (including images and stylesheets). The source is available on Github.
This guide covers Langohr 3.6.x.
When applications connect to the broker, they need to handle connection failures. Networks are not 100% reliable, even with modern system configuration tools like Chef or Puppet misconfigurations happen and the broker might also be down. Error detection should happen as early as possible.
langohr.core/connect
will raise java.net.ConnectException
or java.net.UnknownHostException
if a
connection fails. Code that catches it can write to a log about the
issue or use retry to execute the begin block one more time. Because
initial connection failures are due to misconfiguration or network
outage, reconnection to the same endpoint (hostname, port, vhost
combination) may result in the same issue over and over.
(require '[langohr.core :as rmq])
(rmq/connect {:host "127.0.0.1" :port 2887})
;; throws java.net.ConnectException due to incorrect port
(rmq/connect {:host "asd88asd.megacorp.com"})
;; throws java.net.UnknownHostException due to incorrect host
Another reason why a connection may fail is authentication failure. Handling authentication failure is very similar to handling initial TCP connection failure:
(rmq/connect {:uri "amqp://sdfoiu:sd899937@hub.megacorp.local:5672/%2F"})
;; throws com.rabbitmq.client.PossibleAuthenticationFailureException
;; due to invalid credentials
In case you are wondering why the exception name has "possible" in it: AMQP 0.9.1 spec requires broker implementations to simply close TCP connection without sending any more data when an exception (such as authentication failure) occurs before AMQP connection is open. In practice, however, when broker closes TCP connection between successful TCP connection and before AMQP connection is open, it means that authentication has failed.
RabbitMQ 3.2 introduces authentication failure
notifications which
Langohr supports. When connecting to RabbitMQ 3.2 or later, Langohr
will raise com.rabbitmq.client.AuthenticationFailureException
when
it receives a proper authentication failure notification.
Detecting network connections is nearly useless if an application cannot recover from them. Recovery is the hard part in "error handling and recovery". Fortunately, the recovery process for many applications follows one simple scheme that Langohr can perform automatically for you.
When automatic recovery is enabled and Langohr detects TCP connection failure, it will try to reconnect every 5 seconds. Currently there is no limit on the number of reconnection attempts.
To completely disable automatic connection recovery, pass
:automatically-recover
as false
langohr.core/connect
.
To determine whether a connection uses automatic recovery, use
langohr.core/automatic-recovery-enabled?
:
(require '[langohr.core :as rmq])
(let [c (rmq/connect)]
(rmq/automatic-recovery-enabled? c))
;= true
Many applications use the same recovery strategy that consists of the following steps:
Langohr provides a feature known as "automatic topology recovery" that performs these steps after connection recovery, while taking care of some of the more tricky details such as recovery of server-named queues with consumers.
To recover your topology, for every channel Langohr will
Server-named queues will be declared with new names and their bindings and consumers will be updated accordingly.
Langohr will not track inter-channel dependencies, e.g. when a server-named queue was declared on channel 10 but used to consume messages from on channel 20. This means that for automatic topology recovery to work, all operations on a queue (declaration, binding, consuming messages, etc) must happen on the same channel, otherwise there is a possibility of the queue not being declared by the time another channel recovers and tries to use it.
To disable topology recovery, pass :automatically-recover-topology
as false
. Then Langohr will only recover connections and channels
(given that automatic recovery in general is not disabled).
It is possible to use recovery hooks (callbacks) to recover a topology manually.
Callbacks are registered on connections and channels using langohr.core/on-recovery
:
(rmq/on-recovery ch (fn [ch]
(start-consumer ch q)))
During recovery, the callback will be invoked after connection and channels have been recovered and passed the connection or channel it was registered with.
The following example demonstrates how a queue with a consumer is recovered using a recovery hook:
(ns clojurewerkz.langohr.examples.recovery.example1
(:gen-class)
(:require [langohr.core :as rmq]
[langohr.channel :as lch]
[langohr.queue :as lq]
[langohr.exchange :as lx]
[langohr.consumers :as lc]
[langohr.basic :as lb])
(:import java.io.IOException
com.rabbitmq.client.AlreadyClosedException))
(defn message-handler
[ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload]
(println (format "[consumer] Received a message: %s"
(String. payload "UTF-8"))))
(defn start-consumer
[ch ^String q]
(lq/declare ch q {:exclusive false :auto-delete false})
(lc/subscribe ch q message-handler {:auto-ack true}))
(defn -main
[& args]
(let [conn (rmq/connect {:automatically-recover true :automatically-recover-topology false})
ch (lch/open conn)
q "langohr.examples.recovery.example1.q"
x ""]
(start-consumer ch q)
(rmq/on-recovery ch (fn [ch]
(start-consumer ch q)))
;; publish messages that are routed to langohr.examples.recovery.example1.q
;; and demonstrate consumer recovery
(while true
(Thread/sleep 1000)
(try
(lb/publish ch x q "hello")
(catch AlreadyClosedException ace
(comment "Happens when you publish while the connection is down"))
(catch IOException ioe
(comment "ditto"))))))
Channel-level exceptions are more common than connection-level ones and often indicate issues applications can recover from (such as consuming from or trying to delete a queue that does not exist).
With Langohr, channel-level exceptions are raised as Java exceptions
(IOException
or ShutdownSignalException
) that provide access to
the underlying channel.close
method information.
Shutdown exceptions can be inspected using functions in the
langohr.shutdown
namespace:
(require '[langohr.queue :as lhq])
(require '[langohr.shutdown :as lh])
(try
;; bind a non-existent queue
(lhq/bind ch "ugggggh" "amq.fanout")
(catch java.net.IOException ioe
(lh/soft-error? ioe)
;= true, it's possible to recover from this exception
(lh/initiated-by-broker? ioe)
;= true, RabbitMQ closed the channel
(lh/initiated-by-application? ioe)
;= false
(println (lh/reason-of ioe))
(println (lh/channel-of ioe))
(println (lh/connection-of ioe))))
A few channel-level exceptions are common and deserve more attention.
The documentation is organized as a number of guides, covering various topics.
We recommend that you read the following guides first, if possible, in this order:
Please take a moment to tell us what you think about this guide on Twitter or the Clojure RabbitMQ mailing list
Let us know what was unclear or what has not been covered. Maybe you do not like the guide style or grammar or discover spelling mistakes. Reader feedback is key to making the documentation better.