kipuka_dogtag/pool.rs
1//! Multi-CA connection pool with health-based routing.
2//!
3//! Manages [`DogtagClient`] instances for multiple Dogtag CA backends,
4//! integrating with kipuka's HA subsystem (`src/ha/`) for failover and
5//! load balancing.
6
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use tracing::{info, warn};
11
12use crate::client::DogtagClient;
13use crate::config::DogtagConfig;
14use crate::{DogtagError, DogtagResult};
15
16/// Health state of a CA backend in the pool.
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum BackendHealth {
19 /// Backend is responding to health checks.
20 Healthy,
21 /// Backend is not responding or returning errors.
22 Unhealthy,
23 /// Health state has not been determined yet.
24 Unknown,
25}
26
27/// A single CA backend entry in the pool.
28struct PoolEntry {
29 /// The HTTP client for this CA instance.
30 client: Arc<DogtagClient>,
31 /// Current health state.
32 health: BackendHealth,
33 /// Timestamp of the last successful health check.
34 last_healthy: Option<Instant>,
35 /// Number of consecutive health check failures.
36 consecutive_failures: u32,
37}
38
39/// Connection pool managing multiple Dogtag CA instances.
40///
41/// Routes enrollment and certificate operations to healthy CA backends.
42/// Integrates with kipuka's HA subsystem for consistent failover behavior
43/// across all CA backend types.
44///
45/// # Health Checking
46///
47/// The pool periodically probes each backend via `GET /ca/rest/info`.
48/// Backends that fail consecutive health checks are marked unhealthy
49/// and excluded from request routing until they recover.
50///
51/// # Thread Safety
52///
53/// `DogtagPool` is `Send + Sync` and designed to be shared via
54/// `Arc<DogtagPool>` across the async runtime.
55pub struct DogtagPool {
56 entries: parking_lot::RwLock<Vec<PoolEntry>>,
57 /// Circuit breaker threshold: mark unhealthy after this many failures.
58 failure_threshold: u32,
59 /// Cooldown before re-checking an unhealthy backend.
60 cooldown: Duration,
61}
62
63impl DogtagPool {
64 /// Create a pool from multiple Dogtag configurations.
65 ///
66 /// Each configuration represents a separate CA instance. The pool
67 /// creates a [`DogtagClient`] for each and begins tracking health.
68 pub fn new(
69 configs: &[DogtagConfig],
70 failure_threshold: u32,
71 cooldown_secs: u64,
72 ) -> DogtagResult<Self> {
73 if configs.is_empty() {
74 return Err(DogtagError::ConfigError(
75 "At least one CA backend is required".into(),
76 ));
77 }
78
79 let mut entries = Vec::with_capacity(configs.len());
80 for config in configs {
81 let client = Arc::new(DogtagClient::new(config)?);
82 info!(url = client.base_url(), "Added Dogtag CA backend to pool");
83 entries.push(PoolEntry {
84 client,
85 health: BackendHealth::Unknown,
86 last_healthy: None,
87 consecutive_failures: 0,
88 });
89 }
90
91 Ok(Self {
92 entries: parking_lot::RwLock::new(entries),
93 failure_threshold,
94 cooldown: Duration::from_secs(cooldown_secs),
95 })
96 }
97
98 /// Get a healthy client from the pool.
99 ///
100 /// Returns the first healthy backend. If no backend is healthy,
101 /// returns [`DogtagError::NoHealthyBackend`].
102 pub fn get_client(&self) -> DogtagResult<Arc<DogtagClient>> {
103 let entries = self.entries.read();
104 for entry in entries.iter() {
105 if entry.health != BackendHealth::Unhealthy {
106 return Ok(Arc::clone(&entry.client));
107 }
108 }
109 Err(DogtagError::NoHealthyBackend)
110 }
111
112 /// Run a single health check pass across all backends.
113 ///
114 /// Probes each backend via `GET /ca/rest/info` and updates health
115 /// state. Unhealthy backends in cooldown are skipped.
116 pub async fn health_check_all(&self) {
117 // Snapshot the client list to avoid holding the lock during I/O.
118 let clients: Vec<(usize, Arc<DogtagClient>, bool)> = {
119 let entries = self.entries.read();
120 entries
121 .iter()
122 .enumerate()
123 .filter_map(|(i, e)| {
124 // Skip unhealthy backends still in cooldown.
125 if e.health == BackendHealth::Unhealthy {
126 if let Some(last) = e.last_healthy {
127 if last.elapsed() < self.cooldown {
128 return None;
129 }
130 }
131 }
132 Some((
133 i,
134 Arc::clone(&e.client),
135 e.health == BackendHealth::Unhealthy,
136 ))
137 })
138 .collect()
139 };
140
141 for (index, client, was_unhealthy) in clients {
142 let healthy = client.health_check().await.unwrap_or(false);
143
144 let mut entries = self.entries.write();
145 if let Some(entry) = entries.get_mut(index) {
146 if healthy {
147 if was_unhealthy {
148 info!(url = client.base_url(), "Dogtag CA backend recovered");
149 }
150 entry.health = BackendHealth::Healthy;
151 entry.last_healthy = Some(Instant::now());
152 entry.consecutive_failures = 0;
153 } else {
154 entry.consecutive_failures += 1;
155 if entry.consecutive_failures >= self.failure_threshold {
156 if entry.health != BackendHealth::Unhealthy {
157 warn!(
158 url = client.base_url(),
159 failures = entry.consecutive_failures,
160 "Dogtag CA backend marked unhealthy"
161 );
162 }
163 entry.health = BackendHealth::Unhealthy;
164 }
165 }
166 }
167 }
168 }
169
170 /// Return the number of backends currently considered healthy.
171 pub fn healthy_count(&self) -> usize {
172 self.entries
173 .read()
174 .iter()
175 .filter(|e| e.health == BackendHealth::Healthy)
176 .count()
177 }
178
179 /// Return the total number of backends in the pool.
180 pub fn total_count(&self) -> usize {
181 self.entries.read().len()
182 }
183}