Coverage for src/pythia/applications/base.py: 74%
261 statements
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
1"""Application.
3An application is composed of (at least) a pipeline. It implements two logics:
5 State management: Start/Stop/Play/Pause of the pipeline.
6 Bus Call
8"""
9from __future__ import annotations
11import inspect
12import os
13from collections import defaultdict
14from logging import getLogger
15from pathlib import Path
16from threading import Thread
17from typing import Any
18from typing import Callable
19from typing import Dict
20from typing import Optional
21from typing import Protocol
22from typing import Type
23from typing import TypeVar
24from typing import Union
26import pyds
28from pythia.event_stream.base import Backend
29from pythia.pipelines.base import BasePipeline
30from pythia.pipelines.base import StringPipeline
31from pythia.types import EventStreamUri
32from pythia.types import Loop
33from pythia.types import Probes
34from pythia.types import SupportedCb
35from pythia.utils.ds import info2batchmeta
36from pythia.utils.gst import get_element
37from pythia.utils.gst import get_static_pad
38from pythia.utils.gst import GLib
39from pythia.utils.gst import Gst
40from pythia.utils.gst import gst_init
41from pythia.utils.gst import PadDirection
42from pythia.utils.message_handlers import on_message_eos
43from pythia.utils.message_handlers import on_message_error
45logger = getLogger(__name__)
47GST_DEBUG_DUMP_DOT_DIR = os.environ.get("GST_DEBUG_DUMP_DOT_DIR", None)
50class RunLoop(Protocol):
51 """Loop wrapper interface."""
53 def __init__(self, loop: Optional[Loop], *args, **kwargs) -> None:
54 """Initialize a loop wrapper.
56 Args:
57 args: Implementation constructor optional args.
58 loop: The loop to wrap.
59 kwargs: Implementation constructor optional args.
61 """
63 def join(self) -> None:
64 """Run and block the loop."""
66 def start(self) -> None:
67 """Ensure the loop is running / activated."""
69 def quit(self) -> None: # noqa: A003
70 """Stop the running loop."""
73class ForegroundLoop(RunLoop):
74 """Simple Loop wrapper implementation to run a blocking loop."""
76 def __init__(self, loop: Optional[Loop] = None):
77 """Initialize a foreground glib loop.
79 Args:
80 loop: the loop to run. If not set, uses
81 :class:`GLib.MainLoop`
83 """
84 self._loop = loop
86 @property
87 def loop(self) -> Loop:
88 """Lazy property for the loop.
90 Returns:
91 initialized or pre-existing loop.
93 """
94 if not self._loop:
95 self._loop = GLib.MainLoop()
96 return self._loop
98 def start(self) -> None:
99 """Ensure the lazy property is accessed at least once."""
100 self.loop # noqa: W0104
102 def join(self) -> None:
103 """Run the main loop in the foreground."""
104 self.loop.run()
106 def quit(self) -> None: # noqa: A003
107 """Quit the main loop running the foreground."""
108 self.loop.quit()
111class BackgroundThreadLoop(Thread, RunLoop):
112 """Simple thread to run a loop without blocking."""
114 def __init__(self, *args, loop: Optional[Loop] = None, **kwargs):
115 """Initialize a background thread to run the loop in.
117 Args:
118 args: Forwarded to thread constructor.
119 loop: the loop to run. If not set, uses
120 :class:`GLib.MainLoop`
121 kwargs: Forwarded to thread constructor.
123 """
124 kwargs.setdefault("daemon", True)
125 super().__init__(*args, **kwargs)
126 self._loop = loop or GLib.MainLoop()
128 def run(self) -> None:
129 """Run and block running the loop."""
130 self._loop.run()
132 def quit(self) -> None: # noqa: A003
133 """Quit the running loop."""
134 self._loop.quit()
137BA = TypeVar("BA", bound="BaseApplication")
139OnBusMessage = Callable[[BA, Gst.Bus, Gst.Message], None]
142BoundGstPadProbeCallback = Callable[
143 [BA, Gst.Pad, Gst.PadProbeInfo], Gst.PadProbeReturn
144]
145"""Gstreamer PadProbe must implement this protocol.
147Using upstream 'Gst.PadProbeCallback' raises NotImplementedError.
149"""
150BoundBatchMetaCb = Callable[[BA, pyds.NvDsBatchMeta], Gst.PadProbeReturn]
151BoundFullPadCb = Callable[
152 [BA, Gst.Pad, Gst.PadProbeInfo, pyds.NvDsBatchMeta], Gst.PadProbeReturn
153]
155BoundSupportedCb = Union[
156 BoundGstPadProbeCallback,
157 BoundBatchMetaCb,
158 BoundFullPadCb,
159]
161CB = TypeVar("CB", SupportedCb, BoundSupportedCb)
164class BaseApplication:
165 """Base pythia application to reduce boilerplate.
167 You can define pipeline message handlers and they will be called
168 when pertinent. For example, by defining a method called
169 'on_message_error', pythia will connect said method as a signal
170 handler for the 'message::eos' detailed signal. Only values from
171 `Gst.MessageType` are allowed, see
172 http://lazka.github.io/pgi-docs/index.html#Gst-1.0/flags.html#Gst.MessageType
174 """
176 loop_cls: Type[RunLoop] = BackgroundThreadLoop
178 def __init__(self, pipeline: BasePipeline) -> None:
179 """Construct an application from a pipeline.
181 Args:
182 pipeline: an instantiated `Pipeline`.
184 """
185 self.pipeline = pipeline
186 self.loop: Optional[RunLoop] = None
187 self._bus: Optional[Gst.Bus] = None
188 self._registered_probes: Probes = defaultdict(
189 lambda: defaultdict(list)
190 )
191 self._message_handlers = self._build_message_handlers()
192 self.watch_ids: list[int] = []
194 def _build_message_handlers(self) -> Dict[str, OnBusMessage]:
195 handlers = {}
196 for name in dir(self):
197 if not name.startswith("on_message"):
198 continue
199 if name.startswith("on_message_"): 199 ↛ 204line 199 didn't jump to line 204, because the condition on line 199 was never false
200 key = "message::{}".format( # noqa: C0209
201 name.split("on_message")[1].lstrip("_").replace("_", "-")
202 )
203 else:
204 key = "message"
205 handlers[key] = getattr(self, name)
206 return handlers
208 @classmethod
209 def from_pipeline_string(
210 cls: Type[BA], pipeline: str, extractors: Probes | None = None
211 ) -> BA:
212 """Factory from pipeline string.
214 Args:
215 pipeline: the string to use to generate the pipeline.
216 extractors: buffer probes to inject to the pipeline.
218 Returns:
219 The instantiated app.
221 """
222 app = cls(StringPipeline(pipeline))
223 app.inject_probes(extractors or {})
224 return app
226 @classmethod
227 def from_pipeline_file(
228 cls: Type[BA],
229 pipeline_file: str | Path,
230 *args,
231 params: Dict[str, Any] | None = None,
232 **kwargs,
233 ) -> BA:
234 """Factory from pipeline file.
236 Args:
237 pipeline_file: the string to use to generate the pipeline.
238 params: Parameters to format the pipeline.
239 args: Forwarded to :meth:`from_pipeline_file` factory.
240 kwargs: Forwarded to :meth:`from_pipeline_file` factory.
242 Returns:
243 The instantiated app.
245 """
247 pipeline = Path(pipeline_file).read_text(encoding="utf-8")
248 pipeline_string = pipeline.format_map(params or {})
249 return cls.from_pipeline_string(pipeline_string, *args, **kwargs)
251 @property
252 def bus(self) -> Gst.Bus:
253 """Get pipeline bus - lazy property.
255 Returns:
256 The pipeline's bus.
258 Raises:
259 RuntimeError: Unable to get bus from pipeline.
261 """
262 if self._bus is None:
263 self._bus = self.pipeline.pipeline.get_bus()
264 if self._bus is None: 264 ↛ 265line 264 didn't jump to line 265, because the condition on line 264 was never true
265 raise RuntimeError("Unable to get Pipeline bus")
266 return self._bus
268 def _before_pipeline_start(self, loop) -> RunLoop:
269 """Call hook - run before pipeline start.
271 Args:
272 loop: the loop to run.
274 Returns:
275 A thread running the background loop.
277 """
278 for element_name, connection in self.pipeline.CONNECTIONS.items(): 278 ↛ 279line 278 didn't jump to line 279, because the loop on line 278 never started
279 element = get_element(self.pipeline.pipeline, element_name)
280 for signal, callback in connection.items():
281 element.connect(signal, callback)
283 self.connect_bus()
284 self.loop = self.loop_cls(loop=loop)
285 self.loop.start()
286 self.before_pipeline_start()
287 return self.loop
289 def before_pipeline_start(self) -> None:
290 """Custom call hook - run before pipeline start."""
292 def after_pipeline_start(self) -> None:
293 """Call hook - run after pipeline start."""
295 def before_loop_join(self) -> None:
296 """Call hook - run before calling `loop.join`."""
298 def before_loop_quit(self) -> None:
299 """Call hook - run before calling `loop.quit`."""
301 def connect_bus(self):
302 """Attach bus signal watch depending on the message type."""
303 if not self._message_handlers: 303 ↛ 304line 303 didn't jump to line 304, because the condition on line 303 was never true
304 return
306 self.bus.add_signal_watch()
307 self.watch_ids = [
308 self.bus.connect(signal, handler)
309 for signal, handler in self._message_handlers.items()
310 ]
312 def disconnect_bus(self):
313 """Dettach bus signal watch."""
314 if self.bus.have_pending():
315 self.bus.set_flushing(True)
316 self.bus.set_flushing(False)
317 self.bus.remove_signal_watch()
318 for watch_id in self.watch_ids:
319 self.bus.disconnect(watch_id)
320 self.watch_ids = []
322 def __call__(
323 self, *, loop: Optional[Loop] = None, foreground: bool = True
324 ) -> None:
325 """Execute the aplication and run its loop.
327 Args:
328 loop: if not set, one is provided.
329 foreground: whether to block when running.
331 Raises:
332 RuntimeError: loop is already running.
333 RuntimeError: Unable to start pipeline.
335 """
336 if self.loop: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true
337 raise RuntimeError("Loop already running")
339 gst_init()
341 loop_ = self._before_pipeline_start(loop)
342 try:
343 self.pipeline.start()
344 except RuntimeError:
345 self.disconnect_bus()
346 loop_.quit()
347 raise
348 self.after_pipeline_start()
350 self.before_loop_join()
351 try:
352 loop_.join()
353 finally:
354 self.stop()
356 def stop(
357 self,
358 ) -> None:
359 """Stop application execution.
361 A loop must be running, and it is stopped as well.
363 Raises:
364 RuntimeError: no loop set.
365 RuntimeError: unabl to stop loop.
367 """
369 self.pipeline.stop()
370 if self.loop is not None:
371 try:
372 self.before_loop_quit()
373 self.loop.quit()
374 except Exception as exc: # pylint: disable=W0703
375 raise RuntimeError(
376 f"Unable to stop application: ({exc})"
377 ) from exc
378 finally:
379 self.loop = None
381 def probe(
382 self,
383 element_name: str,
384 pad_direction: PadDirection,
385 *probe_args,
386 pad_probe_type: Gst.PadProbeType = Gst.PadProbeType.BUFFER,
387 backend_uri: Optional[EventStreamUri] = None,
388 ):
389 """Register function as a probe callback.
391 Args:
392 element_name: Name of the `Gst.Element` to attach the probe
393 to.
394 pad_direction: : Direction of the `Gst.Pad` to attach the
395 probe to.
396 probe_args: Optional additional args - forwarded to the
397 decorated callback as varargs.
398 pad_probe_type: Buffer probe type.
399 backend_uri: If set, used to send messages from the probe to
400 this remote backend. Only used (and mandatory in that
401 case) when he callable to decorate is a generator.
403 Returns:
404 decorated callback, registered as an application buffer
405 probe.
407 This method has two usages: to decorate proper buffer probes,
408 where the developer is in charge of handling the data, and to
409 decorate a generator which yields incoming data. In the latter a
410 'backend_uri' is required and used to connect and send any and
411 all generated data into the remote service.
413 """
415 def decorator(user_probe: CB) -> CB:
417 pythia_probe, backend = _build_probe(user_probe, backend_uri)
419 element = get_element(self.pipeline.pipeline, element_name)
420 pad = get_static_pad(element, pad_direction)
421 pad.add_probe(
422 pad_probe_type, # type: ignore[arg-type]
423 pythia_probe, # type: ignore[arg-type]
424 *probe_args,
425 )
426 self._registered_probes[element_name][pad_direction].append(
427 {"probe": pythia_probe, "backend": backend}
428 )
429 return pythia_probe
431 return decorator
433 def inject_probes(self, extractors: Probes):
434 """Register several probes.
436 Args:
437 extractors: mapping containing a collection of probes
438 (callbacks) assigned to their respective element's
439 source (or sink) pad.
441 """
442 for element, padprobes in extractors.items():
443 for pad_direction, probes in padprobes.items():
444 for probe in probes:
445 self.probe(element, pad_direction=pad_direction)(probe)
447 def backend(
448 self, element_name: str, pad_direction: PadDirection, idx: int = 0
449 ) -> Backend:
450 """Retrieve a backend for a registered generator probe.
452 Args:
453 element_name: Gstreamer element name the probe is registered
454 to.
455 pad_direction: The element's pad direction where the buffer
456 probe is attached.
457 idx: position in the registry array for the specified pad.
458 As the most usual case a pad will have a single buffer
459 probe, if this value is not set it defaults to `0`,
460 meaning the signle element in the list is retrieved.
462 Returns:
463 The backend associated with the generator probe. The backend
464 is in charge of posting the messages from the generator
465 probe to an external service.
467 """
468 return self._registered_probes[element_name][pad_direction][idx][
469 "backend"
470 ]
473def _get_from_positional_arg_name(signature, name) -> Optional[int]:
474 try:
475 return signature.args.index(name)
476 except ValueError:
477 return None
480def _get_from_annotations(signature, name) -> Optional[int]:
481 try:
482 return [ 482 ↛ exitline 482 didn't return from function '_get_from_annotations', because the return on line 482 wasn't executed
483 signature.annotations.get(aa, None) for aa in signature.args
484 ].index(name)
485 except ValueError:
486 return None
489def _get_probe_batch_meta_idx(signature: inspect.FullArgSpec) -> Optional[int]:
490 batch_meta_strats = [
491 (_get_from_positional_arg_name, "batch_meta"),
492 (_get_from_annotations, "pyds.NvDsBatchMeta"),
493 ]
494 batch_meta_idx = None
495 for strategy, name in batch_meta_strats: 495 ↛ 499line 495 didn't jump to line 499, because the loop on line 495 didn't complete
496 batch_meta_idx = strategy(signature, name)
497 if batch_meta_idx is not None: 497 ↛ 495line 497 didn't jump to line 495, because the condition on line 497 was never false
498 return batch_meta_idx
499 return None
502def _get_probe_pad_idx(signature: inspect.FullArgSpec) -> Optional[int]:
503 pad_idx_strats = [
504 (_get_from_positional_arg_name, "pad"),
505 (_get_from_positional_arg_name, "gst_pad"),
506 (_get_from_annotations, "Gst.Pad"),
507 ]
508 pad_idx = None
509 for strategy, name in pad_idx_strats:
510 pad_idx = strategy(signature, name)
511 if pad_idx is not None:
512 return pad_idx
513 return None
516def _get_probe_info_idx(signature: inspect.FullArgSpec) -> Optional[int]:
517 info_idx_strats = [
518 (_get_from_positional_arg_name, "info"),
519 (_get_from_positional_arg_name, "gst_info"),
520 (_get_from_annotations, "Gst.PadProbeInfo"),
521 ]
522 info_idx = None
523 for strategy, name in info_idx_strats:
524 info_idx = strategy(signature, name)
525 if info_idx is not None:
526 return info_idx
527 return None
530def _build_probe(probe, backend_uri: Optional[EventStreamUri] = None):
531 signature = inspect.getfullargspec(probe)
532 is_iterator = inspect.isgeneratorfunction(probe)
533 if backend_uri and not is_iterator: 533 ↛ 534line 533 didn't jump to line 534, because the condition on line 533 was never true
534 raise ValueError("'backend_uri' set. Probe must be a generator.")
535 if is_iterator and not backend_uri: 535 ↛ 536line 535 didn't jump to line 536, because the condition on line 535 was never true
536 backend_uri = os.getenv("PYTHIA_STREAM_URI", None)
537 if not backend_uri:
538 backend_uri = "log://?stream=stdout"
539 logger.warning(
540 "'backend_uri' not set."
541 " It is mandatory with generator probes."
542 " Defaulting to '%s'",
543 backend_uri,
544 )
546 is_bound = hasattr(probe, "__self__")
548 batch_meta_idx = _get_probe_batch_meta_idx(signature)
549 pad_idx = _get_probe_pad_idx(signature)
550 info_idx = _get_probe_info_idx(signature)
552 supported = [
553 ["batch_meta"],
554 ["pad", "info"],
555 ["pad", "info", "batch_meta"],
556 ]
557 err_tmp = (
558 f"Unsupported spec {signature.args} for '{probe.__name__}'."
559 f" Muse be one of `{supported}`"
560 )
562 if is_bound:
563 pad_idx = (
564 pad_idx - 1 if (is_bound and pad_idx is not None) else pad_idx
565 )
566 info_idx = (
567 info_idx - 1 if (is_bound and info_idx is not None) else info_idx
568 )
569 batch_meta_idx = (
570 batch_meta_idx - 1
571 if (is_bound and batch_meta_idx is not None)
572 else batch_meta_idx
573 )
575 if batch_meta_idx == 0 and (pad_idx is None) and (info_idx is None):
576 if is_iterator and (backend_uri is not None):
577 backend = Backend.from_uri(backend_uri)
579 def pythia_iter_probe_batch_meta(
580 _: Gst.Pad, info: Gst.PadProbeInfo
581 ) -> Gst.PadProbeReturn:
582 batch_meta = info2batchmeta(info)
583 if not batch_meta:
584 return Gst.PadProbeReturn.OK
585 for data in probe(batch_meta):
586 backend.post(data)
587 return Gst.PadProbeReturn.OK
589 return pythia_iter_probe_batch_meta, backend
591 def pythia_probe_batch_meta(
592 _: Gst.Pad, info: Gst.PadProbeInfo
593 ) -> Gst.PadProbeReturn:
594 batch_meta = info2batchmeta(info)
595 if not batch_meta:
596 return Gst.PadProbeReturn.OK
597 return probe(batch_meta)
599 return pythia_probe_batch_meta, None
601 if pad_idx == 0 and info_idx == 1 and batch_meta_idx == 2: 601 ↛ 615line 601 didn't jump to line 615, because the condition on line 601 was never false
602 if is_iterator: 602 ↛ 603line 602 didn't jump to line 603, because the condition on line 602 was never true
603 raise NotImplementedError
605 def pythia_probe_full(
606 pad: Gst.Pad, info: Gst.PadProbeInfo
607 ) -> Gst.PadProbeReturn:
608 batch_meta = info2batchmeta(info)
609 if not batch_meta:
610 return Gst.PadProbeReturn.OK
611 return probe(pad, info, batch_meta)
613 return pythia_probe_full, None
615 if pad_idx == 0 and info_idx == 1 and batch_meta_idx is None:
616 if is_iterator:
617 raise NotImplementedError
619 return probe, None
621 raise ValueError(err_tmp)
624class Application(BaseApplication):
625 """Typical pythia application."""
627 on_message_eos = on_message_eos
628 on_message_error = on_message_error