Coverage for src/pythia/event_stream/memory.py: 88%
20 statements
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
1"""Memory-backed event stream storage."""
3from __future__ import annotations
5from collections import deque
6from typing import Callable
8from pythia.event_stream.base import Backend as Base
10STORAGE: dict[str, deque] = {}
13class Backend(Base):
14 """Simple event stream client to store incoming data into a deque.
16 By default, the deque does not have a limit so the dev mus pay
17 special attention to avoid the deque from growing indefinitely.
18 Alternatively, a bounded deque could be used, or a watcher to pop
19 old-enough elements in a background worker.
21 """
23 _deque: deque | None = None
24 _deque_constructor: Callable = deque
25 """Allow to customize the storage (eg to set maxlen)."""
27 @property
28 def deque(self) -> deque:
29 """Internal container lazy-loader.
31 Returns:
32 Initialized deque.
34 """
35 if self._deque is None: 35 ↛ 36line 35 didn't jump to line 36, because the condition on line 35 was never true
36 self.connect()
37 return self._deque # type: ignore
39 def connect(self) -> None:
40 """Fetch stream-specific deque from global container."""
41 try:
42 self._deque = STORAGE[self.stream]
43 except KeyError:
44 self._deque = STORAGE[self.stream] = self._deque_constructor()
46 def post(self, data) -> None:
47 """Append an element into the deque.
49 Args:
50 data: the data to append. Can be any python object.
52 """
53 self.deque.append(data)