from fastapi import Body, Request from fastapi.responses import StreamingResponse from configs import (LLM_MODEL, PROMPT_TEMPLATE, VECTOR_SEARCH_TOP_K, SCORE_THRESHOLD, TEMPERATURE) from server.chat.utils import wrap_done, get_ChatOpenAI from server.utils import BaseResponse from langchain import LLMChain from langchain.callbacks import AsyncIteratorCallbackHandler from typing import AsyncIterable, List, Optional import asyncio from langchain.prompts.chat import ChatPromptTemplate from server.chat.utils import History from server.knowledge_base.kb_service.base import KBService, KBServiceFactory import json import os from urllib.parse import urlencode from server.knowledge_base.kb_doc_api import search_docs async def knowledge_base_chat(query: str = Body(..., description="用户输入", examples=["你好"]), knowledge_base_name: str = Body(..., description="知识库名称", examples=["samples"]), top_k: int = Body(VECTOR_SEARCH_TOP_K, description="匹配向量数"), score_threshold: float = Body(SCORE_THRESHOLD, description="知识库匹配相关度阈值,取值范围在0-1之间,SCORE越小,相关度越高,取到1相当于不筛选,建议设置在0.5左右", ge=0, le=1), history: List[History] = Body([], description="历史对话", examples=[[ {"role": "user", "content": "我们来玩成语接龙,我先来,生龙活虎"}, {"role": "assistant", "content": "虎头虎脑"}]] ), stream: bool = Body(False, description="流式输出"), model_name: str = Body(LLM_MODEL, description="LLM 模型名称。"), temperature: float = Body(TEMPERATURE, description="LLM 采样温度", gt=0.0, le=1.0), local_doc_url: bool = Body(False, description="知识文件返回本地路径(true)或URL(false)"), request: Request = None, ): kb = KBServiceFactory.get_service_by_name(knowledge_base_name) if kb is None: return BaseResponse(code=404, msg=f"未找到知识库 {knowledge_base_name}") history = [History.from_data(h) for h in history] async def knowledge_base_chat_iterator(query: str, kb: KBService, top_k: int, history: Optional[List[History]], model_name: str = LLM_MODEL, ) -> AsyncIterable[str]: callback = AsyncIteratorCallbackHandler() model = get_ChatOpenAI( model_name=model_name, temperature=temperature, callbacks=[callback], ) docs = search_docs(query, knowledge_base_name, top_k, score_threshold) context = "\n".join([doc.page_content for doc in docs]) input_msg = History(role="user", content=PROMPT_TEMPLATE).to_msg_template(False) # 用户最后一个问题会进入PROMPT_TEMPLATE,不用再作为history 了 if len(history) >= 1: history.pop() chat_prompt = ChatPromptTemplate.from_messages( [i.to_msg_template() for i in history] + [input_msg]) chain = LLMChain(prompt=chat_prompt, llm=model) # Begin a task that runs in the background. task = asyncio.create_task(wrap_done( chain.acall({"context": context, "question": query}), callback.done), ) source_documents = [] for inum, doc in enumerate(docs): filename = os.path.split(doc.metadata["source"])[-1] if local_doc_url: url = "file://" + doc.metadata["source"] else: parameters = urlencode({"knowledge_base_name": knowledge_base_name, "file_name":filename}) url = f"{request.base_url}knowledge_base/download_doc?" + parameters text = f"""出处 [{inum + 1}] [{filename}]({url}) \n\n{doc.page_content}\n\n""" source_documents.append(text) if stream: async for token in callback.aiter(): # Use server-sent-events to stream the response yield json.dumps({"answer": token}, ensure_ascii=False) yield json.dumps({"docs": source_documents}, ensure_ascii=False) else: answer = "" async for token in callback.aiter(): answer += token yield json.dumps({"answer": answer, "docs": source_documents}, ensure_ascii=False) await task return StreamingResponse(knowledge_base_chat_iterator(query, kb, top_k, history, model_name), media_type="text/event-stream")