Coverage for src/pythia/event_stream/redis.py: 85%

16 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2022-10-07 19:27 +0000

1"""Redis-backed event stream storage.""" 

2 

3from __future__ import annotations 

4 

5import json 

6 

7from redis import Redis 

8 

9from pythia.event_stream.base import Backend as Base 

10 

11 

12class Backend(Base): 

13 """Simple backend to post messages using :meth:`Redis.xadd`.""" 

14 

15 _client: Redis | None = None 

16 

17 @property 

18 def client(self) -> Redis: 

19 """Redis producer lazy-loader. 

20 

21 Returns: 

22 Initialized producer, guaranteed to be both ping-connected. 

23 

24 """ 

25 if not self._client: 25 ↛ 26line 25 didn't jump to line 26, because the condition on line 25 was never true

26 self.connect() 

27 return self._client # type: ignore 

28 

29 def connect(self) -> None: 

30 """Instantiate a Redis client. 

31 

32 This method is in charge of creating the client and making sure 

33 it is properly connected via ping. 

34 

35 """ 

36 self._client = Redis.from_url(self.uri) 

37 self._client.ping() 

38 

39 def post(self, data) -> None: 

40 """Make the redis client send data via :meth:`Redis.xadd`. 

41 

42 Args: 

43 data: the data to append. Can be any python object. 

44 

45 """ 

46 

47 self.client.xadd(self.stream, fields={"data": json.dumps(data)})