Coverage for src/pythia/event_stream/kafka.py: 80%

37 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2022-10-07 19:27 +0000

1"""kafka-backed event stream storage.""" 

2from __future__ import annotations 

3 

4import json 

5from typing import List 

6 

7from kafka import KafkaProducer 

8from kafka.admin import KafkaAdminClient 

9from kafka.admin import NewTopic 

10 

11from pythia.event_stream.base import Backend as Base 

12 

13 

14class Backend(Base): 

15 """Simple backend to post messages using :class:`KafkaProducer`.""" 

16 

17 _client: KafkaProducer | None = None 

18 

19 @property 

20 def host(self) -> str: 

21 """Exctract the 'host' from the netloc in the uri. 

22 

23 Returns: 

24 The host contained in the uri's netloc. 

25 

26 """ 

27 return self.netloc["host"] 

28 

29 @property 

30 def port(self) -> int: 

31 """Exctract the 'port' from the netloc in the uri. 

32 

33 Returns: 

34 The port contained in the uri's netloc. 

35 

36 """ 

37 return int(self.netloc["port"]) 

38 

39 @property 

40 def client(self) -> KafkaProducer: 

41 """Kafka producer lazy-loader. 

42 

43 Returns: 

44 Initialized producer, guaranteed to be both connected and 

45 have the stream created as a topic. 

46 

47 """ 

48 if not self._client: 

49 self.connect() 

50 return self._client # type: ignore 

51 

52 @property 

53 def bootstrap_servers(self) -> List[str]: 

54 """Singular list containing 'host:port'. 

55 

56 Returns: 

57 Single-element list containing a connection of the form 

58 'host:port' 

59 

60 """ 

61 return [f"{self.host}:{self.port}"] 

62 

63 def connect(self) -> None: 

64 """Instantiate a topic connected kafkaproducer. 

65 

66 This method is in charge of creating the producer, making sure 

67 it is properly connected, and ensure the topic which is used to 

68 post messages actually exists, creating it otherwise. 

69 

70 Raises: 

71 ConnectionError: kafka producer is not 'bootstrap_connected' 

72 

73 """ 

74 self._client = KafkaProducer(bootstrap_servers=self.bootstrap_servers) 

75 print("Checking kafka connection...") 

76 if not self._client.bootstrap_connected(): 76 ↛ 77line 76 didn't jump to line 77, because the condition on line 76 was never true

77 raise ConnectionError 

78 print("kafka connection OK") 

79 

80 print("Checking kafka topic...") 

81 admin = KafkaAdminClient( 

82 bootstrap_servers=self.bootstrap_servers, api_version=(0, 9) 

83 ) 

84 if self.stream not in admin.list_topics(): 84 ↛ 96line 84 didn't jump to line 96, because the condition on line 84 was never false

85 admin.create_topics( 

86 new_topics=[ 

87 NewTopic( 

88 name=self.stream, 

89 num_partitions=1, 

90 replication_factor=1, 

91 ) 

92 ], 

93 validate_only=False, 

94 ) 

95 print("kafka topic created") 

96 print("kafka topic OK") 

97 

98 def post(self, data) -> None: 

99 """Make the kafka producer send serialized data. 

100 

101 Args: 

102 data: the data to append. Can be any python object. 

103 

104 """ 

105 

106 self.client.send(self.stream, json.dumps(data).encode())