Langchain-Chatchat/webui_pages/dialogue/dialogue.py

383 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import streamlit as st
from webui_pages.utils import *
from streamlit_chatbox import *
from datetime import datetime
import os
from configs import (TEMPERATURE, HISTORY_LEN, PROMPT_TEMPLATES,
DEFAULT_KNOWLEDGE_BASE, DEFAULT_SEARCH_ENGINE, SUPPORT_AGENT_MODEL)
from server.knowledge_base.utils import LOADER_DICT
from typing import List, Dict
chat_box = ChatBox(
assistant_avatar=os.path.join(
"img",
"chatchat_icon_blue_square_v2.png"
)
)
def get_messages_history(history_len: int, content_in_expander: bool = False) -> List[Dict]:
'''
返回消息历史。
content_in_expander控制是否返回expander元素中的内容一般导出的时候可以选上传入LLM的history不需要
'''
def filter(msg):
content = [x for x in msg["elements"] if x._output_method in ["markdown", "text"]]
if not content_in_expander:
content = [x for x in content if not x._in_expander]
content = [x.content for x in content]
return {
"role": msg["role"],
"content": "\n\n".join(content),
}
return chat_box.filter_history(history_len=history_len, filter=filter)
@st.cache_data
def upload_temp_docs(files, _api: ApiRequest) -> str:
'''
将文件上传到临时目录,用于文件对话
返回临时向量库ID
'''
return _api.upload_temp_docs(files).get("data", {}).get("id")
def dialogue_page(api: ApiRequest, is_lite: bool = False):
st.session_state.setdefault("file_chat_id", None)
default_model = api.get_default_llm_model()[0]
if not chat_box.chat_inited:
st.toast(
f"欢迎使用 [`Langchain-Chatchat`](https://github.com/chatchat-space/Langchain-Chatchat) ! \n\n"
f"当前运行的模型`{default_model}`, 您可以开始提问了."
)
chat_box.init_session()
with st.sidebar:
# TODO: 对话模型与会话绑定
def on_mode_change():
mode = st.session_state.dialogue_mode
text = f"已切换到 {mode} 模式。"
if mode == "知识库问答":
cur_kb = st.session_state.get("selected_kb")
if cur_kb:
text = f"{text} 当前知识库: `{cur_kb}`。"
st.toast(text)
dialogue_modes = ["LLM 对话",
"知识库问答",
"文件对话",
"搜索引擎问答",
"自定义Agent问答",
]
dialogue_mode = st.selectbox("请选择对话模式:",
dialogue_modes,
index=3,
on_change=on_mode_change,
key="dialogue_mode",
)
def on_llm_change():
if llm_model:
config = api.get_model_config(llm_model)
if not config.get("online_api"): # 只有本地model_worker可以切换模型
st.session_state["prev_llm_model"] = llm_model
st.session_state["cur_llm_model"] = st.session_state.llm_model
def llm_model_format_func(x):
if x in running_models:
return f"{x} (Running)"
return x
running_models = list(api.list_running_models())
available_models = []
config_models = api.list_config_models()
for k, v in config_models.get("local", {}).items(): # 列出配置了有效本地路径的模型
if (v.get("model_path_exists")
and k not in running_models):
available_models.append(k)
for k, v in config_models.get("online", {}).items(): # 列出ONLINE_MODELS中直接访问的模型
if not v.get("provider") and k not in running_models:
available_models.append(k)
llm_models = running_models + available_models
cur_llm_model = st.session_state.get("cur_llm_model", default_model)
if cur_llm_model in llm_models:
index = llm_models.index(cur_llm_model)
else:
index = 0
llm_model = st.selectbox("选择LLM模型",
llm_models,
index,
format_func=llm_model_format_func,
on_change=on_llm_change,
key="llm_model",
)
if (st.session_state.get("prev_llm_model") != llm_model
and not is_lite
and not llm_model in config_models.get("online", {})
and not llm_model in config_models.get("langchain", {})
and llm_model not in running_models):
with st.spinner(f"正在加载模型: {llm_model},请勿进行操作或刷新页面"):
prev_model = st.session_state.get("prev_llm_model")
r = api.change_llm_model(prev_model, llm_model)
if msg := check_error_msg(r):
st.error(msg)
elif msg := check_success_msg(r):
st.success(msg)
st.session_state["prev_llm_model"] = llm_model
index_prompt = {
"LLM 对话": "llm_chat",
"自定义Agent问答": "agent_chat",
"搜索引擎问答": "search_engine_chat",
"知识库问答": "knowledge_base_chat",
"文件对话": "knowledge_base_chat",
}
prompt_templates_kb_list = list(PROMPT_TEMPLATES[index_prompt[dialogue_mode]].keys())
prompt_template_name = prompt_templates_kb_list[0]
if "prompt_template_select" not in st.session_state:
st.session_state.prompt_template_select = prompt_templates_kb_list[0]
def prompt_change():
text = f"已切换为 {prompt_template_name} 模板。"
st.toast(text)
prompt_template_select = st.selectbox(
"请选择Prompt模板",
prompt_templates_kb_list,
index=0,
on_change=prompt_change,
key="prompt_template_select",
)
prompt_template_name = st.session_state.prompt_template_select
temperature = st.slider("Temperature", 0.0, 1.0, TEMPERATURE, 0.05)
history_len = st.number_input("历史对话轮数:", 0, 20, HISTORY_LEN)
def on_kb_change():
st.toast(f"已加载知识库: {st.session_state.selected_kb}")
if dialogue_mode == "知识库问答":
with st.expander("知识库配置", True):
kb_list = api.list_knowledge_bases()
index = 0
if DEFAULT_KNOWLEDGE_BASE in kb_list:
index = kb_list.index(DEFAULT_KNOWLEDGE_BASE)
selected_kb = st.selectbox(
"请选择知识库:",
kb_list,
index=index,
on_change=on_kb_change,
key="selected_kb",
)
kb_top_k = st.number_input("匹配知识条数:", 1, 20, VECTOR_SEARCH_TOP_K)
## Bge 模型会超过1
score_threshold = st.slider("知识匹配分数阈值:", 0.0, 2.0, float(SCORE_THRESHOLD), 0.01)
elif dialogue_mode == "文件对话":
with st.expander("文件对话配置", True):
files = st.file_uploader("上传知识文件:",
[i for ls in LOADER_DICT.values() for i in ls],
accept_multiple_files=True,
)
kb_top_k = st.number_input("匹配知识条数:", 1, 20, VECTOR_SEARCH_TOP_K)
## Bge 模型会超过1
score_threshold = st.slider("知识匹配分数阈值:", 0.0, 2.0, float(SCORE_THRESHOLD), 0.01)
if st.button("开始上传", disabled=len(files)==0):
st.session_state["file_chat_id"] = upload_temp_docs(files, api)
elif dialogue_mode == "搜索引擎问答":
search_engine_list = api.list_search_engines()
if DEFAULT_SEARCH_ENGINE in search_engine_list:
index = search_engine_list.index(DEFAULT_SEARCH_ENGINE)
else:
index = search_engine_list.index("duckduckgo") if "duckduckgo" in search_engine_list else 0
with st.expander("搜索引擎配置", True):
search_engine = st.selectbox(
label="请选择搜索引擎",
options=search_engine_list,
index=index,
)
se_top_k = st.number_input("匹配搜索结果条数:", 1, 20, SEARCH_ENGINE_TOP_K)
# Display chat messages from history on app rerun
chat_box.output_messages()
chat_input_placeholder = "请输入对话内容换行请使用Shift+Enter "
def on_feedback(
feedback,
chat_history_id: str = "",
history_index: int = -1,
):
reason = feedback["text"]
score_int = chat_box.set_feedback(feedback=feedback, history_index=history_index)
api.chat_feedback(chat_history_id=chat_history_id,
score=score_int,
reason=reason)
st.session_state["need_rerun"] = True
feedback_kwargs = {
"feedback_type": "thumbs",
"optional_text_label": "欢迎反馈您打分的理由",
}
if prompt := st.chat_input(chat_input_placeholder, key="prompt"):
history = get_messages_history(history_len)
chat_box.user_say(prompt)
if dialogue_mode == "LLM 对话":
chat_box.ai_say("正在思考...")
text = ""
chat_history_id = ""
r = api.chat_chat(prompt,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature)
for t in r:
if error_msg := check_error_msg(t): # check whether error occured
st.error(error_msg)
break
text += t.get("text", "")
chat_box.update_msg(text)
chat_history_id = t.get("chat_history_id", "")
metadata = {
"chat_history_id": chat_history_id,
}
chat_box.update_msg(text, streaming=False, metadata=metadata) # 更新最终的字符串,去除光标
chat_box.show_feedback(**feedback_kwargs,
key=chat_history_id,
on_submit=on_feedback,
kwargs={"chat_history_id": chat_history_id, "history_index": len(chat_box.history) - 1})
elif dialogue_mode == "自定义Agent问答":
if not any(agent in llm_model for agent in SUPPORT_AGENT_MODEL):
chat_box.ai_say([
f"正在思考... \n\n <span style='color:red'>该模型并没有进行Agent对齐请更换支持Agent的模型获得更好的体验</span>\n\n\n",
Markdown("...", in_expander=True, title="思考过程", state="complete"),
])
else:
chat_box.ai_say([
f"正在思考...",
Markdown("...", in_expander=True, title="思考过程", state="complete"),
])
text = ""
ans = ""
for d in api.agent_chat(prompt,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature,
):
try:
d = json.loads(d)
except:
pass
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
if chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=1)
if chunk := d.get("final_answer"):
ans += chunk
chat_box.update_msg(ans, element_index=0)
if chunk := d.get("tools"):
text += "\n\n".join(d.get("tools", []))
chat_box.update_msg(text, element_index=1)
chat_box.update_msg(ans, element_index=0, streaming=False)
chat_box.update_msg(text, element_index=1, streaming=False)
elif dialogue_mode == "知识库问答":
chat_box.ai_say([
f"正在查询知识库 `{selected_kb}` ...",
Markdown("...", in_expander=True, title="知识库匹配结果", state="complete"),
])
text = ""
for d in api.knowledge_base_chat(prompt,
knowledge_base_name=selected_kb,
top_k=kb_top_k,
score_threshold=score_threshold,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature):
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
elif chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=0)
chat_box.update_msg(text, element_index=0, streaming=False)
chat_box.update_msg("\n\n".join(d.get("docs", [])), element_index=1, streaming=False)
elif dialogue_mode == "文件对话":
if st.session_state["file_chat_id"] is None:
st.error("请先上传文件再进行对话")
st.stop()
chat_box.ai_say([
f"正在查询文件 `{st.session_state['file_chat_id']}` ...",
Markdown("...", in_expander=True, title="文件匹配结果", state="complete"),
])
text = ""
for d in api.file_chat(prompt,
knowledge_id=st.session_state["file_chat_id"],
top_k=kb_top_k,
score_threshold=score_threshold,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature):
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
elif chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=0)
chat_box.update_msg(text, element_index=0, streaming=False)
chat_box.update_msg("\n\n".join(d.get("docs", [])), element_index=1, streaming=False)
elif dialogue_mode == "搜索引擎问答":
chat_box.ai_say([
f"正在执行 `{search_engine}` 搜索...",
Markdown("...", in_expander=True, title="网络搜索结果", state="complete"),
])
text = ""
for d in api.search_engine_chat(prompt,
search_engine_name=search_engine,
top_k=se_top_k,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature,
split_result=se_top_k > 1):
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
elif chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=0)
chat_box.update_msg(text, element_index=0, streaming=False)
chat_box.update_msg("\n\n".join(d.get("docs", [])), element_index=1, streaming=False)
if st.session_state.get("need_rerun"):
st.session_state["need_rerun"] = False
st.rerun()
now = datetime.now()
with st.sidebar:
cols = st.columns(2)
export_btn = cols[0]
if cols[1].button(
"清空对话",
use_container_width=True,
):
chat_box.reset_history()
st.rerun()
export_btn.download_button(
"导出记录",
"".join(chat_box.export2md()),
file_name=f"{now:%Y-%m-%d %H.%M}_对话记录.md",
mime="text/markdown",
use_container_width=True,
)