use std::{hash::DefaultHasher, sync::Arc}; use tokio_stream::StreamExt as _; use tracing::{debug, error, trace}; pub struct HostedBus { peers: Arc, guid: zbus::OwnedGuid, next_id: usize, _self_conn: zbus::Connection, } impl HostedBus { pub async fn new() -> eyre::Result { let guid: zbus::OwnedGuid = zbus::Guid::generate().into(); let peers = busd::peers::Peers::new(); let dbus = busd::fdo::DBus::new(peers.clone(), guid.clone()); let monitoring = busd::fdo::Monitoring::new(peers.clone()); // Create a peer for ourselves. trace!("Creating self-dial connection."); let (client_socket, peer_socket) = zbus::connection::socket::Channel::pair(); let service_conn = zbus::connection::Builder::authenticated_socket(client_socket, guid.clone())? .p2p() .unique_name(busd::fdo::BUS_NAME)? .name(busd::fdo::BUS_NAME)? .serve_at(busd::fdo::DBus::PATH, dbus)? .serve_at(busd::fdo::Monitoring::PATH, monitoring)? .build() .await?; let peer_conn = zbus::connection::Builder::authenticated_socket(peer_socket, guid.clone())? .p2p() .build() .await?; peers.add_us(peer_conn).await; trace!("Self-dial connection created."); Ok(HostedBus { peers, guid, next_id: 0, _self_conn: service_conn, }) } pub fn server_guid(&self) -> &zbus::Guid<'_> { self.guid.inner() } pub async fn connect_channel(&mut self, skip_hello: bool) -> eyre::Result { let id = self.next_id(); self.peers .add_channel(&self.guid, id, skip_hello) .await .map_err(|err| eyre::eyre!(Box::new(err))) // https://github.com/eyre-rs/eyre/issues/31 XXX: busd should not use anyhow! } pub async fn connect_unix( &mut self, socket: tokio::net::UnixStream, auth: zbus::AuthMechanism, ) -> eyre::Result<()> { let id = self.next_id(); self.peers .add(&self.guid, id, socket.into(), auth) .await .map_err(|err| eyre::eyre!(Box::new(err))) } pub async fn name_owner_changes(&mut self) -> eyre::Result { let conn = self.connect_channel(false).await?; NameOwnerStream::new(conn).await } fn next_id(&mut self) -> usize { self.next_id += 1; self.next_id } } pub trait SharedHostedBus { async fn run_unix_listener(self, listener: tokio::net::UnixListener, auth: zbus::AuthMechanism); async fn spawn_external_client( self, command: &mut tokio::process::Command, ) -> eyre::Result; } impl SharedHostedBus for Arc> { async fn run_unix_listener( self, listener: tokio::net::UnixListener, auth: zbus::AuthMechanism, ) { while let Ok((socket, _remote_addr)) = listener.accept().await { if let Err(e) = self.lock().await.connect_unix(socket, auth).await { error!("unix connection: {:?}", e); } } } async fn spawn_external_client( self, command: &mut tokio::process::Command, ) -> eyre::Result { // NOTE: abstract sockets belong to the *network* namespace // Possibly better to only accept once and let go of the listener / use socketpair & /dev/fd? (Uncommon to reconnect?) let abstract_path = format!("/run/sidebus-broker/{}", zbus::Guid::generate()); let listener = tokio::net::UnixListener::bind(format!("\0{abstract_path}"))?; debug!(%abstract_path, "opened listener for external client"); tokio::spawn(self.run_unix_listener(listener, zbus::AuthMechanism::External)); Ok(command .env( "DBUS_SESSION_BUS_ADDRESS", format!("unix:abstract={abstract_path}"), ) .spawn()?) } } pub struct NameOwnerStream { _conn: zbus::Connection, stream: zbus::MessageStream, } impl NameOwnerStream { pub async fn new(conn: zbus::Connection) -> eyre::Result { // we're subscribing and waiting for the server to confirm the subscription *here* // so there should be no missing events if an owner we're waiting for is spawned // "too soon", even if we start polling `next()` after it's connected // (to test: `tokio::time::sleep(Duration::from_secs(1)).await` before `wait_for_acquisition()` :)) let stream = zbus::MessageStream::for_match_rule( zbus::MatchRule::builder() .msg_type(zbus::message::Type::Signal) .path(busd::fdo::DBus::PATH)? .interface(busd::fdo::DBus::INTERFACE)? .member("NameOwnerChanged")? .build() .into_owned(), &conn, None, ) .await?; Ok(NameOwnerStream { _conn: conn, stream, }) } pub async fn next(&mut self) -> eyre::Result { match self.stream.next().await { None => Err(eyre::eyre!("stream ended")), Some(Err(err)) => Err(err.into()), Some(Ok(msg)) => Ok({ let body = msg.body(); let (name, old_owner, new_owner): ( zbus::names::BusName, zbus::zvariant::Optional, zbus::zvariant::Optional, ) = body.deserialize()?; busd::name_registry::NameOwnerChanged { name: name.into_owned().into(), old_owner: Option::from(old_owner).map(|x: zbus::names::UniqueName| x.into()), new_owner: Option::from(new_owner).map(|x: zbus::names::UniqueName| x.into()), } }), } } pub async fn wait_for_acquisition( &mut self, name: zbus::names::OwnedWellKnownName, ) -> eyre::Result<()> { while let Ok(change) = self.next().await { if let zbus::names::BusName::WellKnown(ref well_known) = *change.name { if *well_known == name { return Ok(()); } } } Err(eyre::eyre!( "non-Ok response when waiting for name acquisition!" )) } }