Add NameOwnerStream to support waiting for clients taking names on an owned bus

This commit is contained in:
Val Packett 2025-07-17 21:19:17 -03:00
parent e32d6746df
commit e4bace1793
3 changed files with 74 additions and 0 deletions

1
Cargo.lock generated
View file

@ -846,6 +846,7 @@ dependencies = [
"eyre", "eyre",
"sidebus-common", "sidebus-common",
"tokio", "tokio",
"tokio-stream",
"tokio-vsock", "tokio-vsock",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",

View file

@ -13,3 +13,4 @@ tokio-vsock = "0.7.1"
tracing = "0.1.41" tracing = "0.1.41"
tracing-subscriber = "0.3.19" tracing-subscriber = "0.3.19"
zbus = { workspace = true } zbus = { workspace = true }
tokio-stream = "0.1.17"

View file

@ -1,4 +1,5 @@
use std::sync::Arc; use std::sync::Arc;
use tokio_stream::StreamExt as _;
use tracing::trace; use tracing::trace;
pub struct HostedBus { pub struct HostedBus {
@ -64,8 +65,79 @@ impl HostedBus {
.map_err(|err| eyre::eyre!(Box::new(err))) .map_err(|err| eyre::eyre!(Box::new(err)))
} }
pub async fn name_owner_changes(&mut self) -> eyre::Result<NameOwnerStream> {
let conn = self.connect_channel(false).await?;
NameOwnerStream::new(conn).await
}
fn next_id(&mut self) -> usize { fn next_id(&mut self) -> usize {
self.next_id += 1; self.next_id += 1;
self.next_id self.next_id
} }
} }
pub struct NameOwnerStream {
_conn: zbus::Connection,
stream: zbus::MessageStream,
}
impl NameOwnerStream {
pub async fn new(conn: zbus::Connection) -> eyre::Result<Self> {
// we're subscribing and waiting for the server to confirm the subscription *here*
// so there should be no missing events if an owner we're waiting for is spawned
// "too soon", even if we start polling `next()` after it's connected
// (to test: `tokio::time::sleep(Duration::from_secs(1)).await` before `wait_for_acquisition()` :))
let stream = zbus::MessageStream::for_match_rule(
zbus::MatchRule::builder()
.msg_type(zbus::message::Type::Signal)
.path(busd::fdo::DBus::PATH)?
.interface(busd::fdo::DBus::INTERFACE)?
.member("NameOwnerChanged")?
.build()
.into_owned(),
&conn,
None,
)
.await?;
Ok(NameOwnerStream {
_conn: conn,
stream,
})
}
pub async fn next(&mut self) -> eyre::Result<busd::name_registry::NameOwnerChanged> {
match self.stream.next().await {
None => Err(eyre::eyre!("stream ended")),
Some(Err(err)) => Err(err.into()),
Some(Ok(msg)) => Ok({
let body = msg.body();
let (name, old_owner, new_owner): (
zbus::names::BusName,
zbus::zvariant::Optional<zbus::names::UniqueName>,
zbus::zvariant::Optional<zbus::names::UniqueName>,
) = body.deserialize()?;
busd::name_registry::NameOwnerChanged {
name: name.into_owned().into(),
old_owner: Option::from(old_owner).map(|x: zbus::names::UniqueName| x.into()),
new_owner: Option::from(new_owner).map(|x: zbus::names::UniqueName| x.into()),
}
}),
}
}
pub async fn wait_for_acquisition(
&mut self,
name: zbus::names::OwnedWellKnownName,
) -> eyre::Result<()> {
while let Ok(change) = self.next().await {
if let zbus::names::BusName::WellKnown(ref well_known) = *change.name {
if *well_known == name {
return Ok(());
}
}
}
Err(eyre::eyre!(
"non-Ok response when waiting for name acquisition!"
))
}
}