1pub mod schema;
13
14use crate::config::DbConfig;
15use crate::error::KipukaError;
16
17pub type Db = sqlx::AnyPool;
19
20#[derive(Clone, Copy, Debug, PartialEq, Eq)]
26pub enum DbKind {
27 Sqlite,
28 Postgres,
29 MariaDb,
30}
31
32impl DbKind {
33 pub fn from_url(url: &str) -> Self {
35 if url.starts_with("postgres") {
36 DbKind::Postgres
37 } else if url.starts_with("mariadb") || url.starts_with("mysql") {
38 DbKind::MariaDb
39 } else {
40 DbKind::Sqlite
41 }
42 }
43}
44
45static IS_POSTGRES: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
51
52#[allow(dead_code)]
58pub(crate) fn pg_sql(s: &'static str) -> &'static str {
59 if !IS_POSTGRES.get().copied().unwrap_or(false) {
60 return s;
61 }
62 static CACHE: std::sync::OnceLock<
63 std::sync::Mutex<std::collections::HashMap<usize, &'static str>>,
64 > = std::sync::OnceLock::new();
65 let cache = CACHE.get_or_init(|| std::sync::Mutex::new(std::collections::HashMap::new()));
66 let key = s.as_ptr() as usize;
67 {
68 let guard = cache.lock().unwrap();
69 if let Some(&cached) = guard.get(&key) {
70 return cached;
71 }
72 }
73 let mut result = String::with_capacity(s.len() + 16);
75 let mut param_num = 0u32;
76 for ch in s.chars() {
77 if ch == '?' {
78 param_num += 1;
79 result.push('$');
80 result.push_str(¶m_num.to_string());
81 } else {
82 result.push(ch);
83 }
84 }
85 let leaked: &'static str = Box::leak(result.into_boxed_str());
86 cache.lock().unwrap().insert(key, leaked);
87 leaked
88}
89
90pub async fn init_pool(config: &DbConfig) -> Result<(Db, DbKind), KipukaError> {
92 let url = config.resolve_url().map_err(KipukaError::Config)?;
93
94 let kind = DbKind::from_url(&url);
95 let _ = IS_POSTGRES.set(kind == DbKind::Postgres);
96
97 let pool_opts = sqlx::any::AnyPoolOptions::new()
98 .acquire_timeout(std::time::Duration::from_secs(config.connect_timeout_secs))
99 .max_lifetime(std::time::Duration::from_secs(config.max_lifetime_secs));
100
101 let pool_opts = if let Some(max) = config.max_connections {
102 pool_opts.max_connections(max)
103 } else {
104 match kind {
105 DbKind::Sqlite => pool_opts.max_connections(1),
106 _ => pool_opts.max_connections(10),
107 }
108 };
109
110 let pool_opts = if let Some(min) = config.min_connections {
111 pool_opts.min_connections(min)
112 } else {
113 pool_opts
114 };
115
116 let pool = pool_opts
117 .connect(&url)
118 .await
119 .map_err(|e| KipukaError::Db(format!("failed to connect to database: {e}")))?;
120
121 if kind == DbKind::Sqlite && config.sqlite_wal {
123 sqlx::query("PRAGMA journal_mode=WAL")
124 .execute(&pool)
125 .await
126 .map_err(|e| KipukaError::Db(format!("failed to enable WAL mode: {e}")))?;
127 }
128
129 Ok((pool, kind))
130}
131
132pub async fn init_ro_pool(config: &DbConfig, kind: DbKind) -> Result<Db, KipukaError> {
139 let url = config.resolve_url().map_err(KipukaError::Config)?;
140
141 if kind != DbKind::Sqlite || url.contains(":memory:") {
143 let pool = sqlx::any::AnyPoolOptions::new()
145 .max_connections(1)
146 .connect(&url)
147 .await
148 .map_err(|e| KipukaError::Db(format!("failed to connect RO pool: {e}")))?;
149 return Ok(pool);
150 }
151
152 let ro_url = if url.contains('?') {
154 format!("{url}&mode=ro")
155 } else {
156 format!("{url}?mode=ro")
157 };
158
159 let pool = sqlx::any::AnyPoolOptions::new()
160 .max_connections(4)
161 .connect(&ro_url)
162 .await
163 .map_err(|e| KipukaError::Db(format!("failed to connect RO pool: {e}")))?;
164
165 Ok(pool)
166}
167
168pub async fn begin_write(
173 pool: &Db,
174 kind: DbKind,
175) -> Result<sqlx::Transaction<'_, sqlx::Any>, KipukaError> {
176 if kind == DbKind::Sqlite {
177 sqlx::query("BEGIN IMMEDIATE")
179 .execute(pool)
180 .await
181 .map_err(|e| KipukaError::Db(format!("BEGIN IMMEDIATE failed: {e}")))?;
182 }
183 let tx = pool
184 .begin()
185 .await
186 .map_err(|e| KipukaError::Db(format!("begin transaction failed: {e}")))?;
187 Ok(tx)
188}
189
190pub async fn run_migrations(pool: &Db, kind: DbKind) -> Result<(), KipukaError> {
192 crate::db::schema::run_migrations(pool, kind).await
193}