From eefb0008655d11fb0f78a661aa9bce633215d791 Mon Sep 17 00:00:00 2001 From: Val Packett Date: Fri, 8 Aug 2025 02:51:30 -0300 Subject: [PATCH] Implement FileChooser portal proxy with virtiofsd mount --- flake.nix | 18 +- sidebus-broker/src/main.rs | 154 +++++++++-- sidebus-broker/src/portal/documents.rs | 85 ++++++ sidebus-broker/src/portal/file_chooser.rs | 301 ++++++++++++++++++++++ sidebus-broker/src/portal/mod.rs | 3 + sidebus-broker/src/portal/request.rs | 183 +++++++++++++ 6 files changed, 718 insertions(+), 26 deletions(-) create mode 100644 sidebus-broker/src/portal/documents.rs create mode 100644 sidebus-broker/src/portal/file_chooser.rs create mode 100644 sidebus-broker/src/portal/mod.rs create mode 100644 sidebus-broker/src/portal/request.rs diff --git a/flake.nix b/flake.nix index 7b0e010..1aeb30c 100644 --- a/flake.nix +++ b/flake.nix @@ -15,7 +15,13 @@ pkgs = import nixpkgs { inherit system overlays; }; - + + buildEnvVars = { + BIN_XDG_PERMISSION_STORE = "${pkgs.xdg-desktop-portal}/libexec/xdg-permission-store"; + BIN_XDG_DOCUMENT_PORTAL = "${pkgs.xdg-desktop-portal}/libexec/xdg-document-portal"; + BIN_VIRTIOFSD = "${pkgs.virtiofsd}/bin/virtiofsd"; + }; + rustToolchain = pkgs.pkgsBuildHost.rust-bin.fromRustupToolchainFile ./rust-toolchain.toml; rustPlatform = pkgs.makeRustPlatform { cargo = rustToolchain; @@ -32,11 +38,13 @@ "busd-0.4.0" = "sha256-UzTclEJ8lRMmiuLJJi+gsm7vkx+MLfnDdi5s9OVT1HE="; }; buildAndTestSubdir = crate; + env = buildEnvVars; }; in { devShells.default = pkgs.mkShell { buildInputs = [ rustToolchain ]; + env = buildEnvVars; }; packages.sidebus-agent = rustPackage "sidebus-agent"; @@ -58,6 +66,14 @@ }; documentation = [ "https://git.clan.lol/valpackett/sidebus" ]; }; + systemd.mounts = [ + { + type = "virtiofs"; + what = "vm-doc-portal"; + where = "/run/vm-doc-portal"; + wantedBy = [ "multi-user.target" ]; + } + ]; }; } ); diff --git a/sidebus-broker/src/main.rs b/sidebus-broker/src/main.rs index bda6285..7a2247d 100644 --- a/sidebus-broker/src/main.rs +++ b/sidebus-broker/src/main.rs @@ -1,10 +1,13 @@ mod bus; +mod portal; mod vsock; use bus::SharedHostedBus; use clap::Parser; -use std::sync::Arc; -use tokio::sync::Mutex; +use std::{path::PathBuf, sync::Arc}; +use tokio::{net::UnixListener, process::Command, sync::Mutex}; +use tracing::error; +use zbus::names::WellKnownName; // https://github.com/rust-lang/rfcs/issues/2407#issuecomment-385291238 macro_rules! enclose { @@ -18,39 +21,140 @@ macro_rules! enclose { #[derive(Parser)] #[command(version, about, long_about = None)] -struct BrokerCli {} +struct BrokerCli { + /// Create unix socket listeners for all internal busses in the provided directory + #[clap(long)] + debug_access: Option, + + /// Private XDG_RUNTIME_DIR for the VM ('./doc' under it will be used as the document portal FS mountpoint, './fs.sock' will be the FS) + #[clap(long)] + runtime_dir: PathBuf, + + /// Absolute path where the './doc' under the runtime-dir would be mounted over virtiofs in the guest + #[clap(long, default_value = "/run/vm-doc-portal")] + guest_mountpoint: PathBuf, + + /// Vsock port number to listen on + #[clap(long)] + vsock_port: u32, +} + +async fn new_hosted_bus() -> eyre::Result<( + Arc>, + zbus::OwnedGuid, + bus::NameOwnerStream, +)> { + let mut bus = bus::HostedBus::new().await?; + let guid = bus.server_guid().to_owned().into(); + let owner_stream = bus.name_owner_changes().await?; + + Ok((Arc::new(Mutex::new(bus)), guid, owner_stream)) +} #[tokio::main] async fn main() -> eyre::Result<()> { tracing_subscriber::fmt::init(); - let _cli = BrokerCli::parse(); + let cli = BrokerCli::parse(); - let vm_bus = bus::HostedBus::new().await?; - let vm_bus_guid: zbus::OwnedGuid = vm_bus.server_guid().to_owned().into(); - let vm_bus = Arc::new(Mutex::new(vm_bus)); + let (vm_bus, vm_bus_guid, _) = new_hosted_bus().await?; + let (priv_bus, _, mut priv_lst) = new_hosted_bus().await?; - // Direct access for the host (just trying things out) - tokio::spawn( - vm_bus - .clone() - .run_unix_listener(tokio::net::UnixListener::bind("vmbus.sock")?), - ); + if let Some(dir_path) = cli.debug_access { + if !dir_path.is_dir() { + error!(path = %dir_path.display(), "--debug-access path is not an existing directory"); + std::process::exit(1); + } + let vm_dbg_listener = UnixListener::bind(dir_path.join("vm.sock"))?; + let _vm_dbg_task = tokio::spawn(vm_bus.clone().run_unix_listener(vm_dbg_listener)); + let priv_dbg_listener = UnixListener::bind(dir_path.join("priv.sock"))?; + let _priv_dbg_task = tokio::spawn(priv_bus.clone().run_unix_listener(priv_dbg_listener)); + // TODO: unlink sockets on exit + } + + std::fs::create_dir_all(&cli.runtime_dir)?; + + let _xps = priv_bus + .clone() + .spawn_external_client( + Command::new(env!("BIN_XDG_PERMISSION_STORE")) + .env("XDG_RUNTIME_DIR", cli.runtime_dir.as_os_str()) + .kill_on_drop(true), + ) + .await?; + + let impl_permission_store = + WellKnownName::from_static_str("org.freedesktop.impl.portal.PermissionStore")?.into(); + priv_lst.wait_for_acquisition(impl_permission_store).await?; + + let _xdp = priv_bus + .clone() + .spawn_external_client( + Command::new(env!("BIN_XDG_DOCUMENT_PORTAL")) + .env("XDG_RUNTIME_DIR", cli.runtime_dir.as_os_str()) + .kill_on_drop(true), + ) + .await?; + + let portal_documents = + WellKnownName::from_static_str("org.freedesktop.portal.Documents")?.into(); + priv_lst.wait_for_acquisition(portal_documents).await?; + + let _vfs = Command::new(env!("BIN_VIRTIOFSD")) + .args(&[ + "--shared-dir", + cli.runtime_dir.join("doc").to_str().unwrap(), + "--socket-path", + cli.runtime_dir.join("fs.sock").to_str().unwrap(), + "--uid-map", + ":1000:1001:1:", + "--gid-map", + ":100:100:1:", + "--log-level", + "debug", + ]) + .env("XDG_RUNTIME_DIR", cli.runtime_dir.as_os_str()) + .kill_on_drop(true) + .spawn(); + // TODO: die when it exits + + let vm_bus_conn = vm_bus.lock().await.connect_channel(false).await?; + let priv_bus_conn = priv_bus.lock().await.connect_channel(false).await?; + let host_session_conn = zbus::connection::Builder::session()?.build().await?; + let file_chooser_imp = portal::file_chooser::FileChooser::new( + &host_session_conn, + &priv_bus_conn, + cli.guest_mountpoint, + ) + .await?; + vm_bus_conn + .request_name("org.freedesktop.portal.Desktop") + .await?; + let true = vm_bus_conn + .object_server() + .at("/org/freedesktop/portal/desktop", file_chooser_imp) + .await? + else { + unreachable!("our own fresh bus can't have interfaces already provided"); + }; // TODO: modprobe vhost_vsock first! // NOTE: Every individual D-Bus client inside of the VM is a new client here! - vsock::ListenerBuilder::new(vsock::VsockAddr::new(vsock::VMADDR_CID_HOST, 4269)) - .with_label("VM Bus") - .listen(move |client| { - enclose! { (vm_bus, vm_bus_guid) async move { - // TODO: Not necessary to go through the channel, add vsock support to the Peer too - let client_conn = client.build((&vm_bus_guid).into()).await?; - let vmbus_conn = vm_bus.lock().await.connect_channel(true).await?; - sidebus_common::raw::splice_conns(client_conn, vmbus_conn).await; - Ok(()) - } } - }) - .await?; + vsock::ListenerBuilder::new(vsock::VsockAddr::new( + vsock::VMADDR_CID_HOST, + cli.vsock_port, + )) + .with_label("VM Bus") + .listen(move |client| { + enclose! { (vm_bus, vm_bus_guid) async move { + // TODO: Not necessary to go through the channel, add vsock support to the Peer too + let client_conn = client.build((&vm_bus_guid).into()).await?; + let vmbus_conn = vm_bus.lock().await.connect_channel(true).await?; + sidebus_common::raw::splice_conns(client_conn, vmbus_conn).await; + Ok(()) + } } + }) + .await?; Ok(()) } diff --git a/sidebus-broker/src/portal/documents.rs b/sidebus-broker/src/portal/documents.rs new file mode 100644 index 0000000..9fca732 --- /dev/null +++ b/sidebus-broker/src/portal/documents.rs @@ -0,0 +1,85 @@ +use std::collections::HashMap; + +use zbus::{proxy, zvariant}; + +#[proxy( + interface = "org.freedesktop.portal.Documents", + default_service = "org.freedesktop.portal.Documents", + default_path = "/org/freedesktop/portal/documents" +)] +pub trait Documents { + /// Add method + fn add( + &self, + o_path_fd: zvariant::Fd<'_>, + reuse_existing: bool, + persistent: bool, + ) -> zbus::Result; + + /// AddFull method + fn add_full( + &self, + o_path_fds: &[zvariant::Fd<'_>], + flags: u32, + app_id: &str, + permissions: &[&str], + ) -> zbus::Result<(Vec, HashMap)>; + + /// AddNamed method + fn add_named( + &self, + o_path_parent_fd: zvariant::Fd<'_>, + filename: &[u8], + reuse_existing: bool, + persistent: bool, + ) -> zbus::Result; + + /// AddNamedFull method + #[allow(clippy::too_many_arguments)] + fn add_named_full( + &self, + o_path_fd: zvariant::Fd<'_>, + filename: &[u8], + flags: u32, + app_id: &str, + permissions: &[&str], + ) -> zbus::Result<(String, HashMap)>; + + /// Delete method + fn delete(&self, doc_id: &str) -> zbus::Result<()>; + + /// GetHostPaths method + fn get_host_paths(&self, doc_ids: &[&str]) -> zbus::Result>>; + + /// GetMountPoint method + fn get_mount_point(&self) -> zbus::Result>; + + /// GrantPermissions method + fn grant_permissions( + &self, + doc_id: &str, + app_id: &str, + permissions: &[&str], + ) -> zbus::Result<()>; + + /// Info method + fn info(&self, doc_id: &str) -> zbus::Result<(Vec, HashMap>)>; + + /// List method + fn list(&self, app_id: &str) -> zbus::Result>>; + + /// Lookup method + fn lookup(&self, filename: &[u8]) -> zbus::Result; + + /// RevokePermissions method + fn revoke_permissions( + &self, + doc_id: &str, + app_id: &str, + permissions: &[&str], + ) -> zbus::Result<()>; + + /// version property + #[zbus(property, name = "version")] + fn version(&self) -> zbus::Result; +} diff --git a/sidebus-broker/src/portal/file_chooser.rs b/sidebus-broker/src/portal/file_chooser.rs new file mode 100644 index 0000000..b3a0ff9 --- /dev/null +++ b/sidebus-broker/src/portal/file_chooser.rs @@ -0,0 +1,301 @@ +use std::collections::HashMap; +use std::os::{fd::AsFd as _, unix::ffi::OsStrExt as _}; +use std::path::PathBuf; + +use tracing::{debug, error, warn}; +use zbus::{Connection, ObjectServer, fdo::Result, zvariant}; + +use super::documents::DocumentsProxy; +use super::request::{RESPONSE_SUCCESS, ReqHandler, ResultTransformer}; + +pub struct FileChooser { + host: FileChooserProxy<'static>, + docs: DocumentsProxy<'static>, + guest_root: PathBuf, +} + +impl FileChooser { + pub async fn new( + host_session_conn: &Connection, + priv_conn: &Connection, + guest_root: PathBuf, + ) -> Result { + let host = FileChooserProxy::builder(host_session_conn).build().await?; + let docs = DocumentsProxy::builder(priv_conn).build().await?; + Ok(FileChooser { + host, + docs, + guest_root, + }) + } +} + +#[zbus::interface( + name = "org.freedesktop.portal.FileChooser", + proxy( + default_service = "org.freedesktop.portal.Desktop", + default_path = "/org/freedesktop/portal/desktop" + ) +)] +impl FileChooser { + async fn open_file( + &self, + #[zbus(header)] hdr: zbus::message::Header<'_>, + #[zbus(object_server)] server: &ObjectServer, + #[zbus(connection)] conn: &zbus::Connection, + parent_window: &str, + title: &str, + options: HashMap<&str, zvariant::Value<'_>>, + ) -> Result { + ReqHandler::prepare(&self.host, hdr, server, conn, &options) + .with_transform(FileTransformer { + docs: self.docs.clone(), + guest_root: self.guest_root.clone(), + for_save: false, + directory: options.get_as("directory")?.unwrap_or(false), + }) + .perform(async || self.host.open_file(parent_window, title, options).await) + .await + } + + async fn save_file( + &self, + #[zbus(header)] hdr: zbus::message::Header<'_>, + #[zbus(object_server)] server: &ObjectServer, + #[zbus(connection)] conn: &zbus::Connection, + parent_window: &str, + title: &str, + options: HashMap<&str, zvariant::Value<'_>>, + ) -> Result { + ReqHandler::prepare(&self.host, hdr, server, conn, &options) + .with_transform(FileTransformer { + docs: self.docs.clone(), + guest_root: self.guest_root.clone(), + for_save: true, + directory: false, + }) + .perform(async || self.host.save_file(parent_window, title, options).await) + .await + } + + async fn save_files( + &self, + #[zbus(header)] hdr: zbus::message::Header<'_>, + #[zbus(object_server)] server: &ObjectServer, + #[zbus(connection)] conn: &zbus::Connection, + parent_window: &str, + title: &str, + options: HashMap<&str, zvariant::Value<'_>>, + ) -> Result { + ReqHandler::prepare(&self.host, hdr, server, conn, &options) + .with_transform(FileTransformer { + docs: self.docs.clone(), + guest_root: self.guest_root.clone(), + for_save: true, + directory: false, + }) + .perform(async || self.host.save_files(parent_window, title, options).await) + .await + } + + /// version property + #[zbus(property, name = "version")] + fn version(&self) -> Result { + Ok(5) + } +} + +struct FileTransformer { + docs: DocumentsProxy<'static>, + guest_root: PathBuf, + for_save: bool, + directory: bool, +} + +// ref: send_response_in_thread_func +// https://github.com/flatpak/xdg-desktop-portal/blob/d037b5c3f91b68ca208a9a41b6e18e6a3a659e05/src/file-chooser.c#L70C1-L70C29 + +impl ResultTransformer for FileTransformer { + async fn apply<'a>( + self, + response: u32, + mut results: HashMap<&'a str, zvariant::Value<'a>>, + ) -> Result<(u32, HashMap<&'a str, zvariant::Value<'a>>)> { + if response != RESPONSE_SUCCESS { + debug!(?response, ?results, "non-success, not transforming"); + return Ok((response, results)); + } + + let guest_uris = results + .get_required_as::("uris")? + .into_iter() + .flat_map(uri_to_path) + .async_map(|u| self.add_path_as_doc(u)) + .await + .flatten() + .collect::>(); + + results.insert("uris", guest_uris.into()); + Ok((response, results)) + } +} + +const REUSE_EXISTING: u32 = 1 << 0; +const PERSISTENT: u32 = 1 << 1; +const AS_NEEDED_BY_APP: u32 = 1 << 2; +const DIRECTORY: u32 = 1 << 3; + +// ref: xdp_register_document +// https://github.com/flatpak/xdg-desktop-portal/blob/10e712e06aa8eb9cd0e59c73c5be62ba53e981a4/src/xdp-documents.c#L71 + +impl FileTransformer { + async fn add_path_as_doc(&self, path: PathBuf) -> Option { + use rustix::fs::{Mode, OFlags}; + + let o_path_fd = match rustix::fs::open( + if self.for_save { path.parent()? } else { &path }, + OFlags::CLOEXEC | OFlags::PATH, + Mode::empty(), + ) { + Ok(fd) => fd, + Err(err) => { + warn!(%err, ?path, "could not open path descriptor"); + return None; + } + }; + + let flags = REUSE_EXISTING + | PERSISTENT + | AS_NEEDED_BY_APP + | if self.directory { DIRECTORY } else { 0 }; + + // XXX: portal impl can return writable=false but host frontend does not pass that back.. + // https://github.com/flatpak/xdg-desktop-portal/discussions/1763 + let permissions = &["read", "write", "grant-permissions"][..]; + + let filename = path.file_name()?; + debug!( + ?path, + ?filename, + ?o_path_fd, + ?flags, + ?permissions, + "adding path to doc portal" + ); + let app_id = ""; // host + let doc_id = match if self.for_save { + let filename_c = std::ffi::CString::new(filename.as_bytes()).ok()?; + self.docs + .add_named_full( + o_path_fd.as_fd().into(), + filename_c.as_bytes_with_nul(), + flags, + app_id, + permissions, + ) + .await + .map(|(doc_id, m)| (Some(doc_id), m)) + } else { + self.docs + .add_full(&[o_path_fd.as_fd().into()], flags, app_id, permissions) + .await + .map(|(mut doc_ids, m)| (doc_ids.pop(), m)) + } { + Ok((Some(v), _)) => v, + Ok((None, _)) => { + warn!(?filename, "adding doc to portal gave no ids"); + return None; + } + Err(err) => { + warn!(?err, ?filename, "could not add doc to portal"); + return None; + } + }; + let path = self.guest_root.join(doc_id).join(filename); + match url::Url::from_file_path(&path) { + Ok(url) => Some(url.to_string()), + Err(err) => { + warn!(?err, ?path, "could not make url from returned path"); + None + } + } + } +} + +fn uri_to_path(v: &zvariant::Value<'_>) -> Option { + let url_str = match v.downcast_ref::() { + Ok(sv) => sv, + Err(err) => { + warn!(%err, ?v, "option 'uris' contains non-string?"); + return None; + } + }; + let url = match url::Url::parse(url_str.as_str()) { + Ok(u) => u, + Err(err) => { + warn!(%err, %url_str, "option 'uris' contains non-parseable uri"); + return None; + } + }; + if url.scheme() != "file" { + warn!(%url, "skipping non-file uri"); + return None; + } + Some(PathBuf::from(url.path())) +} + +trait MapExt<'a> { + fn get_as(&'a self, key: &'a str) -> Result> + where + T: TryFrom<&'a zvariant::Value<'a>>, + >>::Error: std::fmt::Display; + + fn get_required_as(&'a self, key: &'a str) -> Result + where + T: TryFrom<&'a zvariant::Value<'a>>, + >>::Error: std::fmt::Display, + { + self.get_as(key).and_then(|o| { + o.ok_or_else(|| { + error!(%key, "options get_as, missing"); + zbus::fdo::Error::Failed(format!("option '{key}' missing")) + }) + }) + } +} + +impl<'a> MapExt<'a> for HashMap<&'a str, zvariant::Value<'a>> { + fn get_as(&'a self, key: &str) -> Result> + where + T: TryFrom<&'a zvariant::Value<'a>>, + >>::Error: std::fmt::Display, + { + self.get(key) + .map(|v| { + // inlined downcast_ref + if let zvariant::Value::Value(v) = v { + ::try_from(v) + } else { + ::try_from(v) + } + }) + .transpose() + .map_err(|err| { + error!(%err, %key, "options get_as"); + zbus::fdo::Error::Failed(format!("option '{key}' type mismatch")) + }) + } +} + +trait IterAsyncExt: Iterator { + async fn async_map(self, f: F) -> impl Iterator + where + Self: Sized, + F: FnMut(Self::Item) -> FU, + FU: Future, + { + futures::future::join_all(self.map(f)).await.into_iter() + } +} + +impl IterAsyncExt for T {} diff --git a/sidebus-broker/src/portal/mod.rs b/sidebus-broker/src/portal/mod.rs new file mode 100644 index 0000000..69aa6be --- /dev/null +++ b/sidebus-broker/src/portal/mod.rs @@ -0,0 +1,3 @@ +pub mod documents; +pub mod file_chooser; +pub mod request; diff --git a/sidebus-broker/src/portal/request.rs b/sidebus-broker/src/portal/request.rs new file mode 100644 index 0000000..31ff978 --- /dev/null +++ b/sidebus-broker/src/portal/request.rs @@ -0,0 +1,183 @@ +use std::{collections::HashMap, ops::Deref as _}; + +use tokio_stream::StreamExt as _; +use tracing::{Instrument, debug, debug_span, error, trace, warn}; +use zbus::{ + Connection, ObjectServer, + fdo::Result, + names::OwnedUniqueName, + object_server::SignalEmitter, + proxy::ProxyImpl, + zvariant::{self, OwnedObjectPath}, +}; + +pub const RESPONSE_SUCCESS: u32 = 0; +// pub const RESPONSE_CANCELLED: u32 = 1; +pub const RESPONSE_OTHER: u32 = 2; + +/// A handler for the org.freedesktop.portal.Request interface, proxying to another +/// instance of the same interface. +struct Request { + host: RequestProxy<'static>, +} + +#[zbus::interface( + name = "org.freedesktop.portal.Request", + proxy(default_service = "org.freedesktop.portal.Desktop") +)] +impl Request { + #[zbus(signal)] + async fn response( + signal_emitter: &SignalEmitter<'_>, + response: u32, + results: HashMap<&str, zvariant::Value<'_>>, + ) -> zbus::Result<()>; + + async fn close(&self) -> Result<()> { + self.host.close().await + } +} + +pub trait ResultTransformer { + fn apply<'a>( + self, + response: u32, + results: HashMap<&'a str, zvariant::Value<'a>>, + ) -> impl std::future::Future>)>> + + std::marker::Send; +} + +impl ResultTransformer for () { + async fn apply<'a>( + self, + response: u32, + results: HashMap<&'a str, zvariant::Value<'a>>, + ) -> Result<(u32, HashMap<&'a str, zvariant::Value<'a>>)> { + Ok((response, results)) + } +} + +pub struct ReqHandler { + token: String, + sender: Option, + conn: Connection, + server: ObjectServer, + host_conn: Connection, + transformer: T, +} + +impl ReqHandler<()> { + pub fn prepare<'a>( + host: &impl ProxyImpl<'a>, + hdr: zbus::message::Header<'_>, + server: &ObjectServer, + conn: &Connection, + options: &HashMap<&str, zvariant::Value<'_>>, + ) -> Self { + ReqHandler { + token: get_token(options), + sender: hdr.sender().map(|s| s.to_owned().into()), + conn: conn.to_owned(), + server: server.to_owned(), + host_conn: host.inner().connection().to_owned(), + transformer: (), + } + } +} + +impl ReqHandler { + pub fn with_transform(self, transformer: T1) -> ReqHandler { + ReqHandler { + transformer, + token: self.token, + sender: self.sender, + conn: self.conn, + server: self.server, + host_conn: self.host_conn, + } + } +} + +impl ReqHandler { + pub async fn perform( + self, + call: impl AsyncFnOnce() -> Result, + ) -> Result { + let sender = self.sender.ok_or_else(|| zbus::Error::MissingField)?; + let sender_str = sender.trim_start_matches(':').replace('.', "_"); + let token = self.token; + let path = zvariant::ObjectPath::try_from(format!( + "/org/freedesktop/portal/desktop/request/{sender_str}/{token}" + )) + .map_err(zbus::Error::from)?; + + let host_path = call().await?; + let imp = Request { + host: RequestProxy::builder(&self.host_conn) + .path(host_path)? + .build() + .await?, + }; + let stream = imp.host.receive_response().await?; + if !self.server.at(&path, imp).await? { + return Err(zbus::fdo::Error::Failed( + "Duplicate request object path".to_owned(), + )); + } + + let path_1: OwnedObjectPath = path.clone().into(); + let sender = sender.to_owned().into(); + tokio::spawn( + forward_response(stream, self.conn.clone(), path_1, sender, self.transformer) + .instrument(debug_span!("response proxy", ?path)), + ); + Ok(path.into()) + } +} + +fn get_token(options: &HashMap<&str, zvariant::Value<'_>>) -> String { + match options.get("handle_token") { + Some(zvariant::Value::Str(str)) => { + trace!("extracted token from handle_token option"); + return String::from(str.deref()); + } + Some(value) => warn!(?value, "handle_token option provided but not a string"), + None => trace!("handle_token not provided"), + }; + use rand::distr::{Alphanumeric, SampleString}; + Alphanumeric.sample_string(&mut rand::rng(), 16) +} + +async fn forward_response( + mut stream: ResponseStream, + conn: Connection, + path: zvariant::OwnedObjectPath, + sender: zbus::names::OwnedUniqueName, + transform: impl ResultTransformer, +) -> Result<()> { + let signal_emitter = SignalEmitter::new(&conn, path)? + .set_destination(zbus::names::BusName::Unique(sender.into())) + .into_owned(); + let Some(resp) = stream.next().await else { + debug!("response stream gone"); + return Ok(()); + }; + debug!(?resp, "got resp"); + let (response, results) = match resp.0.deserialize() { + Ok((response, results)) => match transform.apply(response, results).await { + Ok(res) => res, + Err(err) => { + error!(%err, "transform error"); + (RESPONSE_OTHER, HashMap::new()) + } + }, + Err(err) => { + error!(%err, "signal body type mismatch"); + (RESPONSE_OTHER, HashMap::new()) + } + }; + if let Err(err) = Request::response(&signal_emitter, response, results).await { + error!(%err, "signal forwarding failed"); + } + Ok(()) +}