Coverage for src/pythia/types.py: 91%
37 statements
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
« prev ^ index » next coverage.py v6.4.4, created at 2022-10-07 19:27 +0000
1"""Types for pythia."""
3from __future__ import annotations
5from typing import Any
6from typing import Callable
7from typing import Dict
8from typing import Protocol
9from typing import TYPE_CHECKING
10from typing import TypedDict
11from typing import TypeVar
12from typing import Union
14import pyds
16from pythia.utils.gst import Gst
17from pythia.utils.gst import PadDirection
19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20, because the condition on line 19 was never true
20 from pythia.event_stream import base as base_stream
23Serializable = Dict[str, Any]
24EventStreamUri = str
26SourceUri = str
27"""should start with `scheme://`.
30* multifilesink usese `multifile://` for the uri.
31* v4l2src uses `v4l2://` for the uri.
32* filesrc uses `file://` for the uri.
34See Also:
35https://gstreamer.freedesktop.org/documentation/gstreamer/gsturihandler.html?gi-language=c#GstURIHandler
36"""
38SinkUri = str
39"""
41Examples:
43 /path/to/video.mp4
44 /path/to/frames/%04d.jpg
45 livesink -> nveglglessink
46 fakesink -> fakesink
47 appsink -> TODO
48"""
51class Loop(Protocol):
52 """A loop interface, where code is executed.
54 Example:
55 >>> from gi.repository import GLib
56 >>> loop = GLib.MainLoop()
58 """
60 def run(self) -> Any: # noqa: A003, C0116
61 ...
63 def quit(self) -> Any: # noqa: A003, C0116
64 ...
67PC = TypeVar("PC", bound="PydsClass")
70PydsClass = Union[
71 pyds.NvDsAnalyticsFrameMeta,
72 pyds.NvDsAnalyticsObjInfo,
73 pyds.NvDsInferSegmentationMeta,
74 pyds.NvDsUserMeta,
75 pyds.NvDsLabelInfo,
76 pyds.NvDsFrameMeta,
77 pyds.NvDsObjectMeta,
78 pyds.NvDsClassifierMeta,
79]
80"""Common pyds metadata class API."""
83class RegisteredProbe(TypedDict):
84 """Simple storage for generator-induced buffer probes."""
86 probe: SupportedCb
87 backend: base_stream.Backend
90Probes = Dict[str, Dict[PadDirection, list]]
92GstPadProbeCallback = Callable[[Gst.Pad, Gst.PadProbeInfo], Gst.PadProbeReturn]
93"""Gstreamer PadProbe must implement this protocol.
95Using upstream 'Gst.PadProbeCallback' raises NotImplementedError.
97"""
98BatchMetaCb = Callable[[pyds.NvDsBatchMeta], Gst.PadProbeReturn]
99FullPadCb = Callable[
100 [Gst.Pad, Gst.PadProbeInfo, pyds.NvDsBatchMeta], Gst.PadProbeReturn
101]
102SupportedCb = Union[
103 GstPadProbeCallback,
104 BatchMetaCb,
105 FullPadCb,
106]
109PT = TypeVar(
110 "PT",
111 GstPadProbeCallback,
112 BatchMetaCb,
113 FullPadCb,
114)
116Con = Dict[str, Dict[str, Callable]]
117"""Mapping of element-name to gst elements' signal callbacks.
119Example:
120 >>> def cb(*a, **kw): ...
121 >>> con = {"pgie": {"pad-added": cb}}
122"""
125class HasConnections(Protocol):
126 """Protocol to indicate a class has connections.
128 See Also:
129 :obj:`Con`
131 """
133 CONNECTIONS: Con