Source code for pythia.pipelines.base

"""Pipeline.

A Gstreamer pipeline, used to process video/image input.
Contains:

    A source: video/image.
    A sink: display/file.
    At least one PythIA model.

Note:
    Although a one-shot uridecodebin usage seems to work, there seems to
        be an issue with quickly subsequent runs, producing segfaults.

"""

from __future__ import annotations

import abc
import re
from collections import defaultdict
from pathlib import Path
from textwrap import dedent as _
from typing import Collection
from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
from urllib.parse import parse_qs
from urllib.parse import urlparse
from urllib.parse import urlunparse

from pythia.exceptions import IncompatiblePipelineError
from pythia.exceptions import InvalidPipelineError
from pythia.models.base import Analytics
from pythia.models.base import InferenceEngine
from pythia.models.base import Tracker
from pythia.types import Con
from pythia.types import HasConnections
from pythia.types import SinkUri
from pythia.types import SourceUri
from pythia.utils.ext import get_arch
from pythia.utils.gst import GLib
from pythia.utils.gst import Gst
from pythia.utils.gst import gst_init
from pythia.utils.str2pythia import find_analytics
from pythia.utils.str2pythia import find_models
from pythia.utils.str2pythia import find_tracker

PSB = Union["PythiaTestSource", "PythiaSource", "PythiaMultiSource"]
PS = Union[
    "PythiaFakesink", "PythiaFilesink", "PythiaMultifileSink", "PythiaLiveSink"
]

UNABLE_TO_PLAY_PIPELINE = "Unable to play the pipeline."


