Add hosted bus module based on busd
Currently depends on WIP changes to busd to allow in-process clients
This commit is contained in:
parent
14ce212e81
commit
627237bdda
7 changed files with 152 additions and 167 deletions
71
sidebus-broker/src/bus.rs
Normal file
71
sidebus-broker/src/bus.rs
Normal file
|
|
@ -0,0 +1,71 @@
|
|||
use std::sync::Arc;
|
||||
use tracing::trace;
|
||||
|
||||
pub struct HostedBus {
|
||||
peers: Arc<busd::peers::Peers>,
|
||||
guid: zbus::OwnedGuid,
|
||||
next_id: usize,
|
||||
_self_conn: zbus::Connection,
|
||||
}
|
||||
|
||||
impl HostedBus {
|
||||
pub async fn new() -> eyre::Result<Self> {
|
||||
let guid: zbus::OwnedGuid = zbus::Guid::generate().into();
|
||||
|
||||
let peers = busd::peers::Peers::new();
|
||||
|
||||
let dbus = busd::fdo::DBus::new(peers.clone(), guid.clone());
|
||||
let monitoring = busd::fdo::Monitoring::new(peers.clone());
|
||||
|
||||
// Create a peer for ourselves.
|
||||
trace!("Creating self-dial connection.");
|
||||
let (client_socket, peer_socket) = zbus::connection::socket::Channel::pair();
|
||||
let service_conn =
|
||||
zbus::connection::Builder::authenticated_socket(client_socket, guid.clone())?
|
||||
.p2p()
|
||||
.unique_name(busd::fdo::BUS_NAME)?
|
||||
.name(busd::fdo::BUS_NAME)?
|
||||
.serve_at(busd::fdo::DBus::PATH, dbus)?
|
||||
.serve_at(busd::fdo::Monitoring::PATH, monitoring)?
|
||||
.build()
|
||||
.await?;
|
||||
let peer_conn = zbus::connection::Builder::authenticated_socket(peer_socket, guid.clone())?
|
||||
.p2p()
|
||||
.build()
|
||||
.await?;
|
||||
|
||||
peers.add_us(peer_conn).await;
|
||||
trace!("Self-dial connection created.");
|
||||
Ok(HostedBus {
|
||||
peers,
|
||||
guid,
|
||||
next_id: 0,
|
||||
_self_conn: service_conn,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn server_guid(&self) -> &zbus::Guid<'_> {
|
||||
self.guid.inner()
|
||||
}
|
||||
|
||||
pub async fn connect_channel(&mut self) -> eyre::Result<zbus::Connection> {
|
||||
let id = self.next_id();
|
||||
self.peers
|
||||
.add_channel(&self.guid, id)
|
||||
.await
|
||||
.map_err(|err| eyre::eyre!(Box::new(err))) // https://github.com/eyre-rs/eyre/issues/31 XXX: busd should not use anyhow!
|
||||
}
|
||||
|
||||
pub async fn connect_unix(&mut self, socket: tokio::net::UnixStream) -> eyre::Result<()> {
|
||||
let id = self.next_id();
|
||||
self.peers
|
||||
.add(&self.guid, id, socket.into(), zbus::AuthMechanism::External)
|
||||
.await
|
||||
.map_err(|err| eyre::eyre!(Box::new(err)))
|
||||
}
|
||||
|
||||
fn next_id(&mut self) -> usize {
|
||||
self.next_id += 1;
|
||||
self.next_id
|
||||
}
|
||||
}
|
||||
|
|
@ -1,7 +1,19 @@
|
|||
mod bus;
|
||||
mod vsock;
|
||||
|
||||
use clap::Parser;
|
||||
use tracing::info;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
// https://github.com/rust-lang/rfcs/issues/2407#issuecomment-385291238
|
||||
macro_rules! enclose {
|
||||
( ($( $x:ident ),*) $y:expr ) => {
|
||||
{
|
||||
$(let $x = $x.clone();)*
|
||||
$y
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Parser)]
|
||||
#[command(version, about, long_about = None)]
|
||||
|
|
@ -13,19 +25,29 @@ async fn main() -> eyre::Result<()> {
|
|||
|
||||
let _cli = BrokerCli::parse();
|
||||
|
||||
let vm_bus = bus::HostedBus::new().await?;
|
||||
let vm_bus_guid: zbus::OwnedGuid = vm_bus.server_guid().to_owned().into();
|
||||
let vm_bus = Arc::new(Mutex::new(vm_bus));
|
||||
|
||||
// Direct access for the host (just trying things out)
|
||||
let unix_listener = tokio::net::UnixListener::bind("vmbus.sock")?;
|
||||
tokio::spawn(enclose! { (vm_bus) async move {
|
||||
while let Ok((socket, _remote_addr)) = unix_listener.accept().await {
|
||||
vm_bus.lock().await.connect_unix(socket).await.unwrap()
|
||||
}
|
||||
} });
|
||||
|
||||
// NOTE: Every individual D-Bus client inside of the VM is a new client here!
|
||||
vsock::ListenerBuilder::new(vsock::VsockAddr::new(vsock::VMADDR_CID_HOST, 4269))
|
||||
.with_label("VM Bus")
|
||||
.listen(async |client| {
|
||||
let session_bus = zbus::connection::Builder::session()
|
||||
.unwrap()
|
||||
.p2p() /* i.e. "raw connection, don't send Hello" */
|
||||
.build()
|
||||
.await
|
||||
.unwrap();
|
||||
info!(guid = %session_bus.server_guid(), "connected to session bus");
|
||||
let client_conn = client.build(session_bus.server_guid().into()).await?;
|
||||
sidebus_common::raw::splice_conns(client_conn, session_bus).await;
|
||||
Ok(())
|
||||
.listen(move |client| {
|
||||
enclose! { (vm_bus, vm_bus_guid) async move {
|
||||
// TODO: Not necessary to go through the channel, add vsock support to the Peer too
|
||||
let client_conn = client.build((&vm_bus_guid).into()).await?;
|
||||
let vmbus_conn = vm_bus.lock().await.connect_channel().await?;
|
||||
sidebus_common::raw::splice_conns(client_conn, vmbus_conn).await;
|
||||
Ok(())
|
||||
} }
|
||||
})
|
||||
.await?;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue