From 9de0315810e9b4c60da9f23c6be1b7a082098b5c Mon Sep 17 00:00:00 2001 From: SomeoneSerge Date: Sun, 20 Apr 2025 08:59:50 +0000 Subject: [PATCH] Ingestable: move into subcommand --- src/main.rs | 140 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 91 insertions(+), 49 deletions(-) diff --git a/src/main.rs b/src/main.rs index 482aae7..2fa977a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ use std::path::{absolute, PathBuf}; use anyhow::anyhow; use anyhow::Context; use clap::Parser; +use clap::Subcommand; use futures::{stream, StreamExt, TryStreamExt}; use rusqlite::{params, OptionalExtension}; use snix_castore::{blobservice, directoryservice, import::fs::ingest_path}; @@ -86,11 +87,16 @@ fn default_db_path() -> PathBuf { data_path().join("sidx.db") } +#[derive(Subcommand)] +enum Command { + Ingest { + #[clap(value_parser = parse_url_or_path, num_args = 1)] + inputs: Vec, + }, +} + #[derive(Parser)] struct Cli { - #[clap(value_parser = parse_url_or_path, num_args = 1)] - inputs: Vec, - #[clap(short, long, action)] refetch: bool, @@ -102,47 +108,30 @@ struct Cli { #[clap(short, long, value_parser, default_value_os_t = default_castore_path())] castore_path: PathBuf, + + #[command(subcommand)] + command: Option, } -#[tokio::main] -async fn main() { - let args = Cli::parse(); - - args.db_path.parent().and_then(|p| { - let _ = std::fs::create_dir_all(p); - Some(()) - }); - - let con = - rusqlite::Connection::open(&args.db_path).expect("Failed to construct Database object"); - con.execute_batch(include_str!("q/init.sql")) - .expect("Failed to execute init.sql"); - let castore_path = absolute(args.castore_path).expect("Failed to canonicalize castore_path"); - let blob_service = blobservice::from_addr(&std::format!( - "objectstore+file://{}", - castore_path - .join("blob") - .to_str() - .expect("Path::to_str unexpectedly broken") - )) - .await - .expect("Couldn't initialize .castore/blob"); - let dir_service = directoryservice::from_addr(&std::format!( - "objectstore+file://{}", - castore_path - .join("directory") - .to_str() - .expect("Path::to_str unexpectedly broken") - )) - .await - .expect("Couldn't initialize .castore/directory"); - - let client = reqwest::Client::new(); - let samples = stream::iter(args.inputs.iter().map(|uri| { - let client = &client; +async fn ingest( + inputs: &Vec, + refetch: bool, + max_parallel: usize, + http_client: reqwest::Client, + blob_service: BS, + dir_service: DS, + con: rusqlite::Connection, +) -> Vec, anyhow::Error>> +where + BS: blobservice::BlobService, + DS: directoryservice::DirectoryService, +{ + let samples = stream::iter(inputs.iter().map(|uri| { + let client = &http_client; let blob_service = &blob_service; let dir_service = &dir_service; let con = &con; + let mut find_sample = con .prepare(include_str!("q/latest-download.sql")) .expect("Failed to prepare latest-download.sql"); @@ -162,7 +151,7 @@ async fn main() { .query_row(params![uri_s], |r| <(u32, String, u32)>::try_from(r)) .optional()?; if let Some((sample_id, blake3, epoch)) = latest_download { - if !args.refetch { + if !refetch { return Ok::, anyhow::Error>(Some(Ingested { sample_id, uri: uri_s, @@ -227,19 +216,72 @@ async fn main() { })) } })) - .buffer_unordered(args.max_parallel) + .buffer_unordered(max_parallel) .collect::, _>>>() .await; - for s in samples { - match s { - Err(e) => { - println!("Failed to fetch: {}", e); - } - Ok(None) => {} - Ok(Some(ingested)) => { - println!("{:?}", ingested) + samples +} + +#[tokio::main] +async fn main() { + let args = Cli::parse(); + + args.db_path.parent().and_then(|p| { + let _ = std::fs::create_dir_all(p); + Some(()) + }); + + let con = + rusqlite::Connection::open(&args.db_path).expect("Failed to construct Database object"); + con.execute_batch(include_str!("q/init.sql")) + .expect("Failed to execute init.sql"); + let castore_path = absolute(args.castore_path).expect("Failed to canonicalize castore_path"); + let blob_service = blobservice::from_addr(&std::format!( + "objectstore+file://{}", + castore_path + .join("blob") + .to_str() + .expect("Path::to_str unexpectedly broken") + )) + .await + .expect("Couldn't initialize .castore/blob"); + let dir_service = directoryservice::from_addr(&std::format!( + "objectstore+file://{}", + castore_path + .join("directory") + .to_str() + .expect("Path::to_str unexpectedly broken") + )) + .await + .expect("Couldn't initialize .castore/directory"); + + let client = reqwest::Client::new(); + + match args.command { + Some(Command::Ingest { inputs }) => { + let samples = ingest( + &inputs, + args.refetch, + args.max_parallel, + client, + blob_service, + dir_service, + con, + ) + .await; + for s in samples { + match s { + Err(e) => { + println!("Failed to fetch: {}", e); + } + Ok(None) => {} + Ok(Some(ingested)) => { + println!("{:?}", ingested) + } + } } } + None => {} } }