[docs]class PythiaSourceBase(abc.ABC, HasConnections): """Base class wrapper for Gstreamer sources. The main goal is to define a skeleton for quickly building sources, and subclasses must implement their rendering logic in the `gst` method. """ CONNECTIONS: Con = {}
[docs] def __init__(self, *uris: SourceUri) -> None: """Construct an instance from `SourceUri` s. Args: uris: Collection of `SourceUri` s. """ self.pythia_params, self.uris = self.pop_pythia_args_from_uris(uris)
def __iter__(self) -> Iterator[SourceUri]: """Iterate over the configured uris. Yields: Own `SourceUri`s. """ yield from self.uris def __len__(self) -> int: """Get the number of sources. Returns: The number of configured uris. """ return len(self.uris)
[docs] @abc.abstractmethod def gst(self) -> str: """Render as string with `gst-launch`-like syntax."""
[docs] @classmethod def from_uris(cls, *uris: SourceUri) -> PSB: """Factory to build a concrete source from a collection of uris. Depending on the received uris, instantiates a concrete :class:`PythiaSourceBase`. Args: uris: Collection of uris to build the source from. Returns: The instantiated source object. Raises: ValueError: No source uris received """ num_uris = len(uris) if num_uris == 1: if uris[0].startswith("test"): return PythiaTestSource(*uris) return PythiaSource(*uris) if num_uris >= 1: return PythiaMultiSource(*uris) raise ValueError("No source uris")
[docs] @staticmethod @abc.abstractmethod def pop_pythia_args_from_uris( uris: Tuple[SourceUri, ...], ) -> Tuple[dict, List[SourceUri]]: """Pop pythia-related query params from source uri. Args: uris: input uris to filter """
[docs]def clean_single_uri(uri: SourceUri) -> Tuple[dict, SourceUri]: """Extract muxer width and height. Args: uri: input uris to parse. Returns: extracted: dictionary containing popped params. list containing the single uri wihtout its pythia query params. Examples: >>> clean_single_uri("file://video.mp4?muxer_width=1280&muxer_height=720") ({'muxer_width': 1280, 'muxer_height': 720}, ['file://video.mp4']) """ # noqa: C0301 parsed = urlparse(uri) data = parsed._asdict() query = parse_qs(data["query"], strict_parsing=False) extracted = { "muxer_width": int(query["muxer_width"][0]), "muxer_height": int(query["muxer_height"][0]), "num_buffers": int(query.get("num_buffers", ["-1"])[0]), } clean_query = parsed.query for name, value in extracted.items(): clean_query = clean_query.replace(f"{name}={value}", "") clean_query = re.sub(r"\&+", "&", clean_query).strip("&").strip("?") parts = [*parsed[:4], clean_query, *parsed[5:]] clean_uri = urlunparse(parts) return extracted, clean_uri
[docs]class PythiaSource(PythiaSourceBase): """Uridecodebin wrapper building block for a single source."""
[docs] @staticmethod def pop_pythia_args_from_uris( uris: Tuple[SourceUri, ...], ) -> Tuple[dict, List[SourceUri]]: """Extract muxer width and height. Args: uris: input uris to filter Returns: extracted: dictionary containing popped params. list containing the single uri wihtout its pythia query params. Examples: >>> uris = ["file://video.mp4?muxer_width=1280&muxer_height=720"] >>> PythiaSource.pop_pythia_args_from_uris(uris) ({'muxer_width': 1280, 'muxer_height': 720}, ['file://video.mp4']) """ extracted, clean_uri = clean_single_uri(uris[0]) return extracted, [clean_uri]
CONNECTIONS: Con = {}
[docs] def gst(self) -> str: """Render from single uridecodebin up to nvmuxer. Returns: Rendered string """ return _( f"""\ uridecodebin uri={self.uris[0]} ! queue ! nvvideoconvert ! video/x-raw(memory:NVMM) ! m.sink_0 nvstreammux name=m batch-size={len(self)} width={self.pythia_params['muxer_width']} height={self.pythia_params['muxer_height']} """ )
[docs]class PythiaMultiSource(PythiaSourceBase): """Uridecodebin wrapper building block for multiple sources."""
[docs] @staticmethod def pop_pythia_args_from_uris( uris: Tuple[SourceUri, ...], ) -> Tuple[dict, List[SourceUri]]: """Extract muxer width and height. Args: uris: input uris to filter Returns: extracted: dictionary containing popped params. list containing the single uri wihtout its pythia query params. Examples: >>> uris = [ ... "./frames/%04d.jpg?muxer_width=320&muxer_height=240", ... "./annotations/%04d.jpg?muxer_width=1280&muxer_height=100", ... ] >>> PythiaMultiSource.pop_pythia_args_from_uris(uris) ({'muxer_width': 1280, 'muxer_height': 240}, ['./frames/%04d.jpg', './annotations/%04d.jpg']) """ # noqa: C0301 extrema = { "muxer_width": 0, "muxer_height": 0, } uris_out = [] for uri in uris: extracted, clean_uri = clean_single_uri(uri) uris_out.append(clean_uri) for key in extrema: extrema[key] = max(extrema[key], extracted[key]) return extrema, uris_out
[docs] def gst(self) -> str: """Render from several uridecodebin up to nvmuxer. Returns: Rendered string """ suffix = _( f"""\ nvstreammux name=m batch-size={len(self.uris)} """ ) text = "\n".join( f"""\ uridecodebin uri={self.uris[idx]} ! queue ! nvvideoconvert ! video/x-raw(memory:NVMM) ! m.sink_{idx} nvstreammux name=m batch-size=1 """ for idx in range(len(self.uris)) ) return f"{text}\n{suffix}"
[docs]class PythiaTestSource(PythiaSourceBase): """videotestsrc wrapper building block."""
[docs] @staticmethod def pop_pythia_args_from_uris( uris: Tuple[SourceUri, ...], ) -> Tuple[dict, List[SourceUri]]: """Extract muxer width and height. Args: uris: input uris to filter Returns: extracted: dictionary containing popped params. list containing the single uri wihtout its pythia query params. Examples: >>> uris = ["test://?muxer_width=320&muxer_height=240"] >>> PythiaTestSource.pop_pythia_args_from_uris(uris) ({'muxer_width': 320, 'muxer_height': 240}, ['test:']) """ extracted, clean_uri = clean_single_uri(uris[0]) return extracted, [clean_uri]
[docs] def gst(self) -> str: """Render from single videotestsrc up to nvmuxer. Returns: Rendered string. """ return _( f""" videotestsrc num-buffers={self.pythia_params['num_buffers']} ! queue ! nvvideoconvert ! video/x-raw(memory:NVMM) ! m.sink_0 nvstreammux name=m batch-size={len(self)} nvbuf-memory-type=0 width={self.pythia_params['muxer_width']} height={self.pythia_params['muxer_height']} """ )
[docs]class PythiaSink(abc.ABC, HasConnections): """Class used to construct sink from uris.""" CONNECTIONS: Con = {} VIDEO_EXTENSIONS = [ ".mp4", ".avi", ".mov", ".mkv", ".webm", ".flv", ".wmv", ".mpg", ".mpeg", ".m4v", ]
[docs] def __init__(self, uri: SinkUri) -> None: """Instantiate sink wrapper with one of the available uris. Args: uri: the uri to build a gst sink and finish the pipeline. """ self.uri = uri
[docs] @classmethod def from_uri(cls, uri: SinkUri) -> PS: """Factory constructor from `SinkUri` . Args: uri: the uri to use. Must fulfill one of the following conditions: * be one of ('live', 'fakesink'). If set to 'live', the output will be the screen. If set to 'fakesink', use the fakesing `Gst.Element` . * If a string containing a `%` , the underlying element will be a `multifilesink` . * Otherwise, it mus be a string pointing to a path, and have a valid and supported video extension. Returns: The instantiated `PythiaSink` , depending on its uri. Raises: ValueError: unsupported sink uri. """ if uri == "live": return PythiaLiveSink(uri) if uri == "fakesink": return PythiaFakesink(uri) if "%" in Path(uri).stem: return PythiaMultifileSink(uri) if Path(uri).suffix in cls.VIDEO_EXTENSIONS: return PythiaFilesink(uri) raise ValueError(f"Unknown sink uri: {uri}")
[docs] @abc.abstractmethod def gst(self) -> str: """Render as string with `gst-launch`-like syntax."""
[docs]class PythiaFakesink(PythiaSink): """fakesink wrapper building block for a single sink."""
[docs] def gst(self) -> str: """Simple fakesink. Returns: Rendered string """ return "fakesink"
[docs]class PythiaFilesink(PythiaSink): """filesink wrapper building block for a single sink. Uses `encodebin` to attempt to properly parse upstream buffers. """
[docs] def gst(self) -> str: """Render from single encodebin up to filesink. Returns: Rendered string """ return _( f"""\ encodebin ! filesink location="{self.uri}" """ )
[docs]class PythiaMultifileSink(PythiaSink): """multifilesink building block for a single multioutput sink. Uses `encodebin` to attempt to properly parse upstream buffers. """ SUPPORTED_FORMATS = { ".jpg": """ nvvideoconvert ! jpegenc quality=100 idct-method=float """, ".png": """ nvvideoconvert ! avenc_png """, ".webp": """ nvvideoconvert ! webpenc lossless=true quality=100 speed=6 """, }
[docs] def gst(self) -> str: """Render from single encodebin up to multifilesink. Returns: Rendered string """ encode = self.SUPPORTED_FORMATS[Path(self.uri).suffix] return _( f"""\ {encode} ! multifilesink location="{self.uri}" """ )
[docs]class PythiaLiveSink(PythiaSink): """nveglglessink wrapper."""
[docs] def __init__(self, uri: SinkUri, arch: str = "") -> None: """Construct nveglglessink wrapper. Args: uri: uri for `PythiaSink`'s constructor. arch: platform architecture, to differentiate GPU and jetson devices. If not set, automatically computed by :func:`get_arch`. In jetson devices, injects an additional `nvegltransform`. See Also: https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_FAQ.html#why-is-a-gst-nvegltransform-plugin-required-on-a-jetson-platform-upstream-from-gst-nveglglessink """ super().__init__(uri) self.arch = arch or get_arch() self.transform = "! nvegltransform" if get_arch() == "aarch64" else ""
[docs] def gst(self) -> str: """Render from nvvideoconvert to nveglglessink. Returns: Rendered string """ return _( f"""\ nvvideoconvert {self.transform} ! nveglglessink """ )
[docs]class BasePipeline(HasConnections, abc.ABC): """Common abstraction wrapper for pythia pipelines.""" _pipeline: Optional[Gst.Pipeline] = None models: Collection[InferenceEngine] analytics: Optional[Analytics] tracker: Optional[Tracker]
[docs] @abc.abstractmethod def gst(self) -> str: """Render its string for to use in `gst-launch`-like syntax."""
@property @abc.abstractmethod def CONNECTIONS(self) -> Con: # type: ignore[override] # noqa: C0103,C0116 ...
[docs] def validate( self, ) -> None: """Checks for internal compliance of specified elements. > Tracker requires at least one InferenceEngine > Analytics requires at least one InferenceEngine, and Tracker if it has Direction Detection or Line Crossing. > SecondaryInference Engine requires at least a PrimaryInferenceEngine Raises: IncompatiblePipelineError: `Analytics` requires `Tracker` but none supplied. IncompatiblePipelineError: `Tracker` requires `Model` but none supplied. """ if self.analytics: if len(self.models) < 1: raise IncompatiblePipelineError( f"Analytics requires at least 1 InferenceEngine." f" Found {len(self.models)}." ) if self.analytics.requires_tracker() and (self.tracker is None): raise IncompatiblePipelineError( "Current Analytics spec requires at least Tracker, " "but none found." ) if self.tracker: if len(self.models) < 1: raise IncompatiblePipelineError( "Tracker requires at least 1 InferenceEngine." f" Found {len(self.models)}." )
@property def pipeline(self) -> Gst.Pipeline: """Gstreamer pipeline lazy property. Returns: The only Gstremaer pipeline on this app, instantiated. """ if not self._pipeline: self._pipeline = self.parse_launch() return self._pipeline
[docs] def parse_launch(self) -> Gst.Pipeline: """Instantiate the internal `Gst.Pipeline`. Returns: The instantiated :class:`Gst.Pipeline`. Raises: NotImplementedError: pipeline already instantiated. InvalidPipelineError: Unable to parse pipeline because of a syntax error in the pipeline string. GLib.Error: Syntax unrelated error - unable to parse pipeline. """ gst_init() if self._pipeline: raise NotImplementedError( "TODO: make a copy of the pipeline," " this one is already in use" ) try: return Gst.parse_launch(self.gst()) except GLib.Error as exc: if "syntax error" in str(exc): raise InvalidPipelineError from exc raise
[docs] def start(self) -> Gst.StateChangeReturn: """Start the pipeline by setting it to PLAYING state. Returns: The state change result enum. Raises: RuntimeError: Unable to play the pipeline. """ self.validate() result = self.pipeline.set_state(Gst.State.PLAYING) if result is Gst.StateChangeReturn.FAILURE: self.stop() raise RuntimeError(f"ERROR: {UNABLE_TO_PLAY_PIPELINE}") return result
[docs] def stop(self) -> None: """Set the pipeline to null state.""" self.pipeline.set_state(Gst.State.NULL)
[docs] def send_eos(self) -> None: """Send a gstreamer 'end of stream' signal.""" self.pipeline.send_event(Gst.Event.new_eos())
ModelType = Union[ Collection[Union[Path, InferenceEngine]], Path, InferenceEngine, None ]
[docs]class Pipeline(BasePipeline): """Wrapper to ease pipeline creation from simple building blocks."""
[docs] def __init__( # noqa: R0913 self, sources: SourceUri | list[SourceUri] | tuple[SourceUri], models: ModelType = None, sink: SinkUri = "fakesink", analytics: Union[Path, Analytics] | None = None, tracker: Union[Path, Tracker] | None = None, ) -> None: """Initialize pipeline wrapper to incrementally build pipeline. Args: sources: Collection of uri sources to join in `nvstreammux`. models: Collection of models to insert in the pipeline. sink: Final element of the pipeline. analytics: Optional `nvdsanalytics`. tracker: Optional `nvtracker`. Raises: ValueError: invalid analytics or tracker object. """ super().__init__() if isinstance(sources, SourceUri): sources = [sources] self.source = PythiaSourceBase.from_uris(*sources) if isinstance(models, (Path, InferenceEngine)): models = [models] self.models = ( [ model if isinstance(model, InferenceEngine) else InferenceEngine.from_folder(model) for model in models ] if models else [] ) self._model_map: dict[str, InferenceEngine] = {} if analytics is None: self.analytics = analytics elif isinstance(analytics, Analytics): self.analytics = analytics elif isinstance(analytics, Path): self.analytics = Analytics.from_file(analytics) else: raise ValueError(f"Unhandled {analytics=}") if tracker is None: self.tracker = tracker elif isinstance(tracker, Tracker): self.tracker = tracker elif isinstance(tracker, Path): self.tracker = Tracker.from_file(tracker) else: raise ValueError(f"Unhandled {tracker=}") self.sink = PythiaSink.from_uri(sink)
@property def CONNECTIONS(self) -> Con: # type: ignore[override] # noqa: C0103 cons: Con = defaultdict(dict) for connectable in (self.source, *self.models, self.sink): for element_name, connections in connectable.CONNECTIONS.items(): for signal, callback in connections.items(): cons[element_name][signal] = callback return cons @property def model_map(self) -> Dict[str, InferenceEngine]: """Lazyproperty mapping from model names to inference engines. Returns: A dictionary whose keys are nvinfer names and their values are their respective :class:`InferenceEngine` wrappers. """ if not self._model_map: self.gst() return self._model_map
[docs] def gst(self) -> str: """Render its string for to use in `gst-launch`-like syntax. Returns: The pipeline as it would be used when calling `gst-launch`. """ source = self.source.gst() models = "" for idx, model in enumerate(self.models): name = f"model_{idx}" self._model_map[name] = model models += model.gst( name=name, unique_id=idx + 1, ) sink = self.sink.gst() tracker = self.tracker.gst() if self.tracker else None analytics = self.analytics.gst() if self.analytics else None return _( f""" {source} {'! ' + models if models else ''} {'! ' + tracker if tracker else ''} {'! ' + analytics if analytics else ''} ! {sink} """ )
[docs]class StringPipeline(BasePipeline): """Pythia pipeline wrapper to construct from pipeline strings.""" CONNECTIONS: Con = {}
[docs] def __init__(self, pipeline_string: str) -> None: """Initialize pipeline wrapper using a pipeline string. Args: pipeline_string: A `gst-launch`-like pipeline string. Raises: InvalidPipelineError: Unable to parse pipeline because of a syntax error in the pipeline string. GLib.Error: Syntax unrelated error - unable to parse pipeline. """ super().__init__() self.pipeline_string = pipeline_string try: self.pipeline except GLib.Error as exc: if "gst_parse_error" not in str(exc): raise raise InvalidPipelineError( f"Unable to parse pipeline:\n```gst\n{pipeline_string}\n```" ) from exc self.models = find_models(self.pipeline) self.analytics = find_analytics(self.pipeline) self.tracker = find_tracker(self.pipeline)
[docs] def gst(self) -> str: return self.pipeline_string