use std::{ path::{absolute, PathBuf}, pin::Pin, }; use anyhow::anyhow; use clap::Parser; use futures::{stream, StreamExt, TryStreamExt}; use rusqlite::{params, OptionalExtension}; use snix_castore::{blobservice, directoryservice}; use url::Url; #[derive(Clone)] enum Ingestable { Url(Url), Path(PathBuf), } #[derive(Debug)] enum IngestedWhen { Now, Before, } #[derive(Debug)] #[allow(dead_code)] struct Ingested { sample_id: u32, uri: String, blake3: String, epoch: u32, when: IngestedWhen, } impl std::fmt::Display for Ingestable { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Ingestable::Url(url) => write!(f, "{}", url), Ingestable::Path(path_buf) => match path_buf.to_str() { Some(s) => write!(f, "{}", s), None => { panic!("PathBuf::to_str failed") } }, } } } fn parse_url_or_path(s: &str) -> Result { if s.is_empty() { Err(anyhow!("Empty path (url)")) } else if s.starts_with("./") || s.starts_with("/") { Ok(Ingestable::Path(PathBuf::from(s))) } else { let url = Url::parse(s)?; if url.scheme() == "file" { match url.to_file_path() { Ok(s) => Ok(Ingestable::Path(s)), Err(()) => Err(anyhow!( "parse_url_or_path: couldn't convert Url ({}) to Path", url )), } } else { Ok(Ingestable::Url(url)) } } } fn data_path() -> PathBuf { let xdg_data_dir = std::env::var("XDG_DATA_DIR") .and_then(|s| Ok(PathBuf::from(s))) .or_else(|_| -> Result { match std::env::home_dir() { Some(p) => Ok(p.join(".local/share")), None => Err(anyhow!("...")), // FIXME } }); match xdg_data_dir { Ok(p) => p.join("sidx"), Err(_) => PathBuf::from(".sidx"), } } fn default_castore_path() -> PathBuf { data_path().join("castore") } fn default_db_path() -> PathBuf { data_path().join("sidx.db") } #[derive(Parser)] struct Cli { #[clap(value_parser = parse_url_or_path, num_args = 1)] inputs: Vec, #[clap(short, long, action)] refetch: bool, #[clap(short, long, value_parser, default_value_t = 5)] max_parallel: usize, #[clap(short, long, value_parser, default_value_os_t = default_db_path())] db_path: PathBuf, #[clap(short, long, value_parser, default_value_os_t = default_castore_path())] castore_path: PathBuf, } #[tokio::main] async fn main() { let args = Cli::parse(); args.db_path.parent().and_then(|p| { let _ = std::fs::create_dir_all(p); Some(()) }); let con = rusqlite::Connection::open(&args.db_path).expect("Failed to construct Database object"); con.execute_batch(include_str!("q/init.sql")) .expect("Failed to execute init.sql"); let castore_path = absolute(args.castore_path).expect("Failed to canonicalize castore_path"); let blob_service = blobservice::from_addr(&std::format!( "objectstore+file://{}", castore_path .join("blob") .to_str() .expect("Path::to_str unexpectedly broken") )) .await .expect("Couldn't initialize .castore/blob"); let dir_service = directoryservice::from_addr(&std::format!( "objectstore+file://{}", castore_path .join("directory") .to_str() .expect("Path::to_str unexpectedly broken") )) .await .expect("Couldn't initialize .castore/directory"); let client = reqwest::Client::new(); let samples = stream::iter(args.inputs.iter().map(|uri| { let client = &client; let blob_service = &blob_service; let _dir_service = &dir_service; let con = &con; let mut find_sample = con .prepare(include_str!("q/latest-download.sql")) .expect("Failed to prepare latest-download.sql"); let mut add_sample = con .prepare(include_str!("q/add-sample.sql")) .expect("Failed to prepare add-sample.sql"); let mut add_blob = con .prepare(include_str!("q/upsert-blob.sql")) .expect("Failed to prepare upsert-blob.sql"); let mut add_uri = con .prepare(include_str!("q/upsert-uri.sql")) .expect("Failed to prepare upsert-uri.sql"); async move { let uri_s = uri.to_string(); let latest_download = find_sample .query_row(params![uri_s], |r| <(u32, String, u32)>::try_from(r)) .optional()?; if let Some((sample_id, blake3, epoch)) = latest_download { if !args.refetch { return Ok::, anyhow::Error>(Some(Ingested { sample_id, uri: uri_s, blake3, epoch, when: IngestedWhen::Before, })); } } let mut r: Pin> = { match uri { Ingestable::Path(path) => match tokio::fs::File::open(path).await { Ok(f) => Box::pin(f), Err(e) => { return Err(anyhow!("Failed to read {:?}: {}", path, e)); } }, Ingestable::Url(url) => { let res = match client.get(url.clone()).send().await { Ok(res) => res.error_for_status()?, Err(e) => { return Err(anyhow!("Failed to GET {}: {}", url, e)); } }; let r = tokio_util::io::StreamReader::new( res.bytes_stream().map_err(std::io::Error::other), ); Box::pin(r) } } }; let mut w = blob_service.open_write().await; let n_bytes = match tokio::io::copy(&mut r, &mut w).await { Ok(n) => n, Err(e) => { return Err(anyhow!( "tokio::io::copy failed for uri={} with {}", uri_s, e )); } }; let digest = w.close().await?; let digest64 = format!("{}", digest); add_blob.execute(params![digest64, n_bytes,])?; add_uri.execute(params![uri_s])?; let (sample_id, epoch) = add_sample .query_row(params![uri_s, digest64], |row| <(u32, u32)>::try_from(row))?; Ok(Some(Ingested { sample_id, uri: uri_s, blake3: digest64, epoch, when: IngestedWhen::Now, })) } })) .buffer_unordered(args.max_parallel) .collect::, _>>>() .await; for s in samples { match s { Err(e) => { println!("Failed to fetch ...: {}", e); } Ok(None) => {} Ok(Some(ingested)) => { println!("{:?}", ingested) } } } }