Coverage for src/pythia/event_stream/base.py: 96%
41 statements
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
1"""Event stream interface and common definitions."""
2from __future__ import annotations
4import abc
5import importlib
6import re
7from typing import Any
8from typing import Dict
9from typing import Tuple
10from typing import Type
11from urllib.parse import parse_qs
12from urllib.parse import urlparse
14from pythia.types import EventStreamUri
17def _parse_netloc(netloc: str):
18 pat = (
19 "^"
20 r"(?:"
21 r"(?P<username>.*?"
22 r"(:(?P<password>.*?))?"
23 r")?"
24 r"@"
25 r")?"
26 r"(?P<host>.*?)"
27 r"(?:"
28 r":(?P<port>\d+)"
29 r")?"
30 r"$"
31 )
32 match = re.match(pat, netloc)
33 if not match: 33 ↛ 34line 33 didn't jump to line 34, because the condition on line 33 was never true
34 raise ValueError(f"Invalid netloc '{netloc}' for uri")
35 return match.groupdict()
38def parse_uri(uri: EventStreamUri) -> Tuple[dict[str, Any], Dict[Any, list]]:
39 """Get information from the uri.
41 Args:
42 uri: uri to parse.
44 Returns:
45 A tuple containing (a) dictionary containing the following keys:
46 scheme, netloc, path, params, query, fragment - as contained in
47 the uri, and (b) query parameters form the uri.
49 """
50 data = urlparse(uri)._asdict()
51 query = parse_qs(data["query"], strict_parsing=False)
52 return data, query
55class Backend(abc.ABC):
56 """Even stream backend.
58 This class has three pruposes:
59 * Interface for custom backends
60 * Skeleton for their internal workings.
61 * Factory to choose implementation.
63 """
65 def __init__(self, uri: EventStreamUri) -> None:
66 """Initialize a backend from its uri.
68 Args:
69 uri: connection string.
71 """
72 self.raw_uri = uri
73 data, query = parse_uri(uri)
74 self.netloc = _parse_netloc(data["netloc"])
75 self.query = query
76 self.stream = stream = self.query["stream"][0]
77 self.uri = uri.replace(f"stream={stream}", "").rstrip("?")
78 self.connect()
80 @classmethod
81 def from_uri(cls: Type[Backend], uri: EventStreamUri) -> Backend:
82 """Factory to select Backend from its schema.
84 Args:
85 uri: connection string.
87 Returns:
88 The instantiated backend for the requested schema.
90 """
91 data, _ = parse_uri(uri)
92 scheme = data["scheme"]
93 module = importlib.import_module(f"{__package__}.{scheme}")
94 klass = module.Backend
96 return klass(uri=uri)
98 @abc.abstractmethod
99 def connect(self):
100 """Ensure internal connection to the remote service is ok."""
102 @abc.abstractmethod
103 def post(self, data):
104 """Send single packet of data.
106 Args:
107 data: the data to send.
109 This method gets called once for each element yielded from the
110 buffer probe.
112 Any kind of batching should be implemented here and coordinated
113 with the respective buffer probe.
115 """