Source code for pythia.event_stream.memory
"""Memory-backed event stream storage."""
from __future__ import annotations
from collections import deque
from typing import Callable
from pythia.event_stream.base import Backend as Base
STORAGE: dict[str, deque] = {}
[docs]class Backend(Base):
"""Simple event stream client to store incoming data into a deque.
By default, the deque does not have a limit so the dev mus pay
special attention to avoid the deque from growing indefinitely.
Alternatively, a bounded deque could be used, or a watcher to pop
old-enough elements in a background worker.
"""
_deque: deque | None = None
_deque_constructor: Callable = deque
"""Allow to customize the storage (eg to set maxlen)."""
@property
def deque(self) -> deque:
"""Internal container lazy-loader.
Returns:
Initialized deque.
"""
if self._deque is None:
self.connect()
return self._deque # type: ignore
[docs] def connect(self) -> None:
"""Fetch stream-specific deque from global container."""
try:
self._deque = STORAGE[self.stream]
except KeyError:
self._deque = STORAGE[self.stream] = self._deque_constructor()
[docs] def post(self, data) -> None:
"""Append an element into the deque.
Args:
data: the data to append. Can be any python object.
"""
self.deque.append(data)