mod bus; mod portal; mod vsock; use bus::SharedHostedBus; use clap::Parser; use futures::{TryFutureExt, stream::FuturesUnordered}; use std::{path::PathBuf, sync::Arc}; use tokio::{net::UnixListener, process::Command, sync::Mutex}; use tokio_stream::StreamExt as _; use tracing::{Instrument, debug, error, info_span}; use zbus::names::WellKnownName; // https://github.com/rust-lang/rfcs/issues/2407#issuecomment-385291238 macro_rules! enclose { ( ($( $x:ident ),*) $y:expr ) => { { $(let $x = $x.clone();)* $y } }; } #[derive(Parser)] #[command(version, about, long_about = None)] struct BrokerCli { /// Create unix socket listeners for internal busses in the provided directory #[clap(long)] debug_access: Option, /// Private XDG_RUNTIME_DIR for the VM ('./doc' under it will be used as the document portal FS mountpoint, './fs.sock' will be the FS) #[clap(long)] runtime_dir: PathBuf, /// Absolute path where the './doc' under the runtime-dir would be mounted over virtiofs in the guest #[clap(long, default_value = "/run/vm-doc-portal")] guest_mountpoint: PathBuf, /// Vsock port number to listen on for the VM bus #[clap(long)] vsock_port: Option, /// Unix socket path to listen on for the VM bus #[clap(long)] unix_path: Option, } async fn new_hosted_bus() -> eyre::Result<( Arc>, zbus::OwnedGuid, bus::NameOwnerStream, )> { let mut bus = bus::HostedBus::new().await?; let guid = bus.server_guid().to_owned().into(); let owner_stream = bus.name_owner_changes().await?; Ok((Arc::new(Mutex::new(bus)), guid, owner_stream)) } #[tokio::main] async fn main() -> eyre::Result<()> { tracing_subscriber::fmt::init(); let cli = BrokerCli::parse(); let (vm_bus, vm_bus_guid, _) = new_hosted_bus().await?; let (priv_bus, _, mut priv_lst) = new_hosted_bus().await?; let mut server_tasks = tokio::task::JoinSet::new(); let mut child_procs = Vec::new(); if let Some(dir_path) = cli.debug_access { if !dir_path.is_dir() { error!(path = %dir_path.display(), "--debug-access path is not an existing directory"); std::process::exit(1); } let priv_dbg_listener = UnixListener::bind(dir_path.join("priv.sock"))?; server_tasks.spawn( priv_bus .clone() .run_unix_listener(priv_dbg_listener, zbus::AuthMechanism::External), ); let vm_dbg_listener = UnixListener::bind(dir_path.join("vm.sock"))?; server_tasks.spawn( vm_bus .clone() .run_unix_listener(vm_dbg_listener, zbus::AuthMechanism::External), ); // TODO: unlink sockets on exit } std::fs::create_dir_all(&cli.runtime_dir)?; child_procs.push( priv_bus .clone() .spawn_external_client( Command::new(env!("BIN_XDG_PERMISSION_STORE")) .env("XDG_RUNTIME_DIR", cli.runtime_dir.as_os_str()) .kill_on_drop(true), ) .await?, ); let impl_permission_store = WellKnownName::from_static_str("org.freedesktop.impl.portal.PermissionStore")?.into(); priv_lst.wait_for_acquisition(impl_permission_store).await?; child_procs.push( priv_bus .clone() .spawn_external_client( Command::new(env!("BIN_XDG_DOCUMENT_PORTAL")) .env("XDG_RUNTIME_DIR", cli.runtime_dir.as_os_str()) .kill_on_drop(true), ) .await?, ); let portal_documents = WellKnownName::from_static_str("org.freedesktop.portal.Documents")?.into(); priv_lst.wait_for_acquisition(portal_documents).await?; child_procs.push( Command::new(env!("BIN_VIRTIOFSD")) .args(&[ "--shared-dir", cli.runtime_dir.join("doc").to_str().unwrap(), "--socket-path", cli.runtime_dir.join("fs.sock").to_str().unwrap(), "--uid-map", ":1000:1001:1:", "--gid-map", ":100:100:1:", "--log-level", "debug", ]) .env("XDG_RUNTIME_DIR", cli.runtime_dir.as_os_str()) .kill_on_drop(true) .spawn()?, ); let vm_bus_conn = vm_bus.lock().await.connect_channel(false).await?; let priv_bus_conn = priv_bus.lock().await.connect_channel(false).await?; let host_session_conn = zbus::connection::Builder::session()?.build().await?; let file_chooser_imp = portal::file_chooser::FileChooser::new( &host_session_conn, &priv_bus_conn, cli.guest_mountpoint, ) .await?; vm_bus_conn .request_name("org.freedesktop.portal.Desktop") .await?; let true = vm_bus_conn .object_server() .at("/org/freedesktop/portal/desktop", file_chooser_imp) .await? else { unreachable!("our own fresh bus can't have interfaces already provided"); }; // NOTE: Every individual D-Bus client inside of the VM is a new client on the VM bus listeners! if let Some(path) = cli.unix_path { // XXX: going through the channel just to strip fds let vm_unix_listener = UnixListener::bind(path)?; while let Ok((socket, remote_addr)) = vm_unix_listener.accept().await { let f = enclose! { (vm_bus, vm_bus_guid) async move { let client_conn = zbus::connection::Builder::unix_stream(socket) .server(&vm_bus_guid) .unwrap() .p2p() .auth_mechanism(zbus::AuthMechanism::Anonymous) .build() .await?; let vmbus_conn = vm_bus.lock().await.connect_channel(true).await?; sidebus_common::raw::splice_conns(client_conn, vmbus_conn).await; Ok::<(), eyre::Report>(()) } }; tokio::spawn( async { match f.await { Ok(()) => debug!("done with client"), Err(err) => error!(%err, "error dealing with client"), } } .instrument(info_span!("serve", ?remote_addr)), ); } } if let Some(port) = cli.vsock_port { // TODO: modprobe vhost_vsock first! server_tasks.spawn( vsock::ListenerBuilder::new(vsock::VsockAddr::new(vsock::VMADDR_CID_HOST, port)) .with_label("VM Bus") .listen(move |client| { enclose! { (vm_bus, vm_bus_guid) async move { // TODO: Not necessary to go through the channel, add vsock support to the Peer too? let client_conn = client.build((&vm_bus_guid).into()).await?; let vmbus_conn = vm_bus.lock().await.connect_channel(true).await?; sidebus_common::raw::splice_conns(client_conn, vmbus_conn).await; Ok(()) } } }) .map_ok_or_else( |e| { error!("vsock listener: {:?}", e); }, |()| (), ), ); } let mut waiter = child_procs .iter_mut() .map(|child| child.wait()) .collect::>(); debug!("starting.."); tokio::select! { _ = server_tasks.join_all() => debug!("server tasks ended"), res = waiter.next() => debug!(?res, "child process terminated"), _ = tokio::signal::ctrl_c() => debug!("interrupt signal"), }; drop(waiter); for mut child in child_procs { if let Err(e) = child.kill().await { error!(?e, "could not kill process"); } } Ok(()) }