Source code for src.data_acquisition.gui.pygame_gui

from collections import deque
from functools import partial, wraps
from logging import Logger
from pathlib import Path
from typing import Any, Callable, Optional, TypeVar, cast
from unittest.mock import MagicMock

import pygame
from colour import Color

from ..types import ElementSize, EventCallback, Point
from .display_mode import DisplayMode, FullscreenDisplayMode, WindowedDisplayMode
from .errors import DrawError, GuiError
from .event_types import EventType
from .event_types.key_press_event_type import KEY_MAPPING, KeyPressEventType
from .event_types.timeout_event_type import TimeoutEventType
from .gui import Gui

T = TypeVar("T", bound=Callable[..., None])


[docs] class PygameGui(Gui): def __init__( self, display_mode: DisplayMode, window_title: str, logger: Optional[Logger] = None, ): super().__init__() self._display_mode = display_mode self._window_title = window_title self._task_queue: deque[Callable[..., None]] = deque() self._subscribed_events: dict[int, tuple[EventType, Callable[..., None]]] = {} self._is_running = False self._logger = logger if logger is not None else MagicMock() self._clock = pygame.time.Clock() def get_window_size(self) -> ElementSize: return ElementSize(self._width, self._height) def _add_task_to_queue(self, task: Callable[..., None]) -> None: self._task_queue.append(task) def on_init(self, callback: EventCallback) -> None: self._task_queue.append(callback) def _handle_events(self, event: pygame.event.Event) -> None: for event_id, (subscribed_event, callback) in self._subscribed_events.items(): if ( isinstance(subscribed_event, KeyPressEventType) and event.type == pygame.KEYDOWN ): if event.key == KEY_MAPPING[subscribed_event.key]: callback() elif event.type == event_id: callback() def start(self) -> None: self._log("Starting GUI") pygame.init() pygame.font.init() self._init_window() pygame.display.set_caption(self._window_title) self._is_running = True while self._is_running: self._run_tasks_from_queue() for event in pygame.event.get(): self._run_tasks_from_queue() if event.type == pygame.QUIT: self.stop() self._handle_events(event) self._clock.tick() self._log("Stopping GUI") pygame.quit() def _log(self, message: str) -> None: self._logger.debug(f"{self.__class__.__name__}: {message}") def _init_window(self) -> None: if isinstance(self._display_mode, FullscreenDisplayMode): self._window = pygame.display.set_mode( (0, 0), pygame.FULLSCREEN | pygame.NOFRAME ) elif isinstance(self._display_mode, WindowedDisplayMode): self._width, self._height = ( self._display_mode.width, self._display_mode.height, ) self._window = pygame.display.set_mode((self._width, self._height)) else: raise GuiError(f"Unknown display mode: {self._display_mode}") self._width, self._height = self._window.get_size() def _run_tasks_from_queue(self) -> None: while self._task_queue: self._task_queue.popleft()() pygame.display.update() def stop(self) -> None: self._is_running = False @staticmethod def submit_execution(func: T) -> T: @wraps(func) def wrapper(self: "PygameGui", *args: Any, **kwargs: Any) -> None: self._add_task_to_queue(partial(func, self, *args, **kwargs)) return cast(T, wrapper) @submit_execution def draw_uniform_background(self, color: Color) -> None: self._window.fill(color.get_hex_l()) @submit_execution def draw_rectangle( self, *, color: Color, top_left_point: Point, width: int, height: int ) -> None: rect = pygame.Rect(*top_left_point, width, height) if not rect.colliderect(pygame.Rect((0, 0, self._width, self._height))): raise DrawError("Rectangle must be inside the view!") pygame.draw.rect(self._window, color.get_hex_l(), rect) @submit_execution def draw_text(self, *, font_size: int, text: str, color: Color) -> None: font = pygame.font.SysFont(None, font_size) lines: list[str] = [] current_line = [] for word in text.split(" "): test_line = " ".join(current_line + [word]) test_width = font.size(test_line)[0] if test_width <= 0.9 * self._width: current_line.append(word) else: lines.append(" ".join(current_line)) current_line = [word] if current_line: lines.append(" ".join(current_line)) line_height = font.get_linesize() total_height = len(lines) * line_height y_pos = (self._height - total_height) // 2 for line in lines: text_surface = font.render(line, True, color.get_hex_l()) text_rect = text_surface.get_rect( center=(self._width // 2, y_pos + line_height // 2) ) self._window.blit(text_surface, text_rect) y_pos += line_height @submit_execution def draw_image(self, image_path: Path) -> None: raise NotImplementedError @submit_execution def play_sound(self, sound_path: Path) -> None: raise NotImplementedError def _subscribe_to_event( self, *, event_id: int, event: EventType, callback: EventCallback ) -> None: self._log(f"Subscribing to event {event_id} with callback {callback}") self._subscribed_events[event_id] = (event, callback) if isinstance(event, TimeoutEventType): self._add_task_to_queue( lambda: pygame.time.set_timer(event_id, event.timeout_millis) ) def subscribe_to_event_and_get_id( self, *, event: EventType, callback: EventCallback ) -> int: event_id = pygame.event.custom_type() self._add_task_to_queue( lambda: self._subscribe_to_event( event_id=event_id, event=event, callback=callback ) ) return event_id def _unsubscribe_from_event_by_id(self, event_id: int) -> None: self._log(f"Unsubscribing from event {event_id}") if event_id not in self._subscribed_events: raise GuiError(f"Event ID {event_id} is not subscribed.") del self._subscribed_events[event_id] def unsubscribe_from_event_by_id(self, event_id: int) -> None: self._add_task_to_queue(lambda: self._unsubscribe_from_event_by_id(event_id))