Source code for pythia.event_stream.kafka
"""kafka-backed event stream storage."""
from __future__ import annotations
import json
from typing import List
from kafka import KafkaProducer
from kafka.admin import KafkaAdminClient
from kafka.admin import NewTopic
from pythia.event_stream.base import Backend as Base
[docs]class Backend(Base):
"""Simple backend to post messages using :class:`KafkaProducer`."""
_client: KafkaProducer | None = None
@property
def host(self) -> str:
"""Exctract the 'host' from the netloc in the uri.
Returns:
The host contained in the uri's netloc.
"""
return self.netloc["host"]
@property
def port(self) -> int:
"""Exctract the 'port' from the netloc in the uri.
Returns:
The port contained in the uri's netloc.
"""
return int(self.netloc["port"])
@property
def client(self) -> KafkaProducer:
"""Kafka producer lazy-loader.
Returns:
Initialized producer, guaranteed to be both connected and
have the stream created as a topic.
"""
if not self._client:
self.connect()
return self._client # type: ignore
@property
def bootstrap_servers(self) -> List[str]:
"""Singular list containing 'host:port'.
Returns:
Single-element list containing a connection of the form
'host:port'
"""
return [f"{self.host}:{self.port}"]
[docs] def connect(self) -> None:
"""Instantiate a topic connected kafkaproducer.
This method is in charge of creating the producer, making sure
it is properly connected, and ensure the topic which is used to
post messages actually exists, creating it otherwise.
Raises:
ConnectionError: kafka producer is not 'bootstrap_connected'
"""
self._client = KafkaProducer(bootstrap_servers=self.bootstrap_servers)
print("Checking kafka connection...")
if not self._client.bootstrap_connected():
raise ConnectionError
print("kafka connection OK")
print("Checking kafka topic...")
admin = KafkaAdminClient(
bootstrap_servers=self.bootstrap_servers, api_version=(0, 9)
)
if self.stream not in admin.list_topics():
admin.create_topics(
new_topics=[
NewTopic(
name=self.stream,
num_partitions=1,
replication_factor=1,
)
],
validate_only=False,
)
print("kafka topic created")
print("kafka topic OK")
[docs] def post(self, data) -> None:
"""Make the kafka producer send serialized data.
Args:
data: the data to append. Can be any python object.
"""
self.client.send(self.stream, json.dumps(data).encode())