diff --git a/Cargo.toml b/Cargo.toml index 4c135fa..609fb40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,9 @@ description = "Tower daemon for KW1FOX-1" [dependencies] anyhow = "1" clap = { version = "4", features = ["derive"] } -ctrlc = "3" +futures = "0.3" +influxdb2 = { version = "0.5", default-features = false, features = ["rustls"] } rppal = "0.22" +tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/src/alarm.rs b/src/alarm.rs new file mode 100644 index 0000000..d85bd30 --- /dev/null +++ b/src/alarm.rs @@ -0,0 +1,59 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use tokio::sync::{watch, Mutex}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Status { + Ok, + Alarm, +} + +// Shared reference to alarm board +pub type SharedAlarms = Arc; + +pub struct AlarmBoard { + faults: Mutex>, + status_tx: watch::Sender, +} + +impl AlarmBoard { + // Create a new alarm board + pub fn new() -> (SharedAlarms, watch::Receiver) { + let (status_tx, status_rx) = watch::channel(Status::Ok); + let board = Arc::new(Self { + faults: Mutex::new(HashSet::new()), + status_tx, + }); + (board, status_rx) + } + + // Set a fault (source, string, active) + pub async fn set_fault(&self, source: &'static str, active: bool) { + let mut faults = self.faults.lock().await; + if active { + faults.insert(source); + } else { + faults.remove(source); + } + + let status = if faults.is_empty() { + Status::Ok + } else { + Status::Alarm + }; + + let _ = self.status_tx.send_if_modified(|current| { + if *current != status { + *current = status; + true + } else { + false + } + }); + } + + pub fn status(&self) -> Status { + *self.status_tx.borrow() + } +} diff --git a/src/config.rs b/src/config.rs index 7869b53..5a8e865 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,7 +5,18 @@ pub struct Config { pub fan_on_temp_c: f64, pub fan_off_temp_c: f64, pub poll_interval_s: f64, - pub error_temp_c: f64, + pub thermal_alarm_temp_c: f64, + pub influx: Option, +} + +#[derive(Debug, Clone)] +pub struct InfluxConfig { + pub url: String, + pub org: String, + pub bucket: String, + pub token: String, + pub host_tag: String, + pub metrics_interval_s: f64, } impl Default for Config { @@ -16,7 +27,30 @@ impl Default for Config { fan_on_temp_c: 40.0, fan_off_temp_c: 35.0, poll_interval_s: 2.0, - error_temp_c: 80.0, + thermal_alarm_temp_c: 80.0, + influx: InfluxConfig::from_env(), } } } + +impl InfluxConfig { + pub fn from_env() -> Option { + let token = std::env::var("TOWERD_INFLUX_TOKEN").ok()?; + let org = std::env::var("TOWERD_INFLUX_ORG").ok()?; + let bucket = std::env::var("TOWERD_INFLUX_BUCKET").ok()?; + + Some(Self { + url: std::env::var("TOWERD_INFLUX_URL") + .unwrap_or_else(|_| "http://influx.kitsunehosting.net:8086".into()), + org, + bucket, + token, + host_tag: std::env::var("TOWERD_INFLUX_HOST") + .unwrap_or_else(|_| "kw1fox-1".into()), + metrics_interval_s: std::env::var("TOWERD_INFLUX_INTERVAL_S") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(30.0), + }) + } +} diff --git a/src/daemon.rs b/src/daemon.rs index 09dca37..3de7740 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1,99 +1,47 @@ -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::thread; -use std::time::Duration; -use tracing::{debug, info}; +use tracing::info; +use crate::alarm::AlarmBoard; use crate::config::Config; -use crate::gpio::{Status, TowerGpio}; -use crate::thermal; +use crate::gpio::Fan; +use crate::tasks; -pub struct TowerDaemon { - config: Config, - gpio: TowerGpio, - running: Arc, -} +pub async fn run(config: Config) -> anyhow::Result<()> { + let (alarms, status_rx) = AlarmBoard::new(); + let fan = Arc::new(tokio::sync::Mutex::new(Fan::new(config.fan_pin)?)); -impl TowerDaemon { - /// Creates a new TowerDaemon instance - // fn new is like the constructor and builds this object - pub fn new(config: Config) -> anyhow::Result { - let gpio = TowerGpio::new(config.status_pin, config.fan_pin)?; - let running = Arc::new(AtomicBool::new(true)); + info!( + status_pin = config.status_pin, + fan_pin = config.fan_pin, + "Tower daemon started" + ); - Ok(Self { - config, - gpio, - running, - }) + if config.influx.is_none() { + info!("InfluxDB metrics disabled (set TOWERD_INFLUX_TOKEN, TOWERD_INFLUX_ORG, and TOWERD_INFLUX_BUCKET to enable)"); } - // Runs the daemon - pub fn run(mut self) -> anyhow::Result<()> { - let running = Arc::clone(&self.running); - ctrlc::set_handler(move || { + tokio::select! { + _ = tokio::signal::ctrl_c() => { info!("Received shutdown signal"); - running.store(false, Ordering::Relaxed); - })?; - - info!( - status_pin = self.config.status_pin, - fan_pin = self.config.fan_pin, - "Tower daemon started" - ); - - // Do tick (main loop) - while self.running.load(Ordering::Relaxed) { - self.tick()?; - thread::sleep(Duration::from_secs_f64(self.config.poll_interval_s)); } - - self.gpio.stop(); - info!("Tower daemon stopped"); - Ok(()) - } - - // Single daemon tick - fn tick(&mut self) -> anyhow::Result<()> { - let temp_c = thermal::read_cpu_temp_c()?; - self.update_fan(temp_c); - self.update_status(temp_c); - - debug!( - temp_c, - fan = if self.gpio.fan_on() { "on" } else { "off" }, - status = ?self.gpio.status(), - ); - - Ok(()) - } - - // Update the fan based on temperature - fn update_fan(&mut self, temp_c: f64) { - if !self.gpio.fan_on() && temp_c >= self.config.fan_on_temp_c { - self.gpio.set_fan(true); - info!( - temp_c, - threshold = self.config.fan_on_temp_c, - "Fan on" - ); - } else if self.gpio.fan_on() && temp_c <= self.config.fan_off_temp_c { - self.gpio.set_fan(false); - info!( - temp_c, - threshold = self.config.fan_off_temp_c, - "Fan off" - ); + result = tasks::status::run(config.status_pin, status_rx) => { + result?; + } + result = tasks::thermal::run(config.clone(), fan.clone(), alarms.clone()) => { + result?; + } + result = async { + match config.influx { + Some(influx) => tasks::metrics::run(influx, fan.clone(), alarms.clone()).await, + None => std::future::pending().await, + } + } => { + result?; } } - // Update the status - fn update_status(&self, temp_c: f64) { - if temp_c >= self.config.error_temp_c { - self.gpio.set_status(Status::Error); - } else { - self.gpio.set_status(Status::Ok); - } - } + fan.lock().await.stop(); + info!("Tower daemon stopped"); + Ok(()) } diff --git a/src/gpio.rs b/src/gpio.rs index f46c320..b1f79d8 100644 --- a/src/gpio.rs +++ b/src/gpio.rs @@ -1,144 +1,42 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread::{self, JoinHandle}; -use std::time::Duration; +use std::sync::Arc; use anyhow::Context; use rppal::gpio::{Gpio, OutputPin}; +use tokio::sync::Mutex; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum Status { - Ok, - Error, -} +pub type SharedFan = Arc>; -const SLOW_ON: Duration = Duration::from_secs(1); -const SLOW_OFF: Duration = Duration::from_secs(1); -const FAST_ON: Duration = Duration::from_millis(150); -const FAST_OFF: Duration = Duration::from_millis(150); - -pub struct TowerGpio { +pub struct Fan { _gpio: Gpio, - fan: OutputPin, - status: Arc>, - blink_shutdown: Arc, - blink_handle: Option>, + pin: OutputPin, } -impl TowerGpio { - /// Sets up the GPIO pins and spawns the blink thread - pub fn new(status_pin: u8, fan_pin: u8) -> anyhow::Result { +impl Fan { + pub fn new(fan_pin: u8) -> anyhow::Result { let gpio = Gpio::new().context("failed to initialize GPIO")?; - // Setup the fan - let mut fan = gpio + let mut pin = gpio .get(fan_pin) .context("failed to open fan pin")? .into_output(); - fan.set_low(); + pin.set_low(); - // Setup the status LED - let status_led = gpio - .get(status_pin) - .context("failed to open status pin")? - .into_output(); - - let status = Arc::new(Mutex::new(Status::Ok)); - let blink_shutdown = Arc::new(AtomicBool::new(false)); // A signal to use to stop the blink thread - - let blink_status = Arc::clone(&status); // Shared reference for the current status - let blink_stop = Arc::clone(&blink_shutdown); // Shared reference to the shutdown signal - let blink_handle = thread::spawn(move || { - blink_loop(status_led, blink_status, blink_stop); // Spawn the blink thread - }); - - Ok(Self { // Return the struct with the GPIO pins and the blink thread handle - _gpio: gpio, - fan, - status, - blink_shutdown, - blink_handle: Some(blink_handle), - }) + Ok(Self { _gpio: gpio, pin }) } - // Get the current status - pub fn status(&self) -> Status { - *self.status.lock().expect("status lock poisoned") + pub fn on(&self) -> bool { + self.pin.is_set_high() } - // Check if the fan is on - pub fn fan_on(&self) -> bool { - self.fan.is_set_high() - } - - // Set the current status - pub fn set_status(&self, status: Status) { - let mut current = self.status.lock().expect("status lock poisoned"); - if *current == status { - return; - } - *current = status; - } - - // Set the fan on or off - pub fn set_fan(&mut self, on: bool) { + pub fn set_on(&mut self, on: bool) { if on { - self.fan.set_high(); + self.pin.set_high(); } else { - self.fan.set_low(); + self.pin.set_low(); } } - // Stop the GPIO pins and join the blink thread pub fn stop(&mut self) { - self.blink_shutdown.store(true, Ordering::Relaxed); - if let Some(handle) = self.blink_handle.take() { - let _ = handle.join(); - } - self.fan.set_low(); + self.pin.set_low(); } } - -// For when the tower GPIO is dropped -impl Drop for TowerGpio { - fn drop(&mut self) { - self.stop(); - } -} - -// The blink loop for the status LED -fn blink_loop(mut led: OutputPin, status: Arc>, shutdown: Arc) { - while !shutdown.load(Ordering::Relaxed) { - let current = *status.lock().expect("status lock poisoned"); - let (on_time, off_time) = match current { - Status::Ok => (SLOW_ON, SLOW_OFF), - Status::Error => (FAST_ON, FAST_OFF), - }; - - led.set_high(); - if sleep_or_shutdown(on_time, &shutdown) { - break; - } - - led.set_low(); - if sleep_or_shutdown(off_time, &shutdown) { - break; - } - } - - led.set_low(); -} - -fn sleep_or_shutdown(duration: Duration, shutdown: &AtomicBool) -> bool { - let step = Duration::from_millis(50); - let mut remaining = duration; - while remaining > Duration::ZERO { - if shutdown.load(Ordering::Relaxed) { - return true; - } - let slice = remaining.min(step); - thread::sleep(slice); - remaining = remaining.saturating_sub(slice); - } - shutdown.load(Ordering::Relaxed) -} diff --git a/src/main.rs b/src/main.rs index 03fcc40..eabbc8b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,8 @@ +mod alarm; mod config; mod daemon; mod gpio; +mod tasks; mod thermal; use clap::Parser; @@ -8,7 +10,6 @@ use tracing::Level; use tracing_subscriber::EnvFilter; use crate::config::Config; -use crate::daemon::TowerDaemon; #[derive(Parser)] #[command(name = "towerd", about = "Tower Daemon")] @@ -17,7 +18,8 @@ struct Args { verbose: bool, } -fn main() -> anyhow::Result<()> { +#[tokio::main] +async fn main() -> anyhow::Result<()> { let args = Args::parse(); let default_level = if args.verbose { @@ -34,5 +36,5 @@ fn main() -> anyhow::Result<()> { .with_target(false) .init(); - TowerDaemon::new(Config::default())?.run() + daemon::run(Config::default()).await } diff --git a/src/tasks/metrics.rs b/src/tasks/metrics.rs new file mode 100644 index 0000000..7e3751b --- /dev/null +++ b/src/tasks/metrics.rs @@ -0,0 +1,74 @@ +use std::time::Duration; + +use futures::stream; +use influxdb2::models::DataPoint; +use influxdb2::Client; +use tokio::time::MissedTickBehavior; +use tracing::{info, warn}; + +use crate::alarm::{SharedAlarms, Status}; +use crate::config::InfluxConfig; +use crate::gpio::SharedFan; +use crate::thermal; + +pub async fn run(config: InfluxConfig, fan: SharedFan, alarms: SharedAlarms) -> anyhow::Result<()> { + // Get client + let client = Client::new(&config.url, &config.org, &config.token); + + // Get interval + let mut interval = + tokio::time::interval(Duration::from_secs_f64(config.metrics_interval_s)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + // Debug logging + info!( + url = %config.url, + org = %config.org, + bucket = %config.bucket, + host = %config.host_tag, + interval_s = config.metrics_interval_s, + "Metrics task started" + ); + + loop { // Main loop + interval.tick().await; + + // Publish the metrics + match publish(&client, &config, &fan, &alarms).await { + Ok(()) => alarms.set_fault("metrics", false).await, + Err(e) => { + // Set the fault if the publish fails + warn!(error = %e, "Failed to publish metrics"); + alarms.set_fault("metrics", true).await; + } + } + } +} + +// Publish the metrics +async fn publish( + client: &Client, + config: &InfluxConfig, + fan: &SharedFan, + alarms: &SharedAlarms, +) -> anyhow::Result<()> { + let temp_c = tokio::task::spawn_blocking(thermal::read_cpu_temp_c) + .await??; + + let fan_on = fan.lock().await.on(); + let alarm = matches!(alarms.status(), Status::Alarm); + + let point = DataPoint::builder("tower") + .tag("host", &config.host_tag) + .field("cpu_temp_c", temp_c) + .field("fan_on", fan_on) + .field("alarm", alarm) + .build()?; + + client + .write(&config.bucket, stream::iter(vec![point])) + .await + .map_err(|e| anyhow::anyhow!("influxdb write failed: {e}"))?; + + Ok(()) +} diff --git a/src/tasks/mod.rs b/src/tasks/mod.rs new file mode 100644 index 0000000..d76f779 --- /dev/null +++ b/src/tasks/mod.rs @@ -0,0 +1,3 @@ +pub mod metrics; +pub mod status; +pub mod thermal; diff --git a/src/tasks/status.rs b/src/tasks/status.rs new file mode 100644 index 0000000..bc2170c --- /dev/null +++ b/src/tasks/status.rs @@ -0,0 +1,45 @@ +use std::time::Duration; + +use anyhow::Context; +use rppal::gpio::{Gpio, OutputPin}; +use tokio::sync::watch; + +use crate::alarm::Status; + +const SLOW_ON: Duration = Duration::from_secs(1); +const SLOW_OFF: Duration = Duration::from_secs(1); +const FAST_ON: Duration = Duration::from_millis(150); +const FAST_OFF: Duration = Duration::from_millis(150); + +struct StatusLed { + pin: OutputPin, +} + +impl Drop for StatusLed { + fn drop(&mut self) { + self.pin.set_low(); + } +} + +pub async fn run(status_pin: u8, status_rx: watch::Receiver) -> anyhow::Result<()> { + let gpio = Gpio::new().context("failed to initialize GPIO")?; + let led = gpio + .get(status_pin) + .context("failed to open status pin")? + .into_output(); + let mut led = StatusLed { pin: led }; + + loop { + // Get the on and off times based on the status + let (on_time, off_time) = match *status_rx.borrow() { + Status::Ok => (SLOW_ON, SLOW_OFF), + Status::Alarm => (FAST_ON, FAST_OFF), + }; + + led.pin.set_high(); + tokio::time::sleep(on_time).await; + + led.pin.set_low(); + tokio::time::sleep(off_time).await; + } +} diff --git a/src/tasks/thermal.rs b/src/tasks/thermal.rs new file mode 100644 index 0000000..3672d3d --- /dev/null +++ b/src/tasks/thermal.rs @@ -0,0 +1,57 @@ +use std::time::Duration; + +use tokio::time::MissedTickBehavior; +use tracing::{debug, info}; + +use crate::alarm::SharedAlarms; +use crate::config::Config; +use crate::gpio::{Fan, SharedFan}; +use crate::thermal; + +pub async fn run(config: Config, fan: SharedFan, alarms: SharedAlarms) -> anyhow::Result<()> { + // Setup the interval for the thermal task + let mut interval = + tokio::time::interval(Duration::from_secs_f64(config.poll_interval_s)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + + loop { // Main loop + interval.tick().await; + + // Read the temperature + let temp_c = tokio::task::spawn_blocking(thermal::read_cpu_temp_c) + .await??; + + // Check if the temperature is over the alarm threshold + let over_temp = temp_c >= config.thermal_alarm_temp_c; + alarms.set_fault("thermal", over_temp).await; + + // Update the fan + let mut fan = fan.lock().await; + update_fan(&mut fan, &config, temp_c); + + // Debug logging + debug!( + temp_c, + fan = if fan.on() { "on" } else { "off" }, + alarm = ?alarms.status(), + ); + } +} + +fn update_fan(fan: &mut Fan, config: &Config, temp_c: f64) { + if !fan.on() && temp_c >= config.fan_on_temp_c { + fan.set_on(true); + info!( + temp_c, + threshold = config.fan_on_temp_c, + "Fan on" + ); + } else if fan.on() && temp_c <= config.fan_off_temp_c { + fan.set_on(false); + info!( + temp_c, + threshold = config.fan_off_temp_c, + "Fan off" + ); + } +}