use std::time::Duration; use futures::stream; use influxdb2::models::DataPoint; use influxdb2::Client as InfluxClient; use tokio::time::MissedTickBehavior; use tracing::{debug, info, warn}; use crate::alarm::SharedAlarms; use crate::config::{InfluxConfig, RenogyConfig}; use crate::gpio::{Fan, SharedFan}; use crate::renogy::{self, Client as RenogyClient, ControllerData, ControllerInfo}; const RECONNECT_INTERVAL_S: f64 = 15.0; pub async fn run( config: RenogyConfig, fan_on_temp_c: f64, fan_off_temp_c: f64, fan: SharedFan, influx: Option, alarms: SharedAlarms, ) -> anyhow::Result<()> { let Some(serial_path) = config.serial_path.clone() else { info!("Renogy disabled (set TOWERD_RENOGY_SERIAL to a /dev/serial/by-id path to enable)"); std::future::pending::<()>().await; unreachable!() }; let mut reconnect = tokio::time::interval(Duration::from_secs_f64(RECONNECT_INTERVAL_S)); reconnect.set_missed_tick_behavior(MissedTickBehavior::Skip); info!( path = %serial_path, slave = config.slave_address, baud = config.baud_rate, interval_s = config.poll_interval_s, "Renogy task started" ); // Reconnect loop loop { let Some(port) = renogy::resolve(&serial_path) else { alarms.set_fault("renogy", true).await; reconnect.tick().await; continue; }; match connect_and_poll( &port, &config, fan_on_temp_c, fan_off_temp_c, &fan, influx.as_ref(), &alarms, ) .await { Ok(()) => return Ok(()), Err(e) => { alarms.set_fault("renogy", true).await; warn!(error = %e, port = %port, "Renogy session ended, retrying"); reconnect.tick().await; } } } } async fn connect_and_poll( port: &str, config: &RenogyConfig, fan_on_temp_c: f64, fan_off_temp_c: f64, fan: &SharedFan, influx: Option<&InfluxConfig>, alarms: &SharedAlarms, ) -> anyhow::Result<()> { let port_owned = port.to_string(); let open_config = config.clone(); let poll_interval_s = config.poll_interval_s; let mut client = tokio::task::spawn_blocking(move || RenogyClient::open(&port_owned, &open_config)).await??; // Verify connection client.verify().await?; let info = client.read_info().await.ok(); if let Some(ref info) = info { info!( port, model = %info.model, software = %info.software_version, hardware = %info.hardware_version, amps = info.amp_rating_a, "Renogy controller connected" ); } else { info!(port, "Renogy controller connected (info registers unavailable?)"); } // Setup influx client let influx_client = influx.map(|cfg| InfluxClient::new(&cfg.url, &cfg.org, &cfg.token)); // Setup poll interval let mut interval = tokio::time::interval(Duration::from_secs_f64(poll_interval_s)); interval.set_missed_tick_behavior(MissedTickBehavior::Skip); // Poll loop loop { // Wait for poll interval interval.tick().await; // Read data let data = client .read_data() .await .map_err(|e| e.context("failed to read Renogy data registers"))?; // Clear fault alarms.set_fault("renogy", false).await; { let mut fan = fan.lock().await; update_fan( &mut fan, fan_on_temp_c, fan_off_temp_c, data.controller_temperature_c, ); debug!( battery_v = data.battery_voltage, battery_soc = data.battery_soc, controller_temp_c = data.controller_temperature_c, solar_w = data.solar_watts, fan = if fan.on() { "on" } else { "off" }, "Renogy poll ok" ); } // Publish data if let (Some(client), Some(influx)) = (&influx_client, influx) { if let Err(e) = publish(client, influx, &data, info.as_ref()).await { warn!(error = %e, "Failed to publish Renogy metrics"); alarms.set_fault("renogy_influx", true).await; } else { alarms.set_fault("renogy_influx", false).await; } } } } fn update_fan(fan: &mut Fan, on_temp_c: f64, off_temp_c: f64, controller_temp_c: u8) { let temp_c = f64::from(controller_temp_c); if !fan.on() && temp_c > on_temp_c { fan.set_on(true); info!( controller_temp_c = temp_c, threshold = on_temp_c, "Fan on" ); } else if fan.on() && temp_c <= off_temp_c { fan.set_on(false); info!( controller_temp_c = temp_c, threshold = off_temp_c, "Fan off" ); } } // Publish data to InfluxDB async fn publish( client: &InfluxClient, influx: &InfluxConfig, data: &ControllerData, info: Option<&ControllerInfo>, ) -> anyhow::Result<()> { let mut point = DataPoint::builder("renogy").tag("host", &influx.host_tag); if let Some(info) = info { if !info.model.is_empty() { point = point.tag("model", &info.model); } } point = point .field("battery_soc", i64::from(data.battery_soc)) .field("battery_voltage", data.battery_voltage) .field("battery_charging_amps", data.battery_charging_amps) .field("battery_charging_watts", data.battery_charging_watts) .field("battery_temperature_c", i64::from(data.battery_temperature_c)) .field("controller_temperature_c", i64::from(data.controller_temperature_c)) .field("load_voltage", data.load_voltage) .field("load_amps", data.load_amps) .field("load_watts", i64::from(data.load_watts)) .field("solar_voltage", data.solar_voltage) .field("solar_amps", data.solar_amps) .field("solar_watts", i64::from(data.solar_watts)) .field("min_battery_voltage_today", data.min_battery_voltage_today) .field("max_battery_voltage_today", data.max_battery_voltage_today) .field("max_charging_amps_today", data.max_charging_amps_today) .field("max_discharging_amps_today", data.max_discharging_amps_today) .field("max_charge_watts_today", i64::from(data.max_charge_watts_today)) .field("max_discharge_watts_today", i64::from(data.max_discharge_watts_today)) .field("charge_amphours_today", i64::from(data.charge_amphours_today)) .field("discharge_amphours_today", i64::from(data.discharge_amphours_today)) .field("charge_watthours_today", i64::from(data.charge_watthours_today)) .field("discharge_watthours_today", i64::from(data.discharge_watthours_today)) .field("controller_uptime_days", i64::from(data.controller_uptime_days)) .field("total_battery_overcharges", i64::from(data.total_battery_overcharges)) .field("total_battery_full_charges", i64::from(data.total_battery_full_charges)) .field("total_charge_amphours", i64::from(data.total_charge_amphours)) .field("total_discharge_amphours", i64::from(data.total_discharge_amphours)) .field("total_generation_kwh", i64::from(data.total_generation_kwh)) .field("total_consumption_kwh", i64::from(data.total_consumption_kwh)) .field("load_status", i64::from(data.load_status)) .field("load_brightness", i64::from(data.load_brightness)) .field("charging_state", i64::from(data.charging_state)) .field("fault_code", i64::from(data.fault_code)); if let Some(info) = info { point = point .field("voltage_rating_v", i64::from(info.voltage_rating_v)) .field("amp_rating_a", i64::from(info.amp_rating_a)) .field("wattage_rating_w", i64::from(info.wattage_rating_w)) .field("discharge_amp_rating_a", i64::from(info.discharge_amp_rating_a)) .field("controller_type", i64::from(info.controller_type)) .field("modbus_address", i64::from(info.modbus_address)) .field("software_version", info.software_version.clone()) .field("hardware_version", info.hardware_version.clone()) .field("serial_number", info.serial_number.clone()); } let point = point.build()?; client .write(&influx.bucket, stream::iter(vec![point])) .await .map_err(|e| anyhow::anyhow!("influxdb write failed: {e}"))?; Ok(()) }