Add influxdb

This commit is contained in:
KenwoodFox
2026-06-15 10:51:54 -04:00
parent f49ff99b6b
commit 35e56d25e7
10 changed files with 330 additions and 208 deletions

View File

@@ -7,7 +7,9 @@ description = "Tower daemon for KW1FOX-1"
[dependencies]
anyhow = "1"
clap = { version = "4", features = ["derive"] }
ctrlc = "3"
futures = "0.3"
influxdb2 = { version = "0.5", default-features = false, features = ["rustls"] }
rppal = "0.22"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal", "sync", "time"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

59
src/alarm.rs Normal file
View File

@@ -0,0 +1,59 @@
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::{watch, Mutex};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Status {
Ok,
Alarm,
}
// Shared reference to alarm board
pub type SharedAlarms = Arc<AlarmBoard>;
pub struct AlarmBoard {
faults: Mutex<HashSet<&'static str>>,
status_tx: watch::Sender<Status>,
}
impl AlarmBoard {
// Create a new alarm board
pub fn new() -> (SharedAlarms, watch::Receiver<Status>) {
let (status_tx, status_rx) = watch::channel(Status::Ok);
let board = Arc::new(Self {
faults: Mutex::new(HashSet::new()),
status_tx,
});
(board, status_rx)
}
// Set a fault (source, string, active)
pub async fn set_fault(&self, source: &'static str, active: bool) {
let mut faults = self.faults.lock().await;
if active {
faults.insert(source);
} else {
faults.remove(source);
}
let status = if faults.is_empty() {
Status::Ok
} else {
Status::Alarm
};
let _ = self.status_tx.send_if_modified(|current| {
if *current != status {
*current = status;
true
} else {
false
}
});
}
pub fn status(&self) -> Status {
*self.status_tx.borrow()
}
}

View File

@@ -5,7 +5,18 @@ pub struct Config {
pub fan_on_temp_c: f64,
pub fan_off_temp_c: f64,
pub poll_interval_s: f64,
pub error_temp_c: f64,
pub thermal_alarm_temp_c: f64,
pub influx: Option<InfluxConfig>,
}
#[derive(Debug, Clone)]
pub struct InfluxConfig {
pub url: String,
pub org: String,
pub bucket: String,
pub token: String,
pub host_tag: String,
pub metrics_interval_s: f64,
}
impl Default for Config {
@@ -16,7 +27,30 @@ impl Default for Config {
fan_on_temp_c: 40.0,
fan_off_temp_c: 35.0,
poll_interval_s: 2.0,
error_temp_c: 80.0,
thermal_alarm_temp_c: 80.0,
influx: InfluxConfig::from_env(),
}
}
}
impl InfluxConfig {
pub fn from_env() -> Option<Self> {
let token = std::env::var("TOWERD_INFLUX_TOKEN").ok()?;
let org = std::env::var("TOWERD_INFLUX_ORG").ok()?;
let bucket = std::env::var("TOWERD_INFLUX_BUCKET").ok()?;
Some(Self {
url: std::env::var("TOWERD_INFLUX_URL")
.unwrap_or_else(|_| "http://influx.kitsunehosting.net:8086".into()),
org,
bucket,
token,
host_tag: std::env::var("TOWERD_INFLUX_HOST")
.unwrap_or_else(|_| "kw1fox-1".into()),
metrics_interval_s: std::env::var("TOWERD_INFLUX_INTERVAL_S")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(30.0),
})
}
}

View File

