Coverage for src/pythia/applications/base.py: 74%

261 statements  

« prev     ^ index     » next       coverage.py v6.4.4, created at 2022-10-07 19:27 +0000

1"""Application. 

2 

3An application is composed of (at least) a pipeline. It implements two logics: 

4 

5 State management: Start/Stop/Play/Pause of the pipeline. 

6 Bus Call 

7 

8""" 

9from __future__ import annotations 

10 

11import inspect 

12import os 

13from collections import defaultdict 

14from logging import getLogger 

15from pathlib import Path 

16from threading import Thread 

17from typing import Any 

18from typing import Callable 

19from typing import Dict 

20from typing import Optional 

21from typing import Protocol 

22from typing import Type 

23from typing import TypeVar 

24from typing import Union 

25 

26import pyds 

27 

28from pythia.event_stream.base import Backend 

29from pythia.pipelines.base import BasePipeline 

30from pythia.pipelines.base import StringPipeline 

31from pythia.types import EventStreamUri 

32from pythia.types import Loop 

33from pythia.types import Probes 

34from pythia.types import SupportedCb 

35from pythia.utils.ds import info2batchmeta 

36from pythia.utils.gst import get_element 

37from pythia.utils.gst import get_static_pad 

38from pythia.utils.gst import GLib 

39from pythia.utils.gst import Gst 

40from pythia.utils.gst import gst_init 

41from pythia.utils.gst import PadDirection 

42from pythia.utils.message_handlers import on_message_eos 

43from pythia.utils.message_handlers import on_message_error 

44 

45logger = getLogger(__name__) 

46 

47GST_DEBUG_DUMP_DOT_DIR = os.environ.get("GST_DEBUG_DUMP_DOT_DIR", None) 

48 

49 

50class RunLoop(Protocol): 

51 """Loop wrapper interface.""" 

52 

53 def __init__(self, loop: Optional[Loop], *args, **kwargs) -> None: 

54 """Initialize a loop wrapper. 

55 

56 Args: 

57 args: Implementation constructor optional args. 

58 loop: The loop to wrap. 

59 kwargs: Implementation constructor optional args. 

60 

61 """ 

62 

63 def join(self) -> None: 

64 """Run and block the loop.""" 

65 

66 def start(self) -> None: 

67 """Ensure the loop is running / activated.""" 

68 

69 def quit(self) -> None: # noqa: A003 

70 """Stop the running loop.""" 

71 

72 

73class ForegroundLoop(RunLoop): 

74 """Simple Loop wrapper implementation to run a blocking loop.""" 

75 

76 def __init__(self, loop: Optional[Loop] = None): 

77 """Initialize a foreground glib loop. 

78 

79 Args: 

80 loop: the loop to run. If not set, uses 

81 :class:`GLib.MainLoop` 

82 

83 """ 

84 self._loop = loop 

85 

86 @property 

87 def loop(self) -> Loop: 

88 """Lazy property for the loop. 

89 

90 Returns: 

91 initialized or pre-existing loop. 

92 

93 """ 

94 if not self._loop: 

95 self._loop = GLib.MainLoop() 

96 return self._loop 

97 

98 def start(self) -> None: 

99 """Ensure the lazy property is accessed at least once.""" 

100 self.loop # noqa: W0104 

101 

102 def join(self) -> None: 

103 """Run the main loop in the foreground.""" 

104 self.loop.run() 

105 

106 def quit(self) -> None: # noqa: A003 

107 """Quit the main loop running the foreground.""" 

108 self.loop.quit() 

109 

110 

111class BackgroundThreadLoop(Thread, RunLoop): 

112 """Simple thread to run a loop without blocking.""" 

113 

114 def __init__(self, *args, loop: Optional[Loop] = None, **kwargs): 

115 """Initialize a background thread to run the loop in. 

116 

117 Args: 

118 args: Forwarded to thread constructor. 

119 loop: the loop to run. If not set, uses 

120 :class:`GLib.MainLoop` 

121 kwargs: Forwarded to thread constructor. 

122 

123 """ 

124 kwargs.setdefault("daemon", True) 

125 super().__init__(*args, **kwargs) 

126 self._loop = loop or GLib.MainLoop() 

127 

128 def run(self) -> None: 

129 """Run and block running the loop.""" 

130 self._loop.run() 

131 

132 def quit(self) -> None: # noqa: A003 

133 """Quit the running loop.""" 

134 self._loop.quit() 

135 

136 

137BA = TypeVar("BA", bound="BaseApplication") 

138 

