Langchain-Chatchat/webui_pages/dialogue/dialogue.py

319 lines
14 KiB
Python
Raw Normal View History

2023-08-01 14:47:38 +08:00
import streamlit as st
from webui_pages.utils import *
from streamlit_chatbox import *
from datetime import datetime
2023-08-13 22:25:01 +08:00
import os
from configs import (LLM_MODEL, TEMPERATURE, HISTORY_LEN, PROMPT_TEMPLATES,
DEFAULT_KNOWLEDGE_BASE, DEFAULT_SEARCH_ENGINE)
from typing import List, Dict
2023-08-13 22:25:01 +08:00
chat_box = ChatBox(
assistant_avatar=os.path.join(
"img",
"chatchat_icon_blue_square_v2.png"
)
)
2023-08-10 23:51:10 +08:00
def get_messages_history(history_len: int, content_in_expander: bool = False) -> List[Dict]:
'''
返回消息历史
content_in_expander控制是否返回expander元素中的内容一般导出的时候可以选上传入LLM的history不需要
'''
def filter(msg):
content = [x for x in msg["elements"] if x._output_method in ["markdown", "text"]]
if not content_in_expander:
content = [x for x in content if not x._in_expander]
content = [x.content for x in content]
return {
"role": msg["role"],
"content": "\n\n".join(content),
}
2023-08-10 23:51:10 +08:00
return chat_box.filter_history(history_len=history_len, filter=filter)
def get_default_llm_model(api: ApiRequest) -> (str, bool):
'''
从服务器上获取当前运行的LLM模型如果本机配置的LLM_MODEL属于本地模型且在其中则优先返回
返回类型为model_name, is_local_model
'''
running_models = api.list_running_models()
if not running_models:
return "", False
if LLM_MODEL in running_models:
return LLM_MODEL, True
local_models = [k for k, v in running_models.items() if not v.get("online_api")]
if local_models:
return local_models[0], True
return list(running_models)[0], False
2023-08-03 15:06:10 +08:00
def dialogue_page(api: ApiRequest):
if not chat_box.chat_inited:
default_model = get_default_llm_model(api)[0]
st.toast(
f"欢迎使用 [`Langchain-Chatchat`](https://github.com/chatchat-space/Langchain-Chatchat) ! \n\n"
f"当前运行的模型`{default_model}`, 您可以开始提问了."
)
chat_box.init_session()
with st.sidebar:
2023-08-12 14:08:21 +08:00
# TODO: 对话模型与会话绑定
def on_mode_change():
mode = st.session_state.dialogue_mode
text = f"已切换到 {mode} 模式。"
if mode == "知识库问答":
cur_kb = st.session_state.get("selected_kb")
if cur_kb:
text = f"{text} 当前知识库: `{cur_kb}`。"
st.toast(text)
dialogue_mode = st.selectbox("请选择对话模式:",
2023-08-13 22:25:01 +08:00
["LLM 对话",
"知识库问答",
"搜索引擎问答",
"自定义Agent问答",
2023-08-13 22:25:01 +08:00
],
index=3,
2023-08-13 22:25:01 +08:00
on_change=on_mode_change,
key="dialogue_mode",
)
def on_llm_change():
if llm_model:
config = api.get_model_config(llm_model)
if not config.get("online_api"): # 只有本地model_worker可以切换模型
st.session_state["prev_llm_model"] = llm_model
st.session_state["cur_llm_model"] = st.session_state.llm_model
def llm_model_format_func(x):
if x in running_models:
return f"{x} (Running)"
return x
running_models = list(api.list_running_models())
available_models = []
config_models = api.list_config_models()
worker_models = list(config_models.get("worker", {})) # 仅列出在FSCHAT_MODEL_WORKERS中配置的模型
for m in worker_models:
if m not in running_models and m != "default":
available_models.append(m)
for k, v in config_models.get("online", {}).items(): # 列出ONLINE_MODELS中直接访问的模型如GPT
if not v.get("provider") and k not in running_models:
print(k, v)
available_models.append(k)
llm_models = running_models + available_models
index = llm_models.index(st.session_state.get("cur_llm_model", get_default_llm_model(api)[0]))
llm_model = st.selectbox("选择LLM模型",
llm_models,
index,
format_func=llm_model_format_func,
on_change=on_llm_change,
key="llm_model",
)
if (llm_model
and st.session_state.get("prev_llm_model") != llm_model
and not api.get_model_config(llm_model).get("online_api")
and llm_model not in running_models):
with st.spinner(f"正在加载模型: {llm_model},请勿进行操作或刷新页面"):
prev_model = st.session_state.get("prev_llm_model")
r = api.change_llm_model(prev_model, llm_model)
if msg := check_error_msg(r):
st.error(msg)
elif msg := check_success_msg(r):
st.success(msg)
st.session_state["prev_llm_model"] = llm_model
index_prompt = {
"LLM 对话": "llm_chat",
"自定义Agent问答": "agent_chat",
"搜索引擎问答": "search_engine_chat",
"知识库问答": "knowledge_base_chat",
}
prompt_templates_kb_list = list(PROMPT_TEMPLATES[index_prompt[dialogue_mode]].keys())
prompt_template_name = prompt_templates_kb_list[0]
if "prompt_template_select" not in st.session_state:
st.session_state.prompt_template_select = prompt_templates_kb_list[0]
def prompt_change():
text = f"已切换为 {prompt_template_name} 模板。"
st.toast(text)
prompt_template_select = st.selectbox(
"请选择Prompt模板",
prompt_templates_kb_list,
index=0,
on_change=prompt_change,
key="prompt_template_select",
)
prompt_template_name = st.session_state.prompt_template_select
temperature = st.slider("Temperature", 0.0, 1.0, TEMPERATURE, 0.05)
history_len = st.number_input("历史对话轮数:", 0, 20, HISTORY_LEN)
def on_kb_change():
st.toast(f"已加载知识库: {st.session_state.selected_kb}")
2023-08-01 14:47:38 +08:00
if dialogue_mode == "知识库问答":
2023-08-03 15:06:10 +08:00
with st.expander("知识库配置", True):
kb_list = api.list_knowledge_bases()
index = 0
if DEFAULT_KNOWLEDGE_BASE in kb_list:
index = kb_list.index(DEFAULT_KNOWLEDGE_BASE)
2023-08-03 15:06:10 +08:00
selected_kb = st.selectbox(
"请选择知识库:",
kb_list,
index=index,
2023-08-03 15:06:10 +08:00
on_change=on_kb_change,
key="selected_kb",
)
kb_top_k = st.number_input("匹配知识条数:", 1, 20, VECTOR_SEARCH_TOP_K)
score_threshold = st.slider("知识匹配分数阈值:", 0.0, 1.0, float(SCORE_THRESHOLD), 0.01)
elif dialogue_mode == "搜索引擎问答":
search_engine_list = api.list_search_engines()
if DEFAULT_SEARCH_ENGINE in search_engine_list:
index = search_engine_list.index(DEFAULT_SEARCH_ENGINE)
else:
index = search_engine_list.index("duckduckgo") if "duckduckgo" in search_engine_list else 0
2023-08-12 02:30:50 +08:00
with st.expander("搜索引擎配置", True):
search_engine = st.selectbox(
label="请选择搜索引擎",
options=search_engine_list,
index=index,
)
se_top_k = st.number_input("匹配搜索结果条数:", 1, 20, SEARCH_ENGINE_TOP_K)
2023-08-01 14:47:38 +08:00
# Display chat messages from history on app rerun
2023-08-12 02:30:50 +08:00
chat_box.output_messages()
2023-09-05 09:53:58 +08:00
chat_input_placeholder = "请输入对话内容换行请使用Shift+Enter "
2023-08-13 17:53:59 +08:00
if prompt := st.chat_input(chat_input_placeholder, key="prompt"):
history = get_messages_history(history_len)
chat_box.user_say(prompt)
if dialogue_mode == "LLM 对话":
chat_box.ai_say("正在思考...")
text = ""
r = api.chat_chat(prompt,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature)
2023-08-13 17:53:59 +08:00
for t in r:
if error_msg := check_error_msg(t): # check whether error occured
st.error(error_msg)
break
2023-08-13 17:53:59 +08:00
text += t
chat_box.update_msg(text)
chat_box.update_msg(text, streaming=False) # 更新最终的字符串,去除光标
elif dialogue_mode == "自定义Agent问答":
chat_box.ai_say([
f"正在思考...",
Markdown("...", in_expander=True, title="思考过程", state="complete"),
])
text = ""
ans = ""
support_agent = ["gpt", "Qwen", "qwen-api", "baichuan-api"] # 目前支持agent的模型
if not any(agent in llm_model for agent in support_agent):
ans += "正在思考... \n\n <span style='color:red'>该模型并没有进行Agent对齐无法正常使用Agent功能</span>\n\n\n<span style='color:red'>请更换 GPT4或Qwen-14B等支持Agent的模型获得更好的体验 </span> \n\n\n"
chat_box.update_msg(ans, element_index=0, streaming=False)
for d in api.agent_chat(prompt,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature,
):
try:
d = json.loads(d)
except:
pass
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
if chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=1)
if chunk := d.get("final_answer"):
ans += chunk
chat_box.update_msg(ans, element_index=0)
if chunk := d.get("tools"):
text += "\n\n".join(d.get("tools", []))
chat_box.update_msg(text, element_index=1)
chat_box.update_msg(ans, element_index=0, streaming=False)
chat_box.update_msg(text, element_index=1, streaming=False)
2023-08-13 17:53:59 +08:00
elif dialogue_mode == "知识库问答":
chat_box.ai_say([
f"正在查询知识库 `{selected_kb}` ...",
Markdown("...", in_expander=True, title="知识库匹配结果", state="complete"),
2023-08-13 17:53:59 +08:00
])
text = ""
for d in api.knowledge_base_chat(prompt,
knowledge_base_name=selected_kb,
top_k=kb_top_k,
score_threshold=score_threshold,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature):
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
elif chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=0)
chat_box.update_msg(text, element_index=0, streaming=False)
chat_box.update_msg("\n\n".join(d.get("docs", [])), element_index=1, streaming=False)
2023-08-13 17:53:59 +08:00
elif dialogue_mode == "搜索引擎问答":
chat_box.ai_say([
f"正在执行 `{search_engine}` 搜索...",
Markdown("...", in_expander=True, title="网络搜索结果", state="complete"),
2023-08-13 17:53:59 +08:00
])
text = ""
for d in api.search_engine_chat(prompt,
search_engine_name=search_engine,
top_k=se_top_k,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature):
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
elif chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=0)
chat_box.update_msg(text, element_index=0, streaming=False)
chat_box.update_msg("\n\n".join(d.get("docs", [])), element_index=1, streaming=False)
now = datetime.now()
2023-08-12 02:30:50 +08:00
with st.sidebar:
2023-08-13 17:53:59 +08:00
cols = st.columns(2)
2023-08-12 02:30:50 +08:00
export_btn = cols[0]
if cols[1].button(
2023-08-12 14:08:21 +08:00
"清空对话",
2023-08-12 02:30:50 +08:00
use_container_width=True,
):
chat_box.reset_history()
st.experimental_rerun()
export_btn.download_button(
2023-08-12 14:08:21 +08:00
"导出记录",
2023-08-13 17:53:59 +08:00
"".join(chat_box.export2md()),
file_name=f"{now:%Y-%m-%d %H.%M}_对话记录.md",
mime="text/markdown",
2023-08-10 23:51:10 +08:00
use_container_width=True,
)