pythia.pipelines.base#

Pipeline.

A Gstreamer pipeline, used to process video/image input. Contains:

A source: video/image. A sink: display/file. At least one PythIA model.

Note

Although a one-shot uridecodebin usage seems to work, there seems to

be an issue with quickly subsequent runs, producing segfaults.

class pythia.pipelines.base.BasePipeline(*args, **kwargs)[source]#

Bases: HasConnections, ABC

Common abstraction wrapper for pythia pipelines.

abstract property CONNECTIONS: Dict[str, Dict[str, Callable]]#
_abc_impl = <_abc_data object>#
_is_protocol = False#
_pipeline: Optional[Pipeline] = None#
analytics: Optional[Analytics]#
abstract gst() str[source]#

Render its string for to use in gst-launch-like syntax.

models: Collection[InferenceEngine]#
parse_launch() Pipeline[source]#

Instantiate the internal Gst.Pipeline.

Returns:

The instantiated Gst.Pipeline.

Raises:
  • NotImplementedError – pipeline already instantiated.

  • InvalidPipelineError – Unable to parse pipeline because of a syntax error in the pipeline string.

  • GLib.Error – Syntax unrelated error - unable to parse pipeline.

property pipeline: Pipeline#

Gstreamer pipeline lazy property.

Returns:

The only Gstremaer pipeline on this app, instantiated.

send_eos() None[source]#

Send a gstreamer ‘end of stream’ signal.

start() StateChangeReturn[source]#

Start the pipeline by setting it to PLAYING state.

Returns:

The state change result enum.

Raises:

RuntimeError – Unable to play the pipeline.

stop() None[source]#

Set the pipeline to null state.

tracker: Optional[Tracker]#
validate() None[source]#

Checks for internal compliance of specified elements.

> Tracker requires at least one InferenceEngine > Analytics requires at least one InferenceEngine, and Tracker if it has Direction Detection or Line Crossing. > SecondaryInference Engine requires at least a PrimaryInferenceEngine

Raises:
class pythia.pipelines.base.Pipeline(sources: SourceUri | list[SourceUri] | tuple[SourceUri], models: ModelType = None, sink: SinkUri = 'fakesink', analytics: Union[Path, Analytics] | None = None, tracker: Union[Path, Tracker] | None = None)[source]#

Bases: BasePipeline

Wrapper to ease pipeline creation from simple building blocks.

Initialize pipeline wrapper to incrementally build pipeline.

Parameters:
  • sources – Collection of uri sources to join in nvstreammux.

  • models – Collection of models to insert in the pipeline.

  • sink – Final element of the pipeline.

  • analytics – Optional nvdsanalytics.

  • tracker – Optional nvtracker.

Raises:

ValueError – invalid analytics or tracker object.

property CONNECTIONS: Dict[str, Dict[str, Callable]]#
__init__(sources: SourceUri | list[SourceUri] | tuple[SourceUri], models: ModelType = None, sink: SinkUri = 'fakesink', analytics: Union[Path, Analytics] | None = None, tracker: Union[Path, Tracker] | None = None) None[source]#

Initialize pipeline wrapper to incrementally build pipeline.

Parameters:
  • sources – Collection of uri sources to join in nvstreammux.

  • models – Collection of models to insert in the pipeline.

  • sink – Final element of the pipeline.

  • analytics – Optional nvdsanalytics.

  • tracker – Optional nvtracker.

Raises:

ValueError – invalid analytics or tracker object.

_abc_impl = <_abc_data object>#
_is_protocol = False#
analytics: Optional[Analytics]#
gst() str[source]#

Render its string for to use in gst-launch-like syntax.

Returns:

The pipeline as it would be used when calling gst-launch.

property model_map: Dict[str, InferenceEngine]#

Lazyproperty mapping from model names to inference engines.

Returns:

A dictionary whose keys are nvinfer names and their values

are their respective InferenceEngine wrappers.

models: Collection[InferenceEngine]#
tracker: Optional[Tracker]#
class pythia.pipelines.base.PythiaFakesink(uri: str)[source]#

Bases: PythiaSink

fakesink wrapper building block for a single sink.

Instantiate sink wrapper with one of the available uris.

Parameters:

uri – the uri to build a gst sink and finish the pipeline.

_abc_impl = <_abc_data object>#
_is_protocol = False#
gst() str[source]#

Simple fakesink.

Returns:

Rendered string

class pythia.pipelines.base.PythiaFilesink(uri: str)[source]#

Bases: PythiaSink

filesink wrapper building block for a single sink.

Uses encodebin to attempt to properly parse upstream buffers.

Instantiate sink wrapper with one of the available uris.

Parameters:

