uvms/pkgs/uvms/uvms.py
2026-03-15 02:16:16 +02:00

739 lines
22 KiB
Python

# NOTE: This would have been bash,
# and this was execlineb previously,
# but it was just easier to reason in terms of context managers
# and try-except-finally branches for the cleanup bit,
# than in terms of traps or such.
# Treat this as bash.
# Treat this as throwaway shitcode.
import os
import subprocess
import socket
import json
import re
from argparse import ArgumentParser
from contextlib import contextmanager, closing, ExitStack
from urllib.parse import urlparse, parse_qs
parser = ArgumentParser("supervise-vm")
parser.add_argument("--vm", default=None)
parser.add_argument("--prefix", default="$HOME/uvms/$VM")
parser.add_argument("--vm-config", default="@BASE_CONFIG@") # noqa: E501
parser.add_argument("--persist-home", action="store_true")
parser.add_argument("--run", action="append")
parser.add_argument("--mem", default=None)
parser.add_argument("app", nargs="*", default=())
TOOLS_DIR = "@TOOLS@" # noqa: E501
SOCKETBINDER = TOOLS_DIR + "/s6-ipcserver-socketbinder" # noqa: E501
CH = TOOLS_DIR + "/cloud-hypervisor"
CHR = TOOLS_DIR + "/ch-remote"
TAPS = "@TAPS@" # noqa: E501
VIRTIOFSD = "@VIRTIOFSD@" # noqa: E501
BWRAP = "@BWRAP@" # noqa: E501
with open("@TOOLS_CLOSURE@", mode="r") as f: # noqa: E501
TOOLS_CLOSURE = [
*(ln.rstrip() for ln in f.readlines()),
os.path.dirname(__file__),
]
BASE_SYSTEM = "@SYSTEM@" # noqa: E501
with open("@SYSTEM_CLOSURE@", mode="r") as f: # noqa: E501
BASE_SYSTEM_CLOSURE = [
*(ln.rstrip() for ln in f.readlines()),
]
PASSTHRU_PATH = ":".join([TOOLS_DIR, *os.environ.get("PATH", "").split(":")])
PASSTHRU_ENV = {
**{
k: v
for k, v in os.environ.items()
if k.startswith("RUST_")
or k.startswith("WAYLAND")
or k.startswith("XDG_")
or k.startswith("DBUS_")
or k.startswith("NIX_")
or k
in [
"TAPS_SOCK",
]
},
"HOME": os.environ.get("HOME", os.getcwd()),
"PATH": PASSTHRU_PATH,
}
def preprocess_args(args_mut):
if not args_mut.app and args_mut.run:
args_mut.app = [*args_mut.run]
if not args_mut.vm:
args_mut.vm = args_mut.run[0]
keys = [k for k, v in args_mut._get_kwargs() if isinstance(v, str)]
for k in keys:
v = getattr(args_mut, k)
if "$HOME" in v:
setattr(args_mut, k, v.replace("$HOME", PASSTHRU_ENV["HOME"]))
for k in keys:
v = getattr(args_mut, k)
if "$VM" in v:
setattr(args_mut, k, v.replace("$VM", args.vm))
for k in keys:
v = getattr(args_mut, k)
if "$PREFIX" in v:
setattr(args_mut, k, v.replace("$PREFIX", args.prefix))
return args_mut
def alive_after(proc, timeout):
if proc is None:
return False
if proc.returncode is not None:
return False
try:
proc.wait(timeout)
except subprocess.TimeoutExpired:
return True
return False
class Processes:
def __init__(self, prefix, vm, check=True, **defaults):
self.prefix = prefix
self.vm = vm
self.check = check
self.defaults = defaults
self.processes = []
def make_env(self):
return {
**PASSTHRU_ENV,
"PATH": PASSTHRU_PATH,
"PREFIX": self.prefix,
"VM": self.vm,
}
def exec(self, *args, **kwargs):
kwargs["cwd"] = kwargs.get("cwd", self.prefix)
kwargs["check"] = kwargs.get("check", self.check)
kwargs["env"] = kwargs.get("env", self.make_env())
return subprocess.run([*args], **self.defaults, **kwargs)
def execline(self, *args, **kwargs):
return exec(
"execlineb",
"-c",
"\n".join(args),
**self.defaults,
executable=TOOLS_DIR + "/execlineb",
**{
"env": self.make_env(),
"check": self.check,
"cwd": self.prefix,
**kwargs,
},
)
@contextmanager
def popen(self, *args, **kwargs):
kwargs["pass_fds"] = kwargs.get("pass_fds", ())
kwargs["env"] = kwargs.get("env", self.make_env())
kwargs["cwd"] = kwargs.get("cwd", self.prefix)
kwargs["text"] = kwargs.get("text", True)
kwargs["stdin"] = kwargs.get("stdin", subprocess.DEVNULL)
kwargs["stdout"] = kwargs.get("stdout", subprocess.DEVNULL)
kwargs["stderr"] = kwargs.get("stderr", subprocess.DEVNULL)
proc = None
try:
proc = subprocess.Popen(
args,
**kwargs,
)
if not alive_after(proc, 0.125):
raise RuntimeError("Failed to start", args)
print(f"Started {args}")
self.processes.append(proc)
yield proc
print(f"Releasing {args}")
finally:
if subprocess.PIPE in (kwargs["stderr"], kwargs["stdout"]):
print(proc.communicate(timeout=0.125))
while alive_after(proc, 0.125):
try:
proc.terminate()
proc.wait()
except Exception as e:
print(f"Cleanup failing: {e}")
@contextmanager
def bwrap(
self,
*bwrap_args,
die_with_parent=True,
# Based on the args from
# `host/rootfs/image/usr/bin/run-vmm`
unshare_all=True,
uid=1000,
gid=100,
unshare_user=True,
unshare_ipc=None,
unshare_pid=None,
unshare_net=None,
unshare_uts=None,
unshare_cgroup_try=True,
bind=(),
dev_bind=(),
dev_bind_implicit=("/dev/kvm", "/dev/vfio"),
dev="/dev",
proc="/proc",
ro_bind_implicit=(
"/etc",
"/sys",
"/proc/sys",
"/dev/null",
"/proc/kallsyms",
*sorted(set([*TOOLS_CLOSURE, *BASE_SYSTEM_CLOSURE])),
),
ro_bind=(),
remount_ro=("/proc/fs", "/proc/irq"),
tmpfs_implicit=(
"/dev/shm",
"/tmp",
"/var/tmp",
"/proc/fs",
"/proc/irq",
),
tmpfs=(),
pass_fds=(2,),
**popen_kwargs,
):
bwrap_args_sock, remote = socket.socketpair()
remote.set_inheritable(True)
bwrap_args_f = bwrap_args_sock.makefile("w")
def print_arg(*args):
print(*args, file=bwrap_args_f, sep="\0", end="\0")
if unshare_all:
print_arg("--unshare-all")
if unshare_user:
print_arg("--unshare-user")
if uid is not None:
assert unshare_user
print_arg("--uid", uid)
if gid is not None:
assert unshare_user
print_arg("--gid", gid)
if unshare_ipc:
print_arg("--unshare-ipc")
if unshare_pid:
print_arg("--unshare-pid")
if unshare_net:
print_arg("--unshare-net")
elif unshare_net is False:
print_arg("--share-net")
if unshare_uts:
print_arg("--unshare-uts")
if unshare_cgroup_try:
print_arg("--unshare-cgroup-try")
if die_with_parent:
print_arg("--die-with-parent")
if dev:
print_arg("--dev", dev)
if proc:
print_arg("--proc", proc)
for p in bind:
assert isinstance(p, (str, tuple)), p
p1, p2 = (p, p) if isinstance(p, str) else p
print_arg("--bind", p1, p2)
for p in (*ro_bind, *ro_bind_implicit):
assert isinstance(p, (str, tuple)), p
p1, p2 = (p, p) if isinstance(p, str) else p
print_arg("--ro-bind", p1, p2)
for p in (*dev_bind, *dev_bind_implicit):
assert isinstance(p, (str, tuple)), p
p1, p2 = (p, p) if isinstance(p, str) else p
print_arg("--dev-bind", p1, p2)
for p in (*tmpfs, *tmpfs_implicit):
print_arg("--tmpfs", p)
# Hunch: order might matter...
for p in remount_ro:
print_arg("--remount-ro", p)
bwrap_args_f.flush()
try:
with ExitStack() as proc_es:
with ExitStack() as es:
es.enter_context(closing(remote))
es.enter_context(closing(bwrap_args_sock))
es.enter_context(closing(bwrap_args_f))
proc = proc_es.enter_context(
self.popen(
"bwrap",
"--args",
str(remote.fileno()),
*bwrap_args,
**popen_kwargs,
executable=BWRAP,
pass_fds=(*pass_fds, remote.fileno()),
)
)
yield proc
finally:
assert proc.returncode is not None, proc
@contextmanager
def run_ch(self):
# s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
# s.set_inheritable(True)
# s.setblocking(True)
# s.bind(self.prefix + "/vmm.sock")
args = [
SOCKETBINDER,
"-B",
self.prefix + "/vmm.sock",
# "@STRACE@", # noqa: E501
# "-Z",
# "-ff",
CH,
# "-v",
"--api-socket",
"fd=0",
# f"fd={s.fileno()}"
]
cleanup_paths = [
self.prefix + "/vmm.sock",
self.prefix + "/vmm.sock.lock",
self.prefix + "/vsock.sock",
]
new_paths = [p for p in cleanup_paths if not os.path.exists(p)]
old_paths = [p for p in cleanup_paths if p not in new_paths]
with ExitStack() as cleanup:
cleanup.enter_context(removing(*new_paths))
proc = cleanup.enter_context(
self.bwrap(
*args,
bind=[self.prefix],
# Probably just need the path to vmlinux
# ro_bind=["/nix/store"], # I give up
unshare_net=False,
shell=False,
stdout=None,
stderr=None,
# pass_fds=(s.fileno(),)
)
)
# s.close()
cleanup.enter_context(removing(*old_paths))
assert alive_after(proc, 1.0), proc
if not os.path.exists(self.prefix + "/vmm.sock"):
raise RuntimeError(
f"{self.prefix}/vmm.sock should exist by now",
)
if proc.returncode is not None:
raise RuntimeError("CH exited early")
yield proc
@contextmanager
def start_gpu(
self,
):
sock_path = self.prefix + "/gpu.sock"
args = [
"@CROSVM@", # noqa: E501
"--no-syslog",
"--log-level",
"debug",
"device",
"gpu",
"--socket-path",
sock_path,
"--wayland-sock",
f'{PASSTHRU_ENV["XDG_RUNTIME_DIR"]}/{PASSTHRU_ENV["WAYLAND_DISPLAY"]}', # noqa: E501
"--params",
'{ "context-types": "cross-domain:virgl2" }',
]
with self.popen(
*args,
stderr=None,
stdout=None,
) as proc, removing(sock_path, sock_path + ".lock"):
yield proc, sock_path
@contextmanager
def start_virtiofsd(
self,
root_dir,
tag,
ro=True,
subdirs=None,
extra_flags=("--posix-acl", "--xattr"),
):
assert os.path.exists(root_dir)
sock_path = self.prefix + f"/virtiofsd-{tag}.sock"
# s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
# NOTE: Nope. Virtiofsd actually expects a blocking socket
# s.setblocking(True)
# s.set_inheritable(True)
with ExitStack() as cleanup: # noqa: F841
# s.bind(sock_path.encode("utf8"))
# cleanup.enter_context(closing(s))
cleanup.enter_context(removing(sock_path, sock_path + ".pid"))
args = [
# If using bwrap():
# "--argv0", "virtiofsd",
# "--uid", "1000",
# "--gid", "100",
# "--",
"unshare",
"-rUm",
"unshare",
"--map-user",
"1000",
"--map-group",
"100",
VIRTIOFSD,
"--shared-dir",
root_dir,
"--tag",
tag,
# "--fd",
# str(s.fileno()),
"--socket-path",
sock_path,
# If relying on bwrap():
# "--sandbox",
# "none",
]
if ro:
args.append("--readonly")
kwargs = {
# If bwrap():
# "bind": [],
# ("ro_bind" if ro else "bind"):
# [*subdirs]
# if subdirs is not None
# else [root_dir],
# "pass_fds": (2, s.fileno()),
"stdout": subprocess.PIPE,
"stderr": subprocess.PIPE,
}
try:
with self.popen(*args, **kwargs) as p:
yield p, sock_path
finally:
if os.path.exists(sock_path):
os.remove(sock_path)
@contextmanager
def defer(f):
try:
yield
finally:
f()
@contextmanager
def removing(*paths):
try:
yield
finally:
for p in paths:
if os.path.exists(p):
os.remove(p)
@contextmanager
def connect_ch_vsock(
vsock_sock_path,
port: int,
type=socket.SOCK_STREAM,
blocking=True,
) -> socket.socket:
os.makedirs(os.path.dirname(vsock_sock_path), exist_ok=True)
s = socket.socket(socket.AF_UNIX, type, 0)
s.setblocking(blocking)
s.connect(vsock_sock_path)
with closing(s):
s.send(b"CONNECT %d\n" % port)
yield s
BYTES_PATTERN = re.compile(r"^([0-9]+)([MmGgKk]?)$")
BYTES_UNITS = {
"k": 1024,
"m": 1048576,
"g": 1024 * 1048576,
}
def parse_bytes(s):
m = BYTES_PATTERN.match(s)
assert m, s
size, unit = m.groups()
return int(size) * BYTES_UNITS.get(unit.lower(), 1)
@contextmanager
def listen_ch_vsock(
vsock_sock_path,
port: int,
type=socket.SOCK_STREAM,
blocking=True,
) -> socket.socket:
os.makedirs(os.path.dirname(vsock_sock_path), exist_ok=True)
listen_path = vsock_sock_path + "_%d" % port
s = socket.socket(socket.AF_UNIX, type, 0)
s.setblocking(blocking)
s.bind(listen_path)
s.listen()
with removing(listen_path):
yield s
def main(args, args_next, cleanup, ps):
send_dir = PASSTHRU_ENV["HOME"] + f"/send/{args.vm}"
os.makedirs(send_dir, exist_ok=True)
os.makedirs(args.prefix, exist_ok=True)
ch_remote = [
"ch-remote",
"--api-socket",
args.prefix + "/vmm.sock",
]
with open(args.vm_config) as f:
config = json.load(f)
app_paths = []
for a in args.app:
a = urlparse(a)
nix_file = "./."
attr = None
if a.scheme == "":
nix_file = "<nixpkgs>"
attr = a.path
else:
assert a.fragment, a
attr = a.fragment
nix_file = a.path or "./."
arglist = []
for k, v in parse_qs(a.query).items():
arglist.append("--arg")
arglist.append(k)
arglist.append(v)
assert nix_file is not None, a
assert attr is not None, a
out_path = ps.exec(
"nix-build",
nix_file,
"-A",
attr,
*arglist,
"--no-out-link",
cwd=os.getcwd(),
capture_output=True,
text=True,
).stdout.strip()
assert out_path.startswith("/nix/store/")
app_paths.append(out_path)
apps_closure = ps.exec( # noqa: F841
"nix-store",
"-qR",
*app_paths,
capture_output=True,
text=True,
).stdout.split()
ready_sock = cleanup.enter_context(
listen_ch_vsock(ps.prefix + "/vsock.sock", 8888),
)
virtiofs_socks = []
_, sock_path = cleanup.enter_context(
ps.start_virtiofsd(
send_dir,
tag="send",
ro=False,
)
)
virtiofs_socks.append(("send", sock_path))
_, sock_path = cleanup.enter_context(
ps.start_virtiofsd(
"/nix/store",
subdirs=apps_closure,
tag="apps",
)
)
virtiofs_socks.append(("apps", sock_path))
_, sock_path = cleanup.enter_context(
ps.start_virtiofsd(
"/nix/store",
subdirs=BASE_SYSTEM_CLOSURE,
tag="system",
)
)
virtiofs_socks.append(("system", sock_path))
if args.persist_home:
os.makedirs(args.prefix + "/home", exist_ok=True)
_, sock_path = cleanup.enter_context(
ps.start_virtiofsd(
args.prefix + "/home",
subdirs=BASE_SYSTEM_CLOSURE,
tag="home",
ro=False,
)
)
virtiofs_socks.append(("home", sock_path))
config["payload"]["cmdline"] += " uvms.persist-home=1"
if args.mem is not None:
config["memory"]["size"] = parse_bytes(args.mem)
config["memory"]["hotplug_size"] = parse_bytes(args.mem)
if "platform" not in config:
config["platform"] = {}
config["platform"]["oem_strings"] = [
"io.systemd.credential:vmm.notify_socket=vsock-stream:2:8888",
*config["platform"].get("oem_strings", []),
]
gpud, gpud_path = cleanup.enter_context(ps.start_gpu())
ch = cleanup.enter_context(ps.run_ch())
ps.exec(
*ch_remote,
"create",
input=json.dumps(config),
text=True,
)
ps.exec(
TAPS,
"pass",
*ch_remote,
"add-net",
"id=wan,fd=3,mac=00:00:00:00:00:01",
)
# TODO: add-fs apps closure separately
for tag, sock_path in virtiofs_socks:
ps.exec(*ch_remote, "add-fs", f"tag={tag},socket={sock_path},id={tag}")
ps.exec(*ch_remote, "add-gpu", f"socket={gpud_path}")
ps.exec(*ch_remote, "boot")
ps.exec(*ch_remote, "info")
ready = False
with ready_sock:
ready_sock.settimeout(8.0)
for _ in range(1048576):
if ready:
break
try:
con, _ = ready_sock.accept()
except: # noqa: E722
print(
"WARNING: CH didn't try connecting to the readiness notification socket" # noqa: E501
)
ready = True
break
else:
with con:
msg = con.recv(1024)
for ln in msg.split(b"\n"):
ln = ln.strip()
print(ln)
# if ln.startswith(b"X_SYSTEMD_UNIT_ACTIVE=uvms-guest.service"): # noqa: E501
if ln.startswith(b"READY=1"): # noqa: E501
ready = True
break
assert ready
cleanup.enter_context(removing(ps.prefix + "/vsock.sock"))
with connect_ch_vsock(ps.prefix + "/vsock.sock", 24601) as guest:
for r in args.run:
res = {}
for _ in range(1):
if "status" in res:
break
try:
guest.send(
json.dumps(
{
"run": {
"argv": [r],
"EXTRA_PATH": [
f"{a}/bin" for a in app_paths
], # noqa: E501
"EXTRA_XDG_DATA_DIRS": [
f"{a}/share" for a in app_paths
], # noqa: E501
}
}
).encode("utf8")
)
res = guest.recv(8192)
try:
res = json.loads(guest.recv(8192))
except json.JSONDecodeError as e:
print(
f"Couldn't interpret --run {r} response: {e} {res}"
) # noqa: E501
res = {}
# res = {"status": "failed"}
except Exception as e:
print(f"Couldn't --run {r}: {repr(e)}")
if "status" not in res:
res["status"] = "fail"
adverb = (
"Successfully"
if res["status"] == "exec succeeded"
else "Failed to" # noqa: E501
)
print(f"{adverb} --run {r}: {res}")
try:
ch.wait()
except KeyboardInterrupt:
pass
if __name__ == "__main__":
args, args_next = parser.parse_known_args()
preprocess_args(args)
ps = Processes(
prefix=args.prefix,
vm=args.vm,
)
try:
with ExitStack() as cleanup:
main(args, args_next, cleanup, ps)
finally:
for p in ps.processes:
if p.returncode is not None:
continue
try:
print(f"Cleanup failed. Re-trying the killing of {p}")
p.terminate()
except: # noqa: E722
pass
for p in ps.processes:
if p.returncode is not None:
continue
try:
p.wait()
except: # noqa: E722
pass