Files
patlite-mqtt/patlite_mqtt/channels.py
2026-05-18 16:05:11 -04:00

175 lines
5.1 KiB
Python

from __future__ import annotations
import enum
import threading
from typing import Callable
from patlite_mqtt.device import PatliteDevice
class Mode(enum.Enum):
OFF = "OFF"
ON = "ON"
FAST_FLASH = "FAST_FLASH"
SLOW_FLASH = "SLOW_FLASH"
@classmethod
def parse(cls, value: str) -> "Mode":
try:
return cls(value.strip().upper())
except ValueError as exc:
raise ValueError(
f"unknown mode {value!r}; expected OFF, ON, FAST_FLASH, or SLOW_FLASH"
) from exc
class ColorChannel:
"""One MQTT-controlled color; flash modes share a global on/off gate."""
def __init__(
self,
name: str,
apply_lit: Callable[[bool, int, int, int], tuple[int, int, int]],
controller: PatliteController,
):
self.name = name
self._apply_lit = apply_lit
self._controller = controller
self._lock = threading.Lock()
self._mode = Mode.OFF
@property
def mode(self) -> Mode:
with self._lock:
return self._mode
def set_mode(self, mode: Mode):
with self._lock:
if mode == self._mode:
return
self._mode = mode
self._controller.sync_flash()
self._controller.refresh()
def contribute(self, ry: int, gb: int, w: int) -> tuple[int, int, int]:
with self._lock:
if self._mode == Mode.OFF:
return ry, gb, w
if self._mode == Mode.ON:
lit = True
else:
lit = self._controller.flash_lit
return self._apply_lit(lit, ry, gb, w)
def shutdown(self):
with self._lock:
self._mode = Mode.OFF
class PatliteController:
def __init__(
self,
device: PatliteDevice,
*,
fast_delay: float = 0.25,
slow_delay: float = 0.5,
):
self._device = device
self._fast_delay = fast_delay
self._slow_delay = slow_delay
self._lock = threading.Lock()
self._flash_lock = threading.Lock()
self._flash_lit = False
self._flash_stop = threading.Event()
self._flash_thread: threading.Thread | None = None
self.channels = {
"red": ColorChannel(
"red",
lambda lit, ry, gb, w: (ry | 0x10, gb, w) if lit else (ry, gb, w),
self,
),
"yellow": ColorChannel(
"yellow",
lambda lit, ry, gb, w: (ry | 0x01, gb, w) if lit else (ry, gb, w),
self,
),
"green": ColorChannel(
"green",
lambda lit, ry, gb, w: (ry, gb | 0x10, w) if lit else (ry, gb, w),
self,
),
"blue": ColorChannel(
"blue",
lambda lit, ry, gb, w: (ry, gb | 0x01, w) if lit else (ry, gb, w),
self,
),
"white": ColorChannel(
"white",
lambda lit, ry, gb, w: (ry, gb, 0x01) if lit else (ry, gb, w),
self,
),
}
@property
def flash_lit(self) -> bool:
with self._flash_lock:
return self._flash_lit
def _any_flashers(self) -> bool:
return any(
ch.mode in (Mode.FAST_FLASH, Mode.SLOW_FLASH)
for ch in self.channels.values()
)
def _flash_delay(self) -> float:
if any(ch.mode == Mode.FAST_FLASH for ch in self.channels.values()):
return self._fast_delay
return self._slow_delay
def sync_flash(self):
with self._flash_lock:
if not self._any_flashers():
self._stop_flash_locked()
self._flash_lit = False
return
if self._flash_thread is None:
self._flash_lit = False
self._flash_stop.clear()
self._flash_thread = threading.Thread(
target=self._flash_loop,
name="patlite-flash",
daemon=True,
)
self._flash_thread.start()
def _flash_loop(self):
while not self._flash_stop.wait(self._flash_delay()):
with self._flash_lock:
if not self._any_flashers():
return
self._flash_lit = not self._flash_lit
self.refresh()
def _stop_flash_locked(self):
self._flash_stop.set()
thread = self._flash_thread
self._flash_thread = None
if thread is not None and thread is not threading.current_thread():
thread.join(timeout=2.0)
self._flash_stop.clear()
def refresh(self):
with self._lock:
ry, gb, w = 0, 0, 0
for channel in self.channels.values():
ry, gb, w = channel.contribute(ry, gb, w)
self._device.send(ry, gb, w)
def shutdown(self):
for channel in self.channels.values():
channel.shutdown()
with self._flash_lock:
self._stop_flash_locked()
self._flash_lit = False
self.refresh()