uri – the uri to build a gst sink and finish the pipeline.

_abc_impl = <_abc_data object>#
_is_protocol = False#
gst() str[source]#

Render from single encodebin up to filesink.

Returns:

Rendered string

class pythia.pipelines.base.PythiaLiveSink(uri: str, arch: str = '')[source]#

Bases: PythiaSink

nveglglessink wrapper.

Construct nveglglessink wrapper.

Parameters:
  • uri – uri for PythiaSink’s constructor.

  • arch – platform architecture, to differentiate GPU and jetson devices. If not set, automatically computed by get_arch(). In jetson devices, injects an additional nvegltransform.

__init__(uri: str, arch: str = '') None[source]#

Construct nveglglessink wrapper.

Parameters:
  • uri – uri for PythiaSink’s constructor.

  • arch – platform architecture, to differentiate GPU and jetson devices. If not set, automatically computed by get_arch(). In jetson devices, injects an additional nvegltransform.

_abc_impl = <_abc_data object>#
_is_protocol = False#
gst() str[source]#

Render from nvvideoconvert to nveglglessink.

Returns:

Rendered string

class pythia.pipelines.base.PythiaMultiSource(*uris: str)[source]#

Bases: PythiaSourceBase

Uridecodebin wrapper building block for multiple sources.

Construct an instance from SourceUri s.

Parameters:

uris – Collection of SourceUri s.

_abc_impl = <_abc_data object>#
_is_protocol = False#
gst() str[source]#

Render from several uridecodebin up to nvmuxer.

Returns:

Rendered string

static pop_pythia_args_from_uris(uris: Tuple[str, ...]) Tuple[dict, List[str]][source]#

Extract muxer width and height.

Parameters:

uris – input uris to filter

Returns:

dictionary containing popped params. list containing the single uri wihtout its pythia query params.

Return type:

extracted

Examples

>>> uris = [
...     "./frames/%04d.jpg?muxer_width=320&muxer_height=240",
...     "./annotations/%04d.jpg?muxer_width=1280&muxer_height=100",
... ]
>>> PythiaMultiSource.pop_pythia_args_from_uris(uris)
({'muxer_width': 1280, 'muxer_height': 240}, ['./frames/%04d.jpg', './annotations/%04d.jpg'])
class pythia.pipelines.base.PythiaMultifileSink(uri: str)[source]#

Bases: PythiaSink

multifilesink building block for a single multioutput sink.

Uses encodebin to attempt to properly parse upstream buffers.

Instantiate sink wrapper with one of the available uris.

Parameters:

uri – the uri to build a gst sink and finish the pipeline.

SUPPORTED_FORMATS = {'.jpg': '\n            nvvideoconvert\n            ! jpegenc\n              quality=100\n              idct-method=float\n        ', '.png': '\n            nvvideoconvert\n            ! avenc_png\n        ', '.webp': '\n            nvvideoconvert\n            ! webpenc\n              lossless=true\n              quality=100\n              speed=6\n        '}#
_abc_impl = <_abc_data object>#
_is_protocol = False#
gst() str[source]#

Render from single encodebin up to multifilesink.

Returns:

Rendered string

class pythia.pipelines.base.PythiaSink(uri: str)[source]#

Bases: ABC, HasConnections

Class used to construct sink from uris.

Instantiate sink wrapper with one of the available uris.

Parameters:

uri – the uri to build a gst sink and finish the pipeline.

CONNECTIONS: Dict[str, Dict[str, Callable]] = {}#
VIDEO_EXTENSIONS = ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.flv', '.wmv', '.mpg', '.mpeg', '.m4v']#
__init__(uri: str) None[source]#

Instantiate sink wrapper with one of the available uris.

Parameters:

uri – the uri to build a gst sink and finish the pipeline.

_abc_impl = <_abc_data object>#
_is_protocol = False#
classmethod from_uri(uri: str) Union[PythiaFakesink, PythiaFilesink, PythiaMultifileSink, PythiaLiveSink][source]#

Factory constructor from SinkUri .

Parameters:

uri

the uri to use. Must fulfill one of the following conditions:

  • be one of (‘live’, ‘fakesink’). If set to ‘live’, the output will be the screen. If set to ‘fakesink’, use the fakesing Gst.Element .

  • If a string containing a % , the underlying element will be a multifilesink .

  • Otherwise, it mus be a string pointing to a path, and have a valid and supported video extension.

Returns:

The instantiated PythiaSink , depending on its uri.

Raises:

ValueError – unsupported sink uri.

abstract gst() str[source]#

Render as string with gst-launch-like syntax.

class pythia.pipelines.base.PythiaSource(*uris: str)[source]#

Bases: PythiaSourceBase

