train Walk and tackle the memory leak issue.

This commit is contained in:
xxh
2026-04-08 05:59:16 -04:00
parent 28e7eb0692
commit 87e5c6d931
5 changed files with 282 additions and 323 deletions

View File

@@ -1,9 +1,14 @@
import subprocess
import os
import time
import threading
class Server():
WATCHDOG_ENABLED = True
WATCHDOG_INTERVAL_SEC = 30.0
WATCHDOG_RSS_MB_LIMIT = 2000.0
def __init__(self, first_server_p, first_monitor_p, n_servers, no_render=True, no_realtime=True) -> None:
try:
import psutil
@@ -14,6 +19,10 @@ class Server():
self.first_server_p = first_server_p
self.n_servers = n_servers
self.rcss_processes = []
self._server_specs = []
self._watchdog_stop = threading.Event()
self._watchdog_lock = threading.Lock()
self._watchdog_thread = None
first_monitor_p = first_monitor_p + 100
# makes it easier to kill test servers without affecting train servers
@@ -23,26 +32,79 @@ class Server():
for i in range(n_servers):
port = first_server_p + i
mport = first_monitor_p + i
self._server_specs.append((port, mport, cmd, render_arg, realtime_arg))
proc = self._spawn_server(port, mport, cmd, render_arg, realtime_arg)
self.rcss_processes.append(proc)
server_cmd = f"{cmd} -c {port} -m {mport} {render_arg} {realtime_arg}".strip()
if self.WATCHDOG_ENABLED:
self._watchdog_thread = threading.Thread(target=self._watchdog_loop, daemon=True)
self._watchdog_thread.start()
proc = subprocess.Popen(
server_cmd.split(),
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
start_new_session=True
def _spawn_server(self, port, mport, cmd, render_arg, realtime_arg):
server_cmd = f"{cmd} -c {port} -m {mport} {render_arg} {realtime_arg}".strip()
proc = subprocess.Popen(
server_cmd.split(),
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
start_new_session=True
)
# Avoid startup storm when launching many servers at once.
time.sleep(0.03)
rc = proc.poll()
if rc is not None:
raise RuntimeError(
f"rcssservermj exited early (code={rc}) on server port {port}, monitor port {mport}"
)
# Avoid startup storm when launching many servers at once.
time.sleep(0.03)
return proc
rc = proc.poll()
if rc is not None:
raise RuntimeError(
f"rcssservermj exited early (code={rc}) on server port {port}, monitor port {mport}"
)
@staticmethod
def _pid_rss_mb(pid):
try:
with open(f"/proc/{pid}/status", "r", encoding="utf-8") as f:
for line in f:
if line.startswith("VmRSS:"):
parts = line.split()
if len(parts) >= 2:
# VmRSS is kB
return float(parts[1]) / 1024.0
except (FileNotFoundError, ProcessLookupError, PermissionError, OSError):
return 0.0
return 0.0
self.rcss_processes.append(proc)
def _restart_server_at_index(self, idx, reason):
port, mport, cmd, render_arg, realtime_arg = self._server_specs[idx]
old_proc = self.rcss_processes[idx]
try:
old_proc.terminate()
old_proc.wait(timeout=1.0)
except Exception:
try:
old_proc.kill()
except Exception:
pass
new_proc = self._spawn_server(port, mport, cmd, render_arg, realtime_arg)
self.rcss_processes[idx] = new_proc
print(
f"[ServerWatchdog] Restarted server idx={idx} port={port} monitor={mport} reason={reason}"
)
def _watchdog_loop(self):
while not self._watchdog_stop.wait(self.WATCHDOG_INTERVAL_SEC):
with self._watchdog_lock:
for i, proc in enumerate(self.rcss_processes):
rc = proc.poll()
if rc is not None:
self._restart_server_at_index(i, f"exited:{rc}")
continue
rss_mb = self._pid_rss_mb(proc.pid)
if rss_mb > self.WATCHDOG_RSS_MB_LIMIT:
self._restart_server_at_index(i, f"rss_mb:{rss_mb:.1f}")
def check_running_servers(self, psutil, first_server_p, first_monitor_p, n_servers):
''' Check if any server is running on chosen ports '''
@@ -78,6 +140,9 @@ class Server():
return
def kill(self):
self._watchdog_stop.set()
if self._watchdog_thread is not None:
self._watchdog_thread.join(timeout=1.0)
for p in self.rcss_processes:
p.kill()
print(f"Killed {self.n_servers} rcssservermj processes starting at {self.first_server_p}")