From c880412300541c48f4d786fded05cd3889a9e02f Mon Sep 17 00:00:00 2001 From: liunux4odoo Date: Sat, 29 Jul 2023 23:01:24 +0800 Subject: [PATCH] use multiprocessing to run fastchat server --- server/llm_api.py | 219 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 190 insertions(+), 29 deletions(-) diff --git a/server/llm_api.py b/server/llm_api.py index e364049..a67076e 100644 --- a/server/llm_api.py +++ b/server/llm_api.py @@ -1,17 +1,163 @@ -import subprocess +from multiprocessing import Process, Queue import sys import os sys.path.append(os.path.dirname(os.path.dirname(__file__))) -from configs.model_config import llm_model_dict, LLM_MODEL, LOG_PATH, logger - - -def execute_command(command): - process = subprocess.Popen(command, shell=True) - return process.pid +from configs.model_config import llm_model_dict, LLM_MODEL, LLM_DEVICE, LOG_PATH, logger +import asyncio host_ip = "0.0.0.0" -port = 8887 +controller_port = 20001 +model_worker_port = 20002 +openai_api_port = 8888 +base_url = "http://127.0.0.1:{}" +queue = Queue() + + +def set_httpx_timeout(timeout=60.0): + import httpx + httpx._config.DEFAULT_TIMEOUT_CONFIG.connect = timeout + httpx._config.DEFAULT_TIMEOUT_CONFIG.read = timeout + httpx._config.DEFAULT_TIMEOUT_CONFIG.write = timeout + + +def create_controller_app( + dispatch_method="shortest_queue", +): + from fastchat.serve.controller import app, Controller + from loguru import logger + logger.add(os.path.join(LOG_PATH, "controller.log"), level="INFO") + + controller = Controller(dispatch_method) + sys.modules["fastchat.serve.controller"].controller = controller + sys.modules["fastchat.serve.controller"].logger = logger + logger.info(f"controller dispatch method: {dispatch_method}") + return app + + +def create_model_worker_app( + model_path=llm_model_dict[LLM_MODEL].get("local_model_path"), + model_names=[LLM_MODEL], + device=LLM_DEVICE, + load_8bit=False, + gptq_ckpt=None, + gptq_wbits=16, + gpus=None, + num_gpus=1, + max_gpu_memory=None, + cpu_offloading=None, + worker_address=base_url.format(model_worker_port), + controller_address=base_url.format(controller_port), + limit_model_concurrency=5, + stream_interval=2, + no_register=False, +): + from fastchat.serve.model_worker import app, GptqConfig, ModelWorker, worker_id + from fastchat.serve import model_worker + from loguru import logger + logger.add(os.path.join(LOG_PATH, "model_worker.log"), level="INFO") + + if gpus and num_gpus is None: + num_gpus = len(gpus.split(',')) + gptq_config = GptqConfig( + ckpt=gptq_ckpt or model_path, + wbits=gptq_wbits, + groupsize=-1, + act_order=None, + ) + worker = ModelWorker( + controller_address, + worker_address, + worker_id, + no_register, + model_path, + model_names, + device, + num_gpus, + max_gpu_memory, + load_8bit, + cpu_offloading, + gptq_config, + ) + sys.modules["fastchat.serve.model_worker"].worker = worker + sys.modules["fastchat.serve.model_worker"].gptq_config = gptq_config + sys.modules["fastchat.serve.model_worker"].logger = logger + return app + + +def create_openai_api_app( + host=host_ip, + port=openai_api_port, + controller_address=base_url.format(controller_port), + api_keys=[], +): + from fastchat.serve.openai_api_server import app, CORSMiddleware, app_settings + from loguru import logger + logger.add(os.path.join(LOG_PATH, "openai_api.log"), level="INFO") + + app.add_middleware( + CORSMiddleware, + allow_credentials=True, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], + ) + + app_settings.controller_address = controller_address + app_settings.api_keys = api_keys + sys.modules["fastchat.serve.openai_api_server"].logger = logger + + return app + + +def run_controller(q): + import uvicorn + app = create_controller_app() + + @app.on_event("startup") + async def on_startup(): + set_httpx_timeout() + q.put(1) + + uvicorn.run(app, host=host_ip, port=controller_port) + + +def run_model_worker(q): + import uvicorn + app = create_model_worker_app() + + @app.on_event("startup") + async def on_startup(): + set_httpx_timeout() + while True: + no = q.get() + if no != 1: + q.put(no) + else: + break + q.put(2) + + uvicorn.run(app, host=host_ip, port=model_worker_port) + + +def run_openai_api(q): + import uvicorn + app = create_openai_api_app() + + @app.on_event("startup") + async def on_startup(): + set_httpx_timeout() + while True: + no = q.get() + if no != 2: + q.put(no) + else: + break + q.put(3) + + uvicorn.run(app, host=host_ip, port=openai_api_port) + + # 1. llm_model_dict精简; # 2. 不同任务的日志还是分开; @@ -24,31 +170,46 @@ port = 8887 # 6. requirements指定fastchat版本号。 -print(llm_model_dict[LLM_MODEL]) -model_path = llm_model_dict[LLM_MODEL]["local_model_path"] -if not model_path: - logger.error("local_model_path 不能为空") -else: - # 启动任务 - command1 = f'nohup python -m fastchat.serve.controller >> {LOG_PATH}/controller_log.txt 2>&1 &' - process1 = execute_command(command1) - logger.info(f"已执行 {command1}") - logger.info(f"Process 1 started with PID: {process1}") +if __name__ == "__main__": + logger.info(llm_model_dict[LLM_MODEL]) + model_path = llm_model_dict[LLM_MODEL]["local_model_path"] + model_path = "d:\\chatglm\\models\\chatglm-6b" - command2 = f'nohup python -m fastchat.serve.model_worker --model-path "{model_path}" --device cuda >> {LOG_PATH}/worker_log.txt 2>&1 &' - process2 = execute_command(command2) - logger.info(f"已执行 {command2}") - logger.info(f"Process 2 started with PID: {process2}") - command3 = f'nohup python -m fastchat.serve.openai_api_server --host "{host_ip}" --port {port} >> {LOG_PATH}/api_log.txt 2>&1 &' - process3 = execute_command(command3) - logger.info(f"已执行 {command3}") - logger.info(f"Process 3 started with PID: {process3}") - - # TODO: model_worker.log 与 controller.log 存储位置未指定为 LOG_PATH --> (hzg0601)model_worker.py,controller.py自行指定的文件写入路径, - # TODO(hzg0601): -->而且是写死的,如果想修改路径必须修改fastchat的代码 logger.info(f"如需查看 llm_api 日志,请前往 {LOG_PATH}") + if not model_path: + logger.error("local_model_path 不能为空") + else: + controller_process = Process( + target=run_controller, + name=f"controller({os.getpid()})", + args=(queue,), + daemon=True, + ) + controller_process.start() + + model_worker_process = Process( + target=run_model_worker, + name=f"model_worker({os.getpid()})", + args=(queue,), + daemon=True, + ) + model_worker_process.start() + + openai_api_process = Process( + target=run_openai_api, + name=f"openai_api({os.getpid()})", + args=(queue,), + daemon=True, + ) + openai_api_process.start() + + controller_process.join() + model_worker_process.join() + openai_api_process.join() + + # 服务启动后接口调用示例: # import openai # openai.api_key = "EMPTY" # Not support yet