Ingestable: use ingest_path for local paths

This way we don't error-out when a path is a directory. That said, we're
still only including the root in sidx.db (e.g. manifests/ but not
manifests/*.json). We should change that next.

Also renamed "blob_id" to "blake3_id" because datasette has a special
branch for ${column}_id referencing a table that contains ${column}
This commit is contained in:
Else, Someone 2025-04-20 00:12:51 +00:00
parent 65326b2dcb
commit 56a0b346cd
5 changed files with 53 additions and 48 deletions

View file

@ -1,16 +1,14 @@
use std::{ use std::path::{absolute, PathBuf};
path::{absolute, PathBuf},
pin::Pin,
};
use anyhow::anyhow; use anyhow::anyhow;
use anyhow::Context;
use clap::Parser; use clap::Parser;
use futures::{stream, StreamExt, TryStreamExt}; use futures::{stream, StreamExt, TryStreamExt};
use rusqlite::{params, OptionalExtension}; use rusqlite::{params, OptionalExtension};
use snix_castore::{blobservice, directoryservice}; use snix_castore::{blobservice, directoryservice, import::fs::ingest_path};
use url::Url; use url::Url;
#[derive(Clone)] #[derive(Clone, Debug)]
enum Ingestable { enum Ingestable {
Url(Url), Url(Url),
Path(PathBuf), Path(PathBuf),
@ -143,7 +141,7 @@ async fn main() {
let samples = stream::iter(args.inputs.iter().map(|uri| { let samples = stream::iter(args.inputs.iter().map(|uri| {
let client = &client; let client = &client;
let blob_service = &blob_service; let blob_service = &blob_service;
let _dir_service = &dir_service; let dir_service = &dir_service;
let con = &con; let con = &con;
let mut find_sample = con let mut find_sample = con
.prepare(include_str!("q/latest-download.sql")) .prepare(include_str!("q/latest-download.sql"))
@ -174,40 +172,47 @@ async fn main() {
})); }));
} }
} }
let mut r: Pin<Box<dyn tokio::io::AsyncRead>> = { let (digest, n_bytes) = match uri {
match uri { Ingestable::Path(path) => {
Ingestable::Path(path) => match tokio::fs::File::open(path).await { match ingest_path::<_, _, _, &[u8]>(&blob_service, &dir_service, path, None)
Ok(f) => Box::pin(f), .await?
Err(e) => { {
return Err(anyhow!("Failed to read {:?}: {}", path, e)); snix_castore::Node::Directory { digest, size } => (digest, size),
snix_castore::Node::File {
digest,
size,
executable: _,
} => (digest, size),
snix_castore::Node::Symlink { target: _ } => {
return Err(anyhow!("TODO: Figure out what to do with symlink roots"))
} }
},
Ingestable::Url(url) => {
let res = match client.get(url.clone()).send().await {
Ok(res) => res.error_for_status()?,
Err(e) => {
return Err(anyhow!("Failed to GET {}: {}", url, e));
}
};
let r = tokio_util::io::StreamReader::new(
res.bytes_stream().map_err(std::io::Error::other),
);
Box::pin(r)
} }
} }
}; Ingestable::Url(url) => {
let mut w = blob_service.open_write().await; let res = client
let n_bytes = match tokio::io::copy(&mut r, &mut w).await { .get(url.clone())
Ok(n) => n, .send()
Err(e) => { .await
return Err(anyhow!( .context(format!("Request.send failed early for {:?}", uri))?
"tokio::io::copy failed for uri={} with {}", .error_for_status()?;
uri_s, let mut r = tokio_util::io::StreamReader::new(
e res.bytes_stream().map_err(std::io::Error::other),
)); );
let mut w = blob_service.open_write().await;
let n_bytes = match tokio::io::copy(&mut r, &mut w).await {
Ok(n) => n,
Err(e) => {
return Err(anyhow!(
"tokio::io::copy failed for uri={} with {}",
uri_s,
e
));
}
};
let digest = w.close().await?;
(digest, n_bytes)
} }
}; };
let digest = w.close().await?;
let digest64 = format!("{}", digest); let digest64 = format!("{}", digest);
add_blob.execute(params![digest64, n_bytes,])?; add_blob.execute(params![digest64, n_bytes,])?;
add_uri.execute(params![uri_s])?; add_uri.execute(params![uri_s])?;
@ -229,7 +234,7 @@ async fn main() {
for s in samples { for s in samples {
match s { match s {
Err(e) => { Err(e) => {
println!("Failed to fetch ...: {}", e); println!("Failed to fetch: {}", e);
} }
Ok(None) => {} Ok(None) => {}
Ok(Some(ingested)) => { Ok(Some(ingested)) => {

View file

@ -1,4 +1,4 @@
INSERT INTO sidx_uri_sample(uri_id, blob_id) INSERT INTO sidx_uri_sample(uri_id, blake3_id)
VALUES( VALUES(
( (
SELECT SELECT
@ -13,7 +13,7 @@ VALUES(
SELECT SELECT
id id
FROM FROM
sidx_blob sidx_blake3
WHERE WHERE
blake3 = ? blake3 = ?
) )

View file

@ -3,20 +3,20 @@ CREATE TABLE IF NOT EXISTS sidx_uri(
uri TEXT UNIQUE, uri TEXT UNIQUE,
PRIMARY KEY(id) PRIMARY KEY(id)
); );
CREATE TABLE IF NOT EXISTS sidx_blob( CREATE TABLE IF NOT EXISTS sidx_blake3(
id INTEGER, id INTEGER,
blake3 TEXT UNIQUE, blake3 TEXT UNIQUE, /* snix-castore node */
n_bytes INTEGER NOT NULL, n_bytes INTEGER NOT NULL,
PRIMARY KEY(id) PRIMARY KEY(id)
); );
CREATE TABLE IF NOT EXISTS sidx_uri_sample( CREATE TABLE IF NOT EXISTS sidx_uri_sample(
id INTEGER, id INTEGER,
uri_id INTEGER NOT NULL, uri_id INTEGER NOT NULL,
blob_id INTEGER, blake3_id INTEGER,
epoch INTEGER NOT NULL DEFAULT (unixepoch()), epoch INTEGER NOT NULL DEFAULT (unixepoch()),
PRIMARY KEY(id), PRIMARY KEY(id),
FOREIGN KEY(uri_id) REFERENCES sidx_uri(id), FOREIGN KEY(uri_id) REFERENCES sidx_uri(id),
FOREIGN KEY(blob_id) REFERENCES sidx_blob(id) FOREIGN KEY(blake3_id) REFERENCES sidx_blake3(id)
); );
CREATE INDEX IF NOT EXISTS sidx_uri_blob_idx CREATE INDEX IF NOT EXISTS sidx_uri_blake3_idx
ON sidx_uri_sample(uri_id, blob_id, epoch); ON sidx_uri_sample(uri_id, blake3_id, epoch);

View file

@ -5,10 +5,10 @@ SELECT
FROM FROM
sidx_uri_sample AS s, sidx_uri_sample AS s,
sidx_uri AS u, sidx_uri AS u,
sidx_blob AS b sidx_blake3 AS b
ON ON
s.uri_id = u.id s.uri_id = u.id
AND s.blob_id = b.id AND s.blake3_id = b.id
WHERE WHERE
u.uri = ? u.uri = ?
ORDER BY ORDER BY

View file

@ -1,4 +1,4 @@
INSERT INTO sidx_blob(blake3, n_bytes) INSERT INTO sidx_blake3(blake3, n_bytes)
VALUES VALUES
(?, ?) (?, ?)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;