@@ -1,99 +1,47 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tracing::{debug, info};
use tracing::info;
use crate::alarm::AlarmBoard;
use crate::config::Config;
use crate::gpio::{Status, TowerGpio};
use crate::thermal;
use crate::gpio::Fan;
use crate::tasks;
pub struct TowerDaemon {
config: Config,
gpio: TowerGpio,
running: Arc<AtomicBool>,
}
pub async fn run(config: Config) -> anyhow::Result<()> {
let (alarms, status_rx) = AlarmBoard::new();
let fan = Arc::new(tokio::sync::Mutex::new(Fan::new(config.fan_pin)?));
impl TowerDaemon {
/// Creates a new TowerDaemon instance
// fn new is like the constructor and builds this object
pub fn new(config: Config) -> anyhow::Result<Self> {
let gpio = TowerGpio::new(config.status_pin, config.fan_pin)?;
let running = Arc::new(AtomicBool::new(true));
info!(
status_pin = config.status_pin,
fan_pin = config.fan_pin,
"Tower daemon started"
);
Ok(Self {
config,
gpio,
running,
})
if config.influx.is_none() {
info!("InfluxDB metrics disabled (set TOWERD_INFLUX_TOKEN, TOWERD_INFLUX_ORG, and TOWERD_INFLUX_BUCKET to enable)");
}
// Runs the daemon
pub fn run(mut self) -> anyhow::Result<()> {
let running = Arc::clone(&self.running);
ctrlc::set_handler(move || {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Received shutdown signal");
running.store(false, Ordering::Relaxed);
})?;
info!(
status_pin = self.config.status_pin,
fan_pin = self.config.fan_pin,
"Tower daemon started"
);
// Do tick (main loop)
while self.running.load(Ordering::Relaxed) {
self.tick()?;
thread::sleep(Duration::from_secs_f64(self.config.poll_interval_s));
}
self.gpio.stop();
info!("Tower daemon stopped");
Ok(())
}
// Single daemon tick
fn tick(&mut self) -> anyhow::Result<()> {
let temp_c = thermal::read_cpu_temp_c()?;
self.update_fan(temp_c);
self.update_status(temp_c);
debug!(
temp_c,
fan = if self.gpio.fan_on() { "on" } else { "off" },
status = ?self.gpio.status(),
);
Ok(())
}
// Update the fan based on temperature
fn update_fan(&mut self, temp_c: f64) {
if !self.gpio.fan_on() && temp_c >= self.config.fan_on_temp_c {
self.gpio.set_fan(true);
info!(
temp_c,
threshold = self.config.fan_on_temp_c,
"Fan on"
);
} else if self.gpio.fan_on() && temp_c <= self.config.fan_off_temp_c {
self.gpio.set_fan(false);
info!(
temp_c,
threshold = self.config.fan_off_temp_c,
"Fan off"
);
result = tasks::status::run(config.status_pin, status_rx) => {
result?;
}
result = tasks::thermal::run(config.clone(), fan.clone(), alarms.clone()) => {
result?;
}
result = async {
match config.influx {
Some(influx) => tasks::metrics::run(influx, fan.clone(), alarms.clone()).await,
None => std::future::pending().await,
}
} => {
result?;
}
}
// Update the status
fn update_status(&self, temp_c: f64) {
if temp_c >= self.config.error_temp_c {
self.gpio.set_status(Status::Error);
} else {
self.gpio.set_status(Status::Ok);
}
}
fan.lock().await.stop();
info!("Tower daemon stopped");
Ok(())
}

View File

@@ -1,144 +1,42 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use std::sync::Arc;
use anyhow::Context;
use rppal::gpio::{Gpio, OutputPin};
use tokio::sync::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Status {
Ok,
Error,
}
pub type SharedFan = Arc<Mutex<Fan>>;
const SLOW_ON: Duration = Duration::from_secs(1);
const SLOW_OFF: Duration = Duration::from_secs(1);
const FAST_ON: Duration = Duration::from_millis(150);
const FAST_OFF: Duration = Duration::from_millis(150);
pub struct TowerGpio {
pub struct Fan {
_gpio: Gpio,
fan: OutputPin,
status: Arc<Mutex<Status>>,
blink_shutdown: Arc<AtomicBool>,
blink_handle: Option<JoinHandle<()>>,
pin: OutputPin,
}
impl TowerGpio {
/// Sets up the GPIO pins and spawns the blink thread
pub fn new(status_pin: u8, fan_pin: u8) -> anyhow::Result<Self> {
impl Fan {
pub fn new(fan_pin: u8) -> anyhow::Result<Self> {
let gpio = Gpio::new().context("failed to initialize GPIO")?;
// Setup the fan
let mut fan = gpio
let mut pin = gpio
.get(fan_pin)
.context("failed to open fan pin")?
.into_output();
fan.set_low();
pin.set_low();
// Setup the status LED
let status_led = gpio
.get(status_pin)
.context("failed to open status pin")?
.into_output();
let status = Arc::new(Mutex::new(Status::Ok));
let blink_shutdown = Arc::new(AtomicBool::new(false)); // A signal to use to stop the blink thread
let blink_status = Arc::clone(&status); // Shared reference for the current status
let blink_stop = Arc::clone(&blink_shutdown); // Shared reference to the shutdown signal
let blink_handle = thread::spawn(move || {
blink_loop(status_led, blink_status, blink_stop); // Spawn the blink thread
});
Ok(Self { // Return the struct with the GPIO pins and the blink thread handle
_gpio: gpio,
fan,
status,
blink_shutdown,
blink_handle: Some(blink_handle),
})
Ok(Self { _gpio: gpio, pin })
}
// Get the current status
pub fn status(&self) -> Status {
*self.status.lock().expect("status lock poisoned")
pub fn on(&self) -> bool {
self.pin.is_set_high()
}
// Check if the fan is on
pub fn fan_on(&self) -> bool {
self.fan.is_set_high()
}
// Set the current status
pub fn set_status(&self, status: Status) {
let mut current = self.status.lock().expect("status lock poisoned");
if *current == status {
return;
}
*current = status;
}
// Set the fan on or off
pub fn set_fan(&mut self, on: bool) {
pub fn set_on(&mut self, on: bool) {
if on {
self.fan.set_high();
self.pin.set_high();
} else {
self.fan.set_low();
self.pin.set_low();
}
}
// Stop the GPIO pins and join the blink thread
pub fn stop(&mut self) {
self.blink_shutdown.store(true, Ordering::Relaxed);
if let Some(handle) = self.blink_handle.take() {
let _ = handle.join();
}
self.fan.set_low();
self.pin.set_low();
}
}
// For when the tower GPIO is dropped
impl Drop for TowerGpio {
fn drop(&mut self) {
self.stop();
}
}
// The blink loop for the status LED
fn blink_loop(mut led: OutputPin, status: Arc<Mutex<Status>>, shutdown: Arc<AtomicBool>) {
while !shutdown.load(Ordering::Relaxed) {
let current = *status.lock().expect("status lock poisoned");
let (on_time, off_time) = match current {
Status::Ok => (SLOW_ON, SLOW_OFF),
Status::Error => (FAST_ON, FAST_OFF),
};
led.set_high();
if sleep_or_shutdown(on_time, &shutdown) {
break;
}
led.set_low();
if sleep_or_shutdown(off_time, &shutdown) {
break;
}
}
led.set_low();
}
fn sleep_or_shutdown(duration: Duration, shutdown: &AtomicBool) -> bool {
let step = Duration::from_millis(50);
let mut remaining = duration;
while remaining > Duration::ZERO {
if shutdown.load(Ordering::Relaxed) {
return true;
}
let slice = remaining.min(step);
thread::sleep(slice);
remaining = remaining.saturating_sub(slice);
}
shutdown.load(Ordering::Relaxed)
}

View File

@@ -1,6 +1,8 @@
mod alarm;
mod config;
mod daemon;
mod gpio;
mod tasks;
mod thermal;
use clap::Parser;
@@ -8,7 +10,6 @@ use tracing::Level;
use tracing_subscriber::EnvFilter;
use crate::config::Config;
use crate::daemon::TowerDaemon;
#[derive(Parser)]
#[command(name = "towerd", about = "Tower Daemon")]
@@ -17,7 +18,8 @@ struct Args {
verbose: bool,
}
fn main() -> anyhow::Result<()> {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let default_level = if args.verbose {
@@ -34,5 +36,5 @@ fn main() -> anyhow::Result<()> {
.with_target(false)
.init();
TowerDaemon::new(Config::default())?.run()
daemon::run(Config::default()).await
}

74
src/tasks/metrics.rs Normal file
View File

@@ -0,0 +1,74 @@
use std::time::Duration;
use futures::stream;
use influxdb2::models::DataPoint;
use influxdb2::Client;
use tokio::time::MissedTickBehavior;
use tracing::{info, warn};
use crate::alarm::{SharedAlarms, Status};
use crate::config::InfluxConfig;
use crate::gpio::SharedFan;
use crate::thermal;
pub async fn run(config: InfluxConfig, fan: SharedFan, alarms: SharedAlarms) -> anyhow::Result<()> {
// Get client
let client = Client::new(&config.url, &config.org, &config.token);
// Get interval
let mut interval =
tokio::time::interval(Duration::from_secs_f64(config.metrics_interval_s));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
// Debug logging
info!(
url = %config.url,
org = %config.org,
bucket = %config.bucket,
host = %config.host_tag,
interval_s = config.metrics_interval_s,
"Metrics task started"
);
loop { // Main loop
interval.tick().await;
// Publish the metrics
match publish(&client, &config, &fan, &alarms).await {
Ok(()) => alarms.set_fault("metrics", false).await,
Err(e) => {
// Set the fault if the publish fails
warn!(error = %e, "Failed to publish metrics");
alarms.set_fault("metrics", true).await;
}
}
}
}
// Publish the metrics
async fn publish(
client: &Client,
config: &InfluxConfig,
fan: &SharedFan,
alarms: &SharedAlarms,
) -> anyhow::Result<()> {
let temp_c = tokio::task::spawn_blocking(thermal::read_cpu_temp_c)
.await??;
let fan_on = fan.lock().await.on();
let alarm = matches!(alarms.status(), Status::Alarm);
let point = DataPoint::builder("tower")
.tag("host", &config.host_tag)
.field("cpu_temp_c", temp_c)
.field("fan_on", fan_on)
.field("alarm", alarm)
.build()?;
client
.write(&config.bucket, stream::iter(vec![point]))
.await
.map_err(|e| anyhow::anyhow!("influxdb write failed: {e}"))?;
Ok(())
}

3
src/tasks/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod metrics;
pub mod status;
pub mod thermal;

45
src/tasks/status.rs Normal file
View File

@@ -0,0 +1,45 @@
use std::time::Duration;
use anyhow::Context;
use rppal::gpio::{Gpio, OutputPin};
use tokio::sync::watch;
use crate::alarm::Status;
const SLOW_ON: Duration = Duration::from_secs(1);
const SLOW_OFF: Duration = Duration::from_secs(1);
const FAST_ON: Duration = Duration::from_millis(150);
const FAST_OFF: Duration = Duration::from_millis(150);
struct StatusLed {
pin: OutputPin,
}
impl Drop for StatusLed {
fn drop(&mut self) {
self.pin.set_low();
}
}
pub async fn run(status_pin: u8, status_rx: watch::Receiver<Status>) -> anyhow::Result<()> {
let gpio = Gpio::new().context("failed to initialize GPIO")?;
let led = gpio
.get(status_pin)
.context("failed to open status pin")?
.into_output();
let mut led = StatusLed { pin: led };
loop {
// Get the on and off times based on the status
let (on_time, off_time) = match *status_rx.borrow() {
Status::Ok => (SLOW_ON, SLOW_OFF),
Status::Alarm => (FAST_ON, FAST_OFF),
};
led.pin.set_high();
tokio::time::sleep(on_time).await;
led.pin.set_low();
tokio::time::sleep(off_time).await;
}
}

57
src/tasks/thermal.rs Normal file
View File

@@ -0,0 +1,57 @@
use std::time::Duration;
use tokio::time::MissedTickBehavior;
use tracing::{debug, info};
use crate::alarm::SharedAlarms;
use crate::config::Config;
use crate::gpio::{Fan, SharedFan};
use crate::thermal;
pub async fn run(config: Config, fan: SharedFan, alarms: SharedAlarms) -> anyhow::Result<()> {
// Setup the interval for the thermal task
let mut interval =
tokio::time::interval(Duration::from_secs_f64(config.poll_interval_s));
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop { // Main loop
interval.tick().await;
// Read the temperature
let temp_c = tokio::task::spawn_blocking(thermal::read_cpu_temp_c)
.await??;
// Check if the temperature is over the alarm threshold
let over_temp = temp_c >= config.thermal_alarm_temp_c;
alarms.set_fault("thermal", over_temp).await;
// Update the fan
let mut fan = fan.lock().await;
update_fan(&mut fan, &config, temp_c);
// Debug logging
debug!(
temp_c,
fan = if fan.on() { "on" } else { "off" },
alarm = ?alarms.status(),
);
}
}
fn update_fan(fan: &mut Fan, config: &Config, temp_c: f64) {
if !fan.on() && temp_c >= config.fan_on_temp_c {
fan.set_on(true);
info!(
temp_c,
threshold = config.fan_on_temp_c,
"Fan on"
);
} else if fan.on() && temp_c <= config.fan_off_temp_c {
fan.set_on(false);
info!(
temp_c,
threshold = config.fan_off_temp_c,
"Fan off"
);
}
}