Coverage for src/pythia/event_stream/redis.py: 85%
16 statements
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
1"""Redis-backed event stream storage."""
3from __future__ import annotations
5import json
7from redis import Redis
9from pythia.event_stream.base import Backend as Base
12class Backend(Base):
13 """Simple backend to post messages using :meth:`Redis.xadd`."""
15 _client: Redis | None = None
17 @property
18 def client(self) -> Redis:
19 """Redis producer lazy-loader.
21 Returns:
22 Initialized producer, guaranteed to be both ping-connected.
24 """
25 if not self._client: 25 ↛ 26line 25 didn't jump to line 26, because the condition on line 25 was never true
26 self.connect()
27 return self._client # type: ignore
29 def connect(self) -> None:
30 """Instantiate a Redis client.
32 This method is in charge of creating the client and making sure
33 it is properly connected via ping.
35 """
36 self._client = Redis.from_url(self.uri)
37 self._client.ping()
39 def post(self, data) -> None:
40 """Make the redis client send data via :meth:`Redis.xadd`.
42 Args:
43 data: the data to append. Can be any python object.
45 """
47 self.client.xadd(self.stream, fields={"data": json.dumps(data)})