pythia.applications.base#

Application.

An application is composed of (at least) a pipeline. It implements two logics:

State management: Start/Stop/Play/Pause of the pipeline. Bus Call

class pythia.applications.base.Application(pipeline: BasePipeline)[source]#

Bases: BaseApplication

Typical pythia application.

Construct an application from a pipeline.

Parameters:

pipeline – an instantiated Pipeline.

on_message_eos(bus: Bus, message: Message)#

Stop application on EOS event.

Parameters:
  • self – A stoppable instance.

  • bus – The application’s pipeline’s bus.

  • message – The gstreamer state change message.

on_message_error(bus: Bus, message: Message) Tuple[str, str]#

Stop application on error.

Parameters:
  • self – A stoppable instance.

  • bus – The application’s pipeline’s bus.

  • message – The gstreamer error message.

Returns:

Error and debug string.

class pythia.applications.base.BackgroundThreadLoop(*args, loop: Optional[Loop] = None, **kwargs)[source]#

Bases: Thread, RunLoop

Simple thread to run a loop without blocking.

Initialize a background thread to run the loop in.

Parameters:
  • args – Forwarded to thread constructor.

  • loop – the loop to run. If not set, uses GLib.MainLoop

  • kwargs – Forwarded to thread constructor.

__init__(*args, loop: Optional[Loop] = None, **kwargs)[source]#

Initialize a background thread to run the loop in.

Parameters:
  • args – Forwarded to thread constructor.

  • loop – the loop to run. If not set, uses GLib.MainLoop

  • kwargs – Forwarded to thread constructor.

_abc_impl = <_abc_data object>#
_is_protocol = False#
quit() None[source]#

Quit the running loop.

run() None[source]#

Run and block running the loop.

class pythia.applications.base.BaseApplication(pipeline: BasePipeline)[source]#

Bases: object

Base pythia application to reduce boilerplate.

You can define pipeline message handlers and they will be called when pertinent. For example, by defining a method called ‘on_message_error’, pythia will connect said method as a signal handler for the ‘message::eos’ detailed signal. Only values from Gst.MessageType are allowed, see http://lazka.github.io/pgi-docs/index.html#Gst-1.0/flags.html#Gst.MessageType

Construct an application from a pipeline.

Parameters:

pipeline – an instantiated Pipeline.

__init__(pipeline: BasePipeline) None[source]#

Construct an application from a pipeline.

Parameters:

pipeline – an instantiated Pipeline.

_before_pipeline_start(loop) RunLoop[source]#

Call hook - run before pipeline start.

Parameters:

loop – the loop to run.

Returns:

A thread running the background loop.

_build_message_handlers() Dict[str, Callable[[BA, Bus, Message], None]][source]#
after_pipeline_start() None[source]#

Call hook - run after pipeline start.

backend(element_name: str, pad_direction: Literal['sink', 'src'], idx: int = 0) Backend[source]#

Retrieve a backend for a registered generator probe.

Parameters:
  • element_name – Gstreamer element name the probe is registered to.

  • pad_direction – The element’s pad direction where the buffer probe is attached.

  • idx – position in the registry array for the specified pad. As the most usual case a pad will have a single buffer probe, if this value is not set it defaults to 0, meaning the signle element in the list is retrieved.

Returns:

The backend associated with the generator probe. The backend

is in charge of posting the messages from the generator probe to an external service.

before_loop_join() None[source]#

Call hook - run before calling loop.join.

before_loop_quit() None[source]#

Call hook - run before calling loop.quit.

before_pipeline_start() None[source]#

Custom call hook - run before pipeline start.

property bus: Bus#

Get pipeline bus - lazy property.

Returns:

The pipeline’s bus.

Raises:

RuntimeError – Unable to get bus from pipeline.

connect_bus()[source]#

Attach bus signal watch depending on the message type.

disconnect_bus()[source]#

Dettach bus signal watch.

classmethod from_pipeline_file(pipeline_file: str | Path, *args, params: Dict[str, Any] | None = None, **kwargs) BA[source]#

Factory from pipeline file.

