Compare commits

..

No commits in common. "3a31c0bdf3b850e5fd2104cf9b471953d6186269" and "b35e7f30f7c9986764cb9908852fdb343c86c9fa" have entirely different histories.

8 changed files with 48 additions and 148 deletions

4
.gitignore vendored
View file

@ -1,4 +0,0 @@
.direnv
result
result-*
target

View file

@ -1,80 +0,0 @@
# Generated by npins. Do not modify; will be overwritten regularly
let
data = builtins.fromJSON (builtins.readFile ./sources.json);
version = data.version;
mkSource =
spec:
assert spec ? type;
let
path =
if spec.type == "Git" then
mkGitSource spec
else if spec.type == "GitRelease" then
mkGitSource spec
else if spec.type == "PyPi" then
mkPyPiSource spec
else if spec.type == "Channel" then
mkChannelSource spec
else
builtins.throw "Unknown source type ${spec.type}";
in
spec // { outPath = path; };
mkGitSource =
{
repository,
revision,
url ? null,
hash,
branch ? null,
...
}:
assert repository ? type;
# At the moment, either it is a plain git repository (which has an url), or it is a GitHub/GitLab repository
# In the latter case, there we will always be an url to the tarball
if url != null then
(builtins.fetchTarball {
inherit url;
sha256 = hash; # FIXME: check nix version & use SRI hashes
})
else
assert repository.type == "Git";
let
urlToName =
url: rev:
let
matched = builtins.match "^.*/([^/]*)(\\.git)?$" repository.url;
short = builtins.substring 0 7 rev;
appendShort = if (builtins.match "[a-f0-9]*" rev) != null then "-${short}" else "";
in
"${if matched == null then "source" else builtins.head matched}${appendShort}";
name = urlToName repository.url revision;
in
builtins.fetchGit {
url = repository.url;
rev = revision;
inherit name;
# hash = hash;
};
mkPyPiSource =
{ url, hash, ... }:
builtins.fetchurl {
inherit url;
sha256 = hash;
};
mkChannelSource =
{ url, hash, ... }:
builtins.fetchTarball {
inherit url;
sha256 = hash;
};
in
if version == 3 then
builtins.mapAttrs (_: mkSource) data.pins
else
throw "Unsupported format version ${toString version} in sources.json. Try running `npins upgrade`"

View file

@ -1,11 +0,0 @@
{
"pins": {
"nixpkgs": {
"type": "Channel",
"name": "nixpkgs-unstable",
"url": "https://releases.nixos.org/nixpkgs/nixpkgs-25.05pre782598.18dd725c2960/nixexprs.tar.xz",
"hash": "1p7kgyph7xkj57p19nbxpycmbchc6d9gwdznsmxhymrzyzi3if21"
}
},
"version": 3
}

View file

