175 lines
5.1 KiB
Python
175 lines
5.1 KiB
Python
from __future__ import annotations
|
|
|
|
import enum
|
|
import threading
|
|
from typing import Callable
|
|
|
|
from patlite_mqtt.device import PatliteDevice
|
|
|
|
|
|
class Mode(enum.Enum):
|
|
OFF = "OFF"
|
|
ON = "ON"
|
|
FAST_FLASH = "FAST_FLASH"
|
|
SLOW_FLASH = "SLOW_FLASH"
|
|
|
|
@classmethod
|
|
def parse(cls, value: str) -> "Mode":
|
|
try:
|
|
return cls(value.strip().upper())
|
|
except ValueError as exc:
|
|
raise ValueError(
|
|
f"unknown mode {value!r}; expected OFF, ON, FAST_FLASH, or SLOW_FLASH"
|
|
) from exc
|
|
|
|
|
|
class ColorChannel:
|
|
"""One MQTT-controlled color; flash modes share a global on/off gate."""
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
apply_lit: Callable[[bool, int, int, int], tuple[int, int, int]],
|
|
controller: PatliteController,
|
|
):
|
|
self.name = name
|
|
self._apply_lit = apply_lit
|
|
self._controller = controller
|
|
self._lock = threading.Lock()
|
|
self._mode = Mode.OFF
|
|
|
|
@property
|
|
def mode(self) -> Mode:
|
|
with self._lock:
|
|
return self._mode
|
|
|
|
def set_mode(self, mode: Mode):
|
|
with self._lock:
|
|
if mode == self._mode:
|
|
return
|
|
self._mode = mode
|
|
self._controller.sync_flash()
|
|
self._controller.refresh()
|
|
|
|
def contribute(self, ry: int, gb: int, w: int) -> tuple[int, int, int]:
|
|
with self._lock:
|
|
if self._mode == Mode.OFF:
|
|
return ry, gb, w
|
|
if self._mode == Mode.ON:
|
|
lit = True
|
|
else:
|
|
lit = self._controller.flash_lit
|
|
return self._apply_lit(lit, ry, gb, w)
|
|
|
|
def shutdown(self):
|
|
with self._lock:
|
|
self._mode = Mode.OFF
|
|
|
|
|
|
class PatliteController:
|
|
def __init__(
|
|
self,
|
|
device: PatliteDevice,
|
|
*,
|
|
fast_delay: float = 0.25,
|
|
slow_delay: float = 0.5,
|
|
):
|
|
self._device = device
|
|
self._fast_delay = fast_delay
|
|
self._slow_delay = slow_delay
|
|
self._lock = threading.Lock()
|
|
self._flash_lock = threading.Lock()
|
|
self._flash_lit = False
|
|
self._flash_stop = threading.Event()
|
|
self._flash_thread: threading.Thread | None = None
|
|
self.channels = {
|
|
"red": ColorChannel(
|
|
"red",
|
|
lambda lit, ry, gb, w: (ry | 0x10, gb, w) if lit else (ry, gb, w),
|
|
self,
|
|
),
|
|
"yellow": ColorChannel(
|
|
"yellow",
|
|
lambda lit, ry, gb, w: (ry | 0x01, gb, w) if lit else (ry, gb, w),
|
|
self,
|
|
),
|
|
"green": ColorChannel(
|
|
"green",
|
|
lambda lit, ry, gb, w: (ry, gb | 0x10, w) if lit else (ry, gb, w),
|
|
self,
|
|
),
|
|
"blue": ColorChannel(
|
|
"blue",
|
|
lambda lit, ry, gb, w: (ry, gb | 0x01, w) if lit else (ry, gb, w),
|
|
self,
|
|
),
|
|
"white": ColorChannel(
|
|
"white",
|
|
lambda lit, ry, gb, w: (ry, gb, 0x01) if lit else (ry, gb, w),
|
|
self,
|
|
),
|
|
}
|
|
|
|
@property
|
|
def flash_lit(self) -> bool:
|
|
with self._flash_lock:
|
|
return self._flash_lit
|
|
|
|
def _any_flashers(self) -> bool:
|
|
return any(
|
|
ch.mode in (Mode.FAST_FLASH, Mode.SLOW_FLASH)
|
|
for ch in self.channels.values()
|
|
)
|
|
|
|
def _flash_delay(self) -> float:
|
|
if any(ch.mode == Mode.FAST_FLASH for ch in self.channels.values()):
|
|
return self._fast_delay
|
|
return self._slow_delay
|
|
|
|
def sync_flash(self):
|
|
with self._flash_lock:
|
|
if not self._any_flashers():
|
|
self._stop_flash_locked()
|
|
self._flash_lit = False
|
|
return
|
|
if self._flash_thread is None:
|
|
self._flash_lit = False
|
|
self._flash_stop.clear()
|
|
self._flash_thread = threading.Thread(
|
|
target=self._flash_loop,
|
|
name="patlite-flash",
|
|
daemon=True,
|
|
)
|
|
self._flash_thread.start()
|
|
|
|
def _flash_loop(self):
|
|
while not self._flash_stop.wait(self._flash_delay()):
|
|
with self._flash_lock:
|
|
if not self._any_flashers():
|
|
return
|
|
self._flash_lit = not self._flash_lit
|
|
self.refresh()
|
|
|
|
def _stop_flash_locked(self):
|
|
self._flash_stop.set()
|
|
thread = self._flash_thread
|
|
self._flash_thread = None
|
|
if thread is not None and thread is not threading.current_thread():
|
|
thread.join(timeout=2.0)
|
|
self._flash_stop.clear()
|
|
|
|
def refresh(self):
|
|
with self._lock:
|
|
ry, gb, w = 0, 0, 0
|
|
for channel in self.channels.values():
|
|
ry, gb, w = channel.contribute(ry, gb, w)
|
|
self._device.send(ry, gb, w)
|
|
|
|
def shutdown(self):
|
|
for channel in self.channels.values():
|
|
channel.shutdown()
|
|
with self._flash_lock:
|
|
self._stop_flash_locked()
|
|
self._flash_lit = False
|
|
self.refresh()
|