Skip to main content

kipuka/ha/
pool.rs

1//! CA connection pool with circuit-breaker and priority weighting.
2//!
3//! Manages connections to multiple CA backends with per-CA health tracking.
4//! Implements RHELBU-3536 R1 (multi-CA) and R2 (circuit breaker).
5
6use std::collections::HashMap;
7use std::time::{Duration, Instant};
8
9use parking_lot::RwLock;
10use tracing::{debug, info, warn};
11
12use super::health::HealthState;
13use super::strategy::{FailoverStrategy, FallbackBehavior, StrategySelector};
14
15/// Opaque identifier for a CA backend.
16#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
17pub struct CaId(pub String);
18
19impl std::fmt::Display for CaId {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        f.write_str(&self.0)
22    }
23}
24
25impl std::borrow::Borrow<str> for CaId {
26    fn borrow(&self) -> &str {
27        &self.0
28    }
29}
30
31impl std::borrow::Borrow<String> for CaId {
32    fn borrow(&self) -> &String {
33        &self.0
34    }
35}
36
37impl From<String> for CaId {
38    fn from(s: String) -> Self {
39        Self(s)
40    }
41}
42
43impl From<&str> for CaId {
44    fn from(s: &str) -> Self {
45        Self(s.to_owned())
46    }
47}
48
49/// Runtime status of a single CA backend.
50#[derive(Debug, Clone)]
51pub struct CaStatus {
52    /// Current health state from the health checker.
53    pub health: HealthState,
54    /// Number of consecutive probe failures.
55    pub consecutive_failures: u32,
56    /// Timestamp of the last successful probe.
57    pub last_success: Option<Instant>,
58    /// Timestamp when the circuit breaker tripped (CA marked unavailable).
59    pub circuit_open_since: Option<Instant>,
60    /// Recent response latency (exponential moving average in milliseconds).
61    pub latency_ema_ms: f64,
62    /// Last observed response latency (for admin display).
63    pub last_latency: Option<Duration>,
64}
65
66impl CaStatus {
67    fn new() -> Self {
68        Self {
69            health: HealthState::Healthy,
70            consecutive_failures: 0,
71            last_success: None,
72            circuit_open_since: None,
73            latency_ema_ms: 0.0,
74            last_latency: None,
75        }
76    }
77}
78
79/// Registered CA backend with its configuration and connection info.
80#[derive(Debug, Clone)]
81pub struct CaConnection {
82    /// Unique identifier.
83    pub id: CaId,
84    /// Base URL or endpoint for this CA.
85    pub endpoint: String,
86    /// Static priority weight (higher = preferred).
87    pub weight: u32,
88    /// Priority order for active-passive (lower = higher priority).
89    pub priority: u32,
90}
91
92/// Circuit-breaker configuration (RHELBU-3536 R2).
93#[derive(Debug, Clone)]
94pub struct CircuitBreakerConfig {
95    /// Number of consecutive failures before opening the circuit.
96    pub failure_threshold: u32,
97    /// Duration to wait before re-probing an unavailable CA.
98    pub cooldown: Duration,
99}
100
101impl Default for CircuitBreakerConfig {
102    fn default() -> Self {
103        Self {
104            failure_threshold: 3,
105            cooldown: Duration::from_secs(60),
106        }
107    }
108}
109
110/// Pool configuration.
111#[derive(Debug, Clone)]
112pub struct PoolConfig {
113    /// Failover strategy for CA selection.
114    pub strategy: FailoverStrategy,
115    /// Behavior when all CAs are unavailable.
116    pub fallback: FallbackBehavior,
117    /// Circuit-breaker settings.
118    pub circuit_breaker: CircuitBreakerConfig,
119}
120
121/// Thread-safe pool of CA backend connections.
122///
123/// Routes enrollment requests to healthy CAs based on the configured
124/// [`FailoverStrategy`]. The pool is updated by the [`super::health::HealthChecker`]
125/// and read by request handlers concurrently.
126pub struct CaPool {
127    /// Registered CA backends (insertion-ordered by priority).
128    connections: Vec<CaConnection>,
129    /// Per-CA runtime status, protected by `RwLock` for concurrent reads.
130    statuses: RwLock<HashMap<CaId, CaStatus>>,
131    /// Pool configuration.
132    config: PoolConfig,
133    /// Strategy selector for CA routing.
134    selector: StrategySelector,
135}
136
137impl CaPool {
138    /// Create a new pool with the given backends and configuration.
139    pub fn new(connections: Vec<CaConnection>, config: PoolConfig) -> Self {
140        let mut statuses = HashMap::new();
141        for conn in &connections {
142            statuses.insert(conn.id.clone(), CaStatus::new());
143        }
144        let selector = StrategySelector::new(config.strategy.clone());
145
146        Self {
147            connections,
148            statuses: RwLock::new(statuses),
149            config,
150            selector,
151        }
152    }
153
154    /// Select the best available CA for an enrollment request.
155    ///
156    /// Returns `None` when no healthy CA is available and the fallback
157    /// behavior is [`FallbackBehavior::Reject`].
158    pub fn select(&self) -> Option<CaConnection> {
159        let statuses = self.statuses.read();
160        let healthy: Vec<&CaConnection> = self
161            .connections
162            .iter()
163            .filter(|c| {
164                statuses
165                    .get(&c.id)
166                    .map(|s| s.health.is_available())
167                    .unwrap_or(false)
168            })
169            .collect();
170
171        if healthy.is_empty() {
172            warn!("no healthy CA backends available");
173            return match self.config.fallback {
174                FallbackBehavior::Reject => None,
175                FallbackBehavior::QueueAndRetry => {
176                    // In a full implementation this would enqueue the request.
177                    // For now, return None and let the caller decide.
178                    warn!("queue-and-retry fallback not yet implemented; rejecting");
179                    None
180                }
181            };
182        }
183
184        let snapshot: Vec<(&CaConnection, &CaStatus)> = healthy
185            .iter()
186            .filter_map(|c| statuses.get(&c.id).map(|s| (*c, s)))
187            .collect();
188
189        self.selector.select(&snapshot)
190    }
191
192    /// Record a successful request to a CA, updating latency EMA.
193    pub fn record_success(&self, id: &CaId, latency: Duration) {
194        let mut statuses = self.statuses.write();
195        if let Some(status) = statuses.get_mut(id) {
196            status.consecutive_failures = 0;
197            status.last_success = Some(Instant::now());
198            status.circuit_open_since = None;
199            status.last_latency = Some(latency);
200
201            let ms = latency.as_secs_f64() * 1000.0;
202            // Exponential moving average with alpha=0.3.
203            status.latency_ema_ms = status.latency_ema_ms * 0.7 + ms * 0.3;
204
205            if status.health != HealthState::Healthy {
206                info!(ca = %id, "CA recovered, marking healthy");
207                status.health = HealthState::Healthy;
208            }
209        }
210    }
211
212    /// Record a failed request, applying circuit-breaker logic (RHELBU-3536 R2).
213    pub fn record_failure(&self, id: &CaId) {
214        let mut statuses = self.statuses.write();
215        if let Some(status) = statuses.get_mut(id) {
216            status.consecutive_failures += 1;
217            debug!(
218                ca = %id,
219                failures = status.consecutive_failures,
220                "CA request failed"
221            );
222
223            if status.consecutive_failures >= self.config.circuit_breaker.failure_threshold {
224                if status.health != HealthState::Unavailable {
225                    warn!(
226                        ca = %id,
227                        failures = status.consecutive_failures,
228                        "circuit breaker tripped, marking CA unavailable"
229                    );
230                    status.health = HealthState::Unavailable;
231                    status.circuit_open_since = Some(Instant::now());
232                }
233            } else if status.health == HealthState::Healthy {
234                status.health = HealthState::Degraded;
235            }
236        }
237    }
238
239    /// Check whether a tripped circuit breaker should allow a re-probe.
240    pub fn should_reprobe(&self, id: &CaId) -> bool {
241        let statuses = self.statuses.read();
242        statuses.get(id).is_some_and(|s| {
243            s.circuit_open_since
244                .is_some_and(|opened| opened.elapsed() >= self.config.circuit_breaker.cooldown)
245        })
246    }
247
248    /// Update the health state for a CA (called by the health checker).
249    pub fn set_health(&self, id: &CaId, state: HealthState) {
250        let mut statuses = self.statuses.write();
251        if let Some(status) = statuses.get_mut(id) {
252            let prev = status.health.clone();
253            status.health = state.clone();
254            if prev != state {
255                info!(ca = %id, from = ?prev, to = ?state, "CA health state transition");
256            }
257        }
258    }
259
260    /// Snapshot of current statuses for monitoring.
261    pub fn status_snapshot(&self) -> HashMap<CaId, CaStatus> {
262        self.statuses.read().clone()
263    }
264
265    /// All registered CA connections.
266    pub fn connections(&self) -> &[CaConnection] {
267        &self.connections
268    }
269
270    /// Pool configuration.
271    pub fn config(&self) -> &PoolConfig {
272        &self.config
273    }
274}