use std::{ collections::HashMap, os::fd::{AsFd, AsRawFd}, os::unix::ffi::OsStrExt, path::PathBuf, }; use tokio::sync::broadcast; use tokio_stream::StreamExt; use tracing::{debug, error}; use zbus::{ Connection, fdo::Result, names::OwnedUniqueName, object_server::SignalEmitter, zvariant, }; use super::{documents::DocumentsProxy, file_chooser::FileTransformer}; #[derive(Clone)] pub struct FileTransfer { host: FileTransferProxy<'static>, file_transformer: FileTransformer, tx: broadcast::Sender, path_prefix_to_host: Vec<(PathBuf, PathBuf)>, } #[derive(Clone, Debug)] enum ForwarderCommand { Add(String, OwnedUniqueName), // Remove(String), } #[zbus::interface( name = "org.freedesktop.portal.FileTransfer", proxy( default_service = "org.freedesktop.portal.Documents", default_path = "/org/freedesktop/portal/documents" ) )] impl FileTransfer { async fn add_files( &self, key: &str, fds: Vec>, options: HashMap<&str, zvariant::Value<'_>>, ) -> Result<()> { let mut host_paths = Vec::with_capacity(fds.len()); for fd in fds.iter() { let link = rustix::fs::readlink( format!("/proc/self/fd/{}", fd.as_fd().as_raw_fd()), Vec::new(), ) .map_err(|e| zbus::fdo::Error::Failed(e.to_string()))?; let guest_path = std::path::PathBuf::from(std::ffi::OsStr::from_bytes( &link.to_string_lossy().as_bytes(), )); let (prefix, host_prefix) = self .path_prefix_to_host .iter() .find(|(prefix, _)| guest_path.starts_with(prefix)) .ok_or_else(|| { zbus::fdo::Error::Failed("Could not find host mapping for path".to_owned()) })?; let guest_suffix = guest_path .strip_prefix(prefix) .map_err(|e| zbus::fdo::Error::Failed(e.to_string()))?; let host_path = if guest_suffix.as_os_str().is_empty() { // Edge case: a bind-mounted file exposed at the same path would get an extra '/' after its path host_prefix.to_path_buf() } else { host_prefix.join(guest_suffix) }; debug!( ?guest_path, ?prefix, ?guest_suffix, ?host_prefix, ?host_path, "mapped path" ); let path_fd = rustix::fs::open( host_path, rustix::fs::OFlags::PATH, rustix::fs::Mode::empty(), ) .map_err(|e| zbus::fdo::Error::Failed(e.to_string()))?; host_paths.push(path_fd.into()); // OwnedFd variant of zbus's Fd enum, so still owned by the Vec } self.host.add_files(key, host_paths, options).await } async fn retrieve_files( &self, key: &str, options: HashMap<&str, zvariant::Value<'_>>, ) -> Result> { let host_paths = self.host.retrieve_files(key, options).await?; let mut result = Vec::with_capacity(host_paths.len()); for host_path in host_paths { if let Some(guest_path) = self .file_transformer .add_path_as_doc(PathBuf::from(&host_path)) .await { result.push(guest_path.to_string_lossy().into_owned()); } else { debug!(%host_path, "could not add path as doc to retrieve file"); } } Ok(result) } async fn start_transfer( &self, #[zbus(header)] hdr: zbus::message::Header<'_>, options: HashMap<&str, zvariant::Value<'_>>, ) -> Result { let sender = hdr .sender() .ok_or_else(|| zbus::Error::MissingField)? .to_owned(); let key = self.host.start_transfer(options).await?; debug!(%key, %sender, "started transfer"); if let Err(err) = self .tx .send(ForwarderCommand::Add(key.clone(), sender.into())) { error!(?err, "file_transfer internal channel error"); return Err(zbus::fdo::Error::IOError("channel error".to_owned())); } Ok(key) } async fn stop_transfer(&self, key: &str) -> Result<()> { debug!(%key, "stopping transfer"); self.host.stop_transfer(key).await } #[zbus(signal)] async fn transfer_closed(signal_emitter: &SignalEmitter<'_>, key: &str) -> zbus::Result<()>; #[zbus(property, name = "version")] fn version(&self) -> Result { Ok(1) } } impl FileTransfer { pub async fn new( host_session_conn: &Connection, priv_conn: &Connection, guest_root: PathBuf, path_prefix_to_host: Vec<(PathBuf, PathBuf)>, ) -> Result { let host = FileTransferProxy::builder(host_session_conn) .build() .await?; let docs = DocumentsProxy::builder(priv_conn).build().await?; let file_transformer = FileTransformer { docs, guest_root, for_save: false, persistent: false, directory: false, }; let (tx, _) = broadcast::channel(8); Ok(FileTransfer { host, file_transformer, tx, path_prefix_to_host, }) } pub async fn forward_transfer_closed( &self, mut signal_emitter: SignalEmitter<'static>, ) -> Result<()> { let mut stream = self.host.receive_transfer_closed().await?; let mut cmds = self.tx.subscribe(); let mut receivers = HashMap::new(); loop { tokio::select! { Ok(cmd) = cmds.recv() => match cmd { ForwarderCommand::Add(key, receiver) => { receivers.insert(key, receiver); }, // ForwarderCommand::Remove(key) => { receivers.remove(&key); }, }, Some(signal) = stream.next() => { debug!(?signal, "transfer closed"); if let Ok((key,)) = signal.0.deserialize::<(&str,)>() { if let Some(bus_name) = receivers.remove(key) { signal_emitter = signal_emitter.set_destination(zbus::names::BusName::Unique(bus_name.clone().into())); if let Err(err) = FileTransfer::transfer_closed(&signal_emitter, key).await { error!(?err, %key, "could not forward signal"); } } else { error!(%key, "got a signal for unknown key"); } } else { error!("could not deserialize transfer closed signal"); }; } } } } }