Examples

Hello World

import asyncio
import asynqp


@asyncio.coroutine
def hello_world():
    """
    Sends a 'hello world' message and then reads it from the queue.
    """
    # connect to the RabbitMQ broker
    connection = yield from asynqp.connect('localhost', 5672, username='guest', password='guest')

    # Open a communications channel
    channel = yield from connection.open_channel()

    # Create a queue and an exchange on the broker
    exchange = yield from channel.declare_exchange('test.exchange', 'direct')
    queue = yield from channel.declare_queue('test.queue')

    # Bind the queue to the exchange, so the queue will get messages published to the exchange
    yield from queue.bind(exchange, 'routing.key')

    # If you pass in a dict it will be automatically converted to JSON
    msg = asynqp.Message({'hello': 'world'})
    exchange.publish(msg, 'routing.key')

    # Synchronously get a message from the queue
    received_message = yield from queue.get()
    print(received_message.json())  # get JSON from incoming messages easily

    # Acknowledge a delivered message
    received_message.ack()

    yield from channel.close()
    yield from connection.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(hello_world())

Reconnecting

'''
Example async consumer and publisher that will reconnect
automatically when a connection to rabbitmq is broken and
restored.
Note that no attempt is made to re-send messages that are
generated while the connection is down.
'''
import asyncio
import asynqp
from asyncio.futures import InvalidStateError


# Global variables are ugly, but this is a simple example
CHANNELS = []
CONNECTION = None
CONSUMER = None
PRODUCER = None


@asyncio.coroutine
def setup_connection(loop):
    # connect to the RabbitMQ broker
    connection = yield from asynqp.connect('localhost',
                                           5672,
                                           username='guest',
                                           password='guest')
    return connection


@asyncio.coroutine
def setup_exchange_and_queue(connection):
    # Open a communications channel
    channel = yield from connection.open_channel()

    # Create a queue and an exchange on the broker
    exchange = yield from channel.declare_exchange('test.exchange', 'direct')
    queue = yield from channel.declare_queue('test.queue')

    # Save a reference to each channel so we can close it later
    CHANNELS.append(channel)

    # Bind the queue to the exchange, so the queue will get messages published to the exchange
    yield from queue.bind(exchange, 'routing.key')

    return exchange, queue


@asyncio.coroutine
def setup_consumer(connection):
    # callback will be called each time a message is received from the queue
    def callback(msg):
        print('Received: {}'.format(msg.body))
        msg.ack()

    _, queue = yield from setup_exchange_and_queue(connection)

    # connect the callback to the queue
    consumer = yield from queue.consume(callback)
    return consumer


@asyncio.coroutine
def setup_producer(connection):
    '''
    The producer will live as an asyncio.Task
    to stop it call Task.cancel()
    '''
    exchange, _ = yield from setup_exchange_and_queue(connection)

    count = 0
    while True:
        msg = asynqp.Message('Message #{}'.format(count))
        exchange.publish(msg, 'routing.key')
        yield from asyncio.sleep(1)
        count += 1


@asyncio.coroutine
def start(loop):
    '''
    Creates a connection, starts the consumer and producer.
    If it fails, it will attempt to reconnect after waiting
    1 second
    '''
    global CONNECTION
    global CONSUMER
    global PRODUCER
    try:
        CONNECTION = yield from setup_connection(loop)
        CONSUMER = yield from setup_consumer(CONNECTION)
        PRODUCER = loop.create_task(setup_producer(CONNECTION))
    # Multiple exceptions may be thrown, ConnectionError, OsError
    except Exception:
        print('failed to connect, trying again.')
        yield from asyncio.sleep(1)
        loop.create_task(start(loop))


@asyncio.coroutine
def stop():
    '''
    Cleans up connections, channels, consumers and producers
    when the connection is closed.
    '''
    global CHANNELS
    global CONNECTION
    global PRODUCER
    global CONSUMER

    yield from CONSUMER.cancel()  # this is a coroutine
    PRODUCER.cancel()  # this is not

    for channel in CHANNELS:
        yield from channel.close()
    CHANNELS = []

    if CONNECTION is not None:
        try:
            yield from CONNECTION.close()
        except InvalidStateError:
            pass  # could be automatically closed, so this is expected
        CONNECTION = None


def connection_lost_handler(loop, context):
    '''
    Here we setup a custom exception handler to listen for
    ConnectionErrors.

    The exceptions we can catch follow this inheritance scheme

        - ConnectionError - base
            |
            - asynqp.exceptions.ConnectionClosedError - connection closed properly
                |
                - asynqp.exceptions.ConnectionLostError - closed unexpectedly
    '''
    exception = context.get('exception')
    if isinstance(exception, asynqp.exceptions.ConnectionClosedError):
        print('Connection lost -- trying to reconnect')
        # close everything before recpnnecting
        close_task = loop.create_task(stop())
        asyncio.wait_for(close_task, None)
        # reconnect
        loop.create_task(start(loop))
    else:
        # default behaviour
        loop.default_exception_handler(context)


loop = asyncio.get_event_loop()
loop.set_exception_handler(connection_lost_handler)
loop.create_task(start(loop))
loop.run_forever()