sql: name tables as Types, sample success optional

...and do not load blobs over 1M into memory
This commit is contained in:
Else, Someone 2025-04-29 01:50:55 +00:00
parent a8a722b002
commit 1ac96a4356
7 changed files with 269 additions and 124 deletions

View file

@ -28,25 +28,32 @@ enum Ingestable {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
enum IngestedWhen { enum SampledWhen {
Now, Now,
Before, Before,
} }
#[derive(Debug, Clone)]
struct SizedBlob {
hash: B3Digest,
n_bytes: u64,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[allow(dead_code)] #[allow(dead_code)]
struct Ingested { struct Sampled {
sample_id: u32, sample_id: u32,
uri: String, uri: String,
blake3: B3Digest, blob: Option<SizedBlob>,
http_status: Option<u16>,
epoch: u32, epoch: u32,
when: IngestedWhen, when: SampledWhen,
} }
#[derive(Clone)] #[derive(Clone)]
enum FetchListingMessage { enum FetchListingMessage {
Ingested(Url, Ingested), Sampled(Url, Sampled),
Recurse(Url, usize), Recurse(Url, usize, Sender<FetchListingMessage>),
} }
impl std::fmt::Display for Ingestable { impl std::fmt::Display for Ingestable {
@ -114,9 +121,15 @@ enum Command {
FetchListing { FetchListing {
#[clap(value_parser, long, default_value_t = 5)] #[clap(value_parser, long, default_value_t = 5)]
max_depth: usize, max_depth: usize,
#[clap(value_parser, long, default_value_t = 1024 * 1024)]
html_max_bytes: u64,
#[clap(value_parser, num_args = 1)] #[clap(value_parser, num_args = 1)]
inputs: Vec<Url>, inputs: Vec<Url>,
}, },
ParseUrl {
#[clap(value_parser, num_args = 1)]
url: Vec<Url>,
},
} }
#[derive(Parser)] #[derive(Parser)]
@ -124,7 +137,7 @@ struct Cli {
#[clap(short, long, action)] #[clap(short, long, action)]
refetch: bool, refetch: bool,
#[clap(short, long, value_parser, default_value_t = 4)] #[clap(short, long, value_parser, default_value_t = 2)]
max_parallel: usize, max_parallel: usize,
#[clap(short, long, value_parser, default_value_os_t = default_db_path())] #[clap(short, long, value_parser, default_value_os_t = default_db_path())]
@ -145,6 +158,7 @@ where
refetch: bool, refetch: bool,
max_parallel: usize, max_parallel: usize,
http: reqwest::Client, http: reqwest::Client,
http_semaphore: Arc<Semaphore>,
con: Arc<Mutex<rusqlite::Connection>>, con: Arc<Mutex<rusqlite::Connection>>,
blob_service: BS, blob_service: BS,
dir_service: DS, dir_service: DS,
@ -187,6 +201,7 @@ async fn open_context(
refetch, refetch,
max_parallel, max_parallel,
http: reqwest::Client::new(), http: reqwest::Client::new(),
http_semaphore: Arc::new(Semaphore::new(max_parallel)),
con: Arc::new(Mutex::new(con)), con: Arc::new(Mutex::new(con)),
blob_service, blob_service,
dir_service, dir_service,
@ -194,92 +209,133 @@ async fn open_context(
} }
impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS> { impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS> {
async fn db_latest_download(&self, uri: &str) -> Result<Option<Ingested>, Error> { async fn db_latest_download(&self, uri: &str) -> Result<Option<Sampled>, Error> {
let lock = self.con.lock().unwrap(); let lock = self.con.lock().unwrap();
let mut find_sample = lock let mut find_sample = lock
.prepare_cached(include_str!("q/latest-download.sql")) .prepare_cached(include_str!("q/latest-download.sql"))
.expect("Failed to prepare latest-download.sql"); .expect("Failed to prepare latest-download.sql");
find_sample find_sample
.query_row(params![uri], |r| <(u32, String, u32)>::try_from(r)) .query_row(params![uri], |r| {
<(u32, String, u64, Option<u16>, u32)>::try_from(r)
})
.optional() .optional()
.context("db_latest_download.sql") .context("db_latest_download.sql")
.and_then(|maybe_triple| match maybe_triple { .and_then(|maybe_tuple| match maybe_tuple {
Some((sample_id, blake3, epoch)) => Ok(Some(Ingested { Some((sample_id, hash, n_bytes, http_code, epoch)) => Ok(Some(Sampled {
sample_id, sample_id,
uri: uri.to_string(), uri: uri.to_string(),
blake3: B3Digest::from_str(&blake3)?, blob: Some(SizedBlob {
hash: B3Digest::from_str(&hash)?,
n_bytes,
}),
http_status: http_code,
epoch, epoch,
when: IngestedWhen::Before, when: SampledWhen::Before,
})), })),
None => Ok(None), None => Ok(None),
}) })
} }
async fn db_add_sample(&self, uri: &str, blake3: &str) -> Result<(u32, u32), rusqlite::Error> { async fn db_add_sample(
let lock = self.con.lock().unwrap(); &self,
uri: &str,
hash: &Option<String>,
http_code: Option<u16>,
) -> Result<(u32, u32), Error> {
let lock = self.con.lock().expect("Couldn't lock mutex");
let mut add_sample = lock let mut add_sample = lock
.prepare_cached(include_str!("q/add-sample.sql")) .prepare_cached(include_str!("q/add-sample.sql"))
.expect("Failed to prepare add-sample.sql"); .context("Failed to prepare add-sample.sql")?;
add_sample.query_row(params![uri, blake3], |row| <(u32, u32)>::try_from(row)) Ok(add_sample.query_row(params![uri, hash, http_code], |row| {
<(u32, u32)>::try_from(row)
})?)
} }
async fn db_add_blob(&self, blake3: &str, n_bytes: u64) -> Result<usize, rusqlite::Error> { async fn db_add_blob(&self, hash: &str, n_bytes: u64) -> Result<usize, Error> {
let lock = self.con.lock().unwrap(); let lock = self.con.lock().expect("db_add_blob: couldn't lock mutex?");
let mut add_blob = lock let mut add_blob = lock
.prepare_cached(include_str!("q/upsert-blob.sql")) .prepare_cached(include_str!("q/upsert-blob.sql"))
.expect("Failed to prepare upsert-blob.sql"); .context("Failed to prepare upsert-blob.sql")?;
add_blob.execute(params![blake3, n_bytes,]) Ok(add_blob.execute(params![hash, n_bytes,])?)
} }
async fn db_add_uri(&self, uri: &str) -> Result<usize, rusqlite::Error> { async fn db_add_uri(&self, uri: &str) -> Result<usize, Error> {
let lock = self.con.lock().unwrap(); let lock = self.con.lock().unwrap();
let mut add_uri = lock let mut add_uri = lock
.prepare_cached(include_str!("q/upsert-uri.sql")) .prepare_cached(include_str!("q/upsert-uri.sql"))
.expect("Failed to prepare upsert-uri.sql"); .context("Failed to prepare upsert-uri.sql")?;
add_uri.execute(params![uri]) Ok(add_uri.execute(params![uri])?)
} }
async fn record_ingested_node( async fn record_ingested_node(
&self, &self,
uri: &str, uri: &str,
blake3: &snix_castore::B3Digest, blob: &Option<SizedBlob>,
n_bytes: u64, http_code: Option<u16>,
) -> Result<Ingested, Error> { ) -> Result<Sampled, Error> {
let digest64 = format!("{}", blake3); let digest64 = if let Some(SizedBlob { hash, n_bytes }) = blob {
self.db_add_blob(&digest64, n_bytes).await?; let digest64 = format!("{}", hash);
self.db_add_blob(&digest64, n_bytes.clone()).await?;
Some(digest64)
} else {
None
};
self.db_add_uri(&uri).await?; self.db_add_uri(&uri).await?;
let (sample_id, epoch) = self.db_add_sample(&uri, &digest64).await?; let (sample_id, epoch) = self
Ok(Ingested { .db_add_sample(&uri, &digest64, http_code.clone())
.await?;
Ok(Sampled {
sample_id, sample_id,
uri: uri.to_string(), uri: uri.to_string(),
blake3: blake3.clone(), blob: blob.clone(),
http_status: http_code,
epoch, epoch,
when: IngestedWhen::Now, when: SampledWhen::Now,
}) })
} }
async fn download_no_cache(&self, uri: &Url) -> Result<Ingested, Error> { async fn download_no_cache(&self, uri: &Url) -> Result<Sampled, Error> {
let _permit = self.http_semaphore.acquire().await.unwrap();
eprintln!("Downloading {:?}", uri.to_string());
let uri_s = uri.to_string(); let uri_s = uri.to_string();
let res = self let res = self
.http .http
.get(uri.clone()) .get(uri.clone())
.send() .send()
.await .await
.context(format!("Request::send failed early for {:?}", uri))? .context(format!("Request::send failed early for {:?}", uri))?;
.error_for_status()?;
let mut r = let status = res.status();
tokio_util::io::StreamReader::new(res.bytes_stream().map_err(std::io::Error::other)); let status_code = status.as_u16();
let mut w = self.blob_service.open_write().await;
let n_bytes = match tokio::io::copy(&mut r, &mut w).await { if status.is_success() {
Ok(n) => n, let mut r = tokio_util::io::StreamReader::new(
Err(e) => { res.bytes_stream().map_err(std::io::Error::other),
return Err(anyhow!( );
"tokio::io::copy failed for uri={} with {}", let mut w = self.blob_service.open_write().await;
uri_s, let n_bytes = match tokio::io::copy(&mut r, &mut w).await {
e Ok(n) => n,
)); Err(e) => {
} return Err(anyhow!(
}; "tokio::io::copy failed for uri={} with {}",
let digest = w.close().await?; uri_s,
self.record_ingested_node(&uri_s, &digest, n_bytes).await e
));
}
};
let digest = w.close().await?;
self.record_ingested_node(
&uri_s,
&Some(SizedBlob {
hash: digest,
n_bytes,
}),
Some(status_code),
)
.await
} else {
self.record_ingested_node(&uri_s, &None, Some(status_code))
.await
}
} }
async fn download(&self, uri: &Url) -> Result<Ingested, Error> { async fn download(&self, uri: &Url) -> Result<Sampled, Error> {
if self.refetch { if self.refetch {
self.download_no_cache(&uri).await self.download_no_cache(&uri).await
} else { } else {
@ -289,7 +345,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
} }
} }
} }
async fn ingest(&self, inputs: &Vec<Ingestable>) -> Vec<Result<Option<Ingested>, Error>> { async fn ingest(&self, inputs: &Vec<Ingestable>) -> Vec<Result<Option<Sampled>, Error>> {
let samples = stream::iter(inputs.iter().map(|uri| { let samples = stream::iter(inputs.iter().map(|uri| {
let blob_service = &self.blob_service; let blob_service = &self.blob_service;
let dir_service = &self.dir_service; let dir_service = &self.dir_service;
@ -306,7 +362,14 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
.await? .await?
{ {
snix_castore::Node::Directory { digest, size } => self snix_castore::Node::Directory { digest, size } => self
.record_ingested_node(&uri_s, &digest, size) .record_ingested_node(
&uri_s,
&Some(SizedBlob {
hash: digest,
n_bytes: size,
}),
None,
)
.await .await
.map(Some), .map(Some),
@ -315,7 +378,14 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
size, size,
executable: _, executable: _,
} => self } => self
.record_ingested_node(&uri_s, &digest, size) .record_ingested_node(
&uri_s,
&Some(SizedBlob {
hash: digest,
n_bytes: size,
}),
None,
)
.await .await
.map(Some), .map(Some),
snix_castore::Node::Symlink { target: _ } => { snix_castore::Node::Symlink { target: _ } => {
@ -328,7 +398,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
} }
})) }))
.buffer_unordered(self.max_parallel) .buffer_unordered(self.max_parallel)
.collect::<Vec<Result<Option<Ingested>, _>>>() .collect::<Vec<Result<Option<Sampled>, _>>>()
.await; .await;
samples samples
@ -349,39 +419,70 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
self: Arc<Self>, self: Arc<Self>,
url: Url, url: Url,
max_depth: usize, max_depth: usize,
html_max_bytes: u64,
tx: Sender<FetchListingMessage>, tx: Sender<FetchListingMessage>,
) -> Result<(), Error> { ) -> Result<(), Error> {
eprintln!("Downloading {:?}", url.to_string()); let maybe_root = self.download(&url).await;
let root = self.download(&url).await?; if let Err(ref e) = maybe_root {
tx.send(FetchListingMessage::Ingested(url.clone(), root.clone())) eprintln!("Couldn't download {}: {:?}", url, e);
};
let root = maybe_root?;
tx.send(FetchListingMessage::Sampled(url.clone(), root.clone()))
.await .await
.context("Stopped accepting tasks before processing an Ingested notification")?; .context("Stopped accepting tasks before processing an Ingested notification")?;
if max_depth <= 0 { if max_depth <= 0 {
return Ok(()); return Ok(());
} }
/* TODO: no need to load blobs to memory unless you know they're text/html */
match self.blob_service.open_read(&root.blake3).await? { match root.blob {
Some(mut reader) => { None => Err(anyhow!(
let content = { "Couldn't download {}. Status code: {:?}",
let mut br = BufReader::new(&mut *reader); url,
let mut content = String::new(); root.http_status
br.read_to_string(&mut content).await?; )),
content Some(SizedBlob { hash, n_bytes }) => {
}; if n_bytes > html_max_bytes {
let hrefs = Self::extract_hrefs(&content).unwrap_or(vec![]); return Ok(());
/* max_depth > 0 here */ }
for href in hrefs { match self.blob_service.open_read(&hash).await? {
let next_url = url.join(&href).context("Constructing next_url")?; Some(mut reader) => {
tx.send(FetchListingMessage::Recurse( let content = {
next_url.clone(), let mut br = BufReader::new(&mut *reader);
max_depth - 1, let mut content = String::new();
)) br.read_to_string(&mut content).await?;
.await content
.context("Stopped accepting tasks before finishing all hrefs")?; };
let hrefs = Self::extract_hrefs(&content).unwrap_or(vec![]);
/* max_depth > 0 here */
for href in hrefs.clone() {
let next_url = url.join(&href).context("Constructing next_url")?;
tx.send(FetchListingMessage::Recurse(
next_url.clone(),
max_depth - 1,
tx.clone(),
))
.await
.context("Stopped accepting tasks before finishing all hrefs")?;
}
{
let lock = self.con.lock().expect("Couldn't acquire Mutex?");
for href in hrefs {
let mut stmt =
lock.prepare_cached(include_str!("q/add-str.sql"))?;
stmt.execute(params!["href"])?;
let next_url = url.join(&href).context("Constructing next_url")?;
let mut stmt =
lock.prepare_cached(include_str!("q/add-uri-ref.sql"))?;
let digest64 = hash.to_string();
stmt.execute(params![digest64, next_url.to_string(), href])?;
}
};
Ok(())
}
None => Err(anyhow!("Couldn't read the ingested blob")),
} }
Ok(())
} }
None => Err(anyhow!("Couldn't read the ingested blob")),
} }
} }
@ -389,7 +490,8 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
self: Arc<Self>, self: Arc<Self>,
url: Url, url: Url,
max_depth: usize, max_depth: usize,
) -> ReceiverStream<Ingested> { html_max_bytes: u64,
) -> ReceiverStream<Sampled> {
let mq_size = 10; let mq_size = 10;
/* TODO: move task queue to e.g. sqlite */ /* TODO: move task queue to e.g. sqlite */
@ -397,33 +499,42 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
let (out_tx, out_rx) = channel(mq_size); let (out_tx, out_rx) = channel(mq_size);
let semaphore = Arc::new(Semaphore::new(self.max_parallel));
tokio::spawn({ tokio::spawn({
async move { async move {
let mut seen: HashSet<String> = HashSet::new(); let mut seen: HashSet<String> = HashSet::new();
tx.send(FetchListingMessage::Recurse(url, max_depth)) {
.await let tx_moved = tx;
.expect("fetch_from_listing failed populating the queue"); tx_moved
.send(FetchListingMessage::Recurse(
url,
max_depth,
tx_moved.clone(),
))
.await
.expect("fetch_from_listing failed populating the queue");
};
while let Some(m) = rx.recv().await { while let Some(m) = rx.recv().await {
match m { match m {
FetchListingMessage::Ingested(_url, ingested) => { FetchListingMessage::Sampled(_url, ingested) => {
out_tx out_tx
.send(ingested) .send(ingested)
.await .await
.expect("ReceiverStream failed to accept an Ingestable"); .expect("ReceiverStream failed to accept an Ingestable");
} }
FetchListingMessage::Recurse(url, max_depth) => { FetchListingMessage::Recurse(url, max_depth, tx) => {
if max_depth > 0 && !seen.contains(&url.to_string()) { if max_depth > 0 && !seen.contains(&url.to_string()) {
seen.insert(url.to_string()); seen.insert(url.to_string());
tokio::spawn({ tokio::spawn({
let s = self.clone(); let s = self.clone();
let url = url.clone(); let url = url.clone();
let tx = tx.clone();
let semaphore = semaphore.clone();
async move { async move {
let _permit = semaphore.acquire(); s.fetch_from_listing_impl(
s.fetch_from_listing_impl(url, max_depth, tx).await url,
max_depth,
html_max_bytes,
tx,
)
.await
} }
}); });
} }
@ -468,11 +579,17 @@ async fn main() {
} }
} }
} }
Some(Command::FetchListing { max_depth, inputs }) => { Some(Command::FetchListing {
let ingested: Vec<Ingested> = stream::iter(inputs) max_depth,
html_max_bytes,
inputs,
}) => {
let ingested: Vec<Sampled> = stream::iter(inputs)
.then(async |i| { .then(async |i| {
let i = i.clone(); let i = i.clone();
ctx.clone().fetch_from_listing(i, max_depth).await ctx.clone()
.fetch_from_listing(i, max_depth, html_max_bytes)
.await
}) })
.flatten_unordered(args.max_parallel) .flatten_unordered(args.max_parallel)
.collect() .collect()
@ -481,6 +598,11 @@ async fn main() {
eprintln!("{:?}", i); eprintln!("{:?}", i);
} }
} }
Some(Command::ParseUrl { url: urls }) => {
for url in urls {
println!("{:?}", url);
}
}
None => {} None => {}
} }
} }

View file

@ -1,21 +1,22 @@
INSERT INTO sidx_uri_sample(uri_id, blake3_id) INSERT INTO SidxUriSample(uri, hash, http_code)
VALUES( VALUES(
( (
SELECT SELECT
id id
FROM FROM
sidx_uri Str
WHERE WHERE
uri = ? str = ?
LIMIT 1 LIMIT 1
), ),
( (
SELECT SELECT
id id
FROM FROM
sidx_blake3 Hash
WHERE WHERE
blake3 = ? hash = ?
) ),
?
) )
RETURNING id, epoch; RETURNING id, epoch;

5
src/q/add-str.sql Normal file
View file

@ -0,0 +1,5 @@
INSERT INTO Str(str)
VALUES
(?)
ON CONFLICT DO NOTHING;

View file

@ -1,16 +1,19 @@
SELECT SELECT
s.id AS sample_id, s.id AS sample_id,
b.blake3, h.hash,
h.n_bytes,
s.http_code,
s.epoch s.epoch
FROM FROM
sidx_uri_sample AS s, SidxUriSample AS s,
sidx_uri AS u, Str AS u,
sidx_blake3 AS b Hash AS h
ON ON
s.uri_id = u.id s.uri = u.id
AND s.blake3_id = b.id AND s.hash = h.id
WHERE WHERE
u.uri = ? u.str = ?
AND s.hash IS NOT NULL
ORDER BY ORDER BY
s.epoch DESC s.epoch DESC
LIMIT 1; LIMIT 1;

View file

@ -1,22 +1,36 @@
CREATE TABLE IF NOT EXISTS sidx_uri( CREATE TABLE IF NOT EXISTS Hash(
id INTEGER, id INTEGER,
uri TEXT UNIQUE, hash TEXT UNIQUE, /* snix-castore node */
n_bytes INTEGER,
PRIMARY KEY(id) PRIMARY KEY(id)
); ); /* Essentially random strings */
CREATE TABLE IF NOT EXISTS sidx_blake3( CREATE TABLE IF NOT EXISTS Str(
id INTEGER, id INTEGER,
blake3 TEXT UNIQUE, /* snix-castore node */ str TEXT UNIQUE,
n_bytes INTEGER NOT NULL,
PRIMARY KEY(id) PRIMARY KEY(id)
); ); /* "Naturally occuring" strings */
CREATE TABLE IF NOT EXISTS sidx_uri_sample( CREATE INDEX IF NOT EXISTS StrIdx ON Str(str);
CREATE TABLE IF NOT EXISTS SidxUriSample(
id INTEGER, id INTEGER,
uri_id INTEGER NOT NULL, uri INTEGER NOT NULL,
blake3_id INTEGER, hash INTEGER,
epoch INTEGER NOT NULL DEFAULT (unixepoch()), epoch INTEGER NOT NULL DEFAULT (unixepoch()),
http_code INTEGER DEFAULT NULL,
PRIMARY KEY(id), PRIMARY KEY(id),
FOREIGN KEY(uri_id) REFERENCES sidx_uri(id), FOREIGN KEY(uri) REFERENCES Str(id),
FOREIGN KEY(blake3_id) REFERENCES sidx_blake3(id) FOREIGN KEY(hash) REFERENCES Hash(id)
); );
CREATE INDEX IF NOT EXISTS sidx_uri_blake3_idx CREATE INDEX IF NOT EXISTS SidxUriHashIdx
ON sidx_uri_sample(uri_id, blake3_id, epoch); ON SidxUriSample(uri, epoch);
CREATE TABLE IF NOT EXISTS UriReference(
content INTEGER,
target INTEGER,
why INTEGER,
PRIMARY KEY (content, target, why),
FOREIGN KEY(content) REFERENCES Hash(id),
FOREIGN KEY(target) REFERENCES Str(id), /* E.g. Uri or Path */
FOREIGN KEY(why) REFERENCES Str(id) /* E.g. "href" */
);
CREATE INDEX IF NOT EXISTS UriReferenceIdx
ON UriReference(target, content);

View file

@ -1,4 +1,4 @@
INSERT INTO sidx_blake3(blake3, n_bytes) INSERT INTO Hash(hash, n_bytes)
VALUES VALUES
(?, ?) (?, ?)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;

View file

@ -1,4 +1,4 @@
INSERT INTO sidx_uri(uri) INSERT INTO Str(str)
VALUES VALUES
(?) (?)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;