From 1ac96a43567fd887eb943f74126e47a3548abf74 Mon Sep 17 00:00:00 2001 From: SomeoneSerge Date: Tue, 29 Apr 2025 01:50:55 +0000 Subject: [PATCH] sql: name tables as Types, sample success optional ...and do not load blobs over 1M into memory --- src/main.rs | 312 ++++++++++++++++++++++++++------------ src/q/add-sample.sql | 13 +- src/q/add-str.sql | 5 + src/q/latest-download.sql | 17 ++- src/q/sidx-init.sql | 42 +++-- src/q/upsert-blob.sql | 2 +- src/q/upsert-uri.sql | 2 +- 7 files changed, 269 insertions(+), 124 deletions(-) create mode 100644 src/q/add-str.sql diff --git a/src/main.rs b/src/main.rs index cd0325c..43f7ee2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,25 +28,32 @@ enum Ingestable { } #[derive(Debug, Clone)] -enum IngestedWhen { +enum SampledWhen { Now, Before, } +#[derive(Debug, Clone)] +struct SizedBlob { + hash: B3Digest, + n_bytes: u64, +} + #[derive(Debug, Clone)] #[allow(dead_code)] -struct Ingested { +struct Sampled { sample_id: u32, uri: String, - blake3: B3Digest, + blob: Option, + http_status: Option, epoch: u32, - when: IngestedWhen, + when: SampledWhen, } #[derive(Clone)] enum FetchListingMessage { - Ingested(Url, Ingested), - Recurse(Url, usize), + Sampled(Url, Sampled), + Recurse(Url, usize, Sender), } impl std::fmt::Display for Ingestable { @@ -114,9 +121,15 @@ enum Command { FetchListing { #[clap(value_parser, long, default_value_t = 5)] max_depth: usize, + #[clap(value_parser, long, default_value_t = 1024 * 1024)] + html_max_bytes: u64, #[clap(value_parser, num_args = 1)] inputs: Vec, }, + ParseUrl { + #[clap(value_parser, num_args = 1)] + url: Vec, + }, } #[derive(Parser)] @@ -124,7 +137,7 @@ struct Cli { #[clap(short, long, action)] refetch: bool, - #[clap(short, long, value_parser, default_value_t = 4)] + #[clap(short, long, value_parser, default_value_t = 2)] max_parallel: usize, #[clap(short, long, value_parser, default_value_os_t = default_db_path())] @@ -145,6 +158,7 @@ where refetch: bool, max_parallel: usize, http: reqwest::Client, + http_semaphore: Arc, con: Arc>, blob_service: BS, dir_service: DS, @@ -187,6 +201,7 @@ async fn open_context( refetch, max_parallel, http: reqwest::Client::new(), + http_semaphore: Arc::new(Semaphore::new(max_parallel)), con: Arc::new(Mutex::new(con)), blob_service, dir_service, @@ -194,92 +209,133 @@ async fn open_context( } impl SidxContext { - async fn db_latest_download(&self, uri: &str) -> Result, Error> { + async fn db_latest_download(&self, uri: &str) -> Result, Error> { let lock = self.con.lock().unwrap(); let mut find_sample = lock .prepare_cached(include_str!("q/latest-download.sql")) .expect("Failed to prepare latest-download.sql"); find_sample - .query_row(params![uri], |r| <(u32, String, u32)>::try_from(r)) + .query_row(params![uri], |r| { + <(u32, String, u64, Option, u32)>::try_from(r) + }) .optional() .context("db_latest_download.sql") - .and_then(|maybe_triple| match maybe_triple { - Some((sample_id, blake3, epoch)) => Ok(Some(Ingested { + .and_then(|maybe_tuple| match maybe_tuple { + Some((sample_id, hash, n_bytes, http_code, epoch)) => Ok(Some(Sampled { sample_id, uri: uri.to_string(), - blake3: B3Digest::from_str(&blake3)?, + blob: Some(SizedBlob { + hash: B3Digest::from_str(&hash)?, + n_bytes, + }), + http_status: http_code, epoch, - when: IngestedWhen::Before, + when: SampledWhen::Before, })), None => Ok(None), }) } - async fn db_add_sample(&self, uri: &str, blake3: &str) -> Result<(u32, u32), rusqlite::Error> { - let lock = self.con.lock().unwrap(); + async fn db_add_sample( + &self, + uri: &str, + hash: &Option, + http_code: Option, + ) -> Result<(u32, u32), Error> { + let lock = self.con.lock().expect("Couldn't lock mutex"); let mut add_sample = lock .prepare_cached(include_str!("q/add-sample.sql")) - .expect("Failed to prepare add-sample.sql"); - add_sample.query_row(params![uri, blake3], |row| <(u32, u32)>::try_from(row)) + .context("Failed to prepare add-sample.sql")?; + Ok(add_sample.query_row(params![uri, hash, http_code], |row| { + <(u32, u32)>::try_from(row) + })?) } - async fn db_add_blob(&self, blake3: &str, n_bytes: u64) -> Result { - let lock = self.con.lock().unwrap(); + async fn db_add_blob(&self, hash: &str, n_bytes: u64) -> Result { + let lock = self.con.lock().expect("db_add_blob: couldn't lock mutex?"); let mut add_blob = lock .prepare_cached(include_str!("q/upsert-blob.sql")) - .expect("Failed to prepare upsert-blob.sql"); - add_blob.execute(params![blake3, n_bytes,]) + .context("Failed to prepare upsert-blob.sql")?; + Ok(add_blob.execute(params![hash, n_bytes,])?) } - async fn db_add_uri(&self, uri: &str) -> Result { + async fn db_add_uri(&self, uri: &str) -> Result { let lock = self.con.lock().unwrap(); let mut add_uri = lock .prepare_cached(include_str!("q/upsert-uri.sql")) - .expect("Failed to prepare upsert-uri.sql"); + .context("Failed to prepare upsert-uri.sql")?; - add_uri.execute(params![uri]) + Ok(add_uri.execute(params![uri])?) } async fn record_ingested_node( &self, uri: &str, - blake3: &snix_castore::B3Digest, - n_bytes: u64, - ) -> Result { - let digest64 = format!("{}", blake3); - self.db_add_blob(&digest64, n_bytes).await?; + blob: &Option, + http_code: Option, + ) -> Result { + let digest64 = if let Some(SizedBlob { hash, n_bytes }) = blob { + let digest64 = format!("{}", hash); + self.db_add_blob(&digest64, n_bytes.clone()).await?; + Some(digest64) + } else { + None + }; self.db_add_uri(&uri).await?; - let (sample_id, epoch) = self.db_add_sample(&uri, &digest64).await?; - Ok(Ingested { + let (sample_id, epoch) = self + .db_add_sample(&uri, &digest64, http_code.clone()) + .await?; + Ok(Sampled { sample_id, uri: uri.to_string(), - blake3: blake3.clone(), + blob: blob.clone(), + http_status: http_code, epoch, - when: IngestedWhen::Now, + when: SampledWhen::Now, }) } - async fn download_no_cache(&self, uri: &Url) -> Result { + async fn download_no_cache(&self, uri: &Url) -> Result { + let _permit = self.http_semaphore.acquire().await.unwrap(); + eprintln!("Downloading {:?}", uri.to_string()); let uri_s = uri.to_string(); let res = self .http .get(uri.clone()) .send() .await - .context(format!("Request::send failed early for {:?}", uri))? - .error_for_status()?; - let mut r = - tokio_util::io::StreamReader::new(res.bytes_stream().map_err(std::io::Error::other)); - let mut w = self.blob_service.open_write().await; - let n_bytes = match tokio::io::copy(&mut r, &mut w).await { - Ok(n) => n, - Err(e) => { - return Err(anyhow!( - "tokio::io::copy failed for uri={} with {}", - uri_s, - e - )); - } - }; - let digest = w.close().await?; - self.record_ingested_node(&uri_s, &digest, n_bytes).await + .context(format!("Request::send failed early for {:?}", uri))?; + + let status = res.status(); + let status_code = status.as_u16(); + + if status.is_success() { + let mut r = tokio_util::io::StreamReader::new( + res.bytes_stream().map_err(std::io::Error::other), + ); + let mut w = self.blob_service.open_write().await; + let n_bytes = match tokio::io::copy(&mut r, &mut w).await { + Ok(n) => n, + Err(e) => { + return Err(anyhow!( + "tokio::io::copy failed for uri={} with {}", + uri_s, + e + )); + } + }; + let digest = w.close().await?; + + self.record_ingested_node( + &uri_s, + &Some(SizedBlob { + hash: digest, + n_bytes, + }), + Some(status_code), + ) + .await + } else { + self.record_ingested_node(&uri_s, &None, Some(status_code)) + .await + } } - async fn download(&self, uri: &Url) -> Result { + async fn download(&self, uri: &Url) -> Result { if self.refetch { self.download_no_cache(&uri).await } else { @@ -289,7 +345,7 @@ impl SidxContext } } } - async fn ingest(&self, inputs: &Vec) -> Vec, Error>> { + async fn ingest(&self, inputs: &Vec) -> Vec, Error>> { let samples = stream::iter(inputs.iter().map(|uri| { let blob_service = &self.blob_service; let dir_service = &self.dir_service; @@ -306,7 +362,14 @@ impl SidxContext .await? { snix_castore::Node::Directory { digest, size } => self - .record_ingested_node(&uri_s, &digest, size) + .record_ingested_node( + &uri_s, + &Some(SizedBlob { + hash: digest, + n_bytes: size, + }), + None, + ) .await .map(Some), @@ -315,7 +378,14 @@ impl SidxContext size, executable: _, } => self - .record_ingested_node(&uri_s, &digest, size) + .record_ingested_node( + &uri_s, + &Some(SizedBlob { + hash: digest, + n_bytes: size, + }), + None, + ) .await .map(Some), snix_castore::Node::Symlink { target: _ } => { @@ -328,7 +398,7 @@ impl SidxContext } })) .buffer_unordered(self.max_parallel) - .collect::, _>>>() + .collect::, _>>>() .await; samples @@ -349,39 +419,70 @@ impl SidxContext self: Arc, url: Url, max_depth: usize, + html_max_bytes: u64, tx: Sender, ) -> Result<(), Error> { - eprintln!("Downloading {:?}", url.to_string()); - let root = self.download(&url).await?; - tx.send(FetchListingMessage::Ingested(url.clone(), root.clone())) + let maybe_root = self.download(&url).await; + if let Err(ref e) = maybe_root { + eprintln!("Couldn't download {}: {:?}", url, e); + }; + let root = maybe_root?; + tx.send(FetchListingMessage::Sampled(url.clone(), root.clone())) .await .context("Stopped accepting tasks before processing an Ingested notification")?; if max_depth <= 0 { return Ok(()); } - /* TODO: no need to load blobs to memory unless you know they're text/html */ - match self.blob_service.open_read(&root.blake3).await? { - Some(mut reader) => { - let content = { - let mut br = BufReader::new(&mut *reader); - let mut content = String::new(); - br.read_to_string(&mut content).await?; - content - }; - let hrefs = Self::extract_hrefs(&content).unwrap_or(vec![]); - /* max_depth > 0 here */ - for href in hrefs { - let next_url = url.join(&href).context("Constructing next_url")?; - tx.send(FetchListingMessage::Recurse( - next_url.clone(), - max_depth - 1, - )) - .await - .context("Stopped accepting tasks before finishing all hrefs")?; + + match root.blob { + None => Err(anyhow!( + "Couldn't download {}. Status code: {:?}", + url, + root.http_status + )), + Some(SizedBlob { hash, n_bytes }) => { + if n_bytes > html_max_bytes { + return Ok(()); + } + match self.blob_service.open_read(&hash).await? { + Some(mut reader) => { + let content = { + let mut br = BufReader::new(&mut *reader); + let mut content = String::new(); + br.read_to_string(&mut content).await?; + content + }; + let hrefs = Self::extract_hrefs(&content).unwrap_or(vec![]); + /* max_depth > 0 here */ + for href in hrefs.clone() { + let next_url = url.join(&href).context("Constructing next_url")?; + tx.send(FetchListingMessage::Recurse( + next_url.clone(), + max_depth - 1, + tx.clone(), + )) + .await + .context("Stopped accepting tasks before finishing all hrefs")?; + } + { + let lock = self.con.lock().expect("Couldn't acquire Mutex?"); + for href in hrefs { + let mut stmt = + lock.prepare_cached(include_str!("q/add-str.sql"))?; + stmt.execute(params!["href"])?; + + let next_url = url.join(&href).context("Constructing next_url")?; + let mut stmt = + lock.prepare_cached(include_str!("q/add-uri-ref.sql"))?; + let digest64 = hash.to_string(); + stmt.execute(params![digest64, next_url.to_string(), href])?; + } + }; + Ok(()) + } + None => Err(anyhow!("Couldn't read the ingested blob")), } - Ok(()) } - None => Err(anyhow!("Couldn't read the ingested blob")), } } @@ -389,7 +490,8 @@ impl SidxContext self: Arc, url: Url, max_depth: usize, - ) -> ReceiverStream { + html_max_bytes: u64, + ) -> ReceiverStream { let mq_size = 10; /* TODO: move task queue to e.g. sqlite */ @@ -397,33 +499,42 @@ impl SidxContext let (out_tx, out_rx) = channel(mq_size); - let semaphore = Arc::new(Semaphore::new(self.max_parallel)); - tokio::spawn({ async move { let mut seen: HashSet = HashSet::new(); - tx.send(FetchListingMessage::Recurse(url, max_depth)) - .await - .expect("fetch_from_listing failed populating the queue"); + { + let tx_moved = tx; + tx_moved + .send(FetchListingMessage::Recurse( + url, + max_depth, + tx_moved.clone(), + )) + .await + .expect("fetch_from_listing failed populating the queue"); + }; while let Some(m) = rx.recv().await { match m { - FetchListingMessage::Ingested(_url, ingested) => { + FetchListingMessage::Sampled(_url, ingested) => { out_tx .send(ingested) .await .expect("ReceiverStream failed to accept an Ingestable"); } - FetchListingMessage::Recurse(url, max_depth) => { + FetchListingMessage::Recurse(url, max_depth, tx) => { if max_depth > 0 && !seen.contains(&url.to_string()) { seen.insert(url.to_string()); tokio::spawn({ let s = self.clone(); let url = url.clone(); - let tx = tx.clone(); - let semaphore = semaphore.clone(); async move { - let _permit = semaphore.acquire(); - s.fetch_from_listing_impl(url, max_depth, tx).await + s.fetch_from_listing_impl( + url, + max_depth, + html_max_bytes, + tx, + ) + .await } }); } @@ -468,11 +579,17 @@ async fn main() { } } } - Some(Command::FetchListing { max_depth, inputs }) => { - let ingested: Vec = stream::iter(inputs) + Some(Command::FetchListing { + max_depth, + html_max_bytes, + inputs, + }) => { + let ingested: Vec = stream::iter(inputs) .then(async |i| { let i = i.clone(); - ctx.clone().fetch_from_listing(i, max_depth).await + ctx.clone() + .fetch_from_listing(i, max_depth, html_max_bytes) + .await }) .flatten_unordered(args.max_parallel) .collect() @@ -481,6 +598,11 @@ async fn main() { eprintln!("{:?}", i); } } + Some(Command::ParseUrl { url: urls }) => { + for url in urls { + println!("{:?}", url); + } + } None => {} } } diff --git a/src/q/add-sample.sql b/src/q/add-sample.sql index f26b3ad..2817b13 100644 --- a/src/q/add-sample.sql +++ b/src/q/add-sample.sql @@ -1,21 +1,22 @@ -INSERT INTO sidx_uri_sample(uri_id, blake3_id) +INSERT INTO SidxUriSample(uri, hash, http_code) VALUES( ( SELECT id FROM - sidx_uri + Str WHERE - uri = ? + str = ? LIMIT 1 ), ( SELECT id FROM - sidx_blake3 + Hash WHERE - blake3 = ? - ) + hash = ? + ), + ? ) RETURNING id, epoch; diff --git a/src/q/add-str.sql b/src/q/add-str.sql new file mode 100644 index 0000000..feae104 --- /dev/null +++ b/src/q/add-str.sql @@ -0,0 +1,5 @@ +INSERT INTO Str(str) +VALUES +(?) +ON CONFLICT DO NOTHING; + diff --git a/src/q/latest-download.sql b/src/q/latest-download.sql index a0e6938..0161664 100644 --- a/src/q/latest-download.sql +++ b/src/q/latest-download.sql @@ -1,16 +1,19 @@ SELECT s.id AS sample_id, - b.blake3, + h.hash, + h.n_bytes, + s.http_code, s.epoch FROM - sidx_uri_sample AS s, - sidx_uri AS u, - sidx_blake3 AS b + SidxUriSample AS s, + Str AS u, + Hash AS h ON - s.uri_id = u.id - AND s.blake3_id = b.id + s.uri = u.id + AND s.hash = h.id WHERE - u.uri = ? + u.str = ? + AND s.hash IS NOT NULL ORDER BY s.epoch DESC LIMIT 1; diff --git a/src/q/sidx-init.sql b/src/q/sidx-init.sql index 94bd769..7a0a9a1 100644 --- a/src/q/sidx-init.sql +++ b/src/q/sidx-init.sql @@ -1,22 +1,36 @@ -CREATE TABLE IF NOT EXISTS sidx_uri( +CREATE TABLE IF NOT EXISTS Hash( id INTEGER, - uri TEXT UNIQUE, + hash TEXT UNIQUE, /* snix-castore node */ + n_bytes INTEGER, PRIMARY KEY(id) - ); -CREATE TABLE IF NOT EXISTS sidx_blake3( +); /* Essentially random strings */ +CREATE TABLE IF NOT EXISTS Str( id INTEGER, - blake3 TEXT UNIQUE, /* snix-castore node */ - n_bytes INTEGER NOT NULL, + str TEXT UNIQUE, PRIMARY KEY(id) - ); -CREATE TABLE IF NOT EXISTS sidx_uri_sample( +); /* "Naturally occuring" strings */ +CREATE INDEX IF NOT EXISTS StrIdx ON Str(str); +CREATE TABLE IF NOT EXISTS SidxUriSample( id INTEGER, - uri_id INTEGER NOT NULL, - blake3_id INTEGER, + uri INTEGER NOT NULL, + hash INTEGER, epoch INTEGER NOT NULL DEFAULT (unixepoch()), + http_code INTEGER DEFAULT NULL, PRIMARY KEY(id), - FOREIGN KEY(uri_id) REFERENCES sidx_uri(id), - FOREIGN KEY(blake3_id) REFERENCES sidx_blake3(id) + FOREIGN KEY(uri) REFERENCES Str(id), + FOREIGN KEY(hash) REFERENCES Hash(id) ); -CREATE INDEX IF NOT EXISTS sidx_uri_blake3_idx -ON sidx_uri_sample(uri_id, blake3_id, epoch); +CREATE INDEX IF NOT EXISTS SidxUriHashIdx +ON SidxUriSample(uri, epoch); + +CREATE TABLE IF NOT EXISTS UriReference( + content INTEGER, + target INTEGER, + why INTEGER, + PRIMARY KEY (content, target, why), + FOREIGN KEY(content) REFERENCES Hash(id), + FOREIGN KEY(target) REFERENCES Str(id), /* E.g. Uri or Path */ + FOREIGN KEY(why) REFERENCES Str(id) /* E.g. "href" */ +); +CREATE INDEX IF NOT EXISTS UriReferenceIdx +ON UriReference(target, content); diff --git a/src/q/upsert-blob.sql b/src/q/upsert-blob.sql index 66fb7ec..3589e33 100644 --- a/src/q/upsert-blob.sql +++ b/src/q/upsert-blob.sql @@ -1,4 +1,4 @@ -INSERT INTO sidx_blake3(blake3, n_bytes) +INSERT INTO Hash(hash, n_bytes) VALUES (?, ?) ON CONFLICT DO NOTHING; diff --git a/src/q/upsert-uri.sql b/src/q/upsert-uri.sql index 555ede8..8702c5a 100644 --- a/src/q/upsert-uri.sql +++ b/src/q/upsert-uri.sql @@ -1,4 +1,4 @@ -INSERT INTO sidx_uri(uri) +INSERT INTO Str(str) VALUES (?) ON CONFLICT DO NOTHING;