EOX GitLab Instance

Commit 9bd367d0 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Implementing async queue using aioredis

Adding tests and improving README
parent 7170d37d
Pipeline #13221 failed with stage
in 30 seconds
======================
View Server Task Queue
======================
============================
vsq - View Server Task Queue
============================
.. image:: https://img.shields.io/pypi/v/vsq.svg
:target: https://pypi.python.org/pypi/vsq
* License: MIT license
.. image:: https://img.shields.io/travis/constantinius/vsq.svg
:target: https://travis-ci.com/constantinius/vsq
.. image:: https://readthedocs.org/projects/vsq/badge/?version=latest
:target: https://vsq.readthedocs.io/en/latest/?version=latest
:alt: Documentation Status
Installation
------------
Install with pip
.. code-block:: bash
pip install vsq
Python Boilerplate contains all the boilerplate you need to create a Python package.
Usage
-----
* Free software: MIT license
* Documentation: https://vsq.readthedocs.io.
Producer:
.. code-block:: python
from redis import Redis
from vsq.queue import Queue
# create a queue instance
queue = Queue('queue_name', Redis(...))
# send a message, which is anything that can be encoded as JSON.
# a Task object is returned.
task = queue.put({
'type': 'myMessageType',
'payload': ['some', 'values'],
})
# some metadata of that task
print(task.id, task.status, task.created)
# wait for the task result, ideally with a timeout
result = task.get(timeout=120)
Consumer:
.. code-block:: python
from redis import Redis
from vsq.queue import Queue
# create a queue instance
queue = Queue('queue_name', Redis(...))
# get a single task (with or without timeout):
task = queue.get_task(timeout=None)
# a task offers an error capturing context manager API
with task:
...
task.result = {
'some': 'result'
}
# a queue can be iterated, when listening on an arbitrary amount of messages
for task in queue:
with task:
...
task.result = {
'some': 'result'
}
Async Queue
...........
``vsq`` provides an async interface mirroring the synchronous one:
.. code-block:: python
import aioredis
from vsq.aqueue import Queue
# create a queue instance
queue = Queue('queue_name', aioredis.from_url('redis://host:6379'))
# send a message, which is anything that can be encoded as JSON.
# a Task object is returned. This is an asynchronous method call
task = await queue.put({
'type': 'myMessageType',
'payload': ['some', 'values'],
})
# some metadata of that task
print(task.id, task.status, task.created)
# wait for the task result, ideally with a timeout. Also this is
# asynchronous
result = await task.get(timeout=120)
Consumer:
.. code-block:: python
import aioredis
from vsq.queue import Queue
# create a queue instance
queue = Queue('queue_name', aioredis.from_url('redis://host:6379'))
# get a single task (with or without timeout):
task = await queue.get_task(timeout=None)
# a task offers an error capturing context manager API
async with task:
...
task.result = {
'some': 'result
}
# a queue can be iterated, when listening on an arbitrary amount of messages
async for task in queue:
async with task:
...
task.result = {
'some': 'result'
}
Features
--------
* TODO
``vsq`` provides command line interfaces to easily start a task daemon or to
dispatch messages.
.. code-block:: bash
# start a task daemon with a given handler function
python -m vsq.queue daemon myq mymodule.somehandler
# send a json encoded message
python -m vsq.queue message myq --json '{"type": "message", "args": "abc"}' --wait
Credits
-------
......
......@@ -11,4 +11,5 @@ Click==7.0
pytest==4.6.5
pytest-runner==5.1
redis==3.5.3
aioredis==1.3.1
aioredis==2.0.0a1
pytest-asyncio==0.15.1
\ No newline at end of file
import os
from datetime import datetime
import pytest
import aioredis
from vsq.aqueue import (
Queue, TaskFailedException, TaskTimeoutException,
TaskStatus, ResponseScheme,
)
@pytest.fixture
async def redis():
redis = await aioredis.from_url(
f"redis://{os.environ.get('REDIS_HOST', 'localhost')}:6379"
)
yield redis
await redis.flushall()
@pytest.fixture
def queue(redis):
yield Queue(
'name',
redis,
)
@pytest.fixture
def queue_pubsub(redis):
yield Queue(
'name',
redis,
response_scheme=ResponseScheme.PUBSUB,
)
@pytest.mark.asyncio
async def test_basic(queue):
# producer
msg = {
'type': 'test',
'arg': 1
}
orig_task = await queue.put(msg)
prev_id = orig_task.id
assert orig_task.status == TaskStatus.ACCEPTED
assert orig_task.id
assert orig_task.message == msg
assert orig_task.result is None
assert orig_task.error is None
assert isinstance(orig_task.created, datetime)
assert orig_task.started is None
assert orig_task.finished is None
# consumer
received_task = await queue.get_task()
async with received_task:
assert received_task.status == TaskStatus.PROCESSING
assert received_task.id == prev_id
assert received_task.message == msg
assert received_task.result is None
assert received_task.error is None
assert isinstance(received_task.created, datetime)
assert isinstance(received_task.started, datetime)
assert received_task.finished is None
received_task.result = {'result': True}
# producer - result
result = await orig_task.get()
assert result == {'result': True}
@pytest.mark.asyncio
async def test_basic_pubsub(queue_pubsub):
# producer
msg = {
'type': 'test',
'arg': 1
}
orig_task = await queue_pubsub.put(msg)
prev_id = orig_task.id
assert orig_task.status == TaskStatus.ACCEPTED
assert orig_task.id
assert orig_task.message == msg
assert orig_task.result is None
assert orig_task.error is None
assert isinstance(orig_task.created, datetime)
assert orig_task.started is None
assert orig_task.finished is None
# consumer
received_task = await queue_pubsub.get_task()
async with received_task:
assert received_task.status == TaskStatus.PROCESSING
assert received_task.id == prev_id
assert received_task.message == msg
assert received_task.result is None
assert received_task.error is None
assert isinstance(received_task.created, datetime)
assert isinstance(received_task.started, datetime)
assert received_task.finished is None
received_task.result = {'result': True}
# producer - result
result = await orig_task.get()
assert result == {'result': True}
@pytest.mark.asyncio
async def test_for(queue):
# producer
await queue.put(1)
await queue.put(2)
await queue.put(3)
# consumer
messages = []
async for task in queue:
messages.append(task.message)
if len(messages) == 3:
break
assert messages == [1, 2, 3]
@pytest.mark.asyncio
async def test_error(queue):
# producer
task = await queue.put(1)
# consumer
received_task = await queue.get_task()
async with received_task:
raise ValueError(f'Unexpected value {received_task.message}')
with pytest.raises(TaskFailedException):
await task.get(0.25)
@pytest.mark.asyncio
async def test_error_pubsub(queue_pubsub):
# producer
task = await queue_pubsub.put(1)
# consumer
received_task = await queue_pubsub.get_task()
async with received_task:
raise ValueError(f'Unexpected value {received_task.message}')
with pytest.raises(TaskFailedException):
await task.get(0.25)
@pytest.mark.asyncio
async def test_timeout(queue):
with pytest.raises(TaskTimeoutException):
await queue.get_task(0.25)
@pytest.mark.asyncio
async def test_timeout_pubsub(queue_pubsub):
with pytest.raises(TaskTimeoutException):
await queue_pubsub.get_task(0.25)
......@@ -24,3 +24,264 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# ------------------------------------------------------------------------------
import sys
import enum
import json
from dataclasses import dataclass
from typing import Optional
from traceback import format_tb
import logging
from importlib import import_module
from time import time
from asyncio import TimeoutError
from aioredis import Redis
import click
from .common import (
TaskStatus, MessageType, now,
TaskFailedException, TaskTimeoutException,
TaskCommon,
)
from .logging import setup_logging
logger = logging.getLogger(__name__)
class ResponseChannel:
async def send_response(self, response: bytes):
raise NotImplementedError()
async def wait_for_response(self, timeout: float = None):
raise NotImplementedError()
class ListResponseChannel(ResponseChannel):
def __init__(self, redis: Redis, channel_name: str,
expires: int = 120):
self.redis = redis
self.channel_name = channel_name
self.expires = expires
async def send_response(self, response: bytes):
await self.redis.lpush(self.channel_name, response)
if self.expires is not None:
await self.redis.expire(
self.channel_name, self.expires
)
async def wait_for_response(self, timeout: float = None):
_, response = await self.redis.brpop(
self.channel_name, timeout=timeout
)
return response
class PubSubResponseChannel(ResponseChannel):
def __init__(self, redis: Redis, channel_name: str, pubsub):
self.redis = redis
self.channel_name = channel_name
self.pubsub = pubsub
async def send_response(self, response: bytes):
await self.redis.publish(self.channel_name, response)
async def wait_for_response(self, timeout: float = None):
while timeout is None or timeout > 0.0:
current = time()
message = await self.pubsub.get_message(timeout=timeout)
if message['type'] == 'message':
return message['data']
if timeout is not None:
timeout -= time() - current
@dataclass
class Task(TaskCommon):
response_channel: Optional[ResponseChannel] = None
# Consumer API
async def __aenter__(self):
self.status = TaskStatus.PROCESSING
self.started = now()
return self
async def __aexit__(self, etype, value, traceback):
if etype is None:
self.status = TaskStatus.DONE
self.finished = now()
else:
self.status = TaskStatus.FAILED
self.error = {
'type': str(etype),
'value': str(value),
'traceback': format_tb(traceback),
}
await self.response_channel.send_response(self.encode())
return True
# Producer API
async def get(self, timeout: float = None) -> MessageType:
""" Wait for the task result and return its result. Raise a
``TaskFailedException`` if the task failed.
Optionally a timeout can be specified to abort when a certain
time has passed. This raises a ``TaskTimeoutException``
"""
raw = await self.response_channel.wait_for_response(timeout)
received = self.decode(raw)
if received.status == TaskStatus.FAILED:
raise TaskFailedException(received.error)
return received.result
class MessageScheme(enum.Enum):
LPUSH_RPOP = 'LPUSH_RPOP'
LPUSH_LPOP = 'LPUSH_LPOP'
RPUSH_LPOP = 'RPUSH_LPOP'
RPUSH_RPOP = 'RPUSH_RPOP'
SADD_SPOP = 'SADD_SPOP'
class ResponseScheme(enum.Enum):
LPUSH_RPOP = 'LPUSH_RPOP'
PUBSUB = 'PUBSUB'
class Queue:
def __init__(self, queue_name: str, redis: Redis,
message_scheme: MessageScheme = MessageScheme.LPUSH_RPOP,
response_scheme: ResponseScheme = ResponseScheme.LPUSH_RPOP,
response_channel_template: str = 'response_{id}',
response_channel_expires: Optional[int] = 120):
self.queue_name = queue_name
self.redis = redis
self.message_scheme = message_scheme
self.response_scheme = response_scheme
self.response_channel_template = response_channel_template
self.response_channel_expires = response_channel_expires
async def _get_response_channel(self, task):
channel_name = self.response_channel_template.format(
id=task.id,
)
if self.response_scheme == ResponseScheme.LPUSH_RPOP:
return ListResponseChannel(self.redis, channel_name)
elif self.response_scheme == ResponseScheme.PUBSUB:
pubsub = self.redis.pubsub()
await pubsub.subscribe(channel_name)
return PubSubResponseChannel(self.redis, channel_name, pubsub)
async def put(self, message: MessageType, msg_id: str = None) -> Task:
if msg_id is not None:
task = Task(id=msg_id, message=message)
else:
task = Task(message=message)
task.response_channel = await self._get_response_channel(task)
encoded = task.encode()
if self.message_scheme in (MessageScheme.LPUSH_RPOP, MessageScheme.LPUSH_LPOP):
await self.redis.lpush(self.queue_name, encoded)
elif self.message_scheme in (MessageScheme.RPUSH_RPOP, MessageScheme.RPUSH_LPOP):
await self.redis.rpush(self.queue_name, encoded)
elif self.message_scheme == MessageScheme.SADD_SPOP:
await self.redis.sadd(self.queue_name, encoded)
return task
async def get_task(self, timeout: float = None) -> Task:
try:
if self.message_scheme in (MessageScheme.RPUSH_LPOP, MessageScheme.LPUSH_LPOP):
result = await self.redis.blpop(
self.queue_name, timeout=timeout
)
elif self.message_scheme in (MessageScheme.RPUSH_RPOP, MessageScheme.LPUSH_RPOP):
result = await self.redis.brpop(
self.queue_name, timeout=timeout
)
elif self.message_scheme == MessageScheme.SADD_SPOP:
result = await self.redis.spop(
self.queue_name, timeout=timeout
)
if result is None:
raise TaskTimeoutException()
_, raw_value = result
task = Task.decode(raw_value)
task.response_channel = await self._get_response_channel(task)
return task
except TimeoutError:
raise TaskTimeoutException()
async def __aiter__(self):
while True:
yield await self.get_task()
@click.group()
@click.option('--host', type=str)
@click.option('--port', show_default=True, default=6379, type=int)
@click.option('--debug', type=bool)
@click.pass_context
def cli(ctx, host, port, debug):
ctx.ensure_object(dict)
ctx.obj['redis'] = Redis(host=host, port=port)
ctx.obj['debug'] = debug
setup_logging(debug)
return 0
@cli.command()
@click.argument('name', type=str)
@click.argument('handler', type=str)
@click.pass_context
def daemon(ctx, name, handler):
"""Start a task daemon listening on the specified queue"""
handler_mod, _, handler_name = handler.rpartition('.')
handler_func = getattr(import_module(handler_mod), handler_name)
queue = Queue(name, ctx.obj['redis'])
logger.debug(f"waiting for tasks on queue '{name}'...")
for task in queue:
with task:
task.result = handler_func(task.message)
@cli.command()
@click.argument('name', type=str)
@click.argument('value', type=str)
@click.option('-j', '--json', 'as_json', is_flag=True, type=bool)
@click.option('-w', '--wait', is_flag=True, type=bool)
@click.pass_context
def message(ctx, name, value, as_json=False, wait=False):
"""Send a message to the specified queue"""
message = value
if as_json:
message = json.loads(message)
queue = Queue(name, ctx.obj['redis'])
task = queue.put(message)
if wait:
try:
result = task.get()
print(result)
return 0
except TaskFailedException as e:
logger.exception(e)
return 1
if __name__ == "__main__":
sys.exit(cli()) # pragma: no cover
......@@ -26,9 +26,11 @@
# ------------------------------------------------------------------------------
import enum
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, List, Union
import json
from typing import Dict, List, Union, Optional
from uuid import uuid4
MessageType = Union[
str, int, float, List['MessageType'], Dict[str, 'MessageType']
......@@ -57,3 +59,63 @@ class TaskStatus(enum.Enum):
PROCESSING = 'PROCESSING'
DONE = 'DONE'
FAILED = 'FAILED'
@dataclass