EOX GitLab Instance

Commit 7170d37d authored by Fabian Schindler's avatar Fabian Schindler
Browse files

Adding tests for pubsub response channel

Fix message decoding for pubsub response channel
parent 37ba2525
Pipeline #13185 passed with stage
in 39 seconds
......@@ -5,7 +5,8 @@ import pytest
from redis import Redis
from vsq.queue import (
Queue, TaskFailedException, TaskTimeoutException, TaskStatus
Queue, TaskFailedException, TaskTimeoutException,
TaskStatus, ResponseScheme,
)
......@@ -27,6 +28,15 @@ def queue(redis):
)
@pytest.fixture
def queue_pubsub(redis):
yield Queue(
'name',
redis,
response_scheme=ResponseScheme.PUBSUB,
)
def test_basic(queue):
# producer
msg = {
......@@ -63,6 +73,42 @@ def test_basic(queue):
assert result == {'result': True}
def test_basic_pubsub(queue_pubsub):
# producer
msg = {
'type': 'test',
'arg': 1
}
orig_task = queue_pubsub.put(msg)
prev_id = orig_task.id
assert orig_task.status == TaskStatus.ACCEPTED
assert orig_task.id
assert orig_task.message == msg
assert orig_task.result is None
assert orig_task.error is None
assert isinstance(orig_task.created, datetime)
assert orig_task.started is None
assert orig_task.finished is None
# consumer
with queue_pubsub.get_task() as received_task:
assert received_task.status == TaskStatus.PROCESSING
assert received_task.id == prev_id
assert received_task.message == msg
assert received_task.result is None
assert received_task.error is None
assert isinstance(received_task.created, datetime)
assert isinstance(received_task.started, datetime)
assert received_task.finished is None
received_task.result = {'result': True}
# producer - result
result = orig_task.get()
assert result == {'result': True}
def test_for(queue):
# producer
queue.put(1)
......@@ -84,37 +130,30 @@ def test_error(queue):
task = queue.put(1)
# consumer
try:
with queue.get_task() as received_task:
raise ValueError(f'Unexpected value {received_task.message}')
except ValueError:
pass
with pytest.raises(TaskFailedException):
task.get(0.25)
def test_timeouts(queue):
with pytest.raises(TaskTimeoutException):
queue.get_task(0.25)
def test_pubsub(redis):
p = redis.pubsub()
# redis.publish('test', 'message1')
p.subscribe('test')
def test_error_pubsub(queue_pubsub):
# producer
task = queue_pubsub.put(1)
redis.publish('test', 'message2')
# consumer
with queue_pubsub.get_task() as received_task:
raise ValueError(f'Unexpected value {received_task.message}')
redis.publish('test', 'message3')
with pytest.raises(TaskFailedException):
task.get(0.25)
print(p.get_message(timeout=1.1))
print(p.get_message(True, timeout=1.1))
print(p.get_message(True, timeout=1.1))
def test_timeout(queue):
with pytest.raises(TaskTimeoutException):
queue.get_task(0.25)
def test_timeout_pubsub(queue_pubsub):
with pytest.raises(TaskTimeoutException):
queue_pubsub.get_task(0.25)
......@@ -43,7 +43,7 @@ import click
from .common import (
TaskStatus, MessageType, now, QueueClosedError,
TaskFailedException, TaskTimeoutException
TaskFailedException, TaskTimeoutException,
)
from .logging import setup_logging
......@@ -93,8 +93,7 @@ class PubSubResponseChannel(ResponseChannel):
current = time()
message = self.pubsub.get_message(timeout=timeout)
if message['type'] == 'message':
value = json.loads(message['data'])
return value
return message['data']
if timeout is not None:
timeout -= time() - current
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment