Skip to main content

kipuka/ha/
mod.rs

1//! High-Availability subsystem for multi-CA failover.
2//!
3//! Implements RHELBU-3536 requirements R1 through R6:
4//! - R1: Multiple CA backend support with independent health tracking
5//! - R2: Circuit-breaker pattern with configurable cooldown
6//! - R3: Pluggable failover strategies (active-passive, round-robin, weighted, latency)
7//! - R4: Health probes with state machine transitions
8//! - R5: Automatic failover on CA unavailability
9//! - R6: Graceful degradation when all CAs are unhealthy
10
11pub mod health;
12pub mod pool;
13pub mod strategy;
14
15pub use health::{HealthChecker, HealthConfig, HealthState};
16pub use pool::{CaConnection, CaId, CaPool, CaStatus};
17pub use strategy::{FailoverStrategy, FallbackBehavior, StrategySelector};
18
19use std::sync::Arc;
20use tokio::sync::watch;
21use tracing::{info, warn};
22
23/// Central coordinator for the HA subsystem.
24///
25/// Owns the [`CaPool`] and [`HealthChecker`], wiring health state updates
26/// into pool availability decisions. The pool uses the configured
27/// [`FailoverStrategy`] to select a CA for each enrollment request.
28pub struct HaManager {
29    pool: Arc<CaPool>,
30    health_checker: HealthChecker,
31    shutdown_tx: watch::Sender<bool>,
32    shutdown_rx: watch::Receiver<bool>,
33}
34
35impl HaManager {
36    /// Build a new HA manager from pool and health configuration.
37    pub fn new(pool: Arc<CaPool>, health_config: HealthConfig) -> Self {
38        let (shutdown_tx, shutdown_rx) = watch::channel(false);
39        let health_checker = HealthChecker::new(Arc::clone(&pool), health_config);
40        Self {
41            pool,
42            health_checker,
43            shutdown_tx,
44            shutdown_rx,
45        }
46    }
47
48    /// Start background health checking.
49    ///
50    /// Spawns a tokio task that periodically probes each CA backend and
51    /// updates the pool's availability map. The task runs until
52    /// [`HaManager::shutdown`] is called.
53    pub async fn start(&self) {
54        let checker = self.health_checker.clone();
55        let mut rx = self.shutdown_rx.clone();
56
57        info!("HA manager starting health check loop");
58
59        tokio::spawn(async move {
60            loop {
61                checker.run_probes().await;
62
63                tokio::select! {
64                    _ = tokio::time::sleep(checker.interval()) => {}
65                    _ = rx.changed() => {
66                        info!("HA manager received shutdown signal");
67                        break;
68                    }
69                }
70            }
71        });
72    }
73
74    /// Signal the health checker to stop.
75    pub fn shutdown(&self) {
76        let _ = self.shutdown_tx.send(true);
77        warn!("HA manager shutting down");
78    }
79
80    /// Reference to the managed CA pool.
81    pub fn pool(&self) -> &Arc<CaPool> {
82        &self.pool
83    }
84}