From aa4a5ad224fe90302a895aaca0362ddf8471a4b4 Mon Sep 17 00:00:00 2001 From: glide-the <2533736852@qq.com> Date: Mon, 4 Sep 2023 22:46:46 +0800 Subject: [PATCH] CTRL-C exit with Python multiprocessing --- startup.py | 67 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 52 insertions(+), 15 deletions(-) diff --git a/startup.py b/startup.py index ecb722c..9766480 100644 --- a/startup.py +++ b/startup.py @@ -1,9 +1,9 @@ -from multiprocessing import Process, Queue -import multiprocessing as mp -import subprocess import asyncio -import sys +import multiprocessing as mp import os +import subprocess +import sys +from multiprocessing import Process, Queue from pprint import pprint # 设置numexpr最大线程数,默认为CPU核心数 @@ -501,12 +501,22 @@ def dump_server_info(after_start=False, args=None): async def start_main_server(): import time + import signal + + def handler(signalname): + """ + Python 3.9 has `signal.strsignal(signalnum)` so this closure would not be needed. + Also, 3.8 includes `signal.valid_signals()` that can be used to create a mapping for the same purpose. + """ + def f(signal_received, frame): + raise KeyboardInterrupt(f"{signalname} received") + return f + + # This will be inherited by the child process if it is forked (not spawned) + signal.signal(signal.SIGINT, handler("SIGINT")) + signal.signal(signal.SIGTERM, handler("SIGTERM")) mp.set_start_method("spawn") - # TODO 链式启动的队列,确实可以用于控制启动顺序, - # 但目前引入proxy_worker后,启动的独立于框架的work processes无法确认当前的位置, - # 导致注册器未启动时,无法注册。整个启动链因为异常被终止 - # 使用await asyncio.sleep(3)可以让后续代码等待一段时间,但不是最优解 queue = Queue() args, parser = parse_args() @@ -556,6 +566,7 @@ async def start_main_server(): daemon=True, ) process.start() + # 使用await asyncio.sleep(3)可以让后续代码等待一段时间 await asyncio.sleep(3) processes["controller"] = process @@ -631,15 +642,41 @@ async def start_main_server(): for process in processes.pop("online-api", []): process.join() for name, process in processes.items(): - process.join() + if isinstance(p, list): + for work_process in p: + work_process.join() + else: + process.join() except: - if model_worker_process := processes.pop("model_worker", None): - model_worker_process.terminate() - for process in processes.pop("online-api", []): - process.terminate() - for name, process in processes.items(): - process.terminate() + # if model_worker_process := processes.pop("model_worker", None): + # model_worker_process.terminate() + # for process in processes.pop("online-api", []): + # process.terminate() + # for name, process in processes.items(): + # process.terminate() + logger.warning("Caught KeyboardInterrupt! Setting stop event...") + finally: + # Send SIGINT if process doesn't exit quickly enough, and kill it as last resort + # .is_alive() also implicitly joins the process (good practice in linux) + # while alive_procs := [p for p in processes.values() if p.is_alive()]: + for p in processes.values(): + logger.info("Process status: %s", p) + for p in processes.values(): + logger.warning("Sending SIGKILL to %s", p) + # Queues and other inter-process communication primitives can break when + # process is killed, but we don't care here + + if isinstance(p, list): + for process in p: + process.kill() + + else: + p.kill() + + sleep(.01) + for p in processes.values(): + logger.info("Process status: %s", p) if __name__ == "__main__":