Coverage for src/tests/core/events/test_consumer.py: 100%

20 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Tests for the Redis Consumer.""" 

2 

3import asyncio 

4 

5import pytest 

6 

7from kwai_core.events.consumer import RedisConsumer 

8from kwai_core.events.stream import RedisMessage, RedisStream 

9 

10 

11pytestmark = pytest.mark.bus 

12 

13 

14async def test_consumer(stream: RedisStream): 

15 """Test the consumer.""" 

16 await stream.create_group("kwai_test_consumer_group") 

17 await stream.add(RedisMessage(data={"text": "Hello consumer!"})) 

18 

19 def out(message: RedisMessage) -> bool: 

20 out.counter = getattr(out, "counter", 0) + 1 

21 return True 

22 

23 consumer = RedisConsumer(stream, "kwai_test_consumer_group", out) 

24 task = asyncio.create_task(consumer.consume("kwai_test_consumer")) 

25 await stream.add(RedisMessage(data={"text": "Hello runnable consumer!"})) 

26 try: 

27 await asyncio.wait_for(task, 4) 

28 except asyncio.TimeoutError: 

29 pass 

30 

31 # noinspection PyUnresolvedReferences 

32 assert out.counter == 2, "The callback should be called twice" 

33 

34 consumer.cancel()