from langchain.embeddings.huggingface import HuggingFaceEmbeddings from langchain.vectorstores import FAISS from langchain.document_loaders import UnstructuredFileLoader from configs.model_config import * import datetime from textsplitter import ChineseTextSplitter from typing import List, Tuple from langchain.docstore.document import Document import numpy as np from utils import torch_gc from tqdm import tqdm from pypinyin import lazy_pinyin DEVICE_ = EMBEDDING_DEVICE DEVICE_ID = "0" if torch.cuda.is_available() else None DEVICE = f"{DEVICE_}:{DEVICE_ID}" if DEVICE_ID else DEVICE_ def load_file(filepath, sentence_size=SENTENCE_SIZE): if filepath.lower().endswith(".md"): loader = UnstructuredFileLoader(filepath, mode="elements") docs = loader.load() elif filepath.lower().endswith(".pdf"): loader = UnstructuredFileLoader(filepath, strategy="fast") textsplitter = ChineseTextSplitter(pdf=True, sentence_size=sentence_size) docs = loader.load_and_split(textsplitter) else: loader = UnstructuredFileLoader(filepath, mode="elements") textsplitter = ChineseTextSplitter(pdf=False, sentence_size=sentence_size) docs = loader.load_and_split(text_splitter=textsplitter) return docs def generate_prompt(related_docs: List[str], query: str, prompt_template=PROMPT_TEMPLATE) -> str: context = "\n".join([doc.page_content for doc in related_docs]) prompt = prompt_template.replace("{question}", query).replace("{context}", context) return prompt def seperate_list(ls: List[int]) -> List[List[int]]: lists = [] ls1 = [ls[0]] for i in range(1, len(ls)): if ls[i - 1] + 1 == ls[i]: ls1.append(ls[i]) else: lists.append(ls1) ls1 = [ls[i]] lists.append(ls1) return lists def similarity_search_with_score_by_vector( self, embedding: List[float], k: int = 4 ) -> List[Tuple[Document, float]]: scores, indices = self.index.search(np.array([embedding], dtype=np.float32), k) docs = [] id_set = set() store_len = len(self.index_to_docstore_id) for j, i in enumerate(indices[0]): if i == -1 or 0 < self.score_threshold < scores[0][j]: # This happens when not enough docs are returned. continue _id = self.index_to_docstore_id[i] doc = self.docstore.search(_id) if not self.chunk_conent: if not isinstance(doc, Document): raise ValueError(f"Could not find document for id {_id}, got {doc}") doc.metadata["score"] = int(scores[0][j]) docs.append(doc) continue id_set.add(i) docs_len = len(doc.page_content) for k in range(1, max(i, store_len - i)): break_flag = False for l in [i + k, i - k]: if 0 <= l < len(self.index_to_docstore_id): _id0 = self.index_to_docstore_id[l] doc0 = self.docstore.search(_id0) if docs_len + len(doc0.page_content) > self.chunk_size: break_flag = True break elif doc0.metadata["source"] == doc.metadata["source"]: docs_len += len(doc0.page_content) id_set.add(l) if break_flag: break if not self.chunk_conent: return docs if len(id_set) == 0 and self.score_threshold > 0: return [] id_list = sorted(list(id_set)) id_lists = seperate_list(id_list) for id_seq in id_lists: for id in id_seq: if id == id_seq[0]: _id = self.index_to_docstore_id[id] doc = self.docstore.search(_id) else: _id0 = self.index_to_docstore_id[id] doc0 = self.docstore.search(_id0) doc.page_content += doc0.page_content if not isinstance(doc, Document): raise ValueError(f"Could not find document for id {_id}, got {doc}") doc_score = min([scores[0][id] for id in [indices[0].tolist().index(i) for i in id_seq if i in indices[0]]]) doc.metadata["score"] = int(doc_score) docs.append(doc) torch_gc() return docs class LocalDocQA: llm: object = None embeddings: object = None top_k: int = VECTOR_SEARCH_TOP_K chunk_size: int = CHUNK_SIZE chunk_conent: bool = True score_threshold: int = VECTOR_SEARCH_SCORE_THRESHOLD def init_cfg(self, embedding_model: str = EMBEDDING_MODEL, embedding_device=EMBEDDING_DEVICE, llm_history_len: int = LLM_HISTORY_LEN, llm_model: str = LLM_MODEL, llm_device=LLM_DEVICE, top_k=VECTOR_SEARCH_TOP_K, use_ptuning_v2: bool = USE_PTUNING_V2, use_lora: bool = USE_LORA, ): if llm_model.startswith('moss'): from models.moss_llm import MOSS self.llm = MOSS() else: from models.chatglm_llm import ChatGLM self.llm = ChatGLM() self.llm.load_model(model_name_or_path=llm_model_dict[llm_model], llm_device=llm_device, use_ptuning_v2=use_ptuning_v2, use_lora=use_lora) self.llm.history_len = llm_history_len self.embeddings = HuggingFaceEmbeddings(model_name=embedding_model_dict[embedding_model], model_kwargs={'device': embedding_device}) self.top_k = top_k def init_knowledge_vector_store(self, filepath: str or List[str], vs_path: str or os.PathLike = None, sentence_size=SENTENCE_SIZE): loaded_files = [] failed_files = [] if isinstance(filepath, str): if not os.path.exists(filepath): print("路径不存在") return None elif os.path.isfile(filepath): file = os.path.split(filepath)[-1] try: docs = load_file(filepath, sentence_size) logger.info(f"{file} 已成功加载") loaded_files.append(filepath) except Exception as e: logger.error(e) logger.info(f"{file} 未能成功加载") return None elif os.path.isdir(filepath): docs = [] for file in tqdm(os.listdir(filepath), desc="加载文件"): fullfilepath = os.path.join(filepath, file) try: docs += load_file(fullfilepath, sentence_size) loaded_files.append(fullfilepath) except Exception as e: logger.error(e) failed_files.append(file) if len(failed_files) > 0: logger.info("以下文件未能成功加载:") for file in failed_files: logger.info(file, end="\n") else: docs = [] for file in filepath: try: docs += load_file(file) logger.info(f"{file} 已成功加载") loaded_files.append(file) except Exception as e: logger.error(e) logger.info(f"{file} 未能成功加载") if len(docs) > 0: logger.info("文件加载完毕,正在生成向量库") if vs_path and os.path.isdir(vs_path): vector_store = FAISS.load_local(vs_path, self.embeddings) vector_store.add_documents(docs) torch_gc() else: if not vs_path: vs_path = os.path.join(VS_ROOT_PATH, f"""{"".join(lazy_pinyin(os.path.splitext(file)[0]))}_FAISS_{datetime.datetime.now().strftime("%Y%m%d_%H%M%S")}""") vector_store = FAISS.from_documents(docs, self.embeddings) # docs 为Document列表 torch_gc() vector_store.save_local(vs_path) return vs_path, loaded_files else: logger.info("文件均未成功加载,请检查依赖包或替换为其他文件再次上传。") return None, loaded_files def one_knowledge_add(self, vs_path, one_title, one_conent, one_content_segmentation, sentence_size): try: if not vs_path or not one_title or not one_conent: logger.info("知识库添加错误,请确认知识库名字、标题、内容是否正确!") return None, [one_title] docs = [Document(page_content=one_conent+"\n", metadata={"source": one_title})] if not one_content_segmentation: text_splitter = ChineseTextSplitter(pdf=False, sentence_size=sentence_size) docs = text_splitter.split_documents(docs) if os.path.isdir(vs_path): vector_store = FAISS.load_local(vs_path, self.embeddings) vector_store.add_documents(docs) else: vector_store = FAISS.from_documents(docs, self.embeddings) ##docs 为Document列表 torch_gc() vector_store.save_local(vs_path) return vs_path, [one_title] except Exception as e: logger.error(e) return None, [one_title] def get_knowledge_based_answer(self, query, vs_path, chat_history=[], streaming: bool = STREAMING): vector_store = FAISS.load_local(vs_path, self.embeddings) FAISS.similarity_search_with_score_by_vector = similarity_search_with_score_by_vector vector_store.chunk_size = self.chunk_size vector_store.chunk_conent = self.chunk_conent vector_store.score_threshold = self.score_threshold related_docs_with_score = vector_store.similarity_search_with_score(query, k=self.top_k) torch_gc() prompt = generate_prompt(related_docs_with_score, query) for result, history in self.llm._call(prompt=prompt, history=chat_history, streaming=streaming): torch_gc() history[-1][0] = query response = {"query": query, "result": result, "source_documents": related_docs_with_score} yield response, history torch_gc() # query 查询内容 # vs_path 知识库路径 # chunk_conent 是否启用上下文关联 # score_threshold 搜索匹配score阈值 # vector_search_top_k 搜索知识库内容条数,默认搜索5条结果 # chunk_sizes 匹配单段内容的连接上下文长度 def get_knowledge_based_conent_test(self, query, vs_path, chunk_conent, score_threshold=VECTOR_SEARCH_SCORE_THRESHOLD, vector_search_top_k=VECTOR_SEARCH_TOP_K, chunk_size=CHUNK_SIZE): vector_store = FAISS.load_local(vs_path, self.embeddings) FAISS.similarity_search_with_score_by_vector = similarity_search_with_score_by_vector vector_store.chunk_conent = chunk_conent vector_store.score_threshold = score_threshold vector_store.chunk_size = chunk_size related_docs_with_score = vector_store.similarity_search_with_score(query, k=vector_search_top_k) if not related_docs_with_score: response = {"query": query, "source_documents": []} return response, "" torch_gc() prompt = "\n".join([doc.page_content for doc in related_docs_with_score]) response = {"query": query, "source_documents": related_docs_with_score} return response, prompt if __name__ == "__main__": local_doc_qa = LocalDocQA() local_doc_qa.init_cfg() query = "本项目使用的embedding模型是什么,消耗多少显存" vs_path = "/Users/liuqian/Downloads/glm-dev/vector_store/aaa" last_print_len = 0 for resp, history in local_doc_qa.get_knowledge_based_answer(query=query, vs_path=vs_path, chat_history=[], streaming=True): logger.info(resp["result"][last_print_len:], end="", flush=True) last_print_len = len(resp["result"]) source_text = [f"""出处 [{inum + 1}] {os.path.split(doc.metadata['source'])[-1]}:\n\n{doc.page_content}\n\n""" # f"""相关度:{doc.metadata['score']}\n\n""" for inum, doc in enumerate(resp["source_documents"])] logger.info("\n\n" + "\n\n".join(source_text)) pass