CTRL-C exit with Python multiprocessing

This commit is contained in:
glide-the 2023-09-04 22:46:46 +08:00
parent df23349584
commit aa4a5ad224
1 changed files with 52 additions and 15 deletions

View File

@ -1,9 +1,9 @@
from multiprocessing import Process, Queue
import multiprocessing as mp
import subprocess
import asyncio
import sys
import multiprocessing as mp
import os
import subprocess
import sys
from multiprocessing import Process, Queue
from pprint import pprint
# 设置numexpr最大线程数默认为CPU核心数
@ -501,12 +501,22 @@ def dump_server_info(after_start=False, args=None):
async def start_main_server():
import time
import signal
def handler(signalname):
"""
Python 3.9 has `signal.strsignal(signalnum)` so this closure would not be needed.
Also, 3.8 includes `signal.valid_signals()` that can be used to create a mapping for the same purpose.
"""
def f(signal_received, frame):
raise KeyboardInterrupt(f"{signalname} received")
return f
# This will be inherited by the child process if it is forked (not spawned)
signal.signal(signal.SIGINT, handler("SIGINT"))
signal.signal(signal.SIGTERM, handler("SIGTERM"))
mp.set_start_method("spawn")
# TODO 链式启动的队列,确实可以用于控制启动顺序,
# 但目前引入proxy_worker后启动的独立于框架的work processes无法确认当前的位置
# 导致注册器未启动时,无法注册。整个启动链因为异常被终止
# 使用await asyncio.sleep(3)可以让后续代码等待一段时间,但不是最优解
queue = Queue()
args, parser = parse_args()
@ -556,6 +566,7 @@ async def start_main_server():
daemon=True,
)
process.start()
# 使用await asyncio.sleep(3)可以让后续代码等待一段时间
await asyncio.sleep(3)
processes["controller"] = process
@ -631,15 +642,41 @@ async def start_main_server():
for process in processes.pop("online-api", []):
process.join()
for name, process in processes.items():
process.join()
if isinstance(p, list):
for work_process in p:
work_process.join()
else:
process.join()
except:
if model_worker_process := processes.pop("model_worker", None):
model_worker_process.terminate()
for process in processes.pop("online-api", []):
process.terminate()
for name, process in processes.items():
process.terminate()
# if model_worker_process := processes.pop("model_worker", None):
# model_worker_process.terminate()
# for process in processes.pop("online-api", []):
# process.terminate()
# for name, process in processes.items():
# process.terminate()
logger.warning("Caught KeyboardInterrupt! Setting stop event...")
finally:
# Send SIGINT if process doesn't exit quickly enough, and kill it as last resort
# .is_alive() also implicitly joins the process (good practice in linux)
# while alive_procs := [p for p in processes.values() if p.is_alive()]:
for p in processes.values():
logger.info("Process status: %s", p)
for p in processes.values():
logger.warning("Sending SIGKILL to %s", p)
# Queues and other inter-process communication primitives can break when
# process is killed, but we don't care here
if isinstance(p, list):
for process in p:
process.kill()
else:
p.kill()
sleep(.01)
for p in processes.values():
logger.info("Process status: %s", p)
if __name__ == "__main__":