EOX GitLab Instance

Commit 287397e9 authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Restructuring queue

Adding testing via gitlab CI
parent bbddc9fd
image: python:latest
before_script:
- pip install -r requirements_dev.txt
- pip install .
test:
services:
- redis:latest
script:
- pytest
publish:
script:
- pip install twine
- python setup.py sdist bdist_wheel
- TWINE_PASSWORD=${CI_JOB_TOKEN} TWINE_USERNAME=gitlab-ci-token python -m twine upload --repository-url https://gitlab.eox.at/api/v4/projects/${CI_PROJECT_ID}/packages/pypi dist/*
only:
- tags
......@@ -9,4 +9,6 @@ Sphinx==1.8.5
twine==1.14.0
Click==7.0
pytest==4.6.5
pytest-runner==5.1
\ No newline at end of file
pytest-runner==5.1
redis==3.5.3
aioredis==1.3.1
......@@ -19,22 +19,21 @@ test_requirements = ['pytest>=3', ]
setup(
author="Fabian Schindler",
author_email='fabian.schindler@eox.at',
python_requires='>=3.5',
python_requires='>=3.6',
classifiers=[
'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Natural Language :: English',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
],
description="Python Boilerplate contains all the boilerplate you need to create a Python package.",
description="",
entry_points={
'console_scripts': [
'vsq=vsq.cli:main',
'vsq-sync=vsq.queue:cli',
],
},
install_requires=requirements,
......@@ -47,7 +46,11 @@ setup(
setup_requires=setup_requirements,
test_suite='tests',
tests_require=test_requirements,
url='https://github.com/constantinius/vsq',
extras_require={
'sync': ['redis'],
'async': ['aioredis'],
},
url='https://gitlab.eox.at/esa/prism/vsq',
version='0.0.1',
zip_safe=False,
)
......@@ -8,19 +8,22 @@ from vsq.queue import (
Queue, TaskFailedException, TaskTimeoutException, TaskStatus
)
@pytest.fixture
def queue():
def redis():
redis = Redis(
'localhost',
6379,
)
yield redis
redis.flushall()
@pytest.fixture
def queue(redis):
yield Queue(
'name',
redis,
# fakeredis.FakeStrictRedis()
)
redis.flushall()
def test_basic(queue):
......@@ -87,9 +90,30 @@ def test_error(queue):
pass
with pytest.raises(TaskFailedException):
task.get()
task.get(0.25)
def test_timeouts(queue):
with pytest.raises(TaskTimeoutException):
queue.get_task(0.25)
def test_pubsub(redis):
p = redis.pubsub()
# redis.publish('test', 'message1')
p.subscribe('test')
redis.publish('test', 'message2')
redis.publish('test', 'message3')
print(p.get_message(timeout=1.1))
print(p.get_message(True, timeout=1.1))
print(p.get_message(True, timeout=1.1))
"""Top-level package for View Server Task Queue."""
# ------------------------------------------------------------------------------
#
# Project: vsq <https://gitlab.eox.at/esa/prism/vsq>
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
# ------------------------------------------------------------------------------
# Copyright (C) 2021 EOX IT Services GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# ------------------------------------------------------------------------------
__author__ = """Fabian Schindler"""
__email__ = 'fabian.schindler@eox.at'
......
# ------------------------------------------------------------------------------
#
# Project: vsq <https://gitlab.eox.at/esa/prism/vsq>
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
# ------------------------------------------------------------------------------
# Copyright (C) 2021 EOX IT Services GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
#
# Project: vsq <https://gitlab.eox.at/esa/prism/vsq>
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
# ------------------------------------------------------------------------------
# Copyright (C) 2021 EOX IT Services GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# ------------------------------------------------------------------------------
import enum
from datetime import datetime, timezone
from typing import Dict, List, Union
......@@ -22,8 +49,7 @@ class TaskFailedException(Exception):
class TaskTimeoutException(Exception):
def __init__(self, error):
self.error = error
pass
class TaskStatus(enum.Enum):
......
# ------------------------------------------------------------------------------
#
# Project: vsq <https://gitlab.eox.at/esa/prism/vsq>
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
# ------------------------------------------------------------------------------
# Copyright (C) 2021 EOX IT Services GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies of this Software or works derived from this Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
# ------------------------------------------------------------------------------
import logging.config
def setup_logging(debug=False):
logging.config.dictConfig({
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'brief': {
'format': '%(levelname)s %(name)s: %(message)s'
}
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG' if debug else 'INFO',
'formatter': 'brief',
}
},
'root': {
'handlers': ['console'],
'level': 'DEBUG' if debug else 'INFO',
}
})
# ------------------------------------------------------------------------------
#
# Project: vsq <https://github.com/geopython/pygeofilter>
# Project: vsq <https://gitlab.eox.at/esa/prism/vsq>
# Authors: Fabian Schindler <fabian.schindler@eox.at>
#
# ------------------------------------------------------------------------------
......@@ -25,21 +25,79 @@
# THE SOFTWARE.
# ------------------------------------------------------------------------------
import sys
import enum
import json
from dataclasses import dataclass, field, asdict
from datetime import datetime
from uuid import uuid4
from weakref import ref
from typing import Optional
from typing import Optional, Any
from traceback import format_tb
import logging
from importlib import import_module
from time import time
import redis
from redis import Redis, TimeoutError
import click
from .common import (
TaskStatus, MessageType, now, QueueClosedError,
TaskFailedException, TaskTimeoutException
)
from .logging import setup_logging
logger = logging.getLogger(__name__)
class ResponseChannel:
def send_response(self, response: bytes):
raise NotImplementedError()
def wait_for_response(self, timeout: float = None):
raise NotImplementedError()
class ListResponseChannel(ResponseChannel):
def __init__(self, redis: Redis, channel_name: str,
expires: int = 120):
self.redis = redis
self.channel_name = channel_name
self.expires = expires
def send_response(self, response: bytes):
self.redis.lpush(self.channel_name, response)
if self.expires is not None:
self.redis.expire(
self.channel_name, self.expires
)
def wait_for_response(self, timeout: float = None):
_, response = self.redis.brpop(self.channel_name, timeout)
return response
class PubSubResponseChannel(ResponseChannel):
def __init__(self, redis: Redis, channel_name: str):
self.redis = redis
self.channel_name = channel_name
self.pubsub = redis.pubsub()
self.pubsub.subscribe(channel_name)
def send_response(self, response: bytes):
self.redis.publish(self.channel_name, response)
def wait_for_response(self, timeout: float = None):
while timeout is None or timeout > 0.0:
current = time()
message = self.pubsub.get_message(timeout=timeout)
if message['type'] == 'message':
value = json.loads(message['data'])
return value
if timeout is not None:
timeout -= time() - current
@dataclass
......@@ -47,7 +105,7 @@ class Task:
"""
"""
queue_ref: ref['Queue']
response_channel: ResponseChannel
status: TaskStatus = TaskStatus.ACCEPTED
id: str = field(default_factory=lambda: uuid4().hex)
message: Optional[MessageType] = None
......@@ -76,43 +134,53 @@ class Task:
'traceback': format_tb(traceback),
}
queue = self.queue_ref()
if not queue:
raise QueueClosedError()
queue.send_task_response(self)
self.response_channel.send_response(self.encode())
return True
def encode(self):
def default(o):
print(o)
if isinstance(o, datetime):
return o.isoformat()
# elif isinstance(o, traceback):
# return format_tb(o)
# elif issubclass(o, Exception):
# return str(o)
elif isinstance(o, Exception):
return str(o)
elif isinstance(o, TaskStatus):
return o.value
dct = asdict(self)
del dct['queue_ref']
dct = {
'status': self.status,
'id': self.id,
'message': self.message,
'result': self.result,
'error': self.error,
'created': self.created,
'started': self.started,
'finished': self.finished,
}
return json.dumps(dct, default=default)
@classmethod
def decode(cls, queue_ref, raw_value):
def decode(cls, response_channel, raw_value) -> 'Task':
values = json.loads(raw_value)
started = (
datetime.fromisoformat(values['started'])
if values['started'] else
None
)
finished = (
datetime.fromisoformat(values['finished'])
if values['finished'] else
None
)
return cls(
queue_ref,
response_channel,
status=TaskStatus(values['status']),
id=values['id'],
message=values['message'],
result=values['result'],
error=values['error'],
created=datetime.fromisoformat(values['created']),
started=datetime.fromisoformat(values['started']) if values['started'] else None,
finished=datetime.fromisoformat(values['finished']) if values['finished'] else None,
started=started,
finished=finished,
)
# Producer API
......@@ -123,11 +191,9 @@ class Task:
Optionally a timeout can be specified to abort when a certain
time has passed. This raises a ``TaskTimeoutException``
"""
queue = self.queue_ref()
if not queue:
raise QueueClosedError()
received = queue.wait_for_task_response(self, timeout)
raw = self.response_channel.wait_for_response(timeout)
received = self.decode(self.response_channel, raw)
if received.status == TaskStatus.FAILED:
raise TaskFailedException(received.error)
......@@ -149,77 +215,125 @@ class ResponseScheme(enum.Enum):
class Queue:
def __init__(self, queue_name: str, connection: redis.Redis,
def __init__(self, queue_name: str, redis: Redis,
message_scheme: MessageScheme = MessageScheme.LPUSH_RPOP,
response_scheme: ResponseScheme = ResponseScheme.LPUSH_RPOP,
response_channel_template: str = 'response_{id}',
response_channel_expires: Optional[int] = 120):
self.queue_name = queue_name
self.connection = connection
self.redis = redis
self.message_scheme = message_scheme
self.response_scheme = response_scheme
self.response_channel_template = response_channel_template
self.response_channel_expires = response_channel_expires
def _get_response_channel(self, task):
channel_name = self.response_channel_template.format(
id=task.id,
)
if self.response_scheme == ResponseScheme.LPUSH_RPOP:
return ListResponseChannel(self.redis, channel_name)
elif self.response_scheme == ResponseScheme.PUBSUB:
return PubSubResponseChannel(self.redis, channel_name)
def put(self, message: MessageType, msg_id: str = None) -> Task:
if msg_id is not None:
task = Task(ref(self), id=msg_id, message=message)
task = Task(None, id=msg_id, message=message)
else:
task = Task(ref(self), message=message)
task = Task(None, message=message)
task.response_channel = self._get_response_channel(task)
encoded = task.encode()
if self.message_scheme in (MessageScheme.LPUSH_RPOP, MessageScheme.LPUSH_LPOP):
self.connection.lpush(self.queue_name, encoded)
self.redis.lpush(self.queue_name, encoded)
elif self.message_scheme in (MessageScheme.RPUSH_RPOP, MessageScheme.RPUSH_LPOP):
self.connection.rpush(self.queue_name, encoded)
self.redis.rpush(self.queue_name, encoded)
elif self.message_scheme == MessageScheme.SADD_SPOP:
self.connection.sadd(self.queue_name, encoded)
self.redis.sadd(self.queue_name, encoded)
return task
def get_task(self, timeout: float = None) -> Task:
try:
if self.message_scheme in (MessageScheme.RPUSH_LPOP, MessageScheme.LPUSH_LPOP):
_, raw_value = self.connection.blpop(self.queue_name)
result = self.redis.blpop(self.queue_name, timeout)
elif self.message_scheme in (MessageScheme.RPUSH_RPOP, MessageScheme.LPUSH_RPOP):
_, raw_value = self.connection.brpop(self.queue_name)
result = self.redis.brpop(self.queue_name, timeout)
elif self.message_scheme == MessageScheme.SADD_SPOP:
_, raw_value = self.connection.spop(self.queue_name)
result = self.redis.spop(self.queue_name, timeout)
return Task.decode(ref(self), raw_value)
except redis.TimeoutError:
if result is None:
raise TaskTimeoutException()
_, raw_value = result
task = Task.decode(None, raw_value)
task.response_channel = self._get_response_channel(task)
return task
except TimeoutError:
raise TaskTimeoutException()
def __iter__(self):
while True:
yield self.get_task()
def wait_for_task_response(self, task, timeout: float = None) -> Task:
channel_name = self.response_channel_template.format(
id=task.id,
)
if self.response_scheme == ResponseScheme.LPUSH_RPOP:
_, raw_value = self.connection.brpop(channel_name, timeout)
return Task.decode(ref(self), raw_value)
else:
raise NotImplementedError(
f'Response scheme {self.response_scheme.name} is not '
'implemented'
)
def send_task_response(self, task):
channel_name = self.response_channel_template.format(
id=task.id,
)
if self.response_scheme == ResponseScheme.LPUSH_RPOP:
self.connection.lpush(channel_name, task.encode())
if self.response_channel_expires is not None:
self.connection.expire(
channel_name, self.response_channel_expires
)
@click.group()
@click.option('--host', type=str)
@click.option('--port', show_default=True, default=6379, type=int)
@click.option('--debug', type=bool)
@click.pass_context
def cli(ctx, host, port, debug):
ctx.ensure_object(dict)
ctx.obj['redis'] = Redis(host=host, port=port)
ctx.obj['debug'] = debug
setup_logging(debug)
return 0
@cli.command()
@click.argument('name', type=str)
@click.argument('handler', type=str)
@click.pass_context
def daemon(ctx, name, handler):
"""Start a task daemon listening on the specified queue"""
handler_mod, _, handler_name = handler.rpartition('.')
handler_func = getattr(import_module(handler_mod), handler_name)
queue = Queue(name, ctx.obj['redis'])
logger.debug(f"waiting for tasks on queue '{name}'...")
for task in queue:
with task:
task.result = handler_func(task.message)
@cli.command()
@click.argument('name', type=str)
@click.argument('value', type=str)
@click.option('-j', '--json', 'as_json', is_flag=True, type=bool)
@click.option('-w', '--wait', is_flag=True, type=bool)
@click.pass_context
def message(ctx, name, value, as_json=False, wait=False):
"""Send a message to the specified queue"""
message = value
if as_json:
message = json.loads(message)
queue = Queue(name, ctx.obj['redis'])
task = queue.put(message)
if wait:
try:
result = task.get()
print(result)
return 0
except TaskFailedException as e:
logger.exception(e)
return 1
else:
raise NotImplementedError(
f'Response scheme {self.response_scheme.name} is not '
'implemented'
)
if __name__ == "__main__":
sys.exit(cli()) # pragma: no cover