Coverage for src/pythia/event_stream/memory.py: 88%

20 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2022-10-07 19:27 +0000

1"""Memory-backed event stream storage.""" 

2 

3from __future__ import annotations 

4 

5from collections import deque 

6from typing import Callable 

7 

8from pythia.event_stream.base import Backend as Base 

9 

10STORAGE: dict[str, deque] = {} 

11 

12 

13class Backend(Base): 

14 """Simple event stream client to store incoming data into a deque. 

15 

16 By default, the deque does not have a limit so the dev mus pay 

17 special attention to avoid the deque from growing indefinitely. 

18 Alternatively, a bounded deque could be used, or a watcher to pop 

19 old-enough elements in a background worker. 

20 

21 """ 

22 

23 _deque: deque | None = None 

24 _deque_constructor: Callable = deque 

25 """Allow to customize the storage (eg to set maxlen).""" 

26 

27 @property 

28 def deque(self) -> deque: 

29 """Internal container lazy-loader. 

30 

31 Returns: 

32 Initialized deque. 

33 

34 """ 

35 if self._deque is None: 35 ↛ 36line 35 didn't jump to line 36, because the condition on line 35 was never true

36 self.connect() 

37 return self._deque # type: ignore 

38 

39 def connect(self) -> None: 

40 """Fetch stream-specific deque from global container.""" 

41 try: 

42 self._deque = STORAGE[self.stream] 

43 except KeyError: 

44 self._deque = STORAGE[self.stream] = self._deque_constructor() 

45 

46 def post(self, data) -> None: 

47 """Append an element into the deque. 

48 

49 Args: 

50 data: the data to append. Can be any python object. 

51 

52 """ 

53 self.deque.append(data)