Parameters:
  • pipeline_file – the string to use to generate the pipeline.

  • params – Parameters to format the pipeline.

  • args – Forwarded to from_pipeline_file() factory.

  • kwargs – Forwarded to from_pipeline_file() factory.

Returns:

The instantiated app.

classmethod from_pipeline_string(pipeline: str, extractors: Probes | None = None) BA[source]#

Factory from pipeline string.

Parameters:
  • pipeline – the string to use to generate the pipeline.

  • extractors – buffer probes to inject to the pipeline.

Returns:

The instantiated app.

inject_probes(extractors: Dict[str, Dict[Literal['sink', 'src'], list]])[source]#

Register several probes.

Parameters:

extractors – mapping containing a collection of probes (callbacks) assigned to their respective element’s source (or sink) pad.

loop_cls#

alias of BackgroundThreadLoop

probe(element_name: str, pad_direction: ~typing.Literal['sink', 'src'], *probe_args, pad_probe_type: ~gi.repository.Gst.PadProbeType = <flags GST_PAD_PROBE_TYPE_BUFFER of type Gst.PadProbeType>, backend_uri: ~typing.Optional[str] = None)[source]#

Register function as a probe callback.

Parameters:
  • element_name – Name of the Gst.Element to attach the probe to.

  • pad_direction – : Direction of the Gst.Pad to attach the probe to.

  • probe_args – Optional additional args - forwarded to the decorated callback as varargs.

  • pad_probe_type – Buffer probe type.

  • backend_uri – If set, used to send messages from the probe to this remote backend. Only used (and mandatory in that case) when he callable to decorate is a generator.

Returns:

decorated callback, registered as an application buffer

probe.

This method has two usages: to decorate proper buffer probes, where the developer is in charge of handling the data, and to decorate a generator which yields incoming data. In the latter a ‘backend_uri’ is required and used to connect and send any and all generated data into the remote service.

stop() None[source]#

Stop application execution.

A loop must be running, and it is stopped as well.

Raises:
  • RuntimeError – no loop set.

  • RuntimeError – unabl to stop loop.

pythia.applications.base.BoundGstPadProbeCallback#

Gstreamer PadProbe must implement this protocol.

Using upstream ‘Gst.PadProbeCallback’ raises NotImplementedError.

alias of Callable[[BA, Pad, PadProbeInfo], PadProbeReturn]

class pythia.applications.base.ForegroundLoop(loop: Optional[Loop] = None)[source]#

Bases: RunLoop

Simple Loop wrapper implementation to run a blocking loop.

Initialize a foreground glib loop.

Parameters:

loop – the loop to run. If not set, uses GLib.MainLoop

__init__(loop: Optional[Loop] = None)[source]#

Initialize a foreground glib loop.

Parameters:

loop – the loop to run. If not set, uses GLib.MainLoop

_abc_impl = <_abc_data object>#
_is_protocol = False#
join() None[source]#

Run the main loop in the foreground.

property loop: Loop#

Lazy property for the loop.

Returns:

initialized or pre-existing loop.

quit() None[source]#

Quit the main loop running the foreground.

start() None[source]#

Ensure the lazy property is accessed at least once.

class pythia.applications.base.RunLoop(*args, **kwargs)[source]#

Bases: Protocol

Loop wrapper interface.

_abc_impl = <_abc_data object>#
_is_protocol = True#
join() None[source]#

Run and block the loop.

quit() None[source]#

Stop the running loop.

start() None[source]#

Ensure the loop is running / activated.

pythia.applications.base._build_probe(probe, backend_uri: Optional[str] = None)[source]#
pythia.applications.base._get_from_annotations(signature, name) Optional[int][source]#
pythia.applications.base._get_from_positional_arg_name(signature, name) Optional[int][source]#
pythia.applications.base._get_probe_batch_meta_idx(signature: FullArgSpec) Optional[int][source]#
pythia.applications.base._get_probe_info_idx(signature: FullArgSpec) Optional[int][source]#
pythia.applications.base._get_probe_pad_idx(signature: FullArgSpec) Optional[int][source]#