139OnBusMessage = Callable[[BA, Gst.Bus, Gst.Message], None] 

140 

141 

142BoundGstPadProbeCallback = Callable[ 

143 [BA, Gst.Pad, Gst.PadProbeInfo], Gst.PadProbeReturn 

144] 

145"""Gstreamer PadProbe must implement this protocol. 

146 

147Using upstream 'Gst.PadProbeCallback' raises NotImplementedError. 

148 

149""" 

150BoundBatchMetaCb = Callable[[BA, pyds.NvDsBatchMeta], Gst.PadProbeReturn] 

151BoundFullPadCb = Callable[ 

152 [BA, Gst.Pad, Gst.PadProbeInfo, pyds.NvDsBatchMeta], Gst.PadProbeReturn 

153] 

154 

155BoundSupportedCb = Union[ 

156 BoundGstPadProbeCallback, 

157 BoundBatchMetaCb, 

158 BoundFullPadCb, 

159] 

160 

161CB = TypeVar("CB", SupportedCb, BoundSupportedCb) 

162 

163 

164class BaseApplication: 

165 """Base pythia application to reduce boilerplate. 

166 

167 You can define pipeline message handlers and they will be called 

168 when pertinent. For example, by defining a method called 

169 'on_message_error', pythia will connect said method as a signal 

170 handler for the 'message::eos' detailed signal. Only values from 

171 `Gst.MessageType` are allowed, see 

172 http://lazka.github.io/pgi-docs/index.html#Gst-1.0/flags.html#Gst.MessageType 

173 

174 """ 

175 

176 loop_cls: Type[RunLoop] = BackgroundThreadLoop 

177 

178 def __init__(self, pipeline: BasePipeline) -> None: 

179 """Construct an application from a pipeline. 

180 

181 Args: 

182 pipeline: an instantiated `Pipeline`. 

183 

184 """ 

185 self.pipeline = pipeline 

186 self.loop: Optional[RunLoop] = None 

187 self._bus: Optional[Gst.Bus] = None 

188 self._registered_probes: Probes = defaultdict( 

189 lambda: defaultdict(list) 

190 ) 

191 self._message_handlers = self._build_message_handlers() 

192 self.watch_ids: list[int] = [] 

193 

194 def _build_message_handlers(self) -> Dict[str, OnBusMessage]: 

195 handlers = {} 

196 for name in dir(self): 

197 if not name.startswith("on_message"): 

198 continue 

199 if name.startswith("on_message_"): 199 ↛ 204line 199 didn't jump to line 204, because the condition on line 199 was never false

200 key = "message::{}".format( # noqa: C0209 

201 name.split("on_message")[1].lstrip("_").replace("_", "-") 

202 ) 

203 else: 

204 key = "message" 

205 handlers[key] = getattr(self, name) 

206 return handlers 

207 

208 @classmethod 

209 def from_pipeline_string( 

210 cls: Type[BA], pipeline: str, extractors: Probes | None = None 

211 ) -> BA: 

212 """Factory from pipeline string. 

213 

214 Args: 

215 pipeline: the string to use to generate the pipeline. 

216 extractors: buffer probes to inject to the pipeline. 

217 

218 Returns: 

219 The instantiated app. 

220 

221 """ 

222 app = cls(StringPipeline(pipeline)) 

223 app.inject_probes(extractors or {}) 

224 return app 

225 

226 @classmethod 

227 def from_pipeline_file( 

228 cls: Type[BA], 

229 pipeline_file: str | Path, 

230 *args, 

231 params: Dict[str, Any] | None = None, 

232 **kwargs, 

233 ) -> BA: 

234 """Factory from pipeline file. 

235 

236 Args: 

237 pipeline_file: the string to use to generate the pipeline. 

238 params: Parameters to format the pipeline. 

239 args: Forwarded to :meth:`from_pipeline_file` factory. 

240 kwargs: Forwarded to :meth:`from_pipeline_file` factory. 

241 

242 Returns: 

243 The instantiated app. 

244 

245 """ 

246 

247 pipeline = Path(pipeline_file).read_text(encoding="utf-8") 

248 pipeline_string = pipeline.format_map(params or {}) 

249 return cls.from_pipeline_string(pipeline_string, *args, **kwargs) 

250 

251 @property 

252 def bus(self) -> Gst.Bus: 

253 """Get pipeline bus - lazy property. 

254 

255 Returns: 

256 The pipeline's bus. 

257 

258 Raises: 

259 RuntimeError: Unable to get bus from pipeline. 

260 

261 """ 

262 if self._bus is None: 

263 self._bus = self.pipeline.pipeline.get_bus() 

264 if self._bus is None: 264 ↛ 265line 264 didn't jump to line 265, because the condition on line 264 was never true

265 raise RuntimeError("Unable to get Pipeline bus") 

266 return self._bus 

267 

268 def _before_pipeline_start(self, loop) -> RunLoop: 

269 """Call hook - run before pipeline start. 

270 

271 Args: 

272 loop: the loop to run. 

273 

274 Returns: 

275 A thread running the background loop. 

276 

277 """ 

278 for element_name, connection in self.pipeline.CONNECTIONS.items(): 278 ↛ 279line 278 didn't jump to line 279, because the loop on line 278 never started

279 element = get_element(self.pipeline.pipeline, element_name) 

280 for signal, callback in connection.items(): 

281 element.connect(signal, callback) 

282 

283 self.connect_bus() 

284 self.loop = self.loop_cls(loop=loop) 

285 self.loop.start() 

286 self.before_pipeline_start() 

287 return self.loop 

288 

289 def before_pipeline_start(self) -> None: 

290 """Custom call hook - run before pipeline start.""" 

291 

292 def after_pipeline_start(self) -> None: 

293 """Call hook - run after pipeline start.""" 

294 

295 def before_loop_join(self) -> None: 

296 """Call hook - run before calling `loop.join`.""" 

297 

298 def before_loop_quit(self) -> None: 

299 """Call hook - run before calling `loop.quit`.""" 

300 

301 def connect_bus(self): 

302 """Attach bus signal watch depending on the message type.""" 

303 if not self._message_handlers: 303 ↛ 304line 303 didn't jump to line 304, because the condition on line 303 was never true

304 return 

305 

306 self.bus.add_signal_watch() 

307 self.watch_ids = [ 

308 self.bus.connect(signal, handler) 

309 for signal, handler in self._message_handlers.items() 

310 ] 

311 

312 def disconnect_bus(self): 

313 """Dettach bus signal watch.""" 

314 if self.bus.have_pending(): 

315 self.bus.set_flushing(True) 

316 self.bus.set_flushing(False) 

317 self.bus.remove_signal_watch() 

318 for watch_id in self.watch_ids: 

319 self.bus.disconnect(watch_id) 

320 self.watch_ids = [] 

321 

322 def __call__( 

323 self, *, loop: Optional[Loop] = None, foreground: bool = True 

324 ) -> None: 

325 """Execute the aplication and run its loop. 

326 

327 Args: 

328 loop: if not set, one is provided. 

329 foreground: whether to block when running. 

330 

331 Raises: 

332 RuntimeError: loop is already running. 

333 RuntimeError: Unable to start pipeline. 

334 

335 """ 

336 if self.loop: 336 ↛ 337line 336 didn't jump to line 337, because the condition on line 336 was never true

337 raise RuntimeError("Loop already running") 

338 

339 gst_init() 

340 

341 loop_ = self._before_pipeline_start(loop) 

342 try: 

343 self.pipeline.start() 

344 except RuntimeError: 

345 self.disconnect_bus() 

346 loop_.quit() 

347 raise 

348 self.after_pipeline_start() 

349 

350 self.before_loop_join() 

351 try: 

352 loop_.join() 

353 finally: 

354 self.stop() 

355 

356 def stop( 

357 self, 

358 ) -> None: 

359 """Stop application execution. 

360 

361 A loop must be running, and it is stopped as well. 

362 

363 Raises: 

364 RuntimeError: no loop set. 

365 RuntimeError: unabl to stop loop. 

366 

367 """ 

368 

369 self.pipeline.stop() 

370 if self.loop is not None: 

371 try: 

372 self.before_loop_quit() 

373 self.loop.quit() 

374 except Exception as exc: # pylint: disable=W0703 

375 raise RuntimeError( 

376 f"Unable to stop application: ({exc})" 

377 ) from exc 

378 finally: 

379 self.loop = None 

380 

381 def probe( 

382 self, 

383 element_name: str, 

384 pad_direction: PadDirection, 

385 *probe_args, 

386 pad_probe_type: Gst.PadProbeType = Gst.PadProbeType.BUFFER, 

387 backend_uri: Optional[EventStreamUri] = None, 

388 ): 

