Coverage for src/pythia/event_stream/kafka.py: 80%
37 statements
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
1"""kafka-backed event stream storage."""
2from __future__ import annotations
4import json
5from typing import List
7from kafka import KafkaProducer
8from kafka.admin import KafkaAdminClient
9from kafka.admin import NewTopic
11from pythia.event_stream.base import Backend as Base
14class Backend(Base):
15 """Simple backend to post messages using :class:`KafkaProducer`."""
17 _client: KafkaProducer | None = None
19 @property
20 def host(self) -> str:
21 """Exctract the 'host' from the netloc in the uri.
23 Returns:
24 The host contained in the uri's netloc.
26 """
27 return self.netloc["host"]
29 @property
30 def port(self) -> int:
31 """Exctract the 'port' from the netloc in the uri.
33 Returns:
34 The port contained in the uri's netloc.
36 """
37 return int(self.netloc["port"])
39 @property
40 def client(self) -> KafkaProducer:
41 """Kafka producer lazy-loader.
43 Returns:
44 Initialized producer, guaranteed to be both connected and
45 have the stream created as a topic.
47 """
48 if not self._client:
49 self.connect()
50 return self._client # type: ignore
52 @property
53 def bootstrap_servers(self) -> List[str]:
54 """Singular list containing 'host:port'.
56 Returns:
57 Single-element list containing a connection of the form
58 'host:port'
60 """
61 return [f"{self.host}:{self.port}"]
63 def connect(self) -> None:
64 """Instantiate a topic connected kafkaproducer.
66 This method is in charge of creating the producer, making sure
67 it is properly connected, and ensure the topic which is used to
68 post messages actually exists, creating it otherwise.
70 Raises:
71 ConnectionError: kafka producer is not 'bootstrap_connected'
73 """
74 self._client = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
75 print("Checking kafka connection...")
76 if not self._client.bootstrap_connected(): 76 ↛ 77line 76 didn't jump to line 77, because the condition on line 76 was never true
77 raise ConnectionError
78 print("kafka connection OK")
80 print("Checking kafka topic...")
81 admin = KafkaAdminClient(
82 bootstrap_servers=self.bootstrap_servers, api_version=(0, 9)
83 )
84 if self.stream not in admin.list_topics(): 84 ↛ 96line 84 didn't jump to line 96, because the condition on line 84 was never false
85 admin.create_topics(
86 new_topics=[
87 NewTopic(
88 name=self.stream,
89 num_partitions=1,
90 replication_factor=1,
91 )
92 ],
93 validate_only=False,
94 )
95 print("kafka topic created")
96 print("kafka topic OK")
98 def post(self, data) -> None:
99 """Make the kafka producer send serialized data.
101 Args:
102 data: the data to append. Can be any python object.
104 """
106 self.client.send(self.stream, json.dumps(data).encode())