Source code for src.data_acquisition.event_manager.composite_event_manager
from logging import Logger
from typing import Generic, Optional, Sequence, TypeVar
from .event_manager import EventManager
T = TypeVar("T")
[docs]
class CompositeEventManager(EventManager[T], Generic[T]):
def __init__(
self,
*,
event_managers: Sequence[EventManager[T]],
logger: Optional[Logger] = None,
) -> None:
super().__init__(logger=logger)
self._event_managers = event_managers
def _start(self) -> None:
for event_manager in self._event_managers:
event_manager.register_callback(self._trigger_callbacks)
event_manager.start()
def _stop(self) -> None:
for event_manager in self._event_managers:
event_manager.stop()
def clone(self) -> "CompositeEventManager[T]":
submanager_clones = [
event_manager.clone() for event_manager in self._event_managers
]
for event_manager in submanager_clones:
event_manager._clone_registered_callbacks_from(self)
return CompositeEventManager(event_managers=submanager_clones)