cuda: support cudnn manifests
Also changes the schemas a bit (still don't care about migrations), switches to named_params!, fixes up perf issues when ingesting manifests and/or hrefs into sqlite, etc. Adds sample queries such as "CudaArtifact conflicts" to datasette config, which explain some issues associated with choosing an evalModules schema on the cudaPackages side.
This commit is contained in:
parent
26538edf4f
commit
c8b8b56456
15 changed files with 770 additions and 73 deletions
509
src/main.rs
509
src/main.rs
|
@ -1,23 +1,28 @@
|
|||
use std::collections::HashSet;
|
||||
use std::path::{absolute, PathBuf};
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::marker::PhantomData;
|
||||
use std::path::{PathBuf, absolute};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::{anyhow, Error};
|
||||
use anyhow::{Error, anyhow};
|
||||
use clap::Parser;
|
||||
use clap::Subcommand;
|
||||
use futures::{stream, StreamExt, TryStreamExt};
|
||||
use rusqlite::{params, OptionalExtension};
|
||||
use futures::{StreamExt, TryStreamExt, stream};
|
||||
use rusqlite::fallible_iterator::FallibleIterator as _;
|
||||
use rusqlite::{OptionalExtension, named_params, params};
|
||||
use scraper::{Html, Selector};
|
||||
use serde::de::{self, Visitor};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snix_castore::B3Digest;
|
||||
use snix_castore::blobservice::BlobService;
|
||||
use snix_castore::directoryservice::DirectoryService;
|
||||
use snix_castore::B3Digest;
|
||||
use snix_castore::{blobservice, directoryservice, import::fs::ingest_path};
|
||||
use std::sync::Mutex;
|
||||
use tokio::io::{AsyncReadExt, BufReader};
|
||||
use tokio::sync::mpsc::{channel, Sender};
|
||||
use tokio::sync::Semaphore;
|
||||
use tokio::sync::mpsc::{Sender, channel};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use url::Url;
|
||||
|
||||
|
@ -130,6 +135,12 @@ enum Command {
|
|||
#[clap(value_parser, num_args = 1)]
|
||||
url: Vec<Url>,
|
||||
},
|
||||
DemoCudaManifest,
|
||||
FormatCudaManifest,
|
||||
ProcessCudaManifests {
|
||||
#[clap(short, long, action)]
|
||||
include_finished: bool,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
|
@ -175,6 +186,10 @@ async fn open_context(
|
|||
}
|
||||
|
||||
let con = rusqlite::Connection::open(&db_path).expect("Failed to construct Database object");
|
||||
con.pragma_update(None, "jorunal_mode", "wal").unwrap();
|
||||
con.pragma_update(None, "synchronous", "normal").unwrap();
|
||||
con.pragma_update(None, "temp_store", "memory").unwrap();
|
||||
con.pragma_update(None, "foreign_keys", "on").unwrap();
|
||||
con.execute_batch(include_str!("q/sidx-init.sql"))
|
||||
.expect("Failed to execute sidx-init.sql");
|
||||
let castore_path = absolute(castore_path).expect("Failed to canonicalize castore_path");
|
||||
|
@ -190,12 +205,12 @@ async fn open_context(
|
|||
let dir_service = directoryservice::from_addr(&std::format!(
|
||||
"objectstore+file://{}",
|
||||
castore_path
|
||||
.join("directory")
|
||||
.join("directories")
|
||||
.to_str()
|
||||
.expect("Path::to_str unexpectedly broken")
|
||||
))
|
||||
.await
|
||||
.expect("Couldn't initialize .castore/directory");
|
||||
.expect("Couldn't initialize .castore/directories");
|
||||
|
||||
SidxContext::<Arc<dyn BlobService>, Arc<dyn DirectoryService>> {
|
||||
refetch,
|
||||
|
@ -208,8 +223,23 @@ async fn open_context(
|
|||
}
|
||||
}
|
||||
|
||||
impl<BS, DS> Drop for SidxContext<BS, DS>
|
||||
where
|
||||
BS: BlobService + Clone,
|
||||
DS: DirectoryService + Clone,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
let con = self
|
||||
.con
|
||||
.lock()
|
||||
.expect("Acquiring mutex for sqlite to run #pragma optimize before exit");
|
||||
con.pragma_update(None, "analysis_limit", 500).unwrap();
|
||||
con.pragma_query(None, "optimize", |_| Ok(())).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS> {
|
||||
async fn db_latest_download(&self, uri: &str) -> Result<Option<Sampled>, Error> {
|
||||
async fn latest_sample(&self, uri: &str) -> Result<Option<Sampled>, Error> {
|
||||
let lock = self.con.lock().unwrap();
|
||||
let mut find_sample = lock
|
||||
.prepare_cached(include_str!("q/latest-download.sql"))
|
||||
|
@ -239,15 +269,22 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
&self,
|
||||
uri: &str,
|
||||
hash: &Option<String>,
|
||||
http_code: Option<u16>,
|
||||
http_code: &Option<u16>,
|
||||
content_type: &Option<String>,
|
||||
) -> Result<(u32, u32), Error> {
|
||||
let lock = self.con.lock().expect("Couldn't lock mutex");
|
||||
let lock = self.con.lock().expect("Locking mutex for db_add_sample");
|
||||
let mut add_sample = lock
|
||||
.prepare_cached(include_str!("q/add-sample.sql"))
|
||||
.context("Failed to prepare add-sample.sql")?;
|
||||
Ok(add_sample.query_row(params![uri, hash, http_code], |row| {
|
||||
<(u32, u32)>::try_from(row)
|
||||
})?)
|
||||
Ok(add_sample.query_row(
|
||||
named_params! {
|
||||
":uri": uri,
|
||||
":hash": hash,
|
||||
":http_code": http_code,
|
||||
":content_type": content_type
|
||||
},
|
||||
|row| <(u32, u32)>::try_from(row),
|
||||
)?)
|
||||
}
|
||||
async fn db_add_blob(&self, hash: &str, n_bytes: u64) -> Result<usize, Error> {
|
||||
let lock = self.con.lock().expect("db_add_blob: couldn't lock mutex?");
|
||||
|
@ -269,6 +306,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
uri: &str,
|
||||
blob: &Option<SizedBlob>,
|
||||
http_code: Option<u16>,
|
||||
content_type: Option<String>,
|
||||
) -> Result<Sampled, Error> {
|
||||
let digest64 = if let Some(SizedBlob { hash, n_bytes }) = blob {
|
||||
let digest64 = format!("{}", hash);
|
||||
|
@ -279,7 +317,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
};
|
||||
self.db_add_uri(&uri).await?;
|
||||
let (sample_id, epoch) = self
|
||||
.db_add_sample(&uri, &digest64, http_code.clone())
|
||||
.db_add_sample(&uri, &digest64, &http_code, &content_type)
|
||||
.await?;
|
||||
Ok(Sampled {
|
||||
sample_id,
|
||||
|
@ -290,7 +328,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
when: SampledWhen::Now,
|
||||
})
|
||||
}
|
||||
async fn download_no_cache(&self, uri: &Url) -> Result<Sampled, Error> {
|
||||
async fn download(&self, uri: &Url) -> Result<Sampled, Error> {
|
||||
let _permit = self.http_semaphore.acquire().await.unwrap();
|
||||
eprintln!("Downloading {:?}", uri.to_string());
|
||||
let uri_s = uri.to_string();
|
||||
|
@ -303,6 +341,11 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
|
||||
let status = res.status();
|
||||
let status_code = status.as_u16();
|
||||
let content_type = res
|
||||
.headers()
|
||||
.get(reqwest::header::CONTENT_TYPE)
|
||||
.and_then(|x| x.to_str().ok())
|
||||
.map(|x| x.to_string());
|
||||
|
||||
if status.is_success() {
|
||||
let mut r = tokio_util::io::StreamReader::new(
|
||||
|
@ -328,20 +371,91 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
n_bytes,
|
||||
}),
|
||||
Some(status_code),
|
||||
content_type,
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
self.record_ingested_node(&uri_s, &None, Some(status_code))
|
||||
self.record_ingested_node(&uri_s, &None, Some(status_code), content_type)
|
||||
.await
|
||||
}
|
||||
}
|
||||
async fn download(&self, uri: &Url) -> Result<Sampled, Error> {
|
||||
if self.refetch {
|
||||
self.download_no_cache(&uri).await
|
||||
async fn ensure_blob(&self, hash: &B3Digest) -> Result<(), Error> {
|
||||
if self
|
||||
.blob_service
|
||||
.has(hash)
|
||||
.await
|
||||
.context("ensure_has() accessing blob_service")?
|
||||
{
|
||||
Ok(())
|
||||
} else {
|
||||
match self.db_latest_download(&uri.to_string()).await? {
|
||||
Some(ingested) => Ok(ingested),
|
||||
None => self.download_no_cache(&uri).await,
|
||||
let b64 = hash.to_string();
|
||||
let uris = {
|
||||
let con = self.con.lock().unwrap();
|
||||
let mut find_uris = con
|
||||
.prepare_cached(include_str!("q/uris-of-hash.sql"))
|
||||
.context("Preparing statement: q/uris-of-hash.sql")
|
||||
.unwrap();
|
||||
find_uris
|
||||
.query(named_params! {":hash": b64, ":limit": 100})?
|
||||
.map(|b| b.get(0))
|
||||
.collect::<Vec<String>>()?
|
||||
};
|
||||
if uris.is_empty() {
|
||||
return Err(anyhow!("No uris recorded for {}", b64));
|
||||
};
|
||||
for uri in uris {
|
||||
let url = match Url::parse(&uri) {
|
||||
Ok(url) => url,
|
||||
Err(_) => continue,
|
||||
};
|
||||
match self
|
||||
.download(&url)
|
||||
.await
|
||||
.context("Redownloading missing blob for ensure_hash")
|
||||
{
|
||||
Ok(Sampled {
|
||||
sample_id: _,
|
||||
uri: _,
|
||||
blob,
|
||||
http_status: _,
|
||||
epoch: _,
|
||||
when: _,
|
||||
}) => {
|
||||
if blob.map_or(false, |sb| sb.hash == *hash) {
|
||||
return Ok(());
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(anyhow!(
|
||||
"All uris for {} are out of date (result in errors or different hashes)",
|
||||
b64
|
||||
))
|
||||
}
|
||||
}
|
||||
async fn ensure_sampled_uri(&self, uri: &Url) -> Result<Sampled, Error> {
|
||||
/* TODO: flatten */
|
||||
if self.refetch {
|
||||
self.download(&uri).await
|
||||
} else {
|
||||
/* TODO: Add negative TTL */
|
||||
match self.latest_sample(&uri.to_string()).await? {
|
||||
Some(ingested) => match ingested.blob.clone() {
|
||||
Some(SizedBlob { hash, n_bytes: _ }) => {
|
||||
if self.blob_service.has(&hash).await? {
|
||||
Ok(ingested)
|
||||
} else {
|
||||
self.download(&uri).await
|
||||
}
|
||||
}
|
||||
None => self.download(&uri).await,
|
||||
},
|
||||
None => self.download(&uri).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -352,7 +466,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
|
||||
async move {
|
||||
let uri_s = uri.to_string();
|
||||
let latest_download = self.db_latest_download(&uri_s).await?;
|
||||
let latest_download = self.latest_sample(&uri_s).await?;
|
||||
if latest_download.is_some() {
|
||||
return Ok(latest_download);
|
||||
}
|
||||
|
@ -369,6 +483,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
n_bytes: size,
|
||||
}),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.map(Some),
|
||||
|
@ -385,6 +500,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
n_bytes: size,
|
||||
}),
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.map(Some),
|
||||
|
@ -393,7 +509,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
}
|
||||
}
|
||||
}
|
||||
Ingestable::Url(url) => self.download(url).await.map(Some),
|
||||
Ingestable::Url(url) => self.ensure_sampled_uri(url).await.map(Some),
|
||||
}
|
||||
}
|
||||
}))
|
||||
|
@ -422,7 +538,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
html_max_bytes: u64,
|
||||
tx: Sender<FetchListingMessage>,
|
||||
) -> Result<(), Error> {
|
||||
let maybe_root = self.download(&url).await;
|
||||
let maybe_root = self.ensure_sampled_uri(&url).await;
|
||||
if let Err(ref e) = maybe_root {
|
||||
eprintln!("Couldn't download {}: {:?}", url, e);
|
||||
};
|
||||
|
@ -475,7 +591,7 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
let mut stmt =
|
||||
lock.prepare_cached(include_str!("q/add-uri-ref.sql"))?;
|
||||
let digest64 = hash.to_string();
|
||||
stmt.execute(params![digest64, next_url.to_string(), href])?;
|
||||
stmt.execute(named_params! {":source": digest64, ":target": next_url.to_string(), ":why": "href"})?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
|
@ -547,6 +663,136 @@ impl<BS: BlobService + Clone, DS: DirectoryService + Clone> SidxContext<BS, DS>
|
|||
}
|
||||
}
|
||||
|
||||
fn string_or_int<'de, T, D>(deserializer: D) -> Result<T, D::Error>
|
||||
where
|
||||
T: Deserialize<'de> + TryFrom<u64> + FromStr<Err = std::num::ParseIntError>,
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
struct StringOrInt<T>(PhantomData<fn() -> T>);
|
||||
|
||||
impl<'de, T> Visitor<'de> for StringOrInt<T>
|
||||
where
|
||||
T: Deserialize<'de> + TryFrom<u64> + FromStr<Err = std::num::ParseIntError>,
|
||||
{
|
||||
type Value = T;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
|
||||
formatter.write_str("string or int")
|
||||
}
|
||||
|
||||
fn visit_u64<E>(self, value: u64) -> Result<T, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
T::try_from(value).map_err(|_e| de::Error::custom("ignored error"))
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, value: &str) -> Result<T, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
FromStr::from_str(value).map_err(de::Error::custom)
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_any(StringOrInt(PhantomData))
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
struct CudaArtifact {
|
||||
relative_path: String,
|
||||
sha256: String,
|
||||
md5: Option<String>,
|
||||
|
||||
// Tha manifests export size as string instead of number
|
||||
#[serde(deserialize_with = "string_or_int")]
|
||||
size: i64,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(untagged)]
|
||||
enum CudaArtifactsByTag {
|
||||
Single(CudaArtifact),
|
||||
Many {
|
||||
#[serde(flatten)]
|
||||
by_tag: HashMap<String, CudaArtifact>,
|
||||
},
|
||||
}
|
||||
impl IntoIterator for CudaArtifactsByTag {
|
||||
type Item = (Option<String>, CudaArtifact);
|
||||
type IntoIter = std::vec::IntoIter<Self::Item>;
|
||||
|
||||
fn into_iter(self) -> std::vec::IntoIter<Self::Item> {
|
||||
match self {
|
||||
CudaArtifactsByTag::Single(art) => vec![(None, art)].into_iter(),
|
||||
CudaArtifactsByTag::Many { by_tag: by_compat } => by_compat
|
||||
.iter()
|
||||
.map(|(k, x)| (Some(k.clone()), x.clone()))
|
||||
.collect::<Vec<Self::Item>>()
|
||||
.into_iter(),
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
#[serde(untagged)]
|
||||
enum CudaArtifactsByPlatform {
|
||||
Binary {
|
||||
#[serde(flatten)]
|
||||
by_platform: HashMap<String, CudaArtifactsByTag>,
|
||||
},
|
||||
Source {
|
||||
source: CudaArtifact,
|
||||
},
|
||||
}
|
||||
|
||||
impl IntoIterator for CudaArtifactsByPlatform {
|
||||
type Item = (String, Option<String>, CudaArtifact);
|
||||
|
||||
/* TODO: Figure out which is the trait that doesn't involve copying */
|
||||
type IntoIter = std::vec::IntoIter<(String, Option<String>, CudaArtifact)>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
match self {
|
||||
CudaArtifactsByPlatform::Binary { by_platform } => by_platform
|
||||
.iter()
|
||||
.flat_map(|(platform, by_tag)| {
|
||||
by_tag
|
||||
.clone()
|
||||
.into_iter()
|
||||
.map(|(tag, artifact)| (platform.clone(), tag.clone(), artifact))
|
||||
})
|
||||
.collect::<Vec<Self::Item>>()
|
||||
.into_iter(),
|
||||
CudaArtifactsByPlatform::Source { source } => {
|
||||
(vec![("source".to_string(), None, source)]).into_iter()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct CudaJsonPackage {
|
||||
name: Option<String>,
|
||||
license: String,
|
||||
license_path: Option<String>,
|
||||
version: String,
|
||||
|
||||
cuda_variant: Option<Vec<String>>,
|
||||
|
||||
#[serde(flatten)]
|
||||
artifacts: CudaArtifactsByPlatform,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
struct CudaJsonManifest {
|
||||
release_date: Option<String>,
|
||||
release_label: Option<String>,
|
||||
release_product: Option<String>,
|
||||
|
||||
#[serde(flatten)]
|
||||
by_pname: HashMap<String, CudaJsonPackage>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let args = Cli::parse();
|
||||
|
@ -603,6 +849,213 @@ async fn main() {
|
|||
println!("{:?}", url);
|
||||
}
|
||||
}
|
||||
Some(Command::FormatCudaManifest) => {
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(
|
||||
&serde_json::from_reader::<_, CudaJsonManifest>(io::stdin()).unwrap()
|
||||
)
|
||||
.unwrap()
|
||||
);
|
||||
}
|
||||
Some(Command::DemoCudaManifest) => {
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&CudaJsonManifest {
|
||||
release_date: Some("1984-01-01".to_string()),
|
||||
release_label: Some("8.9.x".to_string()),
|
||||
release_product: Some("cudnn".to_string()),
|
||||
by_pname: HashMap::from([
|
||||
(
|
||||
"cudnn".to_string(),
|
||||
CudaJsonPackage {
|
||||
name: Some("cuDNN Library".to_string()),
|
||||
license: "cudnn".to_string(),
|
||||
license_path: Some("bar/foo".to_string()),
|
||||
version: "8.9.7.6".to_string(),
|
||||
cuda_variant: Some(vec!["11".to_string(), "12".to_string()]),
|
||||
artifacts: CudaArtifactsByPlatform::Binary {
|
||||
by_platform: HashMap::from([
|
||||
("x86_64-linux".to_string(),
|
||||
CudaArtifactsByTag::Many {
|
||||
by_tag:
|
||||
HashMap::from([
|
||||
("cuda11"
|
||||
.to_string(),
|
||||
CudaArtifact{
|
||||
relative_path:
|
||||
"kek".to_string(),
|
||||
sha256: "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824".to_string(),
|
||||
md5: Some("5d41402abc4b2a76b9719d911017c592".to_string()),
|
||||
size: 5 })])})
|
||||
|
||||
]),
|
||||
}
|
||||
}),
|
||||
(
|
||||
"cuda_samples".to_string(),
|
||||
CudaJsonPackage {
|
||||
name: Some("NVIDIA cuDNN samples".to_string()),
|
||||
license: "cudnn".to_string(),
|
||||
license_path: Some("foo/bar".to_string()),
|
||||
version: "8.9.7.6".to_string(),
|
||||
cuda_variant: None,
|
||||
artifacts: CudaArtifactsByPlatform::Source {
|
||||
source: CudaArtifact {
|
||||
relative_path: "/biba/boba/fifa".to_string(),
|
||||
sha256: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".to_string(),
|
||||
md5: Some("d41d8cd98f00b204e9800998ecf8427e".to_string()),
|
||||
size: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
),])
|
||||
})
|
||||
.unwrap()
|
||||
);
|
||||
}
|
||||
Some(Command::ProcessCudaManifests { include_finished }) => {
|
||||
let manifests: Vec<(String, String, Option<u64>)> = {
|
||||
let con = ctx.con.lock().unwrap();
|
||||
con.execute_batch(include_str!("q/cuda-init.sql"))
|
||||
.context("q/cuda-init.sql")
|
||||
.unwrap();
|
||||
let mut find_manifests = con
|
||||
.prepare_cached(include_str!("q/find-cuda-manifests.sql"))
|
||||
.context("q/find-cuda-manifests.sql")
|
||||
.unwrap();
|
||||
find_manifests
|
||||
.query(named_params! {":include_finished": include_finished})
|
||||
.context("q/find-cuda-manifests.sql")
|
||||
.unwrap()
|
||||
.map(|row| <(String, String, Option<u64>)>::try_from(row))
|
||||
.collect()
|
||||
.expect("Casting result of q/find-cuda-manifests.sql")
|
||||
};
|
||||
for m in &manifests {
|
||||
let b64 = m.1.clone();
|
||||
let b3 = match B3Digest::from_str(&b64) {
|
||||
Ok(b3) => b3,
|
||||
Err(e) => {
|
||||
eprintln!("Invalid hash recorded for {:?}: {}", m, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if let Err(e) = ctx.ensure_blob(&b3).await {
|
||||
eprintln!("Couldn't provision the blob for {:?}: {}", m, e);
|
||||
continue;
|
||||
};
|
||||
let json = {
|
||||
let mut reader = match ctx.blob_service.open_read(&b3).await {
|
||||
Ok(Some(reader)) => reader,
|
||||
Ok(None) => {
|
||||
eprintln!("Blob doesn't exist after ensure_blob: {:?}", m);
|
||||
continue;
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Couldn't query the blob for {:?}: {}", m, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let mut json = String::new();
|
||||
match reader.read_to_string(&mut json).await {
|
||||
Ok(_) => (),
|
||||
Err(e) => {
|
||||
eprintln!("Couldn't read blob {:?}: {:?}", m, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
json
|
||||
};
|
||||
let parsed: CudaJsonManifest = match serde_json::from_str(&json) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
eprintln!("Couldn't parse JSON for {:?}: {:?}", m, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
{
|
||||
let mut lock = ctx.con.lock().unwrap();
|
||||
let tx = lock.transaction().unwrap();
|
||||
{
|
||||
let mut add_str = tx
|
||||
.prepare_cached(include_str!("q/add-str.sql"))
|
||||
.context("q/add-str.sql")
|
||||
.unwrap();
|
||||
let mut add_hash = tx
|
||||
.prepare_cached(include_str!("q/upsert-blob.sql"))
|
||||
.context("q/upsert-blob.sql")
|
||||
.unwrap();
|
||||
let mut add_manifest = tx
|
||||
.prepare_cached(include_str!("q/add-cuda-manifest.sql"))
|
||||
.context("q/add-cuda-manifest.sql")
|
||||
.unwrap();
|
||||
let mut add_comp = tx
|
||||
.prepare_cached(include_str!("q/add-cuda-artifact.sql"))
|
||||
.context("q/add-cuda-artifact.sql")
|
||||
.unwrap();
|
||||
|
||||
add_hash.execute(params![b64, None::<usize>]).unwrap();
|
||||
for s in vec![
|
||||
&parsed.release_date,
|
||||
&parsed.release_label,
|
||||
&parsed.release_product,
|
||||
] {
|
||||
add_str.execute((s,)).unwrap();
|
||||
}
|
||||
add_manifest
|
||||
.execute(named_params! {
|
||||
":hash": b64,
|
||||
":release_date": parsed.release_date,
|
||||
":release_label": parsed.release_label,
|
||||
":release_product": parsed.release_product,
|
||||
})
|
||||
.context("Executing q/add-cuda-manifest.sql")
|
||||
.unwrap();
|
||||
|
||||
for (pname, pkg) in parsed.by_pname {
|
||||
for (platform, maybe_tag, comp) in pkg.artifacts.into_iter() {
|
||||
let ps = named_params! {
|
||||
":manifest": b64,
|
||||
":name": pkg.name,
|
||||
":pname": pname,
|
||||
":license_name": pkg.license,
|
||||
":license_path": pkg.license_path,
|
||||
":version": pkg.version,
|
||||
":sha256": comp.sha256,
|
||||
":md5": comp.md5,
|
||||
":platform": platform,
|
||||
":relative_path": comp.relative_path,
|
||||
":n_bytes": comp.size,
|
||||
":compat_tag": maybe_tag
|
||||
};
|
||||
for h in &vec![Some(&comp.sha256), comp.md5.as_ref()] {
|
||||
add_hash.execute(params![h, None::<usize>]).unwrap();
|
||||
}
|
||||
for s in &vec![
|
||||
Some(&pname),
|
||||
pkg.name.as_ref(),
|
||||
Some(&pkg.license),
|
||||
pkg.license_path.as_ref(),
|
||||
Some(&pkg.version),
|
||||
Some(&platform.to_string()),
|
||||
Some(&comp.relative_path),
|
||||
maybe_tag.as_ref(),
|
||||
] {
|
||||
add_str.execute(params![s]).unwrap();
|
||||
}
|
||||
add_comp
|
||||
.execute(ps)
|
||||
.context("Executing q/add-cuda-artifact.sql")
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
tx.commit()
|
||||
.expect("Couldn't commit transaction adding manifest or its component");
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue