import json import os import select import socket import subprocess import sys class Processes: def __init__(self): self.processes = [] self.sources = [] self.liveness_fds = dict() self.client_fds = set() def popen(self, *args, **kwargs): a, b = socket.socketpair() pass_fds = [*kwargs.get("pass_fds", ()), b.fileno()] proc = subprocess.Popen(*args, **kwargs, pass_fds=pass_fds) self.processes.append(proc) self.sources.append(a) assert a.fileno() not in self.liveness_fds self.liveness_fds[a.fileno()] = proc b.close() return proc def handle_run(self, run: dict) -> dict: res = {} text = run.get("text", False) env = { **os.environ, "PATH": ":".join( [ *os.environ.get("PATH", "").split(":"), *run.get( "EXTRA_PATH", [], ), ], ), } proc = None try: proc = self.popen( req["run"]["argv"], text=text, env=env, cwd="/home/user", stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) res["status"] = "exec succeeded" except Exception as e: print(e) res["status"] = "exec failed" res["exception"] = repr(e) res["pid"] = getattr(proc, "pid", None) try: if proc is not None: proc.wait(0.125) res["long_running"] = False res["returncode"] = getattr(proc, "returncode", None) except subprocess.TimeoutExpired: res["long_running"] = True return res, proc def accept_vsock(self, s): con, (cid, port) = serv.accept() assert cid == 2, cid self.sources.append(con) self.client_fds.insert(con.fileno()) return con, (cid, port) if __name__ == "__main__": ps = Processes() serv = socket.fromfd(3, socket.AF_VSOCK, socket.SOCK_STREAM) ps.sources.append(serv) while True: rr, rw, xs = select.select(ps.sources, [], ps.sources) for con in (*rr, *xs): if con.fileno() in ps.liveness_fds: assert con.recv(128) == b"" proc = ps.liveness_fds[con.fileno()] proc.wait() assert proc.returncode is not None, proc print(f"{proc} has terminated, shutting down") sys.exit(proc.returncode) for con in rr: if con is serv: con, _ = ps.accept_vsock(serv) print(f"Open [{con.fileno()}]") if con.fileno() in ps.liveness_fds: assert False, "Must already be processed" elif con.fileno() in ps.client_fds: req = con.recv(8192) # IDK why but I keep getting empty messages if req == b"": print(f"Lost [{con.fileno()}]") continue try: req = json.loads(req) print(f"Received {req=}") except json.JSONDecodeError as e: print(f"Couldn't interpret {req=}: {e}") continue if "run" in req: res, proc = ps.handle_run(req["run"]) else: res = {"status": "unknown command"} _, rw, _ = select.select([], [con], []) assert rw, rw res = json.dumps(res).encode("utf8") print(f"Responding with {res=}") con.send(res) else: assert False, con.fileno()