Source code for src.data_acquisition.events.event

from abc import ABC, abstractmethod
from logging import Logger
from typing import Optional
from unittest.mock import MagicMock

from ..types import EventCallback
from .errors import IncorrectMethodCallOrderError


[docs] class Event(ABC): def __init__(self, *, logger: Optional[Logger] = None) -> None: self._is_listening = False self._subscribers: list[EventCallback] = [] self._logger = logger if logger is not None else MagicMock()
[docs] def start_listening(self) -> None: self._check_is_not_listening() self._is_listening = True self._log("Starting to listen for events") self._start_listening()
def _log(self, message: str) -> None: self._logger.debug(f"{self.__class__.__name__}: {message}") def _check_is_not_listening(self) -> None: if self._is_listening: raise IncorrectMethodCallOrderError("Event is already listening.") @abstractmethod def _start_listening(self) -> None: pass
[docs] def stop_listening(self) -> None: self._check_is_listening() self._log("Stopping listening for events") self._stop_listening() self._is_listening = False
def _check_is_listening(self) -> None: if not self._is_listening: raise IncorrectMethodCallOrderError("Event has not started listening yet.") @abstractmethod def _stop_listening(self) -> None: pass
[docs] def subscribe(self, callback: EventCallback) -> None: self._log(f"Subscribing callback: {callback}") self._subscribers.append(callback)
[docs] def unsubscribe(self, callback: EventCallback) -> None: self._log(f"Unsubscribing callback: {callback}") self._subscribers.remove(callback)
def _trigger_callbacks(self) -> None: self._log(f"Triggering callbacks") for subscriber in self._subscribers: subscriber()
[docs] @abstractmethod def clone(self) -> "Event": """ Method used to create a clone of the event, so that it can be used as a prototype and reused by event managers. :return: A new instance of the same type as this event, with the same configuration. """ pass