RabbitMQ python example with expire and reject

Based on: http://www.rabbitmq.com/tutorials/tutorial-two-python.html

This example demonstrates how to reject jobs that can not be done at moment on current worker, handling retries count and job expiration

send.py

#!/usr/bin/env python
import pika
import time
import datetime
import json
import sys

count = int(sys.argv[1]) # read from command line arguments count of jobs to create
queue = 'retries' # queue name

''' example of more robust pika.ConnectionParameters
host='localhost',
port=5672,
virtual_host='/',
credentials=pika.credentials.PlainCredentials(
	username='guest',
	password='guest'
)
'''
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True) # durable=True - makes queue persistent

for i in range(1, count + 1):
	message = "item %d" % i
	timestamp = time.time()
	now = datetime.datetime.now()
	expire = 1000 * int((now.replace(hour=23, minute=59, second=59, microsecond=999999) - now).total_seconds())
	headers = { # example how headers can be used
		'hello': 'world',
		'created': int(timestamp)
	}
	data = { # example hot to transfer objects rather than string using json.dumps and json.loads
		'keyword': message,
		'domain': message,
		'created': int(timestamp),
		'expire': expire
	}
	channel.basic_publish(
		exchange='',
		routing_key=queue,
		body=json.dumps(data), # must be string
		properties=pika.BasicProperties(
			delivery_mode=2, # makes persistent job
			priority=0, # default priority
			timestamp=timestamp, # timestamp of job creation
			expiration=str(expire), # job expiration (milliseconds from now), must be string, handled by rabbitmq
			headers=headers
		))
	print "[>] Sent %r" % message

connection.close()

worker.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pika
import time
import datetime
import json
import sys

stop_word = sys.argv[1]
max_retries = 3
queue = 'retries'

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)

print '[*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
	#print properties.headers.get('hello')
	data = json.loads(body)
	print "[>] Received '%s' (try: %d)" % (data.get('keyword'), 1 + int(properties.priority))

	if properties.priority >= max_retries - 1: # example handling retries
		ch.basic_ack(delivery_tag=method.delivery_tag)
		print "[!] '%s' rejected after %d retries" % (data.get('keyword'), 1 + int(properties.priority))
	else:
		try:
			if data.get('keyword') == stop_word: # example - rejeceting job
				raise Exception('Stop word detected')

			time.sleep(len(data.get('keyword')))
			ch.basic_ack(delivery_tag=method.delivery_tag)
			print "[+] Done"

		except:
			timestamp = time.time()
			now = datetime.datetime.now()
			expire = 1000 * int((now.replace(hour=23, minute=59, second=59, microsecond=999999) - now).total_seconds())

			# to reject job we create new one with other priority and expiration
			channel.basic_publish(exchange='', routing_key=queue, body=json.dumps(data),
				properties=pika.BasicProperties(delivery_mode=2, priority=int(properties.priority) + 1, timestamp=timestamp, expiration=str(expire), headers=properties.headers))
			# also do not forget to send back acknowledge about job
			ch.basic_ack(delivery_tag=method.delivery_tag)
			print "[!] Rejected, going to sleep for a while"
			time.sleep(10)

	print

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue)

try:
	channel.start_consuming()
except KeyboardInterrupt:
	channel.stop_consuming();

connection.close()

How to run

python send.py 6
[>] Sent 'item 1'
[>] Sent 'item 2'
[>] Sent 'item 3'
[>] Sent 'item 4'
[>] Sent 'item 5'
[>] Sent 'item 6'

python worker.py "item 3"
[*] Waiting for messages. To exit press CTRL+C
[>] Received 'item 1' (try: 1)
[+] Done

[>] Received 'item 3' (try: 1)
[!] Rejected, going to sleep for a while

[>] Received 'item 5' (try: 1)
[+] Done

python worker.py "item 5"
[*] Waiting for messages. To exit press CTRL+C
[>] Received 'item 2' (try: 1)
[+] Done

[>] Received 'item 4' (try: 1)
[+] Done

[>] Received 'item 6' (try: 1)
[+] Done

[>] Received 'item 3' (try: 2)
[+] Done