from __future__ import annotations import enum import threading from typing import Callable from patlite_mqtt.device import PatliteDevice class Mode(enum.Enum): OFF = "OFF" ON = "ON" FAST_FLASH = "FAST_FLASH" SLOW_FLASH = "SLOW_FLASH" @classmethod def parse(cls, value: str) -> "Mode": try: return cls(value.strip().upper()) except ValueError as exc: raise ValueError( f"unknown mode {value!r}; expected OFF, ON, FAST_FLASH, or SLOW_FLASH" ) from exc class ColorChannel: """One MQTT-controlled color; flash modes share a global on/off gate.""" def __init__( self, name: str, apply_lit: Callable[[bool, int, int, int], tuple[int, int, int]], controller: PatliteController, ): self.name = name self._apply_lit = apply_lit self._controller = controller self._lock = threading.Lock() self._mode = Mode.OFF @property def mode(self) -> Mode: with self._lock: return self._mode def set_mode(self, mode: Mode): with self._lock: if mode == self._mode: return self._mode = mode self._controller.sync_flash() self._controller.refresh() def contribute(self, ry: int, gb: int, w: int) -> tuple[int, int, int]: with self._lock: if self._mode == Mode.OFF: return ry, gb, w if self._mode == Mode.ON: lit = True else: lit = self._controller.flash_lit return self._apply_lit(lit, ry, gb, w) def shutdown(self): with self._lock: self._mode = Mode.OFF class PatliteController: def __init__( self, device: PatliteDevice, *, fast_delay: float = 0.25, slow_delay: float = 0.5, ): self._device = device self._fast_delay = fast_delay self._slow_delay = slow_delay self._lock = threading.Lock() self._flash_lock = threading.Lock() self._flash_lit = False self._flash_stop = threading.Event() self._flash_thread: threading.Thread | None = None self.channels = { "red": ColorChannel( "red", lambda lit, ry, gb, w: (ry | 0x10, gb, w) if lit else (ry, gb, w), self, ), "yellow": ColorChannel( "yellow", lambda lit, ry, gb, w: (ry | 0x01, gb, w) if lit else (ry, gb, w), self, ), "green": ColorChannel( "green", lambda lit, ry, gb, w: (ry, gb | 0x10, w) if lit else (ry, gb, w), self, ), "blue": ColorChannel( "blue", lambda lit, ry, gb, w: (ry, gb | 0x01, w) if lit else (ry, gb, w), self, ), "white": ColorChannel( "white", lambda lit, ry, gb, w: (ry, gb, 0x01) if lit else (ry, gb, w), self, ), } @property def flash_lit(self) -> bool: with self._flash_lock: return self._flash_lit def _any_flashers(self) -> bool: return any( ch.mode in (Mode.FAST_FLASH, Mode.SLOW_FLASH) for ch in self.channels.values() ) def _flash_delay(self) -> float: if any(ch.mode == Mode.FAST_FLASH for ch in self.channels.values()): return self._fast_delay return self._slow_delay def sync_flash(self): with self._flash_lock: if not self._any_flashers(): self._stop_flash_locked() self._flash_lit = False return if self._flash_thread is None: self._flash_lit = False self._flash_stop.clear() self._flash_thread = threading.Thread( target=self._flash_loop, name="patlite-flash", daemon=True, ) self._flash_thread.start() def _flash_loop(self): while not self._flash_stop.wait(self._flash_delay()): with self._flash_lock: if not self._any_flashers(): return self._flash_lit = not self._flash_lit self.refresh() def _stop_flash_locked(self): self._flash_stop.set() thread = self._flash_thread self._flash_thread = None if thread is not None and thread is not threading.current_thread(): thread.join(timeout=2.0) self._flash_stop.clear() def refresh(self): with self._lock: ry, gb, w = 0, 0, 0 for channel in self.channels.values(): ry, gb, w = channel.contribute(ry, gb, w) self._device.send(ry, gb, w) def shutdown(self): for channel in self.channels.values(): channel.shutdown() with self._flash_lock: self._stop_flash_locked() self._flash_lit = False self.refresh()