Coverage for src/pythia/utils/ds.py: 59%

49 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2022-10-07 19:27 +0000

1"""Deepstream interface, utilities and customization.""" 

2from __future__ import annotations 

3 

4from typing import Any 

5from typing import Callable 

6from typing import ClassVar 

7from typing import Protocol 

8from typing import Type 

9 

10import pyds 

11 

12from pythia.types import PydsClass 

13from pythia.utils.gst import Gst 

14 

15 

16def buf2batchmeta(gst_buffer: Gst.Buffer) -> pyds.NvDsBatchMeta: 

17 """Get batch metadata from gstreamer buffer. 

18 

19 Args: 

20 gst_buffer: gstreamer buffer, as received in a pad buffer probe. 

21 

22 Returns: 

23 The deepstream metadata contained in the buffer. 

24 

25 See Also: 

26 :func:`pyds.gst_buffer_get_nvds_batch_meta` 

27 

28 """ 

29 return pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer)) 

30 

31 

32def info2batchmeta(info: Gst.PadProbeInfo) -> pyds.NvDsBatchMeta | None: 

33 """Get batch metadata from gstreamer buffer probe info. 

34 

35 Args: 

36 info: gstreamer probe info, as received in a pad buffer probe. 

37 

38 Returns: 

39 The deepstream metadata contained in the buffer. 

40 

41 See Also: 

42 :func:`pyds.gst_buffer_get_nvds_batch_meta` 

43 

44 """ 

45 gst_buffer = info.get_buffer() 

46 if not gst_buffer: 

47 print("Unable to get GstBuffer ") 

48 return None 

49 return buf2batchmeta(gst_buffer) 

50 

51 

52def _is_analytics_meta(user_meta: pyds.NvDsUserMeta) -> bool: 

53 return user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type( 

54 "NVIDIA.DSANALYTICSOBJ.USER_META" 

55 ) 

56 

57 

58def _is_frameanalytics_meta(user_meta: pyds.NvDsUserMeta) -> bool: 

59 return user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type( 

60 "NVIDIA.DSANALYTICSFRAME.USER_META" 

61 ) 

62 

63 

64def _is_segmentation_meta(user_meta: pyds.NvDsUserMeta) -> bool: 

65 return user_meta.base_meta.meta_type == pyds.NVDSINFER_SEGMENTATION_META 

66 

67 

68def inject_external_classification( 

69 batch_meta: pyds.NvDsBatchMeta, 

70 obj_meta: pyds.NvDsObjectMeta, 

71 **data: dict[str, Any], 

72) -> None: 

73 """Inject classification metadata. 

74 

75 If "label" is present, also injects it into the display meta. 

76 

77 Args: 

78 batch_meta: deepstream batch metadata. 

79 obj_meta: deepstream object metadata to inject classification 

80 into. 

81 data: parameters for :class:`pyds.NvDsLabelInfo`. 

82 

83 """ 

84 

85 classifier_meta = pyds.nvds_acquire_classifier_meta_from_pool(batch_meta) 

86 label_info = pyds.nvds_acquire_label_info_meta_from_pool(batch_meta) 

87 

88 for name, value in data.items(): 

89 setattr(label_info, name, value) 

90 

91 pyds.nvds_add_label_info_meta_to_classifier(classifier_meta, label_info) 

92 pyds.nvds_add_classifier_meta_to_object(obj_meta, classifier_meta) 

93 

94 if "label" in data: 

95 label = data["label"] 

96 txt_params = obj_meta.text_params 

97 original = pyds.get_string(txt_params.display_text) 

98 obj_meta.text_params.display_text = f"{original} {label}" 

99 

100 

101UserMetaCondition = Callable[[pyds.NvDsUserMeta], bool] 

102 

103 

104class SupportedUserMeta(Protocol): # noqa: R0903 

105 """Minimum API to parse custom user meta.""" 

106 

107 condition: ClassVar[UserMetaCondition] 

108 """Discriminator to filteruser meta type.""" 

109 

110 klass: ClassVar[Type[PydsClass]] 

111 """Class to use for casting when filter passes.""" 

112 

113 

114class FrameAnalytics(SupportedUserMeta): # noqa: R0903 

115 """Per-frame analytics from `nvdsanalytics`.""" 

116 

117 condition = _is_frameanalytics_meta 

118 klass = pyds.NvDsAnalyticsFrameMeta 

119 

120 

121class ObjectAnalytics(SupportedUserMeta): # noqa: R0903 

122 """Per-object analytics from `nvdsanalytics`.""" 

123 

124 condition = _is_analytics_meta 

125 klass = pyds.NvDsAnalyticsObjInfo 

126 

127 

128class SemanticMasks(SupportedUserMeta): # noqa: R0903 

129 """Per-object semantic segmentation masks from `nvinfer`.""" 

130 

131 condition = _is_segmentation_meta 

132 klass = pyds.NvDsInferSegmentationMeta