diff --git a/.gitignore b/.gitignore index 10953ca..37aebdf 100644 --- a/.gitignore +++ b/.gitignore @@ -170,3 +170,7 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ +.pytest_cache +.DS_Store + + diff --git a/configs/kb_config.py.example b/configs/kb_config.py.example index 530433b..42e08fa 100644 --- a/configs/kb_config.py.example +++ b/configs/kb_config.py.example @@ -3,7 +3,7 @@ import os # 默认使用的知识库 DEFAULT_KNOWLEDGE_BASE = "samples" -# 默认向量库类型。可选:faiss, milvus(离线) & zilliz(在线), pg. +# 默认向量库/全文检索引擎类型。可选:faiss, milvus(离线) & zilliz(在线), pgvector,全文检索引擎es DEFAULT_VS_TYPE = "faiss" # 缓存向量库数量(针对FAISS) @@ -89,7 +89,15 @@ kbs_config = { }, "pg": { "connection_uri": "postgresql://postgres:postgres@127.0.0.1:5432/langchain_chatchat", - } + }, + + "es": { + "host": "127.0.0.1", + "port": "9200", + "index_name": "test_index", + "user": "", + "password": "" + } } # TextSplitter配置项,如果你不明白其中的含义,就不要修改。 diff --git a/configs/model_config.py.example b/configs/model_config.py.example index e6d78b9..9988451 100644 --- a/configs/model_config.py.example +++ b/configs/model_config.py.example @@ -18,7 +18,7 @@ EMBEDDING_MODEL_OUTPUT_PATH = "output" # 要运行的 LLM 名称,可以包括本地模型和在线模型。 # 第一个将作为 API 和 WEBUI 的默认模型 -LLM_MODELS = ["chatglm2-6b-int4", "zhipu-api", "openai-api] +LLM_MODELS = ["chatglm2-6b", "zhipu-api", "openai-api"] # AgentLM模型的名称 (可以不指定,指定之后就锁定进入Agent之后的Chain的模型,不指定就是LLM_MODELS[0]) Agent_MODEL = None @@ -113,6 +113,7 @@ ONLINE_LLM_MODEL = { "api_key": "", "provider": "AzureWorker", }, + } # 在以下字典中修改属性值,以指定本地embedding模型存储位置。支持3种设置方法: @@ -198,6 +199,8 @@ MODEL_PATH = { "Qwen-14B": "Qwen/Qwen-14B", "Qwen-7B-Chat": "Qwen/Qwen-7B-Chat", "Qwen-14B-Chat": "Qwen/Qwen-14B-Chat", + "Qwen-14B-Chat-Int8": "Qwen/Qwen-14B-Chat-Int8" # 确保已经安装了auto-gptq optimum flash-atten + "Qwen-14B-Chat-Int4": "Qwen/Qwen-14B-Chat-Int4" # 确保已经安装了auto-gptq optimum flash-atten }, } diff --git a/docs/ES部署指南.md b/docs/ES部署指南.md new file mode 100644 index 0000000..f461582 --- /dev/null +++ b/docs/ES部署指南.md @@ -0,0 +1,29 @@ + +# 实现基于ES的数据插入、检索、删除、更新 +```shell +author: 唐国梁Tommy +e-mail: flytang186@qq.com + +如果遇到任何问题,可以与我联系,我这边部署后服务是没有问题的。 +``` + +## 第1步:ES docker部署 +```shell +docker network create elastic +docker run -id --name elasticsearch --net elastic -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "xpack.security.enabled=false" -e "xpack.security.http.ssl.enabled=false" -t docker.elastic.co/elasticsearch/elasticsearch:8.8.2 +``` + +### 第2步:Kibana docker部署 +**注意:Kibana版本与ES保持一致** +```shell +docker pull docker.elastic.co/kibana/kibana:{version} +docker run --name kibana --net elastic -p 5601:5601 docker.elastic.co/kibana/kibana:{version} +``` + +### 第3步:核心代码 +```shell +1. 核心代码路径 +server/knowledge_base/kb_service/es_kb_service.py + +2. 需要在 configs/model_config.py 中 配置 ES参数(IP, PORT)等; +``` \ No newline at end of file diff --git a/server/db/repository/knowledge_file_repository.py b/server/db/repository/knowledge_file_repository.py index 08417a4..3d77cc0 100644 --- a/server/db/repository/knowledge_file_repository.py +++ b/server/db/repository/knowledge_file_repository.py @@ -51,6 +51,10 @@ def add_docs_to_db(session, 将某知识库某文件对应的所有Document信息添加到数据库。 doc_infos形式:[{"id": str, "metadata": dict}, ...] ''' + #! 这里会出现doc_infos为None的情况,需要进一步排查 + if doc_infos is None: + print("输入的server.db.repository.knowledge_file_repository.add_docs_to_db的doc_infos参数为None") + return False for d in doc_infos: obj = FileDocModel( kb_name=kb_name, diff --git a/server/knowledge_base/kb_cache/base.py b/server/knowledge_base/kb_cache/base.py index b65e4ff..99f854f 100644 --- a/server/knowledge_base/kb_cache/base.py +++ b/server/knowledge_base/kb_cache/base.py @@ -119,7 +119,7 @@ class EmbeddingsPool(CachePool): def load_embeddings(self, model: str = None, device: str = None) -> Embeddings: self.atomic.acquire() model = model or EMBEDDING_MODEL - device = device or embedding_device() + device = embedding_device() key = (model, device) if not self.get(key): item = ThreadSafeObject(key, pool=self) diff --git a/server/knowledge_base/kb_service/base.py b/server/knowledge_base/kb_service/base.py index d823740..e221e43 100644 --- a/server/knowledge_base/kb_service/base.py +++ b/server/knowledge_base/kb_service/base.py @@ -46,6 +46,7 @@ class SupportedVSType: DEFAULT = 'default' ZILLIZ = 'zilliz' PG = 'pg' + ES = 'es' class KBService(ABC): @@ -274,6 +275,12 @@ class KBServiceFactory: from server.knowledge_base.kb_service.zilliz_kb_service import ZillizKBService return ZillizKBService(kb_name, embed_model=embed_model) elif SupportedVSType.DEFAULT == vector_store_type: + return MilvusKBService(kb_name, + embed_model=embed_model) # other milvus parameters are set in model_config.kbs_config + elif SupportedVSType.ES == vector_store_type: + from server.knowledge_base.kb_service.es_kb_service import ESKBService + return ESKBService(kb_name, embed_model=embed_model) + elif SupportedVSType.DEFAULT == vector_store_type: # kb_exists of default kbservice is False, to make validation easier. from server.knowledge_base.kb_service.default_kb_service import DefaultKBService return DefaultKBService(kb_name) diff --git a/server/knowledge_base/kb_service/es_kb_service.py b/server/knowledge_base/kb_service/es_kb_service.py new file mode 100644 index 0000000..200de5f --- /dev/null +++ b/server/knowledge_base/kb_service/es_kb_service.py @@ -0,0 +1,205 @@ +#!/user/bin/env python3 +""" +File_Name: es_kb_service.py +Author: TangGuoLiang +Email: 896165277@qq.com +Created: 2023-09-05 +""" +from typing import List +import os +import shutil +from langchain.embeddings.base import Embeddings +from langchain.schema import Document +from langchain.vectorstores.elasticsearch import ElasticsearchStore +from configs import KB_ROOT_PATH, EMBEDDING_MODEL, EMBEDDING_DEVICE, CACHED_VS_NUM +from server.knowledge_base.kb_service.base import KBService, SupportedVSType +from server.utils import load_local_embeddings +from elasticsearch import Elasticsearch +from configs import logger +from configs import kbs_config + +class ESKBService(KBService): + + def do_init(self): + self.kb_path = self.get_kb_path(self.kb_name) + self.index_name = self.kb_path.split("/")[-1] + self.IP = kbs_config[self.vs_type()]['host'] + self.PORT = kbs_config[self.vs_type()]['port'] + self.user = kbs_config[self.vs_type()].get("user",'') + self.password = kbs_config[self.vs_type()].get("password",'') + self.embeddings_model = load_local_embeddings(self.embed_model, EMBEDDING_DEVICE) + try: + # ES python客户端连接(仅连接) + if self.user != "" and self.password != "": + self.es_client_python = Elasticsearch(f"http://{self.IP}:{self.PORT}", + basic_auth=(self.user,self.password)) + else: + logger.warning("ES未配置用户名和密码") + self.es_client_python = Elasticsearch(f"http://{self.IP}:{self.PORT}") + self.es_client_python.indices.create(index=self.index_name) + except ConnectionError: + logger.error("连接到 Elasticsearch 失败!") + except Exception as e: + logger.error(f"Error 发生 : {e}") + + try: + # langchain ES 连接、创建索引 + if self.user != "" and self.password != "": + self.db_init = ElasticsearchStore( + es_url=f"http://{self.IP}:{self.PORT}", + index_name=self.index_name, + query_field="context", + vector_query_field="dense_vector", + embedding=self.embeddings_model, + es_user=self.user, + es_password=self.password + ) + else: + logger.warning("ES未配置用户名和密码") + self.db_init = ElasticsearchStore( + es_url=f"http://{self.IP}:{self.PORT}", + index_name=self.index_name, + query_field="context", + vector_query_field="dense_vector", + embedding=self.embeddings_model, + ) + except ConnectionError: + print("### 连接到 Elasticsearch 失败!") + logger.error("### 连接到 Elasticsearch 失败!") + except Exception as e: + logger.error(f"Error 发生 : {e}") + + @staticmethod + def get_kb_path(knowledge_base_name: str): + return os.path.join(KB_ROOT_PATH, knowledge_base_name) + + @staticmethod + def get_vs_path(knowledge_base_name: str): + return os.path.join(ESKBService.get_kb_path(knowledge_base_name), "vector_store") + + def do_create_kb(self): + if os.path.exists(self.doc_path): + if not os.path.exists(os.path.join(self.kb_path, "vector_store")): + os.makedirs(os.path.join(self.kb_path, "vector_store")) + else: + logger.warning("directory `vector_store` already exists.") + + def vs_type(self) -> str: + return SupportedVSType.ES + + def _load_es(self, docs, embed_model): + # 将docs写入到ES中 + try: + # 连接 + 同时写入文档 + if self.user != "" and self.password != "": + self.db = ElasticsearchStore.from_documents( + documents=docs, + embedding=embed_model, + es_url= f"http://{self.IP}:{self.PORT}", + index_name=self.index_name, + distance_strategy="COSINE", + query_field="context", + vector_query_field="dense_vector", + verify_certs=False, + es_user=self.user, + es_password=self.password + ) + else: + self.db = ElasticsearchStore.from_documents( + documents=docs, + embedding=embed_model, + es_url= f"http://{self.IP}:{self.PORT}", + index_name=self.index_name, + distance_strategy="COSINE", + query_field="context", + vector_query_field="dense_vector", + verify_certs=False) + except ConnectionError as ce: + print(ce) + print("连接到 Elasticsearch 失败!") + logger.error("连接到 Elasticsearch 失败!") + except Exception as e: + logger.error(f"Error 发生 : {e}") + print(e) + + + + def do_search(self, query:str, top_k: int, score_threshold: float): + # 文本相似性检索 + docs = self.db_init.similarity_search_with_score(query=query, + k=top_k) + return docs + + + def do_delete_doc(self, kb_file, **kwargs): + if self.es_client_python.indices.exists(index=self.index_name): + # 从向量数据库中删除索引(文档名称是Keyword) + query = { + "query": { + "term": { + "metadata.source.keyword": kb_file.filepath + } + } + } + # 注意设置size,默认返回10个。 + search_results = self.es_client_python.search(body=query, size=50) + delete_list = [hit["_id"] for hit in search_results['hits']['hits']] + if len(delete_list) == 0: + return None + else: + for doc_id in delete_list: + try: + self.es_client_python.delete(index=self.index_name, + id=doc_id, + refresh=True) + except Exception as e: + logger.error("ES Docs Delete Error!") + + # self.db_init.delete(ids=delete_list) + #self.es_client_python.indices.refresh(index=self.index_name) + + + def do_add_doc(self, docs: List[Document], **kwargs): + '''向知识库添加文件''' + print(f"server.knowledge_base.kb_service.es_kb_service.do_add_doc 输入的docs参数长度为:{len(docs)}") + print("*"*100) + self._load_es(docs=docs, embed_model=self.embeddings_model) + # 获取 id 和 source , 格式:[{"id": str, "metadata": dict}, ...] + print("写入数据成功.") + print("*"*100) + + if self.es_client_python.indices.exists(index=self.index_name): + file_path = docs[0].metadata.get("source") + query = { + "query": { + "term": { + "metadata.source.keyword": file_path + } + } + } + search_results = self.es_client_python.search(body=query) + if len(search_results["hits"]["hits"]) == 0: + raise ValueError("召回元素个数为0") + info_docs = [{"id":hit["_id"], "metadata": hit["_source"]["metadata"]} for hit in search_results["hits"]["hits"]] + return info_docs + + + def do_clear_vs(self): + """从知识库删除全部向量""" + if self.es_client_python.indices.exists(index=self.kb_name): + self.es_client_python.indices.delete(index=self.kb_name) + + + def do_drop_kb(self): + """删除知识库""" + # self.kb_file: 知识库路径 + if os.path.exists(self.kb_path): + shutil.rmtree(self.kb_path) + + + + + + + +