sidx: sqlite helpers reuse transactions

This commit is contained in:
Else, Someone 2025-05-12 22:11:35 +00:00
parent c8b8b56456
commit 5f598ece03

View file

@ -186,7 +186,7 @@ async fn open_context(
} }
let con = rusqlite::Connection::open(&db_path).expect("Failed to construct Database object"); let con = rusqlite::Connection::open(&db_path).expect("Failed to construct Database object");
con.pragma_update(None, "jorunal_mode", "wal").unwrap(); con.pragma_update(None, "journal_mode", "wal").unwrap();
con.pragma_update(None, "synchronous", "normal").unwrap(); con.pragma_update(None, "synchronous", "normal").unwrap();
con.pragma_update(None, "temp_store", "memory").unwrap(); con.pragma_update(None, "temp_store", "memory").unwrap();
con.pragma_update(None, "foreign_keys", "on").unwrap(); con.pragma_update(None, "foreign_keys", "on").unwrap();
@ -238,6 +238,86 @@ where
} }
} }
trait ConnectionLike {
#[allow(dead_code)]
fn execute<P: rusqlite::Params>(&self, sql: &str, params: P) -> rusqlite::Result<usize>;
fn prepare_cached(&self, sql: &str) -> rusqlite::Result<rusqlite::CachedStatement<'_>>;
}
impl ConnectionLike for rusqlite::Connection {
fn execute<P: rusqlite::Params>(&self, sql: &str, params: P) -> rusqlite::Result<usize> {
<rusqlite::Connection>::execute(self, sql, params)
}
fn prepare_cached(&self, sql: &str) -> rusqlite::Result<rusqlite::CachedStatement<'_>> {
<rusqlite::Connection>::prepare_cached(self, sql)
}
}
impl<'a> ConnectionLike for rusqlite::Transaction<'a> {
fn execute<P: rusqlite::Params>(&self, sql: &str, params: P) -> rusqlite::Result<usize> {
<rusqlite::Connection>::execute(self, sql, params)
}
fn prepare_cached(&self, sql: &str) -> rusqlite::Result<rusqlite::CachedStatement<'_>> {
<rusqlite::Connection>::prepare_cached(self, sql)
}
}
impl<'a> ConnectionLike for rusqlite::Savepoint<'a> {
fn execute<P: rusqlite::Params>(&self, sql: &str, params: P) -> rusqlite::Result<usize> {
<rusqlite::Connection>::execute(self, sql, params)
}
fn prepare_cached(&self, sql: &str) -> rusqlite::Result<rusqlite::CachedStatement<'_>> {
<rusqlite::Connection>::prepare_cached(self, sql)
}
}
fn db_add_sample(
tx: &mut rusqlite::Transaction<'_>,
uri: &str,
hash: &Option<String>,
http_code: &Option<u16>,
content_type: &Option<String>,
) -> Result<(u32, u32), Error> {
let mut sp = tx.savepoint().context("db_add_sample")?;
sp.set_drop_behavior(rusqlite::DropBehavior::Commit);
{
if let Some(h) = hash {
db_add_blob(&mut sp, &h.clone(), None)?;
};
let mut add_sample = sp
.prepare_cached(include_str!("q/add-sample.sql"))
.context("Failed to prepare add-sample.sql")?;
return Ok(add_sample.query_row(
named_params! {
":uri": uri,
":hash": hash,
":http_code": http_code,
":content_type": content_type
},
|row| <(u32, u32)>::try_from(row),
)?);
}
}
fn db_add_blob<Con: ConnectionLike>(
con: &mut Con,
hash: &str,
n_bytes: Option<u64>,
) -> Result<usize, Error> {
let mut add_blob = con
.prepare_cached(include_str!("q/upsert-blob.sql"))
.context("Failed to prepare upsert-blob.sql")?;
Ok(add_blob.execute(params![hash, n_bytes,])?)
}
fn db_add_uri<Con: ConnectionLike>(con: &mut Con, uri: &str) -> Result<usize, Error> {
let mut add_uri = con
.prepare_cached(include_str!("q/upsert-uri.sql"))
.context("Failed to prepare upsert-uri.sql")?;
Ok(add_uri.execute(params![uri])?)
}
impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS> { impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS> {
async fn latest_sample(&self, uri: &str) -> Result<Option<Sampled>, Error> { async fn latest_sample(&self, uri: &str) -> Result<Option<Sampled>, Error> {
let lock = self.con.lock().unwrap(); let lock = self.con.lock().unwrap();
@ -265,42 +345,6 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
None => Ok(None), None => Ok(None),
}) })
} }
async fn db_add_sample(
&self,
uri: &str,
hash: &Option<String>,
http_code: &Option<u16>,
content_type: &Option<String>,
) -> Result<(u32, u32), Error> {
let lock = self.con.lock().expect("Locking mutex for db_add_sample");
let mut add_sample = lock
.prepare_cached(include_str!("q/add-sample.sql"))
.context("Failed to prepare add-sample.sql")?;
Ok(add_sample.query_row(
named_params! {
":uri": uri,
":hash": hash,
":http_code": http_code,
":content_type": content_type
},
|row| <(u32, u32)>::try_from(row),
)?)
}
async fn db_add_blob(&self, hash: &str, n_bytes: u64) -> Result<usize, Error> {
let lock = self.con.lock().expect("db_add_blob: couldn't lock mutex?");
let mut add_blob = lock
.prepare_cached(include_str!("q/upsert-blob.sql"))
.context("Failed to prepare upsert-blob.sql")?;
Ok(add_blob.execute(params![hash, n_bytes,])?)
}
async fn db_add_uri(&self, uri: &str) -> Result<usize, Error> {
let lock = self.con.lock().unwrap();
let mut add_uri = lock
.prepare_cached(include_str!("q/upsert-uri.sql"))
.context("Failed to prepare upsert-uri.sql")?;
Ok(add_uri.execute(params![uri])?)
}
async fn record_ingested_node( async fn record_ingested_node(
&self, &self,
uri: &str, uri: &str,
@ -308,25 +352,28 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
http_code: Option<u16>, http_code: Option<u16>,
content_type: Option<String>, content_type: Option<String>,
) -> Result<Sampled, Error> { ) -> Result<Sampled, Error> {
let digest64 = if let Some(SizedBlob { hash, n_bytes }) = blob { let mut lock = self.con.lock().unwrap();
let digest64 = format!("{}", hash); let mut tx = lock.transaction()?;
self.db_add_blob(&digest64, n_bytes.clone()).await?; {
Some(digest64) let digest64 = if let Some(SizedBlob { hash, n_bytes }) = blob {
} else { let digest64 = format!("{}", hash);
None db_add_blob(&mut tx, &digest64, Some(n_bytes.clone()))?;
}; Some(digest64)
self.db_add_uri(&uri).await?; } else {
let (sample_id, epoch) = self None
.db_add_sample(&uri, &digest64, &http_code, &content_type) };
.await?; db_add_uri(&mut tx, &uri)?;
Ok(Sampled { let (sample_id, epoch) =
sample_id, db_add_sample(&mut tx, &uri, &digest64, &http_code, &content_type)?;
uri: uri.to_string(), Ok(Sampled {
blob: blob.clone(), sample_id,
http_status: http_code, uri: uri.to_string(),
epoch, blob: blob.clone(),
when: SampledWhen::Now, http_status: http_code,
}) epoch,
when: SampledWhen::Now,
})
}
} }
async fn download(&self, uri: &Url) -> Result<Sampled, Error> { async fn download(&self, uri: &Url) -> Result<Sampled, Error> {
let _permit = self.http_semaphore.acquire().await.unwrap(); let _permit = self.http_semaphore.acquire().await.unwrap();