From 4fd76692dbb57aba69e54017f2cae3908f9f7da3 Mon Sep 17 00:00:00 2001 From: Val Packett Date: Fri, 3 Oct 2025 05:13:38 -0300 Subject: [PATCH] Add option to expose a (non-fd-capable) unix socket instead of vsock libkrun does not use vsock on the host, so we need to provide this to work with muvm. --- sidebus-broker/src/bus.rs | 26 +++++++--- sidebus-broker/src/main.rs | 99 +++++++++++++++++++++++++++++--------- 2 files changed, 93 insertions(+), 32 deletions(-) diff --git a/sidebus-broker/src/bus.rs b/sidebus-broker/src/bus.rs index 4607379..b2fd94f 100644 --- a/sidebus-broker/src/bus.rs +++ b/sidebus-broker/src/bus.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; +use std::{hash::DefaultHasher, sync::Arc}; use tokio_stream::StreamExt as _; -use tracing::{debug, trace}; +use tracing::{debug, error, trace}; pub struct HostedBus { peers: Arc, @@ -57,10 +57,14 @@ impl HostedBus { .map_err(|err| eyre::eyre!(Box::new(err))) // https://github.com/eyre-rs/eyre/issues/31 XXX: busd should not use anyhow! } - pub async fn connect_unix(&mut self, socket: tokio::net::UnixStream) -> eyre::Result<()> { + pub async fn connect_unix( + &mut self, + socket: tokio::net::UnixStream, + auth: zbus::AuthMechanism, + ) -> eyre::Result<()> { let id = self.next_id(); self.peers - .add(&self.guid, id, socket.into(), zbus::AuthMechanism::External) + .add(&self.guid, id, socket.into(), auth) .await .map_err(|err| eyre::eyre!(Box::new(err))) } @@ -77,7 +81,7 @@ impl HostedBus { } pub trait SharedHostedBus { - async fn run_unix_listener(self, listener: tokio::net::UnixListener); + async fn run_unix_listener(self, listener: tokio::net::UnixListener, auth: zbus::AuthMechanism); async fn spawn_external_client( self, command: &mut tokio::process::Command, @@ -85,9 +89,15 @@ pub trait SharedHostedBus { } impl SharedHostedBus for Arc> { - async fn run_unix_listener(self, listener: tokio::net::UnixListener) { + async fn run_unix_listener( + self, + listener: tokio::net::UnixListener, + auth: zbus::AuthMechanism, + ) { while let Ok((socket, _remote_addr)) = listener.accept().await { - self.lock().await.connect_unix(socket).await.unwrap() + if let Err(e) = self.lock().await.connect_unix(socket, auth).await { + error!("unix connection: {:?}", e); + } } } @@ -100,7 +110,7 @@ impl SharedHostedBus for Arc> { let abstract_path = format!("/run/sidebus-broker/{}", zbus::Guid::generate()); let listener = tokio::net::UnixListener::bind(format!("\0{abstract_path}"))?; debug!(%abstract_path, "opened listener for external client"); - tokio::spawn(self.run_unix_listener(listener)); + tokio::spawn(self.run_unix_listener(listener, zbus::AuthMechanism::External)); Ok(command .env( "DBUS_SESSION_BUS_ADDRESS", diff --git a/sidebus-broker/src/main.rs b/sidebus-broker/src/main.rs index 7a2247d..5b41111 100644 --- a/sidebus-broker/src/main.rs +++ b/sidebus-broker/src/main.rs @@ -4,9 +4,10 @@ mod vsock; use bus::SharedHostedBus; use clap::Parser; +use futures::TryFutureExt; use std::{path::PathBuf, sync::Arc}; use tokio::{net::UnixListener, process::Command, sync::Mutex}; -use tracing::error; +use tracing::{Instrument, debug, error, info_span}; use zbus::names::WellKnownName; // https://github.com/rust-lang/rfcs/issues/2407#issuecomment-385291238 @@ -22,7 +23,7 @@ macro_rules! enclose { #[derive(Parser)] #[command(version, about, long_about = None)] struct BrokerCli { - /// Create unix socket listeners for all internal busses in the provided directory + /// Create unix socket listeners for internal busses in the provided directory #[clap(long)] debug_access: Option, @@ -34,9 +35,13 @@ struct BrokerCli { #[clap(long, default_value = "/run/vm-doc-portal")] guest_mountpoint: PathBuf, - /// Vsock port number to listen on + /// Vsock port number to listen on for the VM bus #[clap(long)] - vsock_port: u32, + vsock_port: Option, + + /// Unix socket path to listen on for the VM bus + #[clap(long)] + unix_path: Option, } async fn new_hosted_bus() -> eyre::Result<( @@ -60,15 +65,25 @@ async fn main() -> eyre::Result<()> { let (vm_bus, vm_bus_guid, _) = new_hosted_bus().await?; let (priv_bus, _, mut priv_lst) = new_hosted_bus().await?; + let mut server_tasks = tokio::task::JoinSet::new(); + if let Some(dir_path) = cli.debug_access { if !dir_path.is_dir() { error!(path = %dir_path.display(), "--debug-access path is not an existing directory"); std::process::exit(1); } - let vm_dbg_listener = UnixListener::bind(dir_path.join("vm.sock"))?; - let _vm_dbg_task = tokio::spawn(vm_bus.clone().run_unix_listener(vm_dbg_listener)); let priv_dbg_listener = UnixListener::bind(dir_path.join("priv.sock"))?; - let _priv_dbg_task = tokio::spawn(priv_bus.clone().run_unix_listener(priv_dbg_listener)); + server_tasks.spawn( + priv_bus + .clone() + .run_unix_listener(priv_dbg_listener, zbus::AuthMechanism::External), + ); + let vm_dbg_listener = UnixListener::bind(dir_path.join("vm.sock"))?; + server_tasks.spawn( + vm_bus + .clone() + .run_unix_listener(vm_dbg_listener, zbus::AuthMechanism::External), + ); // TODO: unlink sockets on exit } @@ -138,23 +153,59 @@ async fn main() -> eyre::Result<()> { unreachable!("our own fresh bus can't have interfaces already provided"); }; - // TODO: modprobe vhost_vsock first! - // NOTE: Every individual D-Bus client inside of the VM is a new client here! - vsock::ListenerBuilder::new(vsock::VsockAddr::new( - vsock::VMADDR_CID_HOST, - cli.vsock_port, - )) - .with_label("VM Bus") - .listen(move |client| { - enclose! { (vm_bus, vm_bus_guid) async move { - // TODO: Not necessary to go through the channel, add vsock support to the Peer too - let client_conn = client.build((&vm_bus_guid).into()).await?; - let vmbus_conn = vm_bus.lock().await.connect_channel(true).await?; - sidebus_common::raw::splice_conns(client_conn, vmbus_conn).await; - Ok(()) - } } - }) - .await?; + // NOTE: Every individual D-Bus client inside of the VM is a new client on the VM bus listeners! + if let Some(path) = cli.unix_path { + // XXX: going through the channel just to strip fds + let vm_unix_listener = UnixListener::bind(path)?; + while let Ok((socket, remote_addr)) = vm_unix_listener.accept().await { + let f = enclose! { (vm_bus, vm_bus_guid) async move { + let client_conn = zbus::connection::Builder::unix_stream(socket) + .server(&vm_bus_guid) + .unwrap() + .p2p() + .auth_mechanism(zbus::AuthMechanism::Anonymous) + .build() + .await?; + let vmbus_conn = vm_bus.lock().await.connect_channel(true).await?; + sidebus_common::raw::splice_conns(client_conn, vmbus_conn).await; + Ok::<(), eyre::Report>(()) + } }; + tokio::spawn( + async { + match f.await { + Ok(()) => debug!("done with client"), + Err(err) => error!(%err, "error dealing with client"), + } + } + .instrument(info_span!("serve", ?remote_addr)), + ); + } + } + + if let Some(port) = cli.vsock_port { + // TODO: modprobe vhost_vsock first! + server_tasks.spawn( + vsock::ListenerBuilder::new(vsock::VsockAddr::new(vsock::VMADDR_CID_HOST, port)) + .with_label("VM Bus") + .listen(move |client| { + enclose! { (vm_bus, vm_bus_guid) async move { + // TODO: Not necessary to go through the channel, add vsock support to the Peer too? + let client_conn = client.build((&vm_bus_guid).into()).await?; + let vmbus_conn = vm_bus.lock().await.connect_channel(true).await?; + sidebus_common::raw::splice_conns(client_conn, vmbus_conn).await; + Ok(()) + } } + }) + .map_ok_or_else( + |e| { + error!("vsock listener: {:?}", e); + }, + |()| (), + ), + ); + } + + let _ = server_tasks.join_all().await; Ok(()) }