Langchain-Chatchat/webui_pages/dialogue/dialogue.py

459 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import streamlit as st
from webui_pages.utils import *
from streamlit_chatbox import *
from streamlit_modal import Modal
from datetime import datetime
import os
import re
import time
from configs import (TEMPERATURE, HISTORY_LEN, PROMPT_TEMPLATES, LLM_MODELS,
DEFAULT_KNOWLEDGE_BASE, DEFAULT_SEARCH_ENGINE, SUPPORT_AGENT_MODEL)
from server.knowledge_base.utils import LOADER_DICT
import uuid
from typing import List, Dict
chat_box = ChatBox(
assistant_avatar=os.path.join(
"img",
"siji.jpg"#"chatchat_icon_blue_square_v2.png"
)
)
def get_messages_history(history_len: int, content_in_expander: bool = False) -> List[Dict]:
'''
返回消息历史。
content_in_expander控制是否返回expander元素中的内容一般导出的时候可以选上传入LLM的history不需要
'''
def filter(msg):
content = [x for x in msg["elements"] if x._output_method in ["markdown", "text"]]
if not content_in_expander:
content = [x for x in content if not x._in_expander]
content = [x.content for x in content]
return {
"role": msg["role"],
"content": "\n\n".join(content),
}
return chat_box.filter_history(history_len=history_len, filter=filter)
@st.cache_data
def upload_temp_docs(files, _api: ApiRequest) -> str:
'''
将文件上传到临时目录,用于文件对话
返回临时向量库ID
'''
return _api.upload_temp_docs(files).get("data", {}).get("id")
def parse_command(text: str, modal: Modal) -> bool:
'''
检查用户是否输入了自定义命令,当前支持:
/new {session_name}。如果未提供名称默认为“会话X”
/del {session_name}。如果未提供名称,在会话数量>1的情况下删除当前会话。
/clear {session_name}。如果未提供名称,默认清除当前会话
/help。查看命令帮助
返回值输入的是命令返回True否则返回False
'''
if m := re.match(r"/([^\s]+)\s*(.*)", text):
cmd, name = m.groups()
name = name.strip()
conv_names = chat_box.get_chat_names()
if cmd == "help":
modal.open()
elif cmd == "new":
if not name:
i = 1
while True:
name = f"会话{i}"
if name not in conv_names:
break
i += 1
if name in st.session_state["conversation_ids"]:
st.error(f"该会话名称 “{name}” 已存在")
time.sleep(1)
else:
st.session_state["conversation_ids"][name] = uuid.uuid4().hex
st.session_state["cur_conv_name"] = name
elif cmd == "del":
name = name or st.session_state.get("cur_conv_name")
if len(conv_names) == 1:
st.error("这是最后一个会话,无法删除")
time.sleep(1)
elif not name or name not in st.session_state["conversation_ids"]:
st.error(f"无效的会话名称:“{name}")
time.sleep(1)
else:
st.session_state["conversation_ids"].pop(name, None)
chat_box.del_chat_name(name)
st.session_state["cur_conv_name"] = ""
elif cmd == "clear":
chat_box.reset_history(name=name or None)
return True
return False
def dialogue_page(api: ApiRequest, is_lite: bool = False):
st.session_state.setdefault("conversation_ids", {})
st.session_state["conversation_ids"].setdefault(chat_box.cur_chat_name, uuid.uuid4().hex)
st.session_state.setdefault("file_chat_id", None)
default_model = api.get_default_llm_model()[0]
if not chat_box.chat_inited:
st.toast(
f"欢迎使用 [`Langchain-Chatchat`](https://github.com/chatchat-space/Langchain-Chatchat) ! \n\n"
f"当前运行的模型`{default_model}`, 您可以开始提问了."
)
chat_box.init_session()
# 弹出自定义命令帮助信息
modal = Modal("自定义命令", key="cmd_help", max_width="500")
if modal.is_open():
with modal.container():
cmds = [x for x in parse_command.__doc__.split("\n") if x.strip().startswith("/")]
st.write("\n\n".join(cmds))
with st.sidebar:
# 多会话
conv_names = list(st.session_state["conversation_ids"].keys())
index = 0
if st.session_state.get("cur_conv_name") in conv_names:
index = conv_names.index(st.session_state.get("cur_conv_name"))
conversation_name = st.selectbox("当前会话:", conv_names, index=index)
chat_box.use_chat_name(conversation_name)
conversation_id = st.session_state["conversation_ids"][conversation_name]
def on_mode_change():
mode = st.session_state.dialogue_mode
text = f"已切换到 {mode} 模式。"
if mode == "知识库问答":
cur_kb = st.session_state.get("selected_kb")
if cur_kb:
text = f"{text} 当前知识库: `{cur_kb}`。"
st.toast(text)
dialogue_modes = ["LLM 对话",
"知识库问答",
"文件对话",
"搜索引擎问答",
"自定义Agent问答",
]
dialogue_mode = st.selectbox("请选择对话模式:",
dialogue_modes,
index=1,
on_change=on_mode_change,
key="dialogue_mode",
)
def on_llm_change():
if llm_model:
config = api.get_model_config(llm_model)
if not config.get("online_api"): # 只有本地model_worker可以切换模型
st.session_state["prev_llm_model"] = llm_model
st.session_state["cur_llm_model"] = st.session_state.llm_model
def llm_model_format_func(x):
if x in running_models:
return f"{x} (Running)"
return x
running_models = list(api.list_running_models())
available_models = []
config_models = api.list_config_models()
if not is_lite:
for k, v in config_models.get("local", {}).items():
if (v.get("model_path_exists")
and k not in running_models):
available_models.append(k)
for k, v in config_models.get("online", {}).items():
if not v.get("provider") and k not in running_models and k in LLM_MODELS:
available_models.append(k)
llm_models = running_models + available_models
cur_llm_model = st.session_state.get("cur_llm_model", default_model)
if cur_llm_model in llm_models:
index = llm_models.index(cur_llm_model)
else:
index = 0
llm_model = st.selectbox("选择LLM模型",
llm_models,
index,
format_func=llm_model_format_func,
on_change=on_llm_change,
key="llm_model",
)
if (st.session_state.get("prev_llm_model") != llm_model
and not is_lite
and not llm_model in config_models.get("online", {})
and not llm_model in config_models.get("langchain", {})
and llm_model not in running_models):
with st.spinner(f"正在加载模型: {llm_model},请勿进行操作或刷新页面"):
prev_model = st.session_state.get("prev_llm_model")
r = api.change_llm_model(prev_model, llm_model)
if msg := check_error_msg(r):
st.error(msg)
elif msg := check_success_msg(r):
st.success(msg)
st.session_state["prev_llm_model"] = llm_model
index_prompt = {
"LLM 对话": "llm_chat",
"自定义Agent问答": "agent_chat",
"搜索引擎问答": "search_engine_chat",
"知识库问答": "knowledge_base_chat",
"文件对话": "knowledge_base_chat",
}
prompt_templates_kb_list = list(PROMPT_TEMPLATES[index_prompt[dialogue_mode]].keys())
prompt_template_name = prompt_templates_kb_list[0]
if "prompt_template_select" not in st.session_state:
st.session_state.prompt_template_select = prompt_templates_kb_list[0]
def prompt_change():
text = f"已切换为 {prompt_template_name} 模板。"
st.toast(text)
prompt_template_select = st.selectbox(
"请选择Prompt模板",
prompt_templates_kb_list,
index=0,
on_change=prompt_change,
key="prompt_template_select",
)
prompt_template_name = st.session_state.prompt_template_select
temperature = st.slider("Temperature", 0.0, 2.0, TEMPERATURE, 0.05)
history_len = st.number_input("历史对话轮数:", 0, 20, HISTORY_LEN)
def on_kb_change():
st.toast(f"已加载知识库: {st.session_state.selected_kb}")
if dialogue_mode == "知识库问答":
with st.expander("知识库配置", True):
kb_list = api.list_knowledge_bases()
index = 0
if DEFAULT_KNOWLEDGE_BASE in kb_list:
index = kb_list.index(DEFAULT_KNOWLEDGE_BASE)
selected_kb = st.selectbox(
"请选择知识库:",
kb_list,
index=index,
on_change=on_kb_change,
key="selected_kb",
)
kb_top_k = st.number_input("匹配知识条数:", 1, 20, VECTOR_SEARCH_TOP_K)
## Bge 模型会超过1
score_threshold = st.slider("知识匹配分数阈值:", 0.0, 2.0, float(SCORE_THRESHOLD), 0.01)
elif dialogue_mode == "文件对话":
with st.expander("文件对话配置", True):
files = st.file_uploader("上传知识文件:",
[i for ls in LOADER_DICT.values() for i in ls],
accept_multiple_files=True,
)
kb_top_k = st.number_input("匹配知识条数:", 1, 20, VECTOR_SEARCH_TOP_K)
## Bge 模型会超过1
score_threshold = st.slider("知识匹配分数阈值:", 0.0, 2.0, float(SCORE_THRESHOLD), 0.01)
if st.button("开始上传", disabled=len(files) == 0):
st.session_state["file_chat_id"] = upload_temp_docs(files, api)
elif dialogue_mode == "搜索引擎问答":
search_engine_list = api.list_search_engines()
if DEFAULT_SEARCH_ENGINE in search_engine_list:
index = search_engine_list.index(DEFAULT_SEARCH_ENGINE)
else:
index = search_engine_list.index("duckduckgo") if "duckduckgo" in search_engine_list else 0
with st.expander("搜索引擎配置", True):
search_engine = st.selectbox(
label="请选择搜索引擎",
options=search_engine_list,
index=index,
)
se_top_k = st.number_input("匹配搜索结果条数:", 1, 20, SEARCH_ENGINE_TOP_K)
# Display chat messages from history on app rerun
chat_box.output_messages()
chat_input_placeholder = "请输入对话内容换行请使用Shift+Enter。输入/help查看自定义命令 "
def on_feedback(
feedback,
message_id: str = "",
history_index: int = -1,
):
reason = feedback["text"]
score_int = chat_box.set_feedback(feedback=feedback, history_index=history_index)
api.chat_feedback(message_id=message_id,
score=score_int,
reason=reason)
st.session_state["need_rerun"] = True
feedback_kwargs = {
"feedback_type": "thumbs",
"optional_text_label": "欢迎反馈您打分的理由",
}
if prompt := st.chat_input(chat_input_placeholder, key="prompt"):
if parse_command(text=prompt, modal=modal): # 用户输入自定义命令
st.rerun()
else:
history = get_messages_history(history_len)
chat_box.user_say(prompt)
if dialogue_mode == "LLM 对话":
chat_box.ai_say("正在思考...")
text = ""
message_id = ""
r = api.chat_chat(prompt,
history=history,
conversation_id=conversation_id,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature)
for t in r:
if error_msg := check_error_msg(t): # check whether error occured
st.error(error_msg)
break
text += t.get("text", "")
chat_box.update_msg(text)
message_id = t.get("message_id", "")
metadata = {
"message_id": message_id,
}
chat_box.update_msg(text, streaming=False, metadata=metadata) # 更新最终的字符串,去除光标
chat_box.show_feedback(**feedback_kwargs,
key=message_id,
on_submit=on_feedback,
kwargs={"message_id": message_id, "history_index": len(chat_box.history) - 1})
elif dialogue_mode == "自定义Agent问答":
if not any(agent in llm_model for agent in SUPPORT_AGENT_MODEL):
chat_box.ai_say([
f"正在思考... \n\n <span style='color:red'>该模型并没有进行Agent对齐请更换支持Agent的模型获得更好的体验</span>\n\n\n",
Markdown("...", in_expander=True, title="思考过程", state="complete"),
])
else:
chat_box.ai_say([
f"正在思考...",
Markdown("...", in_expander=True, title="思考过程", state="complete"),
])
text = ""
ans = ""
for d in api.agent_chat(prompt,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature,
):
try:
print(f"1111自定义Agent问答, d:{d}")
d = json.loads(d)
print(f"22222自定义Agent问答, d:{d}")
except:
pass
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
if chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=1)
if chunk := d.get("final_answer"):
ans += chunk
chat_box.update_msg(ans, element_index=0)
if chunk := d.get("tools"):
text += "\n\n".join(d.get("tools", []))
chat_box.update_msg(text, element_index=1)
chat_box.update_msg(ans, element_index=0, streaming=False)
chat_box.update_msg(text, element_index=1, streaming=False)
elif dialogue_mode == "知识库问答":
chat_box.ai_say([
f"正在查询知识库 `{selected_kb}` ...",
Markdown("...", in_expander=True, title="知识库匹配结果", state="complete"),
])
text = ""
for d in api.knowledge_base_chat(prompt,
knowledge_base_name=selected_kb,
top_k=kb_top_k,
score_threshold=score_threshold,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature):
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
elif chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=0)
chat_box.update_msg(text, element_index=0, streaming=False)
chat_box.update_msg("\n\n".join(d.get("docs", [])), element_index=1, streaming=False)
elif dialogue_mode == "文件对话":
if st.session_state["file_chat_id"] is None:
st.error("请先上传文件再进行对话")
st.stop()
chat_box.ai_say([
f"正在查询文件 `{st.session_state['file_chat_id']}` ...",
Markdown("...", in_expander=True, title="文件匹配结果", state="complete"),
])
text = ""
for d in api.file_chat(prompt,
knowledge_id=st.session_state["file_chat_id"],
top_k=kb_top_k,
score_threshold=score_threshold,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature):
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
elif chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=0)
chat_box.update_msg(text, element_index=0, streaming=False)
chat_box.update_msg("\n\n".join(d.get("docs", [])), element_index=1, streaming=False)
elif dialogue_mode == "搜索引擎问答":
chat_box.ai_say([
f"正在执行 `{search_engine}` 搜索...",
Markdown("...", in_expander=True, title="网络搜索结果", state="complete"),
])
text = ""
for d in api.search_engine_chat(prompt,
search_engine_name=search_engine,
top_k=se_top_k,
history=history,
model=llm_model,
prompt_name=prompt_template_name,
temperature=temperature,
split_result=se_top_k > 1):
if error_msg := check_error_msg(d): # check whether error occured
st.error(error_msg)
elif chunk := d.get("answer"):
text += chunk
chat_box.update_msg(text, element_index=0)
chat_box.update_msg(text, element_index=0, streaming=False)
chat_box.update_msg("\n\n".join(d.get("docs", [])), element_index=1, streaming=False)
if st.session_state.get("need_rerun"):
st.session_state["need_rerun"] = False
st.rerun()
now = datetime.now()
with st.sidebar:
cols = st.columns(2)
export_btn = cols[0]
if cols[1].button(
"清空对话",
use_container_width=True,
):
chat_box.reset_history()
st.rerun()
export_btn.download_button(
"导出记录",
"".join(chat_box.export2md()),
file_name=f"{now:%Y-%m-%d %H.%M}_对话记录.md",
mime="text/markdown",
use_container_width=True,
)