Source code for pythia.event_stream.base

"""Event stream interface and common definitions."""
from __future__ import annotations

import abc
import importlib
import re
from typing import Any
from typing import Dict
from typing import Tuple
from typing import Type
from urllib.parse import parse_qs
from urllib.parse import urlparse

from pythia.types import EventStreamUri


[docs]def _parse_netloc(netloc: str): pat = ( "^" r"(?:" r"(?P<username>.*?" r"(:(?P<password>.*?))?" r")?" r"@" r")?" r"(?P<host>.*?)" r"(?:" r":(?P<port>\d+)" r")?" r"$" ) match = re.match(pat, netloc) if not match: raise ValueError(f"Invalid netloc '{netloc}' for uri") return match.groupdict()
[docs]def parse_uri(uri: EventStreamUri) -> Tuple[dict[str, Any], Dict[Any, list]]: """Get information from the uri. Args: uri: uri to parse. Returns: A tuple containing (a) dictionary containing the following keys: scheme, netloc, path, params, query, fragment - as contained in the uri, and (b) query parameters form the uri. """ data = urlparse(uri)._asdict() query = parse_qs(data["query"], strict_parsing=False) return data, query
[docs]class Backend(abc.ABC): """Even stream backend. This class has three pruposes: * Interface for custom backends * Skeleton for their internal workings. * Factory to choose implementation. """
[docs] def __init__(self, uri: EventStreamUri) -> None: """Initialize a backend from its uri. Args: uri: connection string. """ self.raw_uri = uri data, query = parse_uri(uri) self.netloc = _parse_netloc(data["netloc"]) self.query = query self.stream = stream = self.query["stream"][0] self.uri = uri.replace(f"stream={stream}", "").rstrip("?") self.connect()
[docs] @classmethod def from_uri(cls: Type[Backend], uri: EventStreamUri) -> Backend: """Factory to select Backend from its schema. Args: uri: connection string. Returns: The instantiated backend for the requested schema. """ data, _ = parse_uri(uri) scheme = data["scheme"] module = importlib.import_module(f"{__package__}.{scheme}") klass = module.Backend return klass(uri=uri)
[docs] @abc.abstractmethod def connect(self): """Ensure internal connection to the remote service is ok."""
[docs] @abc.abstractmethod def post(self, data): """Send single packet of data. Args: data: the data to send. This method gets called once for each element yielded from the buffer probe. Any kind of batching should be implemented here and coordinated with the respective buffer probe. """