Source code for pythia.event_stream.redis
"""Redis-backed event stream storage."""
from __future__ import annotations
import json
from redis import Redis
from pythia.event_stream.base import Backend as Base
[docs]class Backend(Base):
"""Simple backend to post messages using :meth:`Redis.xadd`."""
_client: Redis | None = None
@property
def client(self) -> Redis:
"""Redis producer lazy-loader.
Returns:
Initialized producer, guaranteed to be both ping-connected.
"""
if not self._client:
self.connect()
return self._client # type: ignore
[docs] def connect(self) -> None:
"""Instantiate a Redis client.
This method is in charge of creating the client and making sure
it is properly connected via ping.
"""
self._client = Redis.from_url(self.uri)
self._client.ping()
[docs] def post(self, data) -> None:
"""Make the redis client send data via :meth:`Redis.xadd`.
Args:
data: the data to append. Can be any python object.
"""
self.client.xadd(self.stream, fields={"data": json.dumps(data)})