amqpclt 0.5 - versatile AMQP client
amqpclt [OPTIONS]
amqpclt is a versatile tool to interact with messaging brokers speaking AMQP and/or message queues (see messaging.queue) on disk.
It receives messages (see messaging.message) from an incoming module, optionally massaging them (i.e. filtering and/or modifying), and sends them to an outgoing module. Depending on which modules are used, the tool can perform different operations.
Here are the supported incoming modules:
Here are the supported outgoing modules:
Here are some frequently used combinations:
See the “EXAMPLES” sections for concrete examples.
amqpclt can read its options from a configuration file. For this, the Perl Config::General module is used and the option names are the same as on the command line. For instance:
daemon = true
pidfile = /var/run/amqpclt.pid
incoming-queue = path=/var/spool/amqpclt
outgoing-broker-uri = amqp://broker.acme.com:5672/virtual_host
outgoing-broker-auth = "plain name=guest pass=guest"
Alternatively, options can be nested:
<outgoing-broker>
uri = amqp://broker.acme.com:5672/virtual_host
auth = "plain name=guest pass=guest"
</outgoing-broker>
Or even:
<outgoing>
<broker>
uri = amqp://broker.acme.com:5672/virtual_host
<auth>
scheme = plain
name = guest
pass = guest
</auth>
</broker>
</outgoing>
The options specified on the command line have precedence over the ones found in the configuration file.
amqpclt can be given python code to execute on all processed messages. This can be used for different purposes:
To use callbacks, the –callback-path or –callback-code option must be used. The python code must provide functions with the following signature:
The code can be put in a file, on the command line or in the amqpclt configuration file, using the “here document” syntax.
Here is an example (to be put in the amqpclt configuration file) that prints on stdout a JSON array of messages:
callback-code = <<EOF
def start (self):
self.count = 0
def check(self, msg):
if self.count:
sys.stdout.write(", ")
else:
sys.stdout.write("[")
self.count += 1
sys.stdout.write(msg.serialize())
return msg
def stop(self):
if self.count:
sys.stdout.write("]\n")
else:
sys.stdout.write("[]\n")
EOF
For simple callback code that only needs the check subroutine, it is enough to supply the “inside code”. If the function definition is missing, the supplied code will be wrapped with:
def check(self, msg):
hdr = msg.header
... your code goes here ...
return msg
This allows for instance to remove the message-id header with something like:
$ amqpclt ... --callback-code 'del(hdr["foo"])'
Here is an example of a configuration file for a message sender daemon (from queue to broker), forcing the persistent header to true (something which is highly recommended for reliable messaging) and setting the destination:
# define the source message queue
<incoming-queue>
path = /var/spool/sender
</incoming-queue>
# modify the message header on the fly
callback-code = <<EOF
hdr["destination"] = "/queue/app1.data"
hdr["persistent"] = "true"
EOF
# define the destination broker
<outgoing-broker>
uri = "amqp://broker.acme.com:5672/virtual_host"
</outgoing-broker>
# miscellaneous options
reliable = true
pidfile = /var/run/sender.pid
daemon = true
loop = true
remove = true
Here is an example of a configuration file for a message shoveler (from broker to broker), clearing some headers on the fly so that messages can be replayed safely:
# define the source broker
<incoming-broker>
uri = "amqp://broker.acme.com:5672/virtual_host"
</incoming-broker>
# define the subscriptions
<subscribe>
destination = /queue/app1.data
</subscribe>
<subscribe>
destination = /queue/app2.data
</subscribe>
# define the destination broker
<outgoing-broker>
uri = "amqp://dev-broker.acme.com:5672/virtual_host"
</outgoing-broker>
# modify the message destination
callback-code = <<EOF
hdr["destination"] = "/queue/dest_to_be_replayed"
EOF
Here is an example of a configuration file for a message receiver (from broker to queue):
# define the source broker
<incoming-broker>
uri = "amqp://broker.acme.com:5672/virtual_host"
<auth>
scheme = plain
name = receiver
pass = secret
</auth>
</incoming-broker>
# define the subscriptions
<subscribe>
destination = /queue/app1.data
</subscribe>
<subscribe>
destination = /queue/app2.data
</subscribe>
# define the destination message queue
<outgoing-queue>
path = /var/spool/receiver
</outgoing-queue>
# miscellaneous options
pidfile = /var/run/receiver.pid
To run it as a daemon:
$ amqpclt --conf test.conf --daemon
To use the configuration file above with some options on the command line to drain the queues:
$ amqpclt --conf test.conf --timeout-inactivity 10
Callback code can also be used to tap messages, i.e. get a copy of all messages processed by amqpclt. Here is some callback code for this purpose that could for instance be merged with the shoveling code above. It also shows how to use the –callback-data option:
callback-code = <<EOF
def start(self, path, qtype="DQS"):
self.tap_queue = queue.new({"path" : path, "type" : qtype})
def check(self, msg):
self.tap_queue.add_message(msg)
return msg
EOF
Callback data must be given to specify which message queue to use:
$ amqpclt --conf tap.conf --callback-data "/tmp/tap,DQS"
Massimo Paladin <massimo.paladin@gmail.com> - Copyright (C) 2013 CERN