diff --git a/Cargo.lock b/Cargo.lock index bf7ac78..c7e3ef5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -846,6 +846,7 @@ dependencies = [ "eyre", "sidebus-common", "tokio", + "tokio-stream", "tokio-vsock", "tracing", "tracing-subscriber", diff --git a/sidebus-broker/Cargo.toml b/sidebus-broker/Cargo.toml index d847c1c..9fbf0ee 100644 --- a/sidebus-broker/Cargo.toml +++ b/sidebus-broker/Cargo.toml @@ -13,3 +13,4 @@ tokio-vsock = "0.7.1" tracing = "0.1.41" tracing-subscriber = "0.3.19" zbus = { workspace = true } +tokio-stream = "0.1.17" diff --git a/sidebus-broker/src/bus.rs b/sidebus-broker/src/bus.rs index 4740789..8ecd7af 100644 --- a/sidebus-broker/src/bus.rs +++ b/sidebus-broker/src/bus.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use tokio_stream::StreamExt as _; use tracing::trace; pub struct HostedBus { @@ -64,8 +65,79 @@ impl HostedBus { .map_err(|err| eyre::eyre!(Box::new(err))) } + pub async fn name_owner_changes(&mut self) -> eyre::Result { + let conn = self.connect_channel(false).await?; + NameOwnerStream::new(conn).await + } + fn next_id(&mut self) -> usize { self.next_id += 1; self.next_id } } + +pub struct NameOwnerStream { + _conn: zbus::Connection, + stream: zbus::MessageStream, +} + +impl NameOwnerStream { + pub async fn new(conn: zbus::Connection) -> eyre::Result { + // we're subscribing and waiting for the server to confirm the subscription *here* + // so there should be no missing events if an owner we're waiting for is spawned + // "too soon", even if we start polling `next()` after it's connected + // (to test: `tokio::time::sleep(Duration::from_secs(1)).await` before `wait_for_acquisition()` :)) + let stream = zbus::MessageStream::for_match_rule( + zbus::MatchRule::builder() + .msg_type(zbus::message::Type::Signal) + .path(busd::fdo::DBus::PATH)? + .interface(busd::fdo::DBus::INTERFACE)? + .member("NameOwnerChanged")? + .build() + .into_owned(), + &conn, + None, + ) + .await?; + Ok(NameOwnerStream { + _conn: conn, + stream, + }) + } + + pub async fn next(&mut self) -> eyre::Result { + match self.stream.next().await { + None => Err(eyre::eyre!("stream ended")), + Some(Err(err)) => Err(err.into()), + Some(Ok(msg)) => Ok({ + let body = msg.body(); + let (name, old_owner, new_owner): ( + zbus::names::BusName, + zbus::zvariant::Optional, + zbus::zvariant::Optional, + ) = body.deserialize()?; + busd::name_registry::NameOwnerChanged { + name: name.into_owned().into(), + old_owner: Option::from(old_owner).map(|x: zbus::names::UniqueName| x.into()), + new_owner: Option::from(new_owner).map(|x: zbus::names::UniqueName| x.into()), + } + }), + } + } + + pub async fn wait_for_acquisition( + &mut self, + name: zbus::names::OwnedWellKnownName, + ) -> eyre::Result<()> { + while let Ok(change) = self.next().await { + if let zbus::names::BusName::WellKnown(ref well_known) = *change.name { + if *well_known == name { + return Ok(()); + } + } + } + Err(eyre::eyre!( + "non-Ok response when waiting for name acquisition!" + )) + } +}