Source code for pythia.event_stream.base
"""Event stream interface and common definitions."""
from __future__ import annotations
import abc
import importlib
import re
from typing import Any
from typing import Dict
from typing import Tuple
from typing import Type
from urllib.parse import parse_qs
from urllib.parse import urlparse
from pythia.types import EventStreamUri
[docs]def _parse_netloc(netloc: str):
pat = (
"^"
r"(?:"
r"(?P<username>.*?"
r"(:(?P<password>.*?))?"
r")?"
r"@"
r")?"
r"(?P<host>.*?)"
r"(?:"
r":(?P<port>\d+)"
r")?"
r"$"
)
match = re.match(pat, netloc)
if not match:
raise ValueError(f"Invalid netloc '{netloc}' for uri")
return match.groupdict()
[docs]def parse_uri(uri: EventStreamUri) -> Tuple[dict[str, Any], Dict[Any, list]]:
"""Get information from the uri.
Args:
uri: uri to parse.
Returns:
A tuple containing (a) dictionary containing the following keys:
scheme, netloc, path, params, query, fragment - as contained in
the uri, and (b) query parameters form the uri.
"""
data = urlparse(uri)._asdict()
query = parse_qs(data["query"], strict_parsing=False)
return data, query
[docs]class Backend(abc.ABC):
"""Even stream backend.
This class has three pruposes:
* Interface for custom backends
* Skeleton for their internal workings.
* Factory to choose implementation.
"""
[docs] def __init__(self, uri: EventStreamUri) -> None:
"""Initialize a backend from its uri.
Args:
uri: connection string.
"""
self.raw_uri = uri
data, query = parse_uri(uri)
self.netloc = _parse_netloc(data["netloc"])
self.query = query
self.stream = stream = self.query["stream"][0]
self.uri = uri.replace(f"stream={stream}", "").rstrip("?")
self.connect()
[docs] @classmethod
def from_uri(cls: Type[Backend], uri: EventStreamUri) -> Backend:
"""Factory to select Backend from its schema.
Args:
uri: connection string.
Returns:
The instantiated backend for the requested schema.
"""
data, _ = parse_uri(uri)
scheme = data["scheme"]
module = importlib.import_module(f"{__package__}.{scheme}")
klass = module.Backend
return klass(uri=uri)
[docs] @abc.abstractmethod
def connect(self):
"""Ensure internal connection to the remote service is ok."""
[docs] @abc.abstractmethod
def post(self, data):
"""Send single packet of data.
Args:
data: the data to send.
This method gets called once for each element yielded from the
buffer probe.
Any kind of batching should be implemented here and coordinated
with the respective buffer probe.
"""