Source code for src.data_acquisition.event_manager.simple_event_manager

from abc import ABC, abstractmethod
from logging import Logger
from typing import Generic, Optional, Sequence, TypeVar

from ..events import Event
from .event_manager import EventManager

T = TypeVar("T")


[docs] class SimpleEventManager(EventManager[T], Generic[T], ABC): def __init__( self, *, events: Sequence[Event], logger: Optional[Logger] = None ) -> None: super().__init__(logger=logger) self._events = events def _start(self) -> None: self._setup() for event in self._events: event.start_listening() @abstractmethod def _setup(self) -> None: pass def _stop(self) -> None: for event in self._events: event.stop_listening() def clone(self) -> "SimpleEventManager[T]": cloned_manager = self._clone() cloned_manager._clone_registered_callbacks_from(self) return cloned_manager @abstractmethod def _clone(self) -> "SimpleEventManager[T]": pass