Ingestable: move into subcommand

This commit is contained in:
Else, Someone 2025-04-20 08:59:50 +00:00
parent 3a31c0bdf3
commit 9de0315810

View file

@ -3,6 +3,7 @@ use std::path::{absolute, PathBuf};
use anyhow::anyhow; use anyhow::anyhow;
use anyhow::Context; use anyhow::Context;
use clap::Parser; use clap::Parser;
use clap::Subcommand;
use futures::{stream, StreamExt, TryStreamExt}; use futures::{stream, StreamExt, TryStreamExt};
use rusqlite::{params, OptionalExtension}; use rusqlite::{params, OptionalExtension};
use snix_castore::{blobservice, directoryservice, import::fs::ingest_path}; use snix_castore::{blobservice, directoryservice, import::fs::ingest_path};
@ -86,11 +87,16 @@ fn default_db_path() -> PathBuf {
data_path().join("sidx.db") data_path().join("sidx.db")
} }
#[derive(Parser)] #[derive(Subcommand)]
struct Cli { enum Command {
Ingest {
#[clap(value_parser = parse_url_or_path, num_args = 1)] #[clap(value_parser = parse_url_or_path, num_args = 1)]
inputs: Vec<Ingestable>, inputs: Vec<Ingestable>,
},
}
#[derive(Parser)]
struct Cli {
#[clap(short, long, action)] #[clap(short, long, action)]
refetch: bool, refetch: bool,
@ -102,47 +108,30 @@ struct Cli {
#[clap(short, long, value_parser, default_value_os_t = default_castore_path())] #[clap(short, long, value_parser, default_value_os_t = default_castore_path())]
castore_path: PathBuf, castore_path: PathBuf,
#[command(subcommand)]
command: Option<Command>,
} }
#[tokio::main] async fn ingest<BS, DS>(
async fn main() { inputs: &Vec<Ingestable>,
let args = Cli::parse(); refetch: bool,
max_parallel: usize,
args.db_path.parent().and_then(|p| { http_client: reqwest::Client,
let _ = std::fs::create_dir_all(p); blob_service: BS,
Some(()) dir_service: DS,
}); con: rusqlite::Connection,
) -> Vec<Result<Option<Ingested>, anyhow::Error>>
let con = where
rusqlite::Connection::open(&args.db_path).expect("Failed to construct Database object"); BS: blobservice::BlobService,
con.execute_batch(include_str!("q/init.sql")) DS: directoryservice::DirectoryService,
.expect("Failed to execute init.sql"); {
let castore_path = absolute(args.castore_path).expect("Failed to canonicalize castore_path"); let samples = stream::iter(inputs.iter().map(|uri| {
let blob_service = blobservice::from_addr(&std::format!( let client = &http_client;
"objectstore+file://{}",
castore_path
.join("blob")
.to_str()
.expect("Path::to_str unexpectedly broken")
))
.await
.expect("Couldn't initialize .castore/blob");
let dir_service = directoryservice::from_addr(&std::format!(
"objectstore+file://{}",
castore_path
.join("directory")
.to_str()
.expect("Path::to_str unexpectedly broken")
))
.await
.expect("Couldn't initialize .castore/directory");
let client = reqwest::Client::new();
let samples = stream::iter(args.inputs.iter().map(|uri| {
let client = &client;
let blob_service = &blob_service; let blob_service = &blob_service;
let dir_service = &dir_service; let dir_service = &dir_service;
let con = &con; let con = &con;
let mut find_sample = con let mut find_sample = con
.prepare(include_str!("q/latest-download.sql")) .prepare(include_str!("q/latest-download.sql"))
.expect("Failed to prepare latest-download.sql"); .expect("Failed to prepare latest-download.sql");
@ -162,7 +151,7 @@ async fn main() {
.query_row(params![uri_s], |r| <(u32, String, u32)>::try_from(r)) .query_row(params![uri_s], |r| <(u32, String, u32)>::try_from(r))
.optional()?; .optional()?;
if let Some((sample_id, blake3, epoch)) = latest_download { if let Some((sample_id, blake3, epoch)) = latest_download {
if !args.refetch { if !refetch {
return Ok::<Option<Ingested>, anyhow::Error>(Some(Ingested { return Ok::<Option<Ingested>, anyhow::Error>(Some(Ingested {
sample_id, sample_id,
uri: uri_s, uri: uri_s,
@ -227,10 +216,60 @@ async fn main() {
})) }))
} }
})) }))
.buffer_unordered(args.max_parallel) .buffer_unordered(max_parallel)
.collect::<Vec<Result<Option<Ingested>, _>>>() .collect::<Vec<Result<Option<Ingested>, _>>>()
.await; .await;
samples
}
#[tokio::main]
async fn main() {
let args = Cli::parse();
args.db_path.parent().and_then(|p| {
let _ = std::fs::create_dir_all(p);
Some(())
});
let con =
rusqlite::Connection::open(&args.db_path).expect("Failed to construct Database object");
con.execute_batch(include_str!("q/init.sql"))
.expect("Failed to execute init.sql");
let castore_path = absolute(args.castore_path).expect("Failed to canonicalize castore_path");
let blob_service = blobservice::from_addr(&std::format!(
"objectstore+file://{}",
castore_path
.join("blob")
.to_str()
.expect("Path::to_str unexpectedly broken")
))
.await
.expect("Couldn't initialize .castore/blob");
let dir_service = directoryservice::from_addr(&std::format!(
"objectstore+file://{}",
castore_path
.join("directory")
.to_str()
.expect("Path::to_str unexpectedly broken")
))
.await
.expect("Couldn't initialize .castore/directory");
let client = reqwest::Client::new();
match args.command {
Some(Command::Ingest { inputs }) => {
let samples = ingest(
&inputs,
args.refetch,
args.max_parallel,
client,
blob_service,
dir_service,
con,
)
.await;
for s in samples { for s in samples {
match s { match s {
Err(e) => { Err(e) => {
@ -243,3 +282,6 @@ async fn main() {
} }
} }
} }
None => {}
}
}