389 """Register function as a probe callback. 

390 

391 Args: 

392 element_name: Name of the `Gst.Element` to attach the probe 

393 to. 

394 pad_direction: : Direction of the `Gst.Pad` to attach the 

395 probe to. 

396 probe_args: Optional additional args - forwarded to the 

397 decorated callback as varargs. 

398 pad_probe_type: Buffer probe type. 

399 backend_uri: If set, used to send messages from the probe to 

400 this remote backend. Only used (and mandatory in that 

401 case) when he callable to decorate is a generator. 

402 

403 Returns: 

404 decorated callback, registered as an application buffer 

405 probe. 

406 

407 This method has two usages: to decorate proper buffer probes, 

408 where the developer is in charge of handling the data, and to 

409 decorate a generator which yields incoming data. In the latter a 

410 'backend_uri' is required and used to connect and send any and 

411 all generated data into the remote service. 

412 

413 """ 

414 

415 def decorator(user_probe: CB) -> CB: 

416 

417 pythia_probe, backend = _build_probe(user_probe, backend_uri) 

418 

419 element = get_element(self.pipeline.pipeline, element_name) 

420 pad = get_static_pad(element, pad_direction) 

421 pad.add_probe( 

422 pad_probe_type, # type: ignore[arg-type] 

423 pythia_probe, # type: ignore[arg-type] 

424 *probe_args, 

425 ) 

426 self._registered_probes[element_name][pad_direction].append( 

427 {"probe": pythia_probe, "backend": backend} 

428 ) 

429 return pythia_probe 

430 

431 return decorator 

432 

433 def inject_probes(self, extractors: Probes): 

434 """Register several probes. 

435 

436 Args: 

437 extractors: mapping containing a collection of probes 

438 (callbacks) assigned to their respective element's 

439 source (or sink) pad. 

440 

441 """ 

442 for element, padprobes in extractors.items(): 

443 for pad_direction, probes in padprobes.items(): 

444 for probe in probes: 

445 self.probe(element, pad_direction=pad_direction)(probe) 

446 

447 def backend( 

448 self, element_name: str, pad_direction: PadDirection, idx: int = 0 

449 ) -> Backend: 

450 """Retrieve a backend for a registered generator probe. 

451 

452 Args: 

453 element_name: Gstreamer element name the probe is registered 

454 to. 

455 pad_direction: The element's pad direction where the buffer 

456 probe is attached. 

457 idx: position in the registry array for the specified pad. 

458 As the most usual case a pad will have a single buffer 

459 probe, if this value is not set it defaults to `0`, 

460 meaning the signle element in the list is retrieved. 

461 

462 Returns: 

463 The backend associated with the generator probe. The backend 

464 is in charge of posting the messages from the generator 

465 probe to an external service. 

466 

467 """ 

468 return self._registered_probes[element_name][pad_direction][idx][ 

469 "backend" 

470 ] 

471 

472 

473def _get_from_positional_arg_name(signature, name) -> Optional[int]: 

474 try: 

475 return signature.args.index(name) 

476 except ValueError: 

477 return None 

478 

479 

480def _get_from_annotations(signature, name) -> Optional[int]: 

481 try: 

482 return [ 482 ↛ exitline 482 didn't return from function '_get_from_annotations', because the return on line 482 wasn't executed

483 signature.annotations.get(aa, None) for aa in signature.args 

484 ].index(name) 

485 except ValueError: 

486 return None 

487 

488 

489def _get_probe_batch_meta_idx(signature: inspect.FullArgSpec) -> Optional[int]: 

490 batch_meta_strats = [ 

491 (_get_from_positional_arg_name, "batch_meta"), 

492 (_get_from_annotations, "pyds.NvDsBatchMeta"), 

493 ] 

494 batch_meta_idx = None 

495 for strategy, name in batch_meta_strats: 495 ↛ 499line 495 didn't jump to line 499, because the loop on line 495 didn't complete

496 batch_meta_idx = strategy(signature, name) 

497 if batch_meta_idx is not None: 497 ↛ 495line 497 didn't jump to line 495, because the condition on line 497 was never false

498 return batch_meta_idx 

499 return None 

500 

501 

502def _get_probe_pad_idx(signature: inspect.FullArgSpec) -> Optional[int]: 

503 pad_idx_strats = [ 

504 (_get_from_positional_arg_name, "pad"), 

505 (_get_from_positional_arg_name, "gst_pad"), 

506 (_get_from_annotations, "Gst.Pad"), 

507 ] 

508 pad_idx = None 

509 for strategy, name in pad_idx_strats: 

510 pad_idx = strategy(signature, name) 

511 if pad_idx is not None: 

512 return pad_idx 

513 return None 

514 

515 

516def _get_probe_info_idx(signature: inspect.FullArgSpec) -> Optional[int]: 

517 info_idx_strats = [ 

518 (_get_from_positional_arg_name, "info"), 

519 (_get_from_positional_arg_name, "gst_info"), 

520 (_get_from_annotations, "Gst.PadProbeInfo"), 

521 ] 

522 info_idx = None 

523 for strategy, name in info_idx_strats: 

524 info_idx = strategy(signature, name) 

525 if info_idx is not None: 

526 return info_idx 

527 return None 

528 

529 

530def _build_probe(probe, backend_uri: Optional[EventStreamUri] = None): 

531 signature = inspect.getfullargspec(probe) 

532 is_iterator = inspect.isgeneratorfunction(probe) 

533 if backend_uri and not is_iterator: 533 ↛ 534line 533 didn't jump to line 534, because the condition on line 533 was never true

534 raise ValueError("'backend_uri' set. Probe must be a generator.") 

535 if is_iterator and not backend_uri: 535 ↛ 536line 535 didn't jump to line 536, because the condition on line 535 was never true

536 backend_uri = os.getenv("PYTHIA_STREAM_URI", None) 

537 if not backend_uri: 

538 backend_uri = "log://?stream=stdout" 

539 logger.warning( 

540 "'backend_uri' not set." 

541 " It is mandatory with generator probes." 

542 " Defaulting to '%s'", 

543 backend_uri, 

544 ) 

545 

546 is_bound = hasattr(probe, "__self__") 

547 

548 batch_meta_idx = _get_probe_batch_meta_idx(signature) 

549 pad_idx = _get_probe_pad_idx(signature) 

550 info_idx = _get_probe_info_idx(signature) 

551 

552 supported = [ 

553 ["batch_meta"], 

554 ["pad", "info"], 

555 ["pad", "info", "batch_meta"], 

556 ] 

557 err_tmp = ( 

558 f"Unsupported spec {signature.args} for '{probe.__name__}'." 

559 f" Muse be one of `{supported}`" 

560 ) 

561 

562 if is_bound: 

563 pad_idx = ( 

564 pad_idx - 1 if (is_bound and pad_idx is not None) else pad_idx 

565 ) 

566 info_idx = ( 

567 info_idx - 1 if (is_bound and info_idx is not None) else info_idx 

568 ) 

569 batch_meta_idx = ( 

570 batch_meta_idx - 1 

571 if (is_bound and batch_meta_idx is not None) 

572 else batch_meta_idx 

573 ) 

574 

575 if batch_meta_idx == 0 and (pad_idx is None) and (info_idx is None): 

576 if is_iterator and (backend_uri is not None): 

577 backend = Backend.from_uri(backend_uri) 

578 

579 def pythia_iter_probe_batch_meta( 

580 _: Gst.Pad, info: Gst.PadProbeInfo 

581 ) -> Gst.PadProbeReturn: 

582 batch_meta = info2batchmeta(info) 

583 if not batch_meta: 

584 return Gst.PadProbeReturn.OK 

585 for data in probe(batch_meta): 

586 backend.post(data) 

587 return Gst.PadProbeReturn.OK 

588 

589 return pythia_iter_probe_batch_meta, backend 

590 

591 def pythia_probe_batch_meta( 

592 _: Gst.Pad, info: Gst.PadProbeInfo 

593 ) -> Gst.PadProbeReturn: 

594 batch_meta = info2batchmeta(info) 

595 if not batch_meta: 

596 return Gst.PadProbeReturn.OK 

597 return probe(batch_meta) 

598 

599 return pythia_probe_batch_meta, None 

600 

601 if pad_idx == 0 and info_idx == 1 and batch_meta_idx == 2: 601 ↛ 615line 601 didn't jump to line 615, because the condition on line 601 was never false

602 if is_iterator: 602 ↛ 603line 602 didn't jump to line 603, because the condition on line 602 was never true

603 raise NotImplementedError 

604 

605 def pythia_probe_full( 

606 pad: Gst.Pad, info: Gst.PadProbeInfo 

607 ) -> Gst.PadProbeReturn: 

608 batch_meta = info2batchmeta(info) 

609 if not batch_meta: 

610 return Gst.PadProbeReturn.OK 

611 return probe(pad, info, batch_meta) 

612 

613 return pythia_probe_full, None 

614 

615 if pad_idx == 0 and info_idx == 1 and batch_meta_idx is None: 

616 if is_iterator: 

617 raise NotImplementedError 

618 

619 return probe, None 

620 

621 raise ValueError(err_tmp) 

622 

623 

624class Application(BaseApplication): 

625 """Typical pythia application.""" 

626 

627 on_message_eos = on_message_eos 

628 on_message_error = on_message_error