Skip to main content

kipuka/ha/
health.rs

1//! Periodic health probing with state machine transitions.
2//!
3//! Implements RHELBU-3536 R4: health state machine
4//! `Healthy -> Degraded -> Unavailable -> Recovering -> Healthy`
5//! with configurable probe intervals and alert generation via audit log.
6
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use serde::{Deserialize, Serialize};
11use tracing::{debug, info, warn};
12
13use super::pool::{CaId, CaPool};
14
15/// Health state of a single CA backend (RHELBU-3536 R4).
16///
17/// Transitions follow the state machine:
18/// ```text
19/// Healthy -> Degraded -> Unavailable -> Recovering -> Healthy
20///                ^                          |
21///                +---- (probe failure) ------+
22/// ```
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub enum HealthState {
25    /// CA is responding normally within latency thresholds.
26    Healthy,
27    /// CA is responding but with elevated latency or intermittent errors.
28    Degraded,
29    /// CA is not responding; circuit breaker is open.
30    Unavailable,
31    /// CA was unavailable and is being re-probed after cooldown.
32    Recovering,
33}
34
35impl HealthState {
36    /// Whether this state allows routing requests to the CA.
37    pub fn is_available(&self) -> bool {
38        matches!(self, Self::Healthy | Self::Degraded | Self::Recovering)
39    }
40}
41
42/// Configuration for the health checker.
43#[derive(Debug, Clone)]
44pub struct HealthConfig {
45    /// Interval between probe rounds (default: 30 seconds).
46    pub probe_interval: Duration,
47    /// Timeout for a single probe request.
48    pub probe_timeout: Duration,
49    /// Number of consecutive successes required to transition from
50    /// `Recovering` back to `Healthy`.
51    pub recovery_threshold: u32,
52    /// Latency threshold (ms) above which a CA is considered degraded.
53    pub degraded_latency_ms: u64,
54}
55
56impl Default for HealthConfig {
57    fn default() -> Self {
58        Self {
59            probe_interval: Duration::from_secs(30),
60            probe_timeout: Duration::from_secs(5),
61            recovery_threshold: 2,
62            degraded_latency_ms: 2000,
63        }
64    }
65}
66
67/// Per-CA probe metrics tracked between probe rounds.
68#[derive(Debug, Clone)]
69pub struct ProbeMetrics {
70    /// Timestamp of the last completed probe.
71    pub last_check: Option<Instant>,
72    /// Number of consecutive probe failures.
73    pub consecutive_failures: u32,
74    /// Number of consecutive probe successes (used during recovery).
75    pub consecutive_successes: u32,
76    /// Last observed response latency.
77    pub last_latency: Option<Duration>,
78}
79
80impl ProbeMetrics {
81    fn new() -> Self {
82        Self {
83            last_check: None,
84            consecutive_failures: 0,
85            consecutive_successes: 0,
86            last_latency: None,
87        }
88    }
89}
90
91/// Runs periodic health probes against each CA backend.
92///
93/// The checker is cloneable (behind `Arc`) and designed to run in a
94/// background tokio task managed by [`super::HaManager`].
95#[derive(Clone)]
96pub struct HealthChecker {
97    pool: Arc<CaPool>,
98    config: HealthConfig,
99    /// Per-CA probe metrics, keyed by CaId.
100    metrics: Arc<parking_lot::RwLock<std::collections::HashMap<CaId, ProbeMetrics>>>,
101}
102
103impl HealthChecker {
104    /// Create a new health checker for the given pool.
105    pub fn new(pool: Arc<CaPool>, config: HealthConfig) -> Self {
106        let mut metrics = std::collections::HashMap::new();
107        for conn in pool.connections() {
108            metrics.insert(conn.id.clone(), ProbeMetrics::new());
109        }
110
111        Self {
112            pool,
113            config,
114            metrics: Arc::new(parking_lot::RwLock::new(metrics)),
115        }
116    }
117
118    /// Configured probe interval.
119    pub fn interval(&self) -> Duration {
120        self.config.probe_interval
121    }
122
123    /// Execute one round of probes against all registered CAs.
124    pub async fn run_probes(&self) {
125        debug!("starting health probe round");
126
127        for conn in self.pool.connections() {
128            let start = Instant::now();
129            let result = self.probe_ca(&conn.id).await;
130            let elapsed = start.elapsed();
131
132            let mut metrics = self.metrics.write();
133            let m = metrics
134                .entry(conn.id.clone())
135                .or_insert_with(ProbeMetrics::new);
136            m.last_check = Some(Instant::now());
137            m.last_latency = Some(elapsed);
138
139            match result {
140                Ok(()) => self.handle_probe_success(&conn.id, elapsed, m),
141                Err(e) => self.handle_probe_failure(&conn.id, e, m),
142            }
143        }
144    }
145
146    /// Probe a single CA backend.
147    ///
148    /// In a full implementation this would issue an HTTP request to the
149    /// CA's health endpoint or attempt a lightweight certificate status
150    /// check. For now it checks whether the pool considers the CA should
151    /// be re-probed after a circuit-breaker cooldown.
152    async fn probe_ca(&self, id: &CaId) -> Result<(), String> {
153        // If the CA is unavailable, check whether cooldown has elapsed.
154        if self.pool.should_reprobe(id) {
155            debug!(ca = %id, "cooldown elapsed, attempting re-probe");
156            // TODO: issue actual health-check request to the CA endpoint.
157            // For now, simulate success for circuit-breaker demonstration.
158            return Ok(());
159        }
160
161        // TODO: implement actual health probe (HTTP GET to CA status endpoint,
162        // or lightweight certificate issuance test).
163        // Placeholder: always succeed for healthy/degraded CAs.
164        Ok(())
165    }
166
167    /// Handle a successful probe, applying state transitions.
168    fn handle_probe_success(&self, id: &CaId, latency: Duration, metrics: &mut ProbeMetrics) {
169        metrics.consecutive_failures = 0;
170        metrics.consecutive_successes += 1;
171
172        let current_snapshot = self.pool.status_snapshot();
173        let current_health = current_snapshot
174            .get(id)
175            .map(|s| s.health.clone())
176            .unwrap_or(HealthState::Healthy);
177
178        let latency_ms = latency.as_millis() as u64;
179
180        let new_state = match current_health {
181            HealthState::Unavailable => {
182                info!(ca = %id, "CA responding again, entering recovery");
183                metrics.consecutive_successes = 1;
184                HealthState::Recovering
185            }
186            HealthState::Recovering => {
187                if metrics.consecutive_successes >= self.config.recovery_threshold {
188                    info!(ca = %id, "CA recovery confirmed, marking healthy");
189                    HealthState::Healthy
190                } else {
191                    debug!(
192                        ca = %id,
193                        successes = metrics.consecutive_successes,
194                        needed = self.config.recovery_threshold,
195                        "CA still recovering"
196                    );
197                    HealthState::Recovering
198                }
199            }
200            HealthState::Degraded | HealthState::Healthy => {
201                if latency_ms > self.config.degraded_latency_ms {
202                    debug!(ca = %id, latency_ms, "CA responding slowly, marking degraded");
203                    HealthState::Degraded
204                } else {
205                    HealthState::Healthy
206                }
207            }
208        };
209
210        self.pool.set_health(id, new_state);
211        self.pool.record_success(id, latency);
212    }
213
214    /// Handle a failed probe, applying state transitions.
215    fn handle_probe_failure(&self, id: &CaId, error: String, metrics: &mut ProbeMetrics) {
216        metrics.consecutive_failures += 1;
217        metrics.consecutive_successes = 0;
218
219        warn!(
220            ca = %id,
221            failures = metrics.consecutive_failures,
222            error = %error,
223            "health probe failed"
224        );
225
226        self.pool.record_failure(id);
227    }
228
229    /// Snapshot of probe metrics for monitoring.
230    pub fn metrics_snapshot(&self) -> std::collections::HashMap<CaId, ProbeMetrics> {
231        self.metrics.read().clone()
232    }
233}