287 lines
8.8 KiB
Rust
287 lines
8.8 KiB
Rust
use std::path::{absolute, PathBuf};
|
|
|
|
use anyhow::anyhow;
|
|
use anyhow::Context;
|
|
use clap::Parser;
|
|
use clap::Subcommand;
|
|
use futures::{stream, StreamExt, TryStreamExt};
|
|
use rusqlite::{params, OptionalExtension};
|
|
use snix_castore::{blobservice, directoryservice, import::fs::ingest_path};
|
|
use url::Url;
|
|
|
|
#[derive(Clone, Debug)]
|
|
enum Ingestable {
|
|
Url(Url),
|
|
Path(PathBuf),
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
enum IngestedWhen {
|
|
Now,
|
|
Before,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
#[allow(dead_code)]
|
|
struct Ingested {
|
|
sample_id: u32,
|
|
uri: String,
|
|
blake3: String,
|
|
epoch: u32,
|
|
when: IngestedWhen,
|
|
}
|
|
|
|
impl std::fmt::Display for Ingestable {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
Ingestable::Url(url) => write!(f, "{}", url),
|
|
Ingestable::Path(path_buf) => match path_buf.to_str() {
|
|
Some(s) => write!(f, "{}", s),
|
|
None => {
|
|
panic!("PathBuf::to_str failed")
|
|
}
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
fn parse_url_or_path(s: &str) -> Result<Ingestable, anyhow::Error> {
|
|
if s.is_empty() {
|
|
Err(anyhow!("Empty path (url)"))
|
|
} else if s.starts_with("./") || s.starts_with("/") {
|
|
Ok(Ingestable::Path(PathBuf::from(s)))
|
|
} else {
|
|
let url = Url::parse(s)?;
|
|
if url.scheme() == "file" {
|
|
match url.to_file_path() {
|
|
Ok(s) => Ok(Ingestable::Path(s)),
|
|
Err(()) => Err(anyhow!(
|
|
"parse_url_or_path: couldn't convert Url ({}) to Path",
|
|
url
|
|
)),
|
|
}
|
|
} else {
|
|
Ok(Ingestable::Url(url))
|
|
}
|
|
}
|
|
}
|
|
|
|
fn data_path() -> PathBuf {
|
|
let xdg_data_dir = std::env::var("XDG_DATA_DIR")
|
|
.and_then(|s| Ok(PathBuf::from(s)))
|
|
.or_else(|_| -> Result<PathBuf, anyhow::Error> {
|
|
match std::env::home_dir() {
|
|
Some(p) => Ok(p.join(".local/share")),
|
|
None => Err(anyhow!("...")), // FIXME
|
|
}
|
|
});
|
|
match xdg_data_dir {
|
|
Ok(p) => p.join("sidx"),
|
|
Err(_) => PathBuf::from(".sidx"),
|
|
}
|
|
}
|
|
fn default_castore_path() -> PathBuf {
|
|
data_path().join("castore")
|
|
}
|
|
fn default_db_path() -> PathBuf {
|
|
data_path().join("sidx.db")
|
|
}
|
|
|
|
#[derive(Subcommand)]
|
|
enum Command {
|
|
Ingest {
|
|
#[clap(value_parser = parse_url_or_path, num_args = 1)]
|
|
inputs: Vec<Ingestable>,
|
|
},
|
|
}
|
|
|
|
#[derive(Parser)]
|
|
struct Cli {
|
|
#[clap(short, long, action)]
|
|
refetch: bool,
|
|
|
|
#[clap(short, long, value_parser, default_value_t = 5)]
|
|
max_parallel: usize,
|
|
|
|
#[clap(short, long, value_parser, default_value_os_t = default_db_path())]
|
|
db_path: PathBuf,
|
|
|
|
#[clap(short, long, value_parser, default_value_os_t = default_castore_path())]
|
|
castore_path: PathBuf,
|
|
|
|
#[command(subcommand)]
|
|
command: Option<Command>,
|
|
}
|
|
|
|
async fn ingest<BS, DS>(
|
|
inputs: &Vec<Ingestable>,
|
|
refetch: bool,
|
|
max_parallel: usize,
|
|
http_client: reqwest::Client,
|
|
blob_service: BS,
|
|
dir_service: DS,
|
|
con: rusqlite::Connection,
|
|
) -> Vec<Result<Option<Ingested>, anyhow::Error>>
|
|
where
|
|
BS: blobservice::BlobService,
|
|
DS: directoryservice::DirectoryService,
|
|
{
|
|
let samples = stream::iter(inputs.iter().map(|uri| {
|
|
let client = &http_client;
|
|
let blob_service = &blob_service;
|
|
let dir_service = &dir_service;
|
|
let con = &con;
|
|
|
|
let mut find_sample = con
|
|
.prepare(include_str!("q/latest-download.sql"))
|
|
.expect("Failed to prepare latest-download.sql");
|
|
let mut add_sample = con
|
|
.prepare(include_str!("q/add-sample.sql"))
|
|
.expect("Failed to prepare add-sample.sql");
|
|
let mut add_blob = con
|
|
.prepare(include_str!("q/upsert-blob.sql"))
|
|
.expect("Failed to prepare upsert-blob.sql");
|
|
let mut add_uri = con
|
|
.prepare(include_str!("q/upsert-uri.sql"))
|
|
.expect("Failed to prepare upsert-uri.sql");
|
|
|
|
async move {
|
|
let uri_s = uri.to_string();
|
|
let latest_download = find_sample
|
|
.query_row(params![uri_s], |r| <(u32, String, u32)>::try_from(r))
|
|
.optional()?;
|
|
if let Some((sample_id, blake3, epoch)) = latest_download {
|
|
if !refetch {
|
|
return Ok::<Option<Ingested>, anyhow::Error>(Some(Ingested {
|
|
sample_id,
|
|
uri: uri_s,
|
|
blake3,
|
|
epoch,
|
|
when: IngestedWhen::Before,
|
|
}));
|
|
}
|
|
}
|
|
let (digest, n_bytes) = match uri {
|
|
Ingestable::Path(path) => {
|
|
match ingest_path::<_, _, _, &[u8]>(&blob_service, &dir_service, path, None)
|
|
.await?
|
|
{
|
|
snix_castore::Node::Directory { digest, size } => (digest, size),
|
|
snix_castore::Node::File {
|
|
digest,
|
|
size,
|
|
executable: _,
|
|
} => (digest, size),
|
|
snix_castore::Node::Symlink { target: _ } => {
|
|
return Err(anyhow!("TODO: Figure out what to do with symlink roots"))
|
|
}
|
|
}
|
|
}
|
|
Ingestable::Url(url) => {
|
|
let res = client
|
|
.get(url.clone())
|
|
.send()
|
|
.await
|
|
.context(format!("Request.send failed early for {:?}", uri))?
|
|
.error_for_status()?;
|
|
let mut r = tokio_util::io::StreamReader::new(
|
|
res.bytes_stream().map_err(std::io::Error::other),
|
|
);
|
|
let mut w = blob_service.open_write().await;
|
|
let n_bytes = match tokio::io::copy(&mut r, &mut w).await {
|
|
Ok(n) => n,
|
|
Err(e) => {
|
|
return Err(anyhow!(
|
|
"tokio::io::copy failed for uri={} with {}",
|
|
uri_s,
|
|
e
|
|
));
|
|
}
|
|
};
|
|
let digest = w.close().await?;
|
|
(digest, n_bytes)
|
|
}
|
|
};
|
|
let digest64 = format!("{}", digest);
|
|
add_blob.execute(params![digest64, n_bytes,])?;
|
|
add_uri.execute(params![uri_s])?;
|
|
let (sample_id, epoch) = add_sample
|
|
.query_row(params![uri_s, digest64], |row| <(u32, u32)>::try_from(row))?;
|
|
Ok(Some(Ingested {
|
|
sample_id,
|
|
uri: uri_s,
|
|
blake3: digest64,
|
|
epoch,
|
|
when: IngestedWhen::Now,
|
|
}))
|
|
}
|
|
}))
|
|
.buffer_unordered(max_parallel)
|
|
.collect::<Vec<Result<Option<Ingested>, _>>>()
|
|
.await;
|
|
|
|
samples
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
let args = Cli::parse();
|
|
|
|
args.db_path.parent().and_then(|p| {
|
|
let _ = std::fs::create_dir_all(p);
|
|
Some(())
|
|
});
|
|
|
|
let con =
|
|
rusqlite::Connection::open(&args.db_path).expect("Failed to construct Database object");
|
|
con.execute_batch(include_str!("q/init.sql"))
|
|
.expect("Failed to execute init.sql");
|
|
let castore_path = absolute(args.castore_path).expect("Failed to canonicalize castore_path");
|
|
let blob_service = blobservice::from_addr(&std::format!(
|
|
"objectstore+file://{}",
|
|
castore_path
|
|
.join("blob")
|
|
.to_str()
|
|
.expect("Path::to_str unexpectedly broken")
|
|
))
|
|
.await
|
|
.expect("Couldn't initialize .castore/blob");
|
|
let dir_service = directoryservice::from_addr(&std::format!(
|
|
"objectstore+file://{}",
|
|
castore_path
|
|
.join("directory")
|
|
.to_str()
|
|
.expect("Path::to_str unexpectedly broken")
|
|
))
|
|
.await
|
|
.expect("Couldn't initialize .castore/directory");
|
|
|
|
let client = reqwest::Client::new();
|
|
|
|
match args.command {
|
|
Some(Command::Ingest { inputs }) => {
|
|
let samples = ingest(
|
|
&inputs,
|
|
args.refetch,
|
|
args.max_parallel,
|
|
client,
|
|
blob_service,
|
|
dir_service,
|
|
con,
|
|
)
|
|
.await;
|
|
for s in samples {
|
|
match s {
|
|
Err(e) => {
|
|
eprintln!("Failed to fetch: {}", e);
|
|
}
|
|
Ok(None) => {}
|
|
Ok(Some(ingested)) => {
|
|
eprintln!("{:?}", ingested)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
None => {}
|
|
}
|
|
}
|