Uridecodebin wrapper building block for a single source.

Construct an instance from SourceUri s.

Parameters:

uris – Collection of SourceUri s.

CONNECTIONS: Dict[str, Dict[str, Callable]] = {}#
_abc_impl = <_abc_data object>#
_is_protocol = False#
gst() str[source]#

Render from single uridecodebin up to nvmuxer.

Returns:

Rendered string

static pop_pythia_args_from_uris(uris: Tuple[str, ...]) Tuple[dict, List[str]][source]#

Extract muxer width and height.

Parameters:

uris – input uris to filter

Returns:

dictionary containing popped params. list containing the single uri wihtout its pythia query params.

Return type:

extracted

Examples

>>> uris = ["file://video.mp4?muxer_width=1280&muxer_height=720"]
>>> PythiaSource.pop_pythia_args_from_uris(uris)
({'muxer_width': 1280, 'muxer_height': 720}, ['file://video.mp4'])
class pythia.pipelines.base.PythiaSourceBase(*uris: str)[source]#

Bases: ABC, HasConnections

Base class wrapper for Gstreamer sources.

The main goal is to define a skeleton for quickly building sources, and subclasses must implement their rendering logic in the gst method.

Construct an instance from SourceUri s.

Parameters:

uris – Collection of SourceUri s.

CONNECTIONS: Dict[str, Dict[str, Callable]] = {}#
__init__(*uris: str) None[source]#

Construct an instance from SourceUri s.

Parameters:

uris – Collection of SourceUri s.

_abc_impl = <_abc_data object>#
_is_protocol = False#
classmethod from_uris(*uris: str) Union[PythiaTestSource, PythiaSource, PythiaMultiSource][source]#

Factory to build a concrete source from a collection of uris.

Depending on the received uris, instantiates a concrete PythiaSourceBase.

Parameters:

uris – Collection of uris to build the source from.

Returns:

The instantiated source object.

Raises:

ValueError – No source uris received

abstract gst() str[source]#

Render as string with gst-launch-like syntax.

abstract static pop_pythia_args_from_uris(uris: Tuple[str, ...]) Tuple[dict, List[str]][source]#

Pop pythia-related query params from source uri.

Parameters:

uris – input uris to filter

class pythia.pipelines.base.PythiaTestSource(*uris: str)[source]#

Bases: PythiaSourceBase

videotestsrc wrapper building block.

Construct an instance from SourceUri s.

Parameters:

uris – Collection of SourceUri s.

_abc_impl = <_abc_data object>#
_is_protocol = False#
gst() str[source]#

Render from single videotestsrc up to nvmuxer.

Returns:

Rendered string.

static pop_pythia_args_from_uris(uris: Tuple[str, ...]) Tuple[dict, List[str]][source]#

Extract muxer width and height.

Parameters:

uris – input uris to filter

Returns:

dictionary containing popped params. list containing the single uri wihtout its pythia query params.

Return type:

extracted

Examples

>>> uris = ["test://?muxer_width=320&muxer_height=240"]
>>> PythiaTestSource.pop_pythia_args_from_uris(uris)
({'muxer_width': 320, 'muxer_height': 240}, ['test:'])
class pythia.pipelines.base.StringPipeline(pipeline_string: str)[source]#

Bases: BasePipeline

Pythia pipeline wrapper to construct from pipeline strings.

Initialize pipeline wrapper using a pipeline string.

Parameters:

pipeline_string – A gst-launch-like pipeline string.

Raises:
  • InvalidPipelineError – Unable to parse pipeline because of a syntax error in the pipeline string.

  • GLib.Error – Syntax unrelated error - unable to parse pipeline.

CONNECTIONS: Dict[str, Dict[str, Callable]] = {}#
__init__(pipeline_string: str) None[source]#

Initialize pipeline wrapper using a pipeline string.

Parameters:

pipeline_string – A gst-launch-like pipeline string.

Raises:
  • InvalidPipelineError – Unable to parse pipeline because of a syntax error in the pipeline string.

  • GLib.Error – Syntax unrelated error - unable to parse pipeline.

_abc_impl = <_abc_data object>#
_is_protocol = False#
gst() str[source]#

Render its string for to use in gst-launch-like syntax.

pythia.pipelines.base.clean_single_uri(uri: str) Tuple[dict, str][source]#

Extract muxer width and height.

Parameters:

uri – input uris to parse.

Returns:

dictionary containing popped params. list containing the single uri wihtout its pythia query params.

Return type:

extracted

Examples

>>> clean_single_uri("file://video.mp4?muxer_width=1280&muxer_height=720")
({'muxer_width': 1280, 'muxer_height': 720}, ['file://video.mp4'])