Coverage for src/tests/core/events/test_stream.py: 100%

29 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2024-01-01 00:00 +0000

1"""Tests for the Redis streams.""" 

2 

3import pytest 

4 

5from kwai_core.events.stream import RedisMessage, RedisStream 

6 

7 

8pytestmark = pytest.mark.bus 

9 

10 

11async def test_add(stream: RedisStream): 

12 """Test adding a message to a stream.""" 

13 message = await stream.add(RedisMessage(data={"text": "Hello World!"})) 

14 assert message.id != "*", "There should be a message id" 

15 

16 

17async def test_read(stream: RedisStream): 

18 """Test reading a message from a stream.""" 

19 message = await stream.read(last_id="0") 

20 assert message is not None, "There should be a message" 

21 assert "text" in message.data, "There should be a text key in the message data" 

22 assert message.data["text"] == "Hello World!", ( 

23 "There should be 'Hello World!' in the message content" 

24 ) 

25 

26 

27async def test_info(stream: RedisStream): 

28 """Test the info method.""" 

29 info = await stream.info() 

30 assert info is not None, "There should be a result" 

31 

32 

33async def test_create_group(stream: RedisStream): 

34 """Test creating a group.""" 

35 result = await stream.create_group("kwai_test_group") 

36 assert result is True, "The group should be created" 

37 

38 

39async def test_get_groups(stream: RedisStream): 

40 """Test getting all groups of a stream.""" 

41 groups = await stream.get_groups() 

42 assert groups is not None, "There should be a result" 

43 assert "kwai_test_group" in groups, "kwai_test_group should exist" 

44 

45 

46async def test_get_group(stream: RedisStream): 

47 """Test getting one group of a stream.""" 

48 group = await stream.get_group("kwai_test_group") 

49 assert group is not None, "There should be a group" 

50 assert group.name == "kwai_test_group", "There should be a kwai_test_group" 

51 

52 

53async def test_consume(stream: RedisStream): 

54 """Test adding a message to a stream.""" 

55 await stream.add(RedisMessage(data={"text": "Hello Consuming World!"})) 

56 message = await stream.consume("kwai_test_group", "kwai_test_group.c1") 

57 assert message is not None, "There should be a message"