@ -1,14 +1,16 @@
use std::path::{absolute, PathBuf}; use std::{
path::{absolute, PathBuf},
pin::Pin,
};
use anyhow::anyhow; use anyhow::anyhow;
use anyhow::Context;
use clap::Parser; use clap::Parser;
use futures::{stream, StreamExt, TryStreamExt}; use futures::{stream, StreamExt, TryStreamExt};
use rusqlite::{params, OptionalExtension}; use rusqlite::{params, OptionalExtension};
use snix_castore::{blobservice, directoryservice, import::fs::ingest_path}; use snix_castore::{blobservice, directoryservice};
use url::Url; use url::Url;
#[derive(Clone, Debug)] #[derive(Clone)]
enum Ingestable { enum Ingestable {
Url(Url), Url(Url),
Path(PathBuf), Path(PathBuf),
@ -141,7 +143,7 @@ async fn main() {
let samples = stream::iter(args.inputs.iter().map(|uri| { let samples = stream::iter(args.inputs.iter().map(|uri| {
let client = &client; let client = &client;
let blob_service = &blob_service; let blob_service = &blob_service;
let dir_service = &dir_service; let _dir_service = &dir_service;
let con = &con; let con = &con;
let mut find_sample = con let mut find_sample = con
.prepare(include_str!("q/latest-download.sql")) .prepare(include_str!("q/latest-download.sql"))
@ -172,47 +174,40 @@ async fn main() {
})); }));
} }
} }
let (digest, n_bytes) = match uri { let mut r: Pin<Box<dyn tokio::io::AsyncRead>> = {
Ingestable::Path(path) => { match uri {
match ingest_path::<_, _, _, &[u8]>(&blob_service, &dir_service, path, None) Ingestable::Path(path) => match tokio::fs::File::open(path).await {
.await? Ok(f) => Box::pin(f),
{ Err(e) => {
snix_castore::Node::Directory { digest, size } => (digest, size), return Err(anyhow!("Failed to read {:?}: {}", path, e));
snix_castore::Node::File {
digest,
size,
executable: _,
} => (digest, size),
snix_castore::Node::Symlink { target: _ } => {
return Err(anyhow!("TODO: Figure out what to do with symlink roots"))
} }
},
Ingestable::Url(url) => {
let res = match client.get(url.clone()).send().await {
Ok(res) => res.error_for_status()?,
Err(e) => {
return Err(anyhow!("Failed to GET {}: {}", url, e));
}
};
let r = tokio_util::io::StreamReader::new(
res.bytes_stream().map_err(std::io::Error::other),
);
Box::pin(r)
} }
} }
Ingestable::Url(url) => { };
let res = client let mut w = blob_service.open_write().await;
.get(url.clone()) let n_bytes = match tokio::io::copy(&mut r, &mut w).await {
.send() Ok(n) => n,
.await Err(e) => {
.context(format!("Request.send failed early for {:?}", uri))? return Err(anyhow!(
.error_for_status()?; "tokio::io::copy failed for uri={} with {}",
let mut r = tokio_util::io::StreamReader::new( uri_s,
res.bytes_stream().map_err(std::io::Error::other), e
); ));
let mut w = blob_service.open_write().await;
let n_bytes = match tokio::io::copy(&mut r, &mut w).await {
Ok(n) => n,
Err(e) => {
return Err(anyhow!(
"tokio::io::copy failed for uri={} with {}",
uri_s,
e
));
}
};
let digest = w.close().await?;
(digest, n_bytes)
} }
}; };
let digest = w.close().await?;
let digest64 = format!("{}", digest); let digest64 = format!("{}", digest);
add_blob.execute(params![digest64, n_bytes,])?; add_blob.execute(params![digest64, n_bytes,])?;
add_uri.execute(params![uri_s])?; add_uri.execute(params![uri_s])?;
@ -234,7 +229,7 @@ async fn main() {
for s in samples { for s in samples {
match s { match s {
Err(e) => { Err(e) => {
println!("Failed to fetch: {}", e); println!("Failed to fetch ...: {}", e);
} }
Ok(None) => {} Ok(None) => {}
Ok(Some(ingested)) => { Ok(Some(ingested)) => {

View file

@ -1,4 +1,4 @@
INSERT INTO sidx_uri_sample(uri_id, blake3_id) INSERT INTO sidx_uri_sample(uri_id, blob_id)
VALUES( VALUES(
( (
SELECT SELECT
@ -13,7 +13,7 @@ VALUES(
SELECT SELECT
id id
FROM FROM
sidx_blake3 sidx_blob
WHERE WHERE
blake3 = ? blake3 = ?
) )

View file

@ -3,20 +3,20 @@ CREATE TABLE IF NOT EXISTS sidx_uri(
uri TEXT UNIQUE, uri TEXT UNIQUE,
PRIMARY KEY(id) PRIMARY KEY(id)
); );
CREATE TABLE IF NOT EXISTS sidx_blake3( CREATE TABLE IF NOT EXISTS sidx_blob(
id INTEGER, id INTEGER,
blake3 TEXT UNIQUE, /* snix-castore node */ blake3 TEXT UNIQUE,
n_bytes INTEGER NOT NULL, n_bytes INTEGER NOT NULL,
PRIMARY KEY(id) PRIMARY KEY(id)
); );
CREATE TABLE IF NOT EXISTS sidx_uri_sample( CREATE TABLE IF NOT EXISTS sidx_uri_sample(
id INTEGER, id INTEGER,
uri_id INTEGER NOT NULL, uri_id INTEGER NOT NULL,
blake3_id INTEGER, blob_id INTEGER,
epoch INTEGER NOT NULL DEFAULT (unixepoch()), epoch INTEGER NOT NULL DEFAULT (unixepoch()),
PRIMARY KEY(id), PRIMARY KEY(id),
FOREIGN KEY(uri_id) REFERENCES sidx_uri(id), FOREIGN KEY(uri_id) REFERENCES sidx_uri(id),
FOREIGN KEY(blake3_id) REFERENCES sidx_blake3(id) FOREIGN KEY(blob_id) REFERENCES sidx_blob(id)
); );
CREATE INDEX IF NOT EXISTS sidx_uri_blake3_idx CREATE INDEX IF NOT EXISTS sidx_uri_blob_idx
ON sidx_uri_sample(uri_id, blake3_id, epoch); ON sidx_uri_sample(uri_id, blob_id, epoch);

View file

@ -5,10 +5,10 @@ SELECT
FROM FROM
sidx_uri_sample AS s, sidx_uri_sample AS s,
sidx_uri AS u, sidx_uri AS u,
sidx_blake3 AS b sidx_blob AS b
ON ON
s.uri_id = u.id s.uri_id = u.id
AND s.blake3_id = b.id AND s.blob_id = b.id
WHERE WHERE
u.uri = ? u.uri = ?
ORDER BY ORDER BY

View file

@ -1,4 +1,4 @@
INSERT INTO sidx_blake3(blake3, n_bytes) INSERT INTO sidx_blob(blake3, n_bytes)
VALUES VALUES
(?, ?) (?, ?)
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;