Coverage for src/pythia/event_stream/base.py: 96%

41 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2022-10-07 19:27 +0000

1"""Event stream interface and common definitions.""" 

2from __future__ import annotations 

3 

4import abc 

5import importlib 

6import re 

7from typing import Any 

8from typing import Dict 

9from typing import Tuple 

10from typing import Type 

11from urllib.parse import parse_qs 

12from urllib.parse import urlparse 

13 

14from pythia.types import EventStreamUri 

15 

16 

17def _parse_netloc(netloc: str): 

18 pat = ( 

19 "^" 

20 r"(?:" 

21 r"(?P<username>.*?" 

22 r"(:(?P<password>.*?))?" 

23 r")?" 

24 r"@" 

25 r")?" 

26 r"(?P<host>.*?)" 

27 r"(?:" 

28 r":(?P<port>\d+)" 

29 r")?" 

30 r"$" 

31 ) 

32 match = re.match(pat, netloc) 

33 if not match: 33 ↛ 34line 33 didn't jump to line 34, because the condition on line 33 was never true

34 raise ValueError(f"Invalid netloc '{netloc}' for uri") 

35 return match.groupdict() 

36 

37 

38def parse_uri(uri: EventStreamUri) -> Tuple[dict[str, Any], Dict[Any, list]]: 

39 """Get information from the uri. 

40 

41 Args: 

42 uri: uri to parse. 

43 

44 Returns: 

45 A tuple containing (a) dictionary containing the following keys: 

46 scheme, netloc, path, params, query, fragment - as contained in 

47 the uri, and (b) query parameters form the uri. 

48 

49 """ 

50 data = urlparse(uri)._asdict() 

51 query = parse_qs(data["query"], strict_parsing=False) 

52 return data, query 

53 

54 

55class Backend(abc.ABC): 

56 """Even stream backend. 

57 

58 This class has three pruposes: 

59 * Interface for custom backends 

60 * Skeleton for their internal workings. 

61 * Factory to choose implementation. 

62 

63 """ 

64 

65 def __init__(self, uri: EventStreamUri) -> None: 

66 """Initialize a backend from its uri. 

67 

68 Args: 

69 uri: connection string. 

70 

71 """ 

72 self.raw_uri = uri 

73 data, query = parse_uri(uri) 

74 self.netloc = _parse_netloc(data["netloc"]) 

75 self.query = query 

76 self.stream = stream = self.query["stream"][0] 

77 self.uri = uri.replace(f"stream={stream}", "").rstrip("?") 

78 self.connect() 

79 

80 @classmethod 

81 def from_uri(cls: Type[Backend], uri: EventStreamUri) -> Backend: 

82 """Factory to select Backend from its schema. 

83 

84 Args: 

85 uri: connection string. 

86 

87 Returns: 

88 The instantiated backend for the requested schema. 

89 

90 """ 

91 data, _ = parse_uri(uri) 

92 scheme = data["scheme"] 

93 module = importlib.import_module(f"{__package__}.{scheme}") 

94 klass = module.Backend 

95 

96 return klass(uri=uri) 

97 

98 @abc.abstractmethod 

99 def connect(self): 

100 """Ensure internal connection to the remote service is ok.""" 

101 

102 @abc.abstractmethod 

103 def post(self, data): 

104 """Send single packet of data. 

105 

106 Args: 

107 data: the data to send. 

108 

109 This method gets called once for each element yielded from the 

110 buffer probe. 

111 

112 Any kind of batching should be implemented here and coordinated 

113 with the respective buffer probe. 

114 

115 """