Source code for src.data_acquisition.event_manager.fixed_timeout_event_manager

from logging import Logger
from typing import Optional

from ..events import TimeoutEvent
from ..gui import Gui
from .simple_event_manager import SimpleEventManager


[docs] class FixedTimeoutEventManager(SimpleEventManager[None]): def __init__( self, *, gui: Gui, timeout_millis: int, logger: Optional[Logger] = None, ) -> None: self._timeout_millis = timeout_millis self._timeout_event = TimeoutEvent( gui=gui, timeout_millis=timeout_millis, logger=logger ) super().__init__(events=[self._timeout_event], logger=logger) self._gui = gui def _setup(self) -> None: self._timeout_event.subscribe(lambda: self._trigger_callbacks(None)) def _clone(self) -> "FixedTimeoutEventManager": return FixedTimeoutEventManager( gui=self._gui, timeout_millis=self._timeout_millis, )