1use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use serde::{Deserialize, Serialize};
11use tracing::{debug, info, warn};
12
13use super::pool::{CaId, CaPool};
14
15#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24pub enum HealthState {
25 Healthy,
27 Degraded,
29 Unavailable,
31 Recovering,
33}
34
35impl HealthState {
36 pub fn is_available(&self) -> bool {
38 matches!(self, Self::Healthy | Self::Degraded | Self::Recovering)
39 }
40}
41
42#[derive(Debug, Clone)]
44pub struct HealthConfig {
45 pub probe_interval: Duration,
47 pub probe_timeout: Duration,
49 pub recovery_threshold: u32,
52 pub degraded_latency_ms: u64,
54}
55
56impl Default for HealthConfig {
57 fn default() -> Self {
58 Self {
59 probe_interval: Duration::from_secs(30),
60 probe_timeout: Duration::from_secs(5),
61 recovery_threshold: 2,
62 degraded_latency_ms: 2000,
63 }
64 }
65}
66
67#[derive(Debug, Clone)]
69pub struct ProbeMetrics {
70 pub last_check: Option<Instant>,
72 pub consecutive_failures: u32,
74 pub consecutive_successes: u32,
76 pub last_latency: Option<Duration>,
78}
79
80impl ProbeMetrics {
81 fn new() -> Self {
82 Self {
83 last_check: None,
84 consecutive_failures: 0,
85 consecutive_successes: 0,
86 last_latency: None,
87 }
88 }
89}
90
91#[derive(Clone)]
96pub struct HealthChecker {
97 pool: Arc<CaPool>,
98 config: HealthConfig,
99 metrics: Arc<parking_lot::RwLock<std::collections::HashMap<CaId, ProbeMetrics>>>,
101}
102
103impl HealthChecker {
104 pub fn new(pool: Arc<CaPool>, config: HealthConfig) -> Self {
106 let mut metrics = std::collections::HashMap::new();
107 for conn in pool.connections() {
108 metrics.insert(conn.id.clone(), ProbeMetrics::new());
109 }
110
111 Self {
112 pool,
113 config,
114 metrics: Arc::new(parking_lot::RwLock::new(metrics)),
115 }
116 }
117
118 pub fn interval(&self) -> Duration {
120 self.config.probe_interval
121 }
122
123 pub async fn run_probes(&self) {
125 debug!("starting health probe round");
126
127 for conn in self.pool.connections() {
128 let start = Instant::now();
129 let result = self.probe_ca(&conn.id).await;
130 let elapsed = start.elapsed();
131
132 let mut metrics = self.metrics.write();
133 let m = metrics
134 .entry(conn.id.clone())
135 .or_insert_with(ProbeMetrics::new);
136 m.last_check = Some(Instant::now());
137 m.last_latency = Some(elapsed);
138
139 match result {
140 Ok(()) => self.handle_probe_success(&conn.id, elapsed, m),
141 Err(e) => self.handle_probe_failure(&conn.id, e, m),
142 }
143 }
144 }
145
146 async fn probe_ca(&self, id: &CaId) -> Result<(), String> {
153 if self.pool.should_reprobe(id) {
155 debug!(ca = %id, "cooldown elapsed, attempting re-probe");
156 return Ok(());
159 }
160
161 Ok(())
165 }
166
167 fn handle_probe_success(&self, id: &CaId, latency: Duration, metrics: &mut ProbeMetrics) {
169 metrics.consecutive_failures = 0;
170 metrics.consecutive_successes += 1;
171
172 let current_snapshot = self.pool.status_snapshot();
173 let current_health = current_snapshot
174 .get(id)
175 .map(|s| s.health.clone())
176 .unwrap_or(HealthState::Healthy);
177
178 let latency_ms = latency.as_millis() as u64;
179
180 let new_state = match current_health {
181 HealthState::Unavailable => {
182 info!(ca = %id, "CA responding again, entering recovery");
183 metrics.consecutive_successes = 1;
184 HealthState::Recovering
185 }
186 HealthState::Recovering => {
187 if metrics.consecutive_successes >= self.config.recovery_threshold {
188 info!(ca = %id, "CA recovery confirmed, marking healthy");
189 HealthState::Healthy
190 } else {
191 debug!(
192 ca = %id,
193 successes = metrics.consecutive_successes,
194 needed = self.config.recovery_threshold,
195 "CA still recovering"
196 );
197 HealthState::Recovering
198 }
199 }
200 HealthState::Degraded | HealthState::Healthy => {
201 if latency_ms > self.config.degraded_latency_ms {
202 debug!(ca = %id, latency_ms, "CA responding slowly, marking degraded");
203 HealthState::Degraded
204 } else {
205 HealthState::Healthy
206 }
207 }
208 };
209
210 self.pool.set_health(id, new_state);
211 self.pool.record_success(id, latency);
212 }
213
214 fn handle_probe_failure(&self, id: &CaId, error: String, metrics: &mut ProbeMetrics) {
216 metrics.consecutive_failures += 1;
217 metrics.consecutive_successes = 0;
218
219 warn!(
220 ca = %id,
221 failures = metrics.consecutive_failures,
222 error = %error,
223 "health probe failed"
224 );
225
226 self.pool.record_failure(id);
227 }
228
229 pub fn metrics_snapshot(&self) -> std::collections::HashMap<CaId, ProbeMetrics> {
231 self.metrics.read().clone()
232 }
233}