Coverage for src/tests/core/events/test_consumer.py: 100%
20 statements
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2024-01-01 00:00 +0000
1"""Tests for the Redis Consumer."""
3import asyncio
5import pytest
7from kwai_core.events.consumer import RedisConsumer
8from kwai_core.events.stream import RedisMessage, RedisStream
11pytestmark = pytest.mark.bus
14async def test_consumer(stream: RedisStream):
15 """Test the consumer."""
16 await stream.create_group("kwai_test_consumer_group")
17 await stream.add(RedisMessage(data={"text": "Hello consumer!"}))
19 def out(message: RedisMessage) -> bool:
20 out.counter = getattr(out, "counter", 0) + 1
21 return True
23 consumer = RedisConsumer(stream, "kwai_test_consumer_group", out)
24 task = asyncio.create_task(consumer.consume("kwai_test_consumer"))
25 await stream.add(RedisMessage(data={"text": "Hello runnable consumer!"}))
26 try:
27 await asyncio.wait_for(task, 4)
28 except asyncio.TimeoutError:
29 pass
31 # noinspection PyUnresolvedReferences
32 assert out.counter == 2, "The callback should be called twice"
34 consumer.cancel()