Pure python AMQP 0.9.1 asynchronous client library
OTHER License
.. image:: https://coveralls.io/repos/github/mosquito/aiormq/badge.svg?branch=master :target: https://coveralls.io/github/mosquito/aiormq?branch=master :alt: Coveralls
.. image:: https://img.shields.io/pypi/status/aiormq.svg :target: https://github.com/mosquito/aiormq :alt: Status
.. image:: https://github.com/mosquito/aiormq/workflows/tests/badge.svg :target: https://github.com/mosquito/aiormq/actions?query=workflow%3Atests :alt: Build status
.. image:: https://img.shields.io/pypi/v/aiormq.svg :target: https://pypi.python.org/pypi/aiormq/ :alt: Latest Version
.. image:: https://img.shields.io/pypi/wheel/aiormq.svg :target: https://pypi.python.org/pypi/aiormq/
.. image:: https://img.shields.io/pypi/pyversions/aiormq.svg :target: https://pypi.python.org/pypi/aiormq/
.. image:: https://img.shields.io/pypi/l/aiormq.svg :target: https://github.com/mosquito/aiormq/blob/master/LICENSE.md
aiormq is a pure python AMQP client library.
.. contents:: Table of contents
Connecting by URL
amqp example: amqp://user:[email protected]/vhost
secure amqp example: amqps://user:[email protected]/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0
Buffered queue for received frames
Only PLAIN
_ auth mechanism support
Publisher confirms
_ support
Transactions
_ support
Channel based asynchronous locks
.. note:: AMQP 0.9.1 requires serialize sending for some frame types on the channel. e.g. Content body must be following after content header. But frames might be sent asynchronously on another channels.
Tracking unroutable messages (Use connection.channel(on_return_raises=False) for disabling)
Full SSL/TLS support, using your choice of:
amqps://
url query parameters:
cafile=
- string contains path to ca certificate filecapath=
- string contains path to ca certificatescadata=
- base64 encoded ca certificate datakeyfile=
- string contains path to key filecertfile=
- string contains path to certificate fileno_verify_ssl
- boolean disables certificates validationcontext=
SSLContext
_ keyword argument to connect()
.Python type hints
_
Uses pamqp
_ as an AMQP 0.9.1 frame encoder/decoder
.. _Publisher confirms: https://www.rabbitmq.com/confirms.html .. _Transactions: https://www.rabbitmq.com/semantics.html .. _PLAIN: https://www.rabbitmq.com/authentication.html .. _type hints: https://docs.python.org/3/library/typing.html .. _pamqp: https://pypi.org/project/pamqp/ .. _SSLContext: https://docs.python.org/3/library/ssl.html#ssl.SSLContext
Simple consumer
.. code-block:: python
import asyncio
import aiormq
async def on_message(message):
"""
on_message doesn't necessarily have to be defined as async.
Here it is to show that it's possible.
"""
print(f" [x] Received message {message!r}")
print(f"Message body is: {message.body!r}")
print("Before sleep!")
await asyncio.sleep(5) # Represents async I/O operations
print("After sleep!")
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
# Declaring queue
declare_ok = await channel.queue_declare('helo')
consume_ok = await channel.basic_consume(
declare_ok.queue, on_message, no_ack=True
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()
Simple publisher
.. code-block:: python :name: test_simple_publisher
import asyncio
from typing import Optional
import aiormq
from aiormq.abc import DeliveredMessage
MESSAGE: Optional[DeliveredMessage] = None
async def main():
global MESSAGE
body = b'Hello World!'
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost//")
# Creating a channel
channel = await connection.channel()
declare_ok = await channel.queue_declare("hello", auto_delete=True)
# Sending the message
await channel.basic_publish(body, routing_key='hello')
print(f" [x] Sent {body}")
MESSAGE = await channel.basic_get(declare_ok.queue)
print(f" [x] Received message from {declare_ok.queue!r}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
assert MESSAGE is not None
assert MESSAGE.routing_key == "hello"
assert MESSAGE.body == b'Hello World!'
Create new task
.. code-block:: python
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
body = b' '.join(sys.argv[1:]) or b"Hello World!"
# Sending the message
await channel.basic_publish(
body,
routing_key='task_queue',
properties=aiormq.spec.Basic.Properties(
delivery_mode=1,
)
)
print(f" [x] Sent {body!r}")
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Simple worker
.. code-block:: python
import asyncio
import aiormq
import aiormq.abc
async def on_message(message: aiormq.abc.DeliveredMessage):
print(f" [x] Received message {message!r}")
print(f" Message body is: {message.body!r}")
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
# Declaring queue
declare_ok = await channel.queue_declare('task_queue', durable=True)
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# we enter a never-ending loop that waits for data and runs
# callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Publisher
.. code-block:: python
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare(
exchange='logs', exchange_type='fanout'
)
body = b' '.join(sys.argv[1:]) or b"Hello World!"
# Sending the message
await channel.basic_publish(
body, routing_key='info', exchange='logs'
)
print(f" [x] Sent {body!r}")
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Subscriber
.. code-block:: python
import asyncio
import aiormq
import aiormq.abc
async def on_message(message: aiormq.abc.DeliveredMessage):
print(f"[x] {message.body!r}")
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
await channel.exchange_declare(
exchange='logs', exchange_type='fanout'
)
# Declaring queue
declare_ok = await channel.queue_declare(exclusive=True)
# Binding the queue to the exchange
await channel.queue_bind(declare_ok.queue, 'logs')
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(' [*] Waiting for logs. To exit press CTRL+C')
loop.run_forever()
Direct consumer
.. code-block:: python
import sys
import asyncio
import aiormq
import aiormq.abc
async def on_message(message: aiormq.abc.DeliveredMessage):
print(f" [x] {message.delivery.routing_key!r}:{message.body!r}"
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = aiormq.Connection("amqp://guest:guest@localhost/")
await connection.connect()
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
severities = sys.argv[1:]
if not severities:
sys.stderr.write(f"Usage: {sys.argv[0]} [info] [warning] [error]\n")
sys.exit(1)
# Declare an exchange
await channel.exchange_declare(
exchange='logs', exchange_type='direct'
)
# Declaring random queue
declare_ok = await channel.queue_declare(durable=True, auto_delete=True)
for severity in severities:
await channel.queue_bind(
declare_ok.queue, 'logs', routing_key=severity
)
# Start listening the random queue
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Emitter
.. code-block:: python
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare(
exchange='logs', exchange_type='direct'
)
body = (
b' '.join(arg.encode() for arg in sys.argv[2:])
or
b"Hello World!"
)
# Sending the message
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'info'
await channel.basic_publish(
body, exchange='logs', routing_key=routing_key,
properties=aiormq.spec.Basic.Properties(
delivery_mode=1
)
)
print(f" [x] Sent {body!r}")
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Publisher
.. code-block:: python
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare('topic_logs', exchange_type='topic')
routing_key = (
sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
)
body = (
b' '.join(arg.encode() for arg in sys.argv[2:])
or
b"Hello World!"
)
# Sending the message
await channel.basic_publish(
body, exchange='topic_logs', routing_key=routing_key,
properties=aiormq.spec.Basic.Properties(
delivery_mode=1
)
)
print(f" [x] Sent {body!r}")
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Consumer
.. code-block:: python
import asyncio
import sys
import aiormq
import aiormq.abc
async def on_message(message: aiormq.abc.DeliveredMessage):
print(f" [x] {message.delivery.routing_key!r}:{message.body!r}")
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = await aiormq.connect(
"amqp://guest:guest@localhost/", loop=loop
)
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
# Declare an exchange
await channel.exchange_declare('topic_logs', exchange_type='topic')
# Declaring queue
declare_ok = await channel.queue_declare('task_queue', durable=True)
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write(
f"Usage: {sys.argv[0]} [binding_key]...\n"
)
sys.exit(1)
for binding_key in binding_keys:
await channel.queue_bind(
declare_ok.queue, 'topic_logs', routing_key=binding_key
)
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for
# data and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
RPC server
.. code-block:: python
import asyncio
import aiormq
import aiormq.abc
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
async def on_message(message:aiormq.abc.DeliveredMessage):
n = int(message.body.decode())
print(f" [.] fib({n})")
response = str(fib(n)).encode()
await message.channel.basic_publish(
response, routing_key=message.header.properties.reply_to,
properties=aiormq.spec.Basic.Properties(
correlation_id=message.header.properties.correlation_id
),
)
await message.channel.basic_ack(message.delivery.delivery_tag)
print('Request complete')
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
# Declaring queue
declare_ok = await channel.queue_declare('rpc_queue')
# Start listening the queue with name 'hello'
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [x] Awaiting RPC requests")
loop.run_forever()
RPC client
.. code-block:: python
import asyncio
import uuid
import aiormq
import aiormq.abc
class FibonacciRpcClient:
def __init__(self):
self.connection = None # type: aiormq.Connection
self.channel = None # type: aiormq.Channel
self.callback_queue = ''
self.futures = {}
self.loop = loop
async def connect(self):
self.connection = await aiormq.connect("amqp://guest:guest@localhost/")
self.channel = await self.connection.channel()
declare_ok = await self.channel.queue_declare(
exclusive=True, auto_delete=True
)
await self.channel.basic_consume(declare_ok.queue, self.on_response)
self.callback_queue = declare_ok.queue
return self
async def on_response(self, message: aiormq.abc.DeliveredMessage):
future = self.futures.pop(message.header.properties.correlation_id)
future.set_result(message.body)
async def call(self, n):
correlation_id = str(uuid.uuid4())
future = loop.create_future()
self.futures[correlation_id] = future
await self.channel.basic_publish(
str(n).encode(), routing_key='rpc_queue',
properties=aiormq.spec.Basic.Properties(
content_type='text/plain',
correlation_id=correlation_id,
reply_to=self.callback_queue,
)
)
return int(await future)
async def main():
fibonacci_rpc = await FibonacciRpcClient().connect()
print(" [x] Requesting fib(30)")
response = await fibonacci_rpc.call(30)
print(r" [.] Got {response!r}")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())