Basic vsock proxying implementation

Currently just connecting to the host's session bus
This commit is contained in:
Val Packett 2025-07-04 21:52:14 -03:00
parent 11a682e19f
commit 14ce212e81
12 changed files with 1807 additions and 34 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
.direnv/
target/
result

1538
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,10 @@
[workspace]
resolver = "2"
members = [
"sidebus-broker"
"sidebus-agent",
"sidebus-broker",
"sidebus-common"
]
[workspace.dependencies]
sidebus-common = { path = "sidebus-common" }

View file

@ -8,9 +8,8 @@
};
};
outputs = { self, nixpkgs, flake-utils, rust-overlay }:
flake-utils.lib.eachDefaultSystem
(system:
outputs = {self, nixpkgs, flake-utils, rust-overlay}:
flake-utils.lib.eachDefaultSystem (system:
let
overlays = [ (import rust-overlay) ];
pkgs = import nixpkgs {
@ -26,11 +25,9 @@
let cargoToml = builtins.fromTOML (builtins.readFile ./${crate}/Cargo.toml);
in rustPlatform.buildRustPackage {
inherit (cargoToml.package) name version;
src = ./${crate};
src = ./.;
cargoLock.lockFile = ./Cargo.lock;
# buildFeatures = features;
# buildInputs = runtimeDeps;
# nativeBuildInputs = buildDeps;
buildAndTestSubdir = crate;
};
in
{
@ -38,6 +35,7 @@
buildInputs = [ rustToolchain ];
};
packages.sidebus-agent = rustPackage "sidebus-agent";
packages.sidebus-broker = rustPackage "sidebus-broker";
}
);

15
sidebus-agent/Cargo.toml Normal file
View file

@ -0,0 +1,15 @@
[package]
name = "sidebus-agent"
version = "0.1.0"
edition = "2024"
[dependencies]
sidebus-common = { workspace = true }
clap = { version = "4.5.40", features = ["derive"] }
eyre = "0.6.12"
tokio = { version = "1.46.0", features = ["full"] }
tokio-stream = "0.1.17"
tokio-vsock = "0.7.1"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
zbus = { version = "5.7.1", default-features = false, features = ["tokio", "tokio-vsock", "bus-impl", "p2p"] }

46
sidebus-agent/src/main.rs Normal file
View file

@ -0,0 +1,46 @@
use clap::Parser;
use tokio::net::UnixListener;
use tracing::info;
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct AgentCli {
listen_path: String,
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
tracing_subscriber::fmt::init();
let cli = AgentCli::parse();
let unix_addr = cli.listen_path;
let unix_listener = UnixListener::bind(unix_addr.clone())?;
info!(%unix_addr, "listening for unix clients");
while let Ok((unix_client, client_addr)) = unix_listener.accept().await {
info!(?client_addr, "new unix client");
tokio::spawn(async move {
let vsock_addr = zbus::Address::from(zbus::address::Transport::Vsock(
zbus::address::transport::Vsock::new(2, 4269),
));
let vsock_conn = zbus::connection::Builder::address(vsock_addr)
.unwrap()
.p2p()
.auth_mechanism(zbus::AuthMechanism::Anonymous)
.build()
.await
.unwrap();
info!(guid = %vsock_conn.server_guid(), "connected to vsock bus");
let client_conn = zbus::connection::Builder::unix_stream(unix_client)
.server(vsock_conn.server_guid())
.unwrap()
.p2p()
.auth_mechanism(zbus::AuthMechanism::External)
.build()
.await
.unwrap();
sidebus_common::raw::splice_conns(client_conn, vsock_conn).await;
});
}
Ok(())
}

View file

@ -4,3 +4,12 @@ version = "0.1.0"
edition = "2024"
[dependencies]
sidebus-common = { workspace = true }
busd = "0.4.0"
clap = { version = "4.5.40", features = ["derive"] }
eyre = "0.6.12"
tokio = { version = "1.46.0", features = ["full"] }
tokio-vsock = "0.7.1"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
zbus = { version = "5.7.1", default-features = false, features = ["tokio", "tokio-vsock", "bus-impl", "p2p"] }

View file

@ -1,3 +1,33 @@
fn main() {
println!("Hello, world!");
mod vsock;
use clap::Parser;
use tracing::info;
#[derive(Parser)]
#[command(version, about, long_about = None)]
struct BrokerCli {}
#[tokio::main]
async fn main() -> eyre::Result<()> {
tracing_subscriber::fmt::init();
let _cli = BrokerCli::parse();
vsock::ListenerBuilder::new(vsock::VsockAddr::new(vsock::VMADDR_CID_HOST, 4269))
.with_label("VM Bus")
.listen(async |client| {
let session_bus = zbus::connection::Builder::session()
.unwrap()
.p2p() /* i.e. "raw connection, don't send Hello" */
.build()
.await
.unwrap();
info!(guid = %session_bus.server_guid(), "connected to session bus");
let client_conn = client.build(session_bus.server_guid().into()).await?;
sidebus_common::raw::splice_conns(client_conn, session_bus).await;
Ok(())
})
.await?;
Ok(())
}

View file

@ -0,0 +1,71 @@
pub use tokio_vsock::{
VMADDR_CID_ANY, VMADDR_CID_HOST, VMADDR_CID_HYPERVISOR, VMADDR_CID_LOCAL, VsockAddr,
};
use tracing::{Instrument, debug, error, info, info_span};
pub struct ConnectionBuilder {
socket: tokio_vsock::VsockStream,
remote_addr: VsockAddr,
}
impl ConnectionBuilder {
pub fn remote_addr(&self) -> &VsockAddr {
&self.remote_addr
}
pub async fn build<'a>(self, guid: zbus::Guid<'a>) -> eyre::Result<zbus::Connection> {
zbus::connection::Builder::vsock_stream(self.socket)
.server(guid)
.unwrap()
.p2p()
.auth_mechanism(zbus::AuthMechanism::Anonymous)
.build()
.await
.map_err(|e| e.into())
}
}
pub struct ListenerBuilder {
addr: VsockAddr,
label: Option<&'static str>,
}
impl ListenerBuilder {
pub fn new(addr: VsockAddr) -> Self {
ListenerBuilder { addr, label: None }
}
pub fn with_label(self, label: &'static str) -> Self {
ListenerBuilder {
label: Some(label),
..self
}
}
#[tracing::instrument(skip(self, on_accept), fields(addr = %self.addr, label = self.label))]
pub async fn listen<F>(self, on_accept: impl Fn(ConnectionBuilder) -> F) -> eyre::Result<()>
where
F: Future<Output = eyre::Result<()>> + Send + 'static,
{
let vsock_listener = tokio_vsock::VsockListener::bind(self.addr)?;
info!("listening");
while let Ok((socket, remote_addr)) = vsock_listener.accept().await {
// TODO: add remote_addr filtering facility
info!(%remote_addr, "accepted client");
let f = on_accept(ConnectionBuilder {
socket,
remote_addr,
});
tokio::spawn(
async {
match f.await {
Ok(()) => debug!("done with client"),
Err(err) => error!(%err, "error dealing with client"),
}
}
.instrument(info_span!("serve", %remote_addr)),
);
}
Ok(())
}
}

10
sidebus-common/Cargo.toml Normal file
View file

@ -0,0 +1,10 @@
[package]
name = "sidebus-common"
version = "0.1.0"
edition = "2024"
[dependencies]
tracing = "0.1.41"
tokio = { version = "1.46.0", features = ["macros"] }
tokio-stream = "0.1.17"
zbus = { version = "5.7.1", default-features = false, features = ["tokio", "tokio-vsock", "bus-impl", "p2p"] }

View file

@ -0,0 +1 @@
pub mod raw;

49
sidebus-common/src/raw.rs Normal file
View file

@ -0,0 +1,49 @@
//! Utilities for dealing with raw message streams
use tokio_stream::StreamExt as _;
use tracing::{debug, error, info};
pub async fn handle_msgs<F, E>(
dir: &'static str,
mut from: zbus::MessageStream,
mut cb: impl FnMut(zbus::Message) -> F,
) -> ()
where
F: Future<Output = Result<(), E>>,
E: std::fmt::Debug,
{
loop {
match from.next().await {
Some(Ok(msg)) => {
debug!(%dir, ?msg, "handling msg");
match cb(msg).await {
Ok(()) => debug!(%dir, "success"),
Err(err) => error!(%dir, ?err, "error handling msg"),
}
}
Some(Err(err)) => {
error!(%dir, %err, "error receiving msg");
}
Option::None => {
error!(%dir, "stream ended");
return ();
}
}
}
}
pub async fn splice_conns(client_conn: zbus::Connection, target_conn: zbus::Connection) {
let target_bus_1 = target_conn.clone();
tokio::select! {
_ = handle_msgs("FROM", client_conn.clone().into(), async |msg| {
target_bus_1.send(&msg).await
}) => {
info!("from_client exited");
}
_ = handle_msgs("TO", target_conn.into(), async |msg| {
client_conn.send(&msg).await
}) => {
info!("to_client exited");
}
}
}