Langchain-Chatchat/webui_st.py

539 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import streamlit as st
# from st_btn_select import st_btn_select
import tempfile
###### 从webui借用的代码 #####
###### 做了少量修改 #####
import os
import shutil
from chains.local_doc_qa import LocalDocQA
from configs.model_config import *
import nltk
from models.base import (BaseAnswer,
AnswerResult,)
import models.shared as shared
from models.loader.args import parser
from models.loader import LoaderCheckPoint
nltk.data.path = [NLTK_DATA_PATH] + nltk.data.path
def get_vs_list():
lst_default = ["新建知识库"]
if not os.path.exists(KB_ROOT_PATH):
return lst_default
lst = os.listdir(KB_ROOT_PATH)
if not lst:
return lst_default
lst.sort()
return lst_default + lst
embedding_model_dict_list = list(embedding_model_dict.keys())
llm_model_dict_list = list(llm_model_dict.keys())
# flag_csv_logger = gr.CSVLogger()
def get_answer(query, vs_path, history, mode, score_threshold=VECTOR_SEARCH_SCORE_THRESHOLD,
vector_search_top_k=VECTOR_SEARCH_TOP_K, chunk_conent: bool = True,
chunk_size=CHUNK_SIZE, streaming: bool = STREAMING,):
if mode == "Bing搜索问答":
for resp, history in local_doc_qa.get_search_result_based_answer(
query=query, chat_history=history, streaming=streaming):
source = "\n\n"
source += "".join(
[f"""<details> <summary>出处 [{i + 1}] <a href="{doc.metadata["source"]}" target="_blank">{doc.metadata["source"]}</a> </summary>\n"""
f"""{doc.page_content}\n"""
f"""</details>"""
for i, doc in
enumerate(resp["source_documents"])])
history[-1][-1] += source
yield history, ""
elif mode == "知识库问答" and vs_path is not None and os.path.exists(vs_path):
for resp, history in local_doc_qa.get_knowledge_based_answer(
query=query, vs_path=vs_path, chat_history=history, streaming=streaming):
source = "\n\n"
source += "".join(
[f"""<details> <summary>出处 [{i + 1}] {os.path.split(doc.metadata["source"])[-1]}</summary>\n"""
f"""{doc.page_content}\n"""
f"""</details>"""
for i, doc in
enumerate(resp["source_documents"])])
history[-1][-1] += source
yield history, ""
elif mode == "知识库测试":
if os.path.exists(vs_path):
resp, prompt = local_doc_qa.get_knowledge_based_conent_test(query=query, vs_path=vs_path,
score_threshold=score_threshold,
vector_search_top_k=vector_search_top_k,
chunk_conent=chunk_conent,
chunk_size=chunk_size)
if not resp["source_documents"]:
yield history + [[query,
"根据您的设定,没有匹配到任何内容,请确认您设置的知识相关度 Score 阈值是否过小或其他参数是否正确。"]], ""
else:
source = "\n".join(
[
f"""<details open> <summary>【知识相关度 Score】{doc.metadata["score"]} - 【出处{i + 1}】: {os.path.split(doc.metadata["source"])[-1]} </summary>\n"""
f"""{doc.page_content}\n"""
f"""</details>"""
for i, doc in
enumerate(resp["source_documents"])])
history.append([query, "以下内容为知识库中满足设置条件的匹配结果:\n\n" + source])
yield history, ""
else:
yield history + [[query,
"请选择知识库后进行测试,当前未选择知识库。"]], ""
else:
for answer_result in local_doc_qa.llm.generatorAnswer(prompt=query, history=history,
streaming=streaming):
resp = answer_result.llm_output["answer"]
history = answer_result.history
history[-1][-1] = resp + (
"\n\n当前知识库为空,如需基于知识库进行问答,请先加载知识库后,再进行提问。" if mode == "知识库问答" else "")
yield history, ""
logger.info(f"flagging: username={FLAG_USER_NAME},query={query},vs_path={vs_path},mode={mode},history={history}")
# flag_csv_logger.flag([query, vs_path, history, mode], username=FLAG_USER_NAME)
def init_model(llm_model: str = 'chat-glm-6b', embedding_model: str = 'text2vec'):
local_doc_qa = LocalDocQA()
# 初始化消息
args = parser.parse_args()
args_dict = vars(args)
args_dict.update(model=llm_model)
shared.loaderCheckPoint = LoaderCheckPoint(args_dict)
llm_model_ins = shared.loaderLLM()
llm_model_ins.set_history_len(LLM_HISTORY_LEN)
try:
local_doc_qa.init_cfg(llm_model=llm_model_ins,
embedding_model=embedding_model)
generator = local_doc_qa.llm.generatorAnswer("你好")
for answer_result in generator:
print(answer_result.llm_output)
reply = """模型已成功加载,可以开始对话,或从右侧选择模式后开始对话"""
logger.info(reply)
except Exception as e:
logger.error(e)
reply = """模型未成功加载,请到页面左上角"模型配置"选项卡中重新选择后点击"加载模型"按钮"""
if str(e) == "Unknown platform: darwin":
logger.info("该报错可能因为您使用的是 macOS 操作系统,需先下载模型至本地后执行 Web UI具体方法请参考项目 README 中本地部署方法及常见问题:"
" https://github.com/imClumsyPanda/langchain-ChatGLM")
else:
logger.info(reply)
return local_doc_qa
# 暂未使用到,先保留
# def reinit_model(llm_model, embedding_model, llm_history_len, no_remote_model, use_ptuning_v2, use_lora, top_k, history):
# try:
# llm_model_ins = shared.loaderLLM(llm_model, no_remote_model, use_ptuning_v2)
# llm_model_ins.history_len = llm_history_len
# local_doc_qa.init_cfg(llm_model=llm_model_ins,
# embedding_model=embedding_model,
# top_k=top_k)
# model_status = """模型已成功重新加载,可以开始对话,或从右侧选择模式后开始对话"""
# logger.info(model_status)
# except Exception as e:
# logger.error(e)
# model_status = """模型未成功重新加载,请到页面左上角"模型配置"选项卡中重新选择后点击"加载模型"按钮"""
# logger.info(model_status)
# return history + [[None, model_status]]
def get_vector_store(vs_id, files, sentence_size, history, one_conent, one_content_segmentation):
vs_path = os.path.join(KB_ROOT_PATH, vs_id, "vector_store")
filelist = []
if not os.path.exists(os.path.join(KB_ROOT_PATH, vs_id, "content")):
os.makedirs(os.path.join(KB_ROOT_PATH, vs_id, "content"))
if local_doc_qa.llm and local_doc_qa.embeddings:
if isinstance(files, list):
for file in files:
filename = os.path.split(file.name)[-1]
shutil.move(file.name, os.path.join(
KB_ROOT_PATH, vs_id, "content", filename))
filelist.append(os.path.join(
KB_ROOT_PATH, vs_id, "content", filename))
vs_path, loaded_files = local_doc_qa.init_knowledge_vector_store(
filelist, vs_path, sentence_size)
else:
vs_path, loaded_files = local_doc_qa.one_knowledge_add(vs_path, files, one_conent, one_content_segmentation,
sentence_size)
if len(loaded_files):
file_status = f"已添加 {''.join([os.path.split(i)[-1] for i in loaded_files if i])} 内容至知识库,并已加载知识库,请开始提问"
else:
file_status = "文件未成功加载,请重新上传文件"
else:
file_status = "模型未完成加载,请先在加载模型后再导入文件"
vs_path = None
logger.info(file_status)
return vs_path, None, history + [[None, file_status]]
knowledge_base_test_mode_info = ("【注意】\n\n"
"1. 您已进入知识库测试模式,您输入的任何对话内容都将用于进行知识库查询,"
"并仅输出知识库匹配出的内容及相似度分值和及输入的文本源路径,查询的内容并不会进入模型查询。\n\n"
"2. 知识相关度 Score 经测试,建议设置为 500 或更低,具体设置情况请结合实际使用调整。"
"""3. 使用"添加单条数据"添加文本至知识库时内容如未分段则内容越多越会稀释各查询内容与之关联的score阈值。\n\n"""
"4. 单条内容长度建议设置在100-150左右。\n\n"
"5. 本界面用于知识入库及知识匹配相关参数设定,但当前版本中,"
"本界面中修改的参数并不会直接修改对话界面中参数,仍需前往`configs/model_config.py`修改后生效。"
"相关参数将在后续版本中支持本界面直接修改。")
webui_title = """
# 🎉langchain-ChatGLM WebUI🎉
👍 [https://github.com/imClumsyPanda/langchain-ChatGLM](https://github.com/imClumsyPanda/langchain-ChatGLM)
"""
###### #####
###### todo #####
# 1. streamlit运行方式与一般web服务器不同使用模块是无法实现单例模式的所以shared和local_doc_qa都需要进行全局化处理。
# 目前已经实现了local_doc_qa的全局化后面要考虑shared。
# 2. 当前local_doc_qa是一个全局变量一方面任何一个session对其做出修改都会影响所有session的对话;另一方面如何处理所有session的请求竞争也是问题。
# 这个暂时无法避免,在配置普通的机器上暂时也无需考虑。
# 3. 目前只包含了get_answer对应的参数以后可以添加其他参数如temperature。
###### #####
###### 配置项 #####
class ST_CONFIG:
user_bg_color = '#77ff77'
user_icon = 'https://tse2-mm.cn.bing.net/th/id/OIP-C.LTTKrxNWDr_k74wz6jKqBgHaHa?w=203&h=203&c=7&r=0&o=5&pid=1.7'
robot_bg_color = '#ccccee'
robot_icon = 'https://ts1.cn.mm.bing.net/th/id/R-C.5302e2cc6f5c7c4933ebb3394e0c41bc?rik=z4u%2b7efba5Mgxw&riu=http%3a%2f%2fcomic-cons.xyz%2fwp-content%2fuploads%2fStar-Wars-avatar-icon-C3PO.png&ehk=kBBvCvpJMHPVpdfpw1GaH%2brbOaIoHjY5Ua9PKcIs%2bAc%3d&risl=&pid=ImgRaw&r=0'
default_mode = '知识库问答'
defalut_kb = ''
###### #####
class MsgType:
'''
目前仅支持文本类型的输入输出,为以后多模态模型预留图像、视频、音频支持。
'''
TEXT = 1
IMAGE = 2
VIDEO = 3
AUDIO = 4
class TempFile:
'''
为保持与get_vector_store的兼容性需要将streamlit上传文件转化为其可以接受的方式
'''
def __init__(self, path):
self.name = path
def init_session():
st.session_state.setdefault('history', [])
# def get_query_params():
# '''
# 可以用url参数传递配置参数llm_model, embedding_model, kb, mode。
# 该参数将覆盖model_config中的配置。处于安全考虑目前只支持kb和mode
# 方便将固定的配置分享给特定的人。
# '''
# params = st.experimental_get_query_params()
# return {k: v[0] for k, v in params.items() if v}
def robot_say(msg, kb=''):
st.session_state['history'].append(
{'is_user': False, 'type': MsgType.TEXT, 'content': msg, 'kb': kb})
def user_say(msg):
st.session_state['history'].append(
{'is_user': True, 'type': MsgType.TEXT, 'content': msg})
def format_md(msg, is_user=False, bg_color='', margin='10%'):
'''
将文本消息格式化为markdown文本
'''
if is_user:
bg_color = bg_color or ST_CONFIG.user_bg_color
text = f'''
<div style="background:{bg_color};
margin-left:{margin};
word-break:break-all;
float:right;
padding:2%;
border-radius:2%;">
{msg}
</div>
'''
else:
bg_color = bg_color or ST_CONFIG.robot_bg_color
text = f'''
<div style="background:{bg_color};
margin-right:{margin};
word-break:break-all;
padding:2%;
border-radius:2%;">
{msg}
</div>
'''
return text
def message(msg,
is_user=False,
msg_type=MsgType.TEXT,
icon='',
bg_color='',
margin='10%',
kb='',
):
'''
渲染单条消息。目前仅支持文本
'''
cols = st.columns([1, 10, 1])
empty = cols[1].empty()
if is_user:
icon = icon or ST_CONFIG.user_icon
bg_color = bg_color or ST_CONFIG.user_bg_color
cols[2].image(icon, width=40)
if msg_type == MsgType.TEXT:
text = format_md(msg, is_user, bg_color, margin)
empty.markdown(text, unsafe_allow_html=True)
else:
raise RuntimeError('only support text message now.')
else:
icon = icon or ST_CONFIG.robot_icon
bg_color = bg_color or ST_CONFIG.robot_bg_color
cols[0].image(icon, width=40)
if kb:
cols[0].write(f'({kb})')
if msg_type == MsgType.TEXT:
text = format_md(msg, is_user, bg_color, margin)
empty.markdown(text, unsafe_allow_html=True)
else:
raise RuntimeError('only support text message now.')
return empty
def output_messages(
user_bg_color='',
robot_bg_color='',
user_icon='',
robot_icon='',
):
with chat_box.container():
last_response = None
for msg in st.session_state['history']:
bg_color = user_bg_color if msg['is_user'] else robot_bg_color
icon = user_icon if msg['is_user'] else robot_icon
empty = message(msg['content'],
is_user=msg['is_user'],
icon=icon,
msg_type=msg['type'],
bg_color=bg_color,
kb=msg.get('kb', '')
)
if not msg['is_user']:
last_response = empty
return last_response
@st.cache_resource(show_spinner=False, max_entries=1)
def load_model(llm_model: str, embedding_model: str):
'''
对应init_model利用streamlit cache避免模型重复加载
'''
local_doc_qa = init_model(llm_model, embedding_model)
robot_say('模型已成功加载,可以开始对话,或从左侧选择模式后开始对话。\n请尽量不要刷新页面,以免模型出错或重复加载。')
return local_doc_qa
# @st.cache_data
def answer(query, vs_path='', history=[], mode='', score_threshold=0,
vector_search_top_k=5, chunk_conent=True, chunk_size=100, qa=None
):
'''
对应get_answer--利用streamlit cache缓存相同问题的答案--
'''
return get_answer(query, vs_path, history, mode, score_threshold,
vector_search_top_k, chunk_conent, chunk_size)
def load_vector_store(
vs_id,
files,
sentence_size=100,
history=[],
one_conent=None,
one_content_segmentation=None,
):
return get_vector_store(
local_doc_qa,
vs_id,
files,
sentence_size,
history,
one_conent,
one_content_segmentation,
)
# main ui
st.set_page_config(webui_title, layout='wide')
init_session()
# params = get_query_params()
# llm_model = params.get('llm_model', LLM_MODEL)
# embedding_model = params.get('embedding_model', EMBEDDING_MODEL)
with st.spinner(f'正在加载模型({LLM_MODEL} + {EMBEDDING_MODEL}),请耐心等候...'):
local_doc_qa = load_model(LLM_MODEL, EMBEDDING_MODEL)
def use_kb_mode(m):
return m in ['知识库问答', '知识库测试']
# sidebar
modes = ['LLM 对话', '知识库问答', 'Bing搜索问答', '知识库测试']
with st.sidebar:
def on_mode_change():
m = st.session_state.mode
robot_say(f'已切换到"{m}"模式')
if m == '知识库测试':
robot_say(knowledge_base_test_mode_info)
index = 0
try:
index = modes.index(ST_CONFIG.default_mode)
except:
pass
mode = st.selectbox('对话模式', modes, index,
on_change=on_mode_change, key='mode')
with st.expander('模型配置', '知识' not in mode):
with st.form('model_config'):
index = 0
try:
index = llm_model_dict_list.index(LLM_MODEL)
except:
pass
llm_model = st.selectbox('LLM模型', llm_model_dict_list, index)
no_remote_model = st.checkbox('加载本地模型', False)
use_ptuning_v2 = st.checkbox('使用p-tuning-v2微调过的模型', False)
use_lora = st.checkbox('使用lora微调的权重', False)
try:
index = embedding_model_dict_list.index(EMBEDDING_MODEL)
except:
pass
embedding_model = st.selectbox(
'Embedding模型', embedding_model_dict_list, index)
btn_load_model = st.form_submit_button('重新加载模型')
if btn_load_model:
local_doc_qa = load_model(llm_model, embedding_model)
if mode in ['知识库问答', '知识库测试']:
vs_list = get_vs_list()
vs_list.remove('新建知识库')
def on_new_kb():
name = st.session_state.kb_name
if name in vs_list:
st.error(f'名为“{name}”的知识库已存在。')
else:
vs_list.append(name)
st.session_state.vs_path = name
def on_vs_change():
robot_say(f'已加载知识库: {st.session_state.vs_path}')
with st.expander('知识库配置', True):
cols = st.columns([12, 10])
kb_name = cols[0].text_input(
'新知识库名称', placeholder='新知识库名称', label_visibility='collapsed')
cols[1].button('新建知识库', on_click=on_new_kb)
vs_path = st.selectbox(
'选择知识库', vs_list, on_change=on_vs_change, key='vs_path')
st.text('')
score_threshold = st.slider(
'知识相关度阈值', 0, 1000, VECTOR_SEARCH_SCORE_THRESHOLD)
top_k = st.slider('向量匹配数量', 1, 20, VECTOR_SEARCH_TOP_K)
history_len = st.slider(
'LLM对话轮数', 1, 50, LLM_HISTORY_LEN) # 也许要跟知识库分开设置
local_doc_qa.llm.set_history_len(history_len)
chunk_conent = st.checkbox('启用上下文关联', False)
st.text('')
# chunk_conent = st.checkbox('分割文本', True) # 知识库文本分割入库
chunk_size = st.slider('上下文关联长度', 1, 1000, CHUNK_SIZE)
sentence_size = st.slider('文本入库分句长度限制', 1, 1000, SENTENCE_SIZE)
files = st.file_uploader('上传知识文件',
['docx', 'txt', 'md', 'csv', 'xlsx', 'pdf'],
accept_multiple_files=True)
if st.button('添加文件到知识库'):
temp_dir = tempfile.mkdtemp()
file_list = []
for f in files:
file = os.path.join(temp_dir, f.name)
with open(file, 'wb') as fp:
fp.write(f.getvalue())
file_list.append(TempFile(file))
_, _, history = load_vector_store(
vs_path, file_list, sentence_size, [], None, None)
st.session_state.files = []
# main body
chat_box = st.empty()
with st.form('my_form', clear_on_submit=True):
cols = st.columns([8, 1])
question = cols[0].text_input(
'temp', key='input_question', label_visibility='collapsed')
def on_send():
q = st.session_state.input_question
if q:
user_say(q)
if mode == 'LLM 对话':
robot_say('正在思考...')
last_response = output_messages()
for history, _ in answer(q,
history=[],
mode=mode):
last_response.markdown(
format_md(history[-1][-1], False),
unsafe_allow_html=True
)
elif use_kb_mode(mode):
robot_say('正在思考...', vs_path)
last_response = output_messages()
for history, _ in answer(q,
vs_path=os.path.join(
KB_ROOT_PATH, vs_path, "vector_store"),
history=[],
mode=mode,
score_threshold=score_threshold,
vector_search_top_k=top_k,
chunk_conent=chunk_conent,
chunk_size=chunk_size):
last_response.markdown(
format_md(history[-1][-1], False, 'ligreen'),
unsafe_allow_html=True
)
else:
robot_say('正在思考...')
last_response = output_messages()
st.session_state['history'][-1]['content'] = history[-1][-1]
submit = cols[1].form_submit_button('发送', on_click=on_send)
output_messages()
# st.write(st.session_state['history'])