diff --git a/src/main.rs b/src/main.rs index ec3a3e5..0036224 100644 --- a/src/main.rs +++ b/src/main.rs @@ -186,7 +186,7 @@ async fn open_context( } let con = rusqlite::Connection::open(&db_path).expect("Failed to construct Database object"); - con.pragma_update(None, "jorunal_mode", "wal").unwrap(); + con.pragma_update(None, "journal_mode", "wal").unwrap(); con.pragma_update(None, "synchronous", "normal").unwrap(); con.pragma_update(None, "temp_store", "memory").unwrap(); con.pragma_update(None, "foreign_keys", "on").unwrap(); @@ -238,6 +238,86 @@ where } } +trait ConnectionLike { + #[allow(dead_code)] + fn execute(&self, sql: &str, params: P) -> rusqlite::Result; + fn prepare_cached(&self, sql: &str) -> rusqlite::Result>; +} + +impl ConnectionLike for rusqlite::Connection { + fn execute(&self, sql: &str, params: P) -> rusqlite::Result { + ::execute(self, sql, params) + } + + fn prepare_cached(&self, sql: &str) -> rusqlite::Result> { + ::prepare_cached(self, sql) + } +} + +impl<'a> ConnectionLike for rusqlite::Transaction<'a> { + fn execute(&self, sql: &str, params: P) -> rusqlite::Result { + ::execute(self, sql, params) + } + + fn prepare_cached(&self, sql: &str) -> rusqlite::Result> { + ::prepare_cached(self, sql) + } +} + +impl<'a> ConnectionLike for rusqlite::Savepoint<'a> { + fn execute(&self, sql: &str, params: P) -> rusqlite::Result { + ::execute(self, sql, params) + } + + fn prepare_cached(&self, sql: &str) -> rusqlite::Result> { + ::prepare_cached(self, sql) + } +} + +fn db_add_sample( + tx: &mut rusqlite::Transaction<'_>, + uri: &str, + hash: &Option, + http_code: &Option, + content_type: &Option, +) -> Result<(u32, u32), Error> { + let mut sp = tx.savepoint().context("db_add_sample")?; + sp.set_drop_behavior(rusqlite::DropBehavior::Commit); + { + if let Some(h) = hash { + db_add_blob(&mut sp, &h.clone(), None)?; + }; + let mut add_sample = sp + .prepare_cached(include_str!("q/add-sample.sql")) + .context("Failed to prepare add-sample.sql")?; + return Ok(add_sample.query_row( + named_params! { + ":uri": uri, + ":hash": hash, + ":http_code": http_code, + ":content_type": content_type + }, + |row| <(u32, u32)>::try_from(row), + )?); + } +} +fn db_add_blob( + con: &mut Con, + hash: &str, + n_bytes: Option, +) -> Result { + let mut add_blob = con + .prepare_cached(include_str!("q/upsert-blob.sql")) + .context("Failed to prepare upsert-blob.sql")?; + Ok(add_blob.execute(params![hash, n_bytes,])?) +} +fn db_add_uri(con: &mut Con, uri: &str) -> Result { + let mut add_uri = con + .prepare_cached(include_str!("q/upsert-uri.sql")) + .context("Failed to prepare upsert-uri.sql")?; + Ok(add_uri.execute(params![uri])?) +} + impl SidxContext { async fn latest_sample(&self, uri: &str) -> Result, Error> { let lock = self.con.lock().unwrap(); @@ -265,42 +345,6 @@ impl SidxContext None => Ok(None), }) } - async fn db_add_sample( - &self, - uri: &str, - hash: &Option, - http_code: &Option, - content_type: &Option, - ) -> Result<(u32, u32), Error> { - let lock = self.con.lock().expect("Locking mutex for db_add_sample"); - let mut add_sample = lock - .prepare_cached(include_str!("q/add-sample.sql")) - .context("Failed to prepare add-sample.sql")?; - Ok(add_sample.query_row( - named_params! { - ":uri": uri, - ":hash": hash, - ":http_code": http_code, - ":content_type": content_type - }, - |row| <(u32, u32)>::try_from(row), - )?) - } - async fn db_add_blob(&self, hash: &str, n_bytes: u64) -> Result { - let lock = self.con.lock().expect("db_add_blob: couldn't lock mutex?"); - let mut add_blob = lock - .prepare_cached(include_str!("q/upsert-blob.sql")) - .context("Failed to prepare upsert-blob.sql")?; - Ok(add_blob.execute(params![hash, n_bytes,])?) - } - async fn db_add_uri(&self, uri: &str) -> Result { - let lock = self.con.lock().unwrap(); - let mut add_uri = lock - .prepare_cached(include_str!("q/upsert-uri.sql")) - .context("Failed to prepare upsert-uri.sql")?; - - Ok(add_uri.execute(params![uri])?) - } async fn record_ingested_node( &self, uri: &str, @@ -308,25 +352,28 @@ impl SidxContext http_code: Option, content_type: Option, ) -> Result { - let digest64 = if let Some(SizedBlob { hash, n_bytes }) = blob { - let digest64 = format!("{}", hash); - self.db_add_blob(&digest64, n_bytes.clone()).await?; - Some(digest64) - } else { - None - }; - self.db_add_uri(&uri).await?; - let (sample_id, epoch) = self - .db_add_sample(&uri, &digest64, &http_code, &content_type) - .await?; - Ok(Sampled { - sample_id, - uri: uri.to_string(), - blob: blob.clone(), - http_status: http_code, - epoch, - when: SampledWhen::Now, - }) + let mut lock = self.con.lock().unwrap(); + let mut tx = lock.transaction()?; + { + let digest64 = if let Some(SizedBlob { hash, n_bytes }) = blob { + let digest64 = format!("{}", hash); + db_add_blob(&mut tx, &digest64, Some(n_bytes.clone()))?; + Some(digest64) + } else { + None + }; + db_add_uri(&mut tx, &uri)?; + let (sample_id, epoch) = + db_add_sample(&mut tx, &uri, &digest64, &http_code, &content_type)?; + Ok(Sampled { + sample_id, + uri: uri.to_string(), + blob: blob.clone(), + http_status: http_code, + epoch, + when: SampledWhen::Now, + }) + } } async fn download(&self, uri: &Url) -> Result { let _permit = self.http_semaphore.acquire().await.unwrap();