ch-runner: move more logic out of elb
This commit is contained in:
parent
0617d97ebf
commit
97f2ba4c66
1 changed files with 131 additions and 171 deletions
|
|
@ -272,92 +272,85 @@ in
|
|||
# NOTE: Used to be an even uglier bash script, but, for now, execline makes for easier comparisons against spectrum
|
||||
uvms.cloud-hypervisor.runner =
|
||||
let
|
||||
addProcess = getExe addProcess';
|
||||
addProcess' = pkgs.writers.writePython3Bin "add-process" { } ''
|
||||
superviseVm = getExe superviseVm';
|
||||
superviseVm' = pkgs.writers.writePython3Bin "supervise-vm" { } ''
|
||||
import os
|
||||
import select
|
||||
import socket
|
||||
import subprocess
|
||||
import sys
|
||||
from argparse import ArgumentParser
|
||||
from contextlib import contextmanager, ExitStack
|
||||
from threading import Thread, Semaphore
|
||||
|
||||
|
||||
parser = ArgumentParser()
|
||||
parser.add_argument("events_path")
|
||||
parser.add_argument("--then", action="append")
|
||||
parser = ArgumentParser("supervise-vm")
|
||||
parser.add_argument("--vm")
|
||||
parser.add_argument("--prefix", default="$HOME/uvms/$VM")
|
||||
parser.add_argument("--sock", default="$PREFIX/supervisor.sock")
|
||||
parser.add_argument("--vm-config")
|
||||
|
||||
MSG_SIZE = 16
|
||||
SHMEM = {}
|
||||
ELB_DIR = "${lib.getBin pkgs.execline}/bin" # noqa: E501
|
||||
S6_DIR = "${lib.getBin pkgs.s6}/bin" # noqa: E501
|
||||
CH_DIR = "${lib.getBin package}/bin" # noqa: E501
|
||||
SOCKETBINDER_PATH = S6_DIR + "/s6-ipcserver-socketbinder" # noqa: E501
|
||||
CH_PATH = CH_DIR + "/cloud-hypervisor"
|
||||
CHR_PATH = CH_DIR + "/ch-remote"
|
||||
TAPS_PATH = "${lib.getExe uvmsPkgs.taps}" # noqa: E501
|
||||
|
||||
PASSTHRU_PATH = ":".join([ELB_DIR, S6_DIR, CH_DIR])
|
||||
PASSTHRU_ENV = {
|
||||
**{
|
||||
k: v
|
||||
for k, v in os.environ.items()
|
||||
if k.startswith("RUST")
|
||||
or k.startswith("WAYLAND")
|
||||
or k in [
|
||||
"TAPS_SOCK",
|
||||
]
|
||||
},
|
||||
"HOME": os.environ.get("HOME", os.getcwd()),
|
||||
"PATH": PASSTHRU_PATH,
|
||||
}
|
||||
|
||||
|
||||
def send(sock, msg):
|
||||
assert len(msg) <= MSG_SIZE, len(msg)
|
||||
return sock.send(msg.ljust(MSG_SIZE))
|
||||
def configure_execline(prefix, vm, check=True, **defaults):
|
||||
def execline(*args, check=check, **kwargs):
|
||||
return subprocess.run(
|
||||
["execlineb", "-c", "\n".join(args)],
|
||||
**defaults,
|
||||
executable=ELB_DIR + "/execlineb",
|
||||
env={
|
||||
**PASSTHRU_ENV,
|
||||
"PATH": PASSTHRU_PATH,
|
||||
"PREFIX": prefix,
|
||||
"VM": vm,
|
||||
},
|
||||
check=check,
|
||||
cwd=prefix,
|
||||
**kwargs)
|
||||
return execline
|
||||
|
||||
|
||||
def recv(sock):
|
||||
msg = sock.recv(MSG_SIZE)
|
||||
# assert len(msg) <= MSG_SIZE, len(msg)
|
||||
assert len(msg) <= MSG_SIZE, len(msg)
|
||||
return (msg.split() + [b""])[0]
|
||||
|
||||
|
||||
def serve_impl(events_path, listener):
|
||||
SHMEM["server"] = True
|
||||
|
||||
cons = []
|
||||
state = "up"
|
||||
while state == "up" or cons != []:
|
||||
if state == "up":
|
||||
rs, ws, es = select.select([listener, *cons], [], [])
|
||||
else:
|
||||
rs, ws, es = select.select(cons, cons, [])
|
||||
events = []
|
||||
for r in rs:
|
||||
if r is listener:
|
||||
r, _ = r.accept()
|
||||
cons.append(r)
|
||||
else:
|
||||
events.append(recv(r))
|
||||
if any(e == b"killall" for e in events):
|
||||
state = "down"
|
||||
if state == "down":
|
||||
for w in ws:
|
||||
with s_lock:
|
||||
send(w, b"die")
|
||||
w.close()
|
||||
cons.remove(w)
|
||||
for w in es:
|
||||
w.close()
|
||||
cons.remove(w)
|
||||
|
||||
|
||||
def serve(events_path):
|
||||
base_dir = os.path.dirname(events_path)
|
||||
if base_dir:
|
||||
os.makedirs(base_dir, exist_ok=True)
|
||||
listener = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
|
||||
listener.setblocking(False)
|
||||
|
||||
try:
|
||||
listener.bind(events_path)
|
||||
listener.listen()
|
||||
return serve_impl(events_path, listener)
|
||||
except OSError as e:
|
||||
EADDRINUSE = 98
|
||||
if e.errno != EADDRINUSE:
|
||||
raise
|
||||
finally:
|
||||
listener.close()
|
||||
os.remove(events_path)
|
||||
|
||||
|
||||
def register(events_path):
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
|
||||
sock.connect(events_path)
|
||||
return sock
|
||||
def preprocess_args(args_mut):
|
||||
keys = [
|
||||
k
|
||||
for k, v
|
||||
in args_mut._get_kwargs()
|
||||
if isinstance(v, str)]
|
||||
for k in keys:
|
||||
v = getattr(args_mut, k)
|
||||
if "$HOME" in v:
|
||||
setattr(
|
||||
args_mut,
|
||||
k,
|
||||
v.replace("$HOME", PASSTHRU_ENV["HOME"]))
|
||||
for k in keys:
|
||||
v = getattr(args_mut, k)
|
||||
if "$VM" in v:
|
||||
setattr(args_mut, k, v.replace("$VM", args.vm))
|
||||
for k in keys:
|
||||
v = getattr(args_mut, k)
|
||||
if "$PREFIX" in v:
|
||||
setattr(args_mut, k, v.replace("$PREFIX", args.prefix))
|
||||
return args_mut
|
||||
|
||||
|
||||
@contextmanager
|
||||
|
|
@ -368,110 +361,77 @@ in
|
|||
f()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def run_ch(vm_prefix):
|
||||
args = [
|
||||
SOCKETBINDER_PATH,
|
||||
"-B",
|
||||
vm_prefix + "/vmm.sock",
|
||||
CH_PATH,
|
||||
"--api-socket",
|
||||
"fd=0",
|
||||
]
|
||||
p = subprocess.Popen(
|
||||
args,
|
||||
shell=False)
|
||||
try:
|
||||
p.wait(1.0)
|
||||
needs_cleanup = False
|
||||
except subprocess.TimeoutExpired:
|
||||
needs_cleanup = True
|
||||
if not os.path.exists(vm_prefix + "/vmm.sock"):
|
||||
raise RuntimeError(f"{vm_prefix}/vmm.sock should exist by now")
|
||||
if p.returncode is not None:
|
||||
raise RuntimeError("CH exited early")
|
||||
try:
|
||||
yield p
|
||||
finally:
|
||||
try:
|
||||
p.poll()
|
||||
except: # noqa: E722
|
||||
pass
|
||||
if p.returncode is None:
|
||||
p.terminate() # CH handles SIG{INT,TERM}?
|
||||
p.wait()
|
||||
unlink_paths = [
|
||||
vm_prefix + "/vmm.sock",
|
||||
vm_prefix + "/vmm.sock.lock",
|
||||
vm_prefix + "/vsock.sock",
|
||||
] if needs_cleanup else []
|
||||
for p in unlink_paths:
|
||||
if os.path.exists(p):
|
||||
os.remove(p)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args, args_next = parser.parse_known_args()
|
||||
preprocess_args(args)
|
||||
|
||||
os.makedirs(args.prefix, exist_ok=True)
|
||||
execline = configure_execline(
|
||||
prefix=args.prefix,
|
||||
vm=args.vm)
|
||||
|
||||
ch_remote = [
|
||||
"ch-remote",
|
||||
"--api-socket",
|
||||
args.prefix + "/vmm.sock",
|
||||
]
|
||||
|
||||
with ExitStack() as cleanup:
|
||||
if args_next:
|
||||
p = subprocess.Popen(
|
||||
args_next,
|
||||
shell=False)
|
||||
then_cmds = reversed(getattr(args, "then") or [])
|
||||
if not args_next:
|
||||
then_cmds = []
|
||||
try:
|
||||
p.wait(0.5)
|
||||
then_cmds = []
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
for f in then_cmds:
|
||||
def run_f():
|
||||
subprocess.run(f)
|
||||
cleanup.enter_context(defer(run_f))
|
||||
|
||||
maybe_server = Thread(
|
||||
target=serve,
|
||||
args=(args.events_path,),
|
||||
daemon=True)
|
||||
maybe_server.start()
|
||||
maybe_server.join(0.5)
|
||||
|
||||
assert (
|
||||
("server" in SHMEM) == bool(maybe_server.is_alive)
|
||||
), (SHMEM, maybe_server)
|
||||
|
||||
if args_next:
|
||||
s = register(args.events_path)
|
||||
s_lock = Semaphore()
|
||||
|
||||
if args_next:
|
||||
def watch_p(p, s):
|
||||
p.wait()
|
||||
with s_lock:
|
||||
try:
|
||||
send(s, b"killall")
|
||||
except BrokenPipeError:
|
||||
pass
|
||||
|
||||
def watch_s(p, s):
|
||||
while True:
|
||||
if recv(s) == b"die":
|
||||
p.terminate()
|
||||
break
|
||||
|
||||
s_watcher = Thread(
|
||||
target=watch_s,
|
||||
args=(p, s),
|
||||
daemon=True)
|
||||
s_watcher.start()
|
||||
watch_p(p, s)
|
||||
s_watcher.join()
|
||||
s.close()
|
||||
|
||||
if SHMEM.get("server", False):
|
||||
maybe_server.join()
|
||||
|
||||
exit_code = 0
|
||||
if args_next:
|
||||
exit_code |= p.returncode
|
||||
sys.exit(exit_code)
|
||||
ch = cleanup.enter_context(run_ch(args.prefix))
|
||||
execline(*ch_remote, "create", args.vm_config)
|
||||
execline(
|
||||
TAPS_PATH, "pass",
|
||||
*ch_remote, "add-net",
|
||||
"id=wan,fd=3,mac=00:00:00:00:00:01")
|
||||
execline(*ch_remote, "boot")
|
||||
execline(*ch_remote, "info")
|
||||
ch.wait()
|
||||
'';
|
||||
ch = getExe package;
|
||||
chr = getExe' package "ch-remote";
|
||||
in
|
||||
writeElb "run-${hostName}" ''
|
||||
importas -i HOME HOME
|
||||
importas -SsD "${chr} --api-socket=${vmmSock}" CHR
|
||||
importas -SsD "${uvmPrefix}" PREFIX
|
||||
define EVENTS ''${PREFIX}/events.sock
|
||||
define -s ADD_PROC "${addProcess} ''${EVENTS}"
|
||||
|
||||
cd $PREFIX
|
||||
background {
|
||||
$ADD_PROC --then ${getExe (
|
||||
writeElb "rm-vmmsock" ''
|
||||
importas -i HOME HOME
|
||||
rm -f ${vmmSock}
|
||||
rm -f ${uvmPrefix}/vsock.sock
|
||||
''
|
||||
)} ${getExe (
|
||||
writeElb "ch" ''
|
||||
importas -Si 1
|
||||
importas -Si 2
|
||||
s6-ipcserver-socketbinder -B $1
|
||||
exec -a "uuvm/''${2} cloud-hypervisor" ${ch} --api-socket fd=0
|
||||
''
|
||||
)} ${vmmSock} ${hostName}
|
||||
}
|
||||
foreground { sleep 0.1 }
|
||||
ifelse -n { test -S ${vmmSock} } { echo "Apparently ${vmmSock} does not exist" }
|
||||
foreground { echo "Loading the configuration" }
|
||||
if { $CHR create ${chSettingsFile} }
|
||||
foreground { echo "Adding TAP" }
|
||||
if { ${lib.getExe uvmsPkgs.taps} pass $CHR add-net "id=wan,fd=3,mac=00:00:00:00:00:01" }
|
||||
foreground { echo "Booting" }
|
||||
if { $CHR boot }
|
||||
if { $CHR info }
|
||||
${superviseVm} --vm-config=${chSettingsFile} --vm=${hostName}
|
||||
'';
|
||||
}
|
||||
(lib.mkIf cfg.enable {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue