clan-sidebus/sidebus-broker/src/main.rs
2026-02-06 05:05:07 -03:00

302 lines
11 KiB
Rust

mod bus;
mod portal;
mod vsock;
use bus::SharedHostedBus;
use clap::Parser;
use futures::{TryFutureExt, stream::FuturesUnordered};
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::{net::UnixListener, process::Command, sync::Mutex};
use tokio_stream::StreamExt as _;
use tracing::{Instrument, debug, error, info_span};
use zbus::names::WellKnownName;
// https://github.com/rust-lang/rfcs/issues/2407#issuecomment-385291238
macro_rules! enclose {
( ($( $x:ident ),*) $y:expr ) => {
{
$(let $x = $x.clone();)*
$y
}
};
}
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct BrokerCli {
/// Create unix socket listeners for internal busses in the provided directory
#[clap(long)]
debug_access: Option<PathBuf>,
/// Private XDG_RUNTIME_DIR for the VM ('./doc' under it will be used as the document portal FS mountpoint, './fs.sock' will be the FS)
#[clap(long)]
runtime_dir: PathBuf,
/// Absolute path where the './doc' under the runtime-dir would be mounted over virtiofs in the guest
#[clap(long, default_value = "/run/vm-doc-portal")]
guest_mountpoint: PathBuf,
/// Vsock port number to listen on for the VM bus
#[clap(long)]
vsock_port: Option<u32>,
/// Unix socket path to listen on for the VM bus
#[clap(long)]
unix_path: Option<PathBuf>,
/// The user ID for the appvm user inside of the guest
#[clap(long, default_value = "1337")]
guest_uid: u32,
/// The group ID for the appvm group inside of the guest
#[clap(long, default_value = "1337")]
guest_gid: u32,
}
async fn new_hosted_bus() -> eyre::Result<(
Arc<Mutex<bus::HostedBus>>,
zbus::OwnedGuid,
bus::NameOwnerStream,
)> {
let mut bus = bus::HostedBus::new().await?;
let guid = bus.server_guid().to_owned().into();
let owner_stream = bus.name_owner_changes().await?;
Ok((Arc::new(Mutex::new(bus)), guid, owner_stream))
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
tracing_subscriber::fmt::init();
let cli = BrokerCli::parse();
let (priv_bus, _, mut priv_lst) = new_hosted_bus().await?;
let mut server_tasks = tokio::task::JoinSet::new();
let mut child_procs = Vec::new();
if let Some(dir_path) = cli.debug_access {
if !dir_path.is_dir() {
error!(path = %dir_path.display(), "--debug-access path is not an existing directory");
std::process::exit(1);
}
let priv_dbg_listener = UnixListener::bind(dir_path.join("priv.sock"))?;
server_tasks.spawn(
priv_bus
.clone()
.run_unix_listener(priv_dbg_listener, zbus::AuthMechanism::External),
);
// TODO: unlink sockets on exit
}
std::fs::create_dir_all(&cli.runtime_dir)?;
child_procs.push(
priv_bus
.clone()
.spawn_external_client(
Command::new(env!("BIN_XDG_PERMISSION_STORE"))
.env("XDG_RUNTIME_DIR", cli.runtime_dir.as_os_str())
.kill_on_drop(true),
)
.await?,
);
let impl_permission_store =
WellKnownName::from_static_str("org.freedesktop.impl.portal.PermissionStore")?.into();
priv_lst.wait_for_acquisition(impl_permission_store).await?;
child_procs.push(
priv_bus
.clone()
.spawn_external_client(
Command::new(env!("BIN_XDG_DOCUMENT_PORTAL"))
.env("XDG_RUNTIME_DIR", cli.runtime_dir.as_os_str())
.kill_on_drop(true),
)
.await?,
);
let portal_documents =
WellKnownName::from_static_str("org.freedesktop.portal.Documents")?.into();
priv_lst.wait_for_acquisition(portal_documents).await?;
child_procs.push(
Command::new(env!("BIN_VIRTIOFSD"))
.args(&[
"--shared-dir",
cli.runtime_dir.join("doc").to_str().unwrap(),
"--socket-path",
cli.runtime_dir.join("fs.sock").to_str().unwrap(),
"--uid-map",
&format!(":{}:{}:1:", cli.guest_uid, unsafe { libc::getuid() }),
"--gid-map",
&format!(":{}:{}:1:", cli.guest_gid, unsafe { libc::getgid() }),
"--log-level",
"debug",
])
.env("XDG_RUNTIME_DIR", cli.runtime_dir.as_os_str())
.kill_on_drop(true)
.spawn()?,
);
let priv_bus_conn = priv_bus.lock().await.connect_channel(false).await?;
let host_session_conn = zbus::connection::Builder::session()?.build().await?;
let file_chooser_imp = portal::file_chooser::FileChooser::new(
&host_session_conn,
&priv_bus_conn,
cli.guest_mountpoint.clone(),
)
.await?;
let file_transfer_imp = portal::file_transfer::FileTransfer::new(
&host_session_conn,
&priv_bus_conn,
cli.guest_mountpoint,
)
.await?;
let notification_imp = portal::notification::Notification::new(&host_session_conn).await?;
let settings_imp = portal::settings::Settings::new(&host_session_conn).await?;
async fn on_vm_bus_connected(
vm_bus_conn: zbus::Connection,
file_chooser: portal::file_chooser::FileChooser,
file_transfer: portal::file_transfer::FileTransfer,
notification: portal::notification::Notification,
settings: portal::settings::Settings,
) -> Result<(), eyre::Report> {
if !vm_bus_conn
.object_server()
.at("/org/freedesktop/portal/desktop", file_chooser)
.await?
{
error!("org.freedesktop.portal.FileChooser already provided");
};
if !vm_bus_conn
.object_server()
.at("/org/freedesktop/portal/documents", file_transfer)
.await?
{
error!("org.freedesktop.portal.FileTransfer already provided");
};
let file_transfer_ref = vm_bus_conn
.object_server()
.interface::<_, portal::file_transfer::FileTransfer>(
"/org/freedesktop/portal/documents",
)
.await?;
tokio::spawn(async move {
let file_transfer = file_transfer_ref.get().await;
let emitter = file_transfer_ref.signal_emitter();
if let Err(err) = file_transfer.forward_transfer_closed(emitter.clone()).await {
error!(%err, "forwarding forward_transfer_closed changes ended");
}
});
if !vm_bus_conn
.object_server()
.at("/org/freedesktop/portal/desktop", notification)
.await?
{
error!("org.freedesktop.portal.Notification already provided");
};
let notification_ref = vm_bus_conn
.object_server()
.interface::<_, portal::notification::Notification>("/org/freedesktop/portal/desktop")
.await?;
tokio::spawn(async move {
let notification = notification_ref.get().await;
let emitter = notification_ref.signal_emitter();
if let Err(err) = notification.forward_actions(emitter.clone()).await {
error!(%err, "forwarding notification changes ended");
}
});
if !vm_bus_conn
.object_server()
.at("/org/freedesktop/portal/desktop", settings)
.await?
{
error!("org.freedesktop.portal.Settings already provided");
};
let settings_ref = vm_bus_conn
.object_server()
.interface::<_, portal::settings::Settings>("/org/freedesktop/portal/desktop")
.await?;
tokio::spawn(async move {
let settings = settings_ref.get().await;
let emitter = settings_ref.signal_emitter();
if let Err(err) = settings.forward_changes(emitter).await {
error!(%err, "forwarding settings changes ended");
}
});
// XXX: no method for "wait until the conn dies"?
Ok(std::future::pending::<()>().await)
}
if let Some(path) = cli.unix_path {
let vm_unix_listener = UnixListener::bind(path)?;
server_tasks.spawn(enclose!((file_chooser_imp, file_transfer_imp, notification_imp, settings_imp) async move {
while let Ok((socket, remote_addr)) = vm_unix_listener.accept().await {
let f = enclose!((file_chooser_imp, file_transfer_imp, notification_imp, settings_imp) async move {
let client_conn = zbus::connection::Builder::unix_stream(socket)
.auth_mechanism(zbus::AuthMechanism::Anonymous)
.name("org.freedesktop.portal.Desktop")?
.name("org.freedesktop.portal.Documents")?
.build()
.await?;
on_vm_bus_connected(client_conn, file_chooser_imp, file_transfer_imp, notification_imp, settings_imp).await
});
tokio::spawn(
async {
match f.await {
Ok(()) => debug!("done with server"),
Err(err) => error!(%err, "error dealing with server"),
}
}
.instrument(info_span!("serve", ?remote_addr)),
);
}
}));
}
if let Some(port) = cli.vsock_port {
// TODO: modprobe vhost_vsock first!
server_tasks.spawn(
vsock::ListenerBuilder::new(vsock::VsockAddr::new(vsock::VMADDR_CID_HOST, port))
.with_label("VM Bus")
.listen(move |client| {
enclose!((file_chooser_imp, file_transfer_imp, notification_imp, settings_imp) async move {
// TODO: Not necessary to go through the channel, add vsock support to the Peer too?
let client_conn = client.build().await?;
on_vm_bus_connected(client_conn, file_chooser_imp, file_transfer_imp, notification_imp, settings_imp).await
})
})
.map_ok_or_else(
|e| {
error!("vsock listener: {:?}", e);
},
|()| (),
),
);
}
let mut waiter = child_procs
.iter_mut()
.map(|child| child.wait())
.collect::<FuturesUnordered<_>>();
tokio::select! {
_ = server_tasks.join_all() => debug!("server tasks ended"),
res = waiter.next() => debug!(?res, "child process terminated"),
_ = tokio::signal::ctrl_c() => debug!("interrupt signal"),
};
drop(waiter);
for mut child in child_procs {
if let Err(e) = child.kill().await {
error!(?e, "could not kill process");
}
}
Ok(())
}