Langchain-Chatchat/server/knowledge_base/kb_service/es_kb_service.py

332 lines
14 KiB
Python
Raw Normal View History

2023-09-14 07:54:42 +08:00
from typing import List
import os
import shutil
from langchain.embeddings.base import Embeddings
from langchain.schema import Document
from langchain.vectorstores.elasticsearch import ElasticsearchStore
2023-11-10 16:04:41 +08:00
from configs import KB_ROOT_PATH, EMBEDDING_MODEL, EMBEDDING_DEVICE, CACHED_VS_NUM
2023-09-14 07:54:42 +08:00
from server.knowledge_base.kb_service.base import KBService, SupportedVSType
from server.knowledge_base.utils import KnowledgeFile
2023-11-10 16:04:41 +08:00
from server.utils import load_local_embeddings
from elasticsearch import Elasticsearch,BadRequestError
from configs import logger,appLogger
2023-11-10 16:04:41 +08:00
from configs import kbs_config
2024-03-07 14:29:08 +08:00
from server.knowledge_base.model.kb_document_model import DocumentWithVSId
2023-09-14 07:54:42 +08:00
class ESKBService(KBService):
def do_init(self):
self.kb_path = self.get_kb_path(self.kb_name)
self.index_name = os.path.split(self.kb_path)[-1]
2023-09-14 07:54:42 +08:00
self.IP = kbs_config[self.vs_type()]['host']
self.PORT = kbs_config[self.vs_type()]['port']
2023-11-10 16:04:41 +08:00
self.user = kbs_config[self.vs_type()].get("user",'')
self.password = kbs_config[self.vs_type()].get("password",'')
self.dims_length = kbs_config[self.vs_type()].get("dims_length",None)
2023-11-10 16:04:41 +08:00
self.embeddings_model = load_local_embeddings(self.embed_model, EMBEDDING_DEVICE)
2023-09-14 07:54:42 +08:00
try:
# ES python客户端连接仅连接
2023-11-10 16:04:41 +08:00
if self.user != "" and self.password != "":
self.es_client_python = Elasticsearch(f"http://{self.IP}:{self.PORT}",
basic_auth=(self.user,self.password))
else:
appLogger.warning("ES未配置用户名和密码")
2023-11-10 16:04:41 +08:00
self.es_client_python = Elasticsearch(f"http://{self.IP}:{self.PORT}")
2023-09-14 07:54:42 +08:00
except ConnectionError:
appLogger.error("连接到 Elasticsearch 失败!")
raise ConnectionError
2023-09-14 07:54:42 +08:00
except Exception as e:
appLogger.error(f"Error 发生 : {e}")
raise e
try:
# 首先尝试通过es_client_python创建
mappings = {
"properties": {
"dense_vector": {
"type": "dense_vector",
"dims": self.dims_length,
"index": True
}
}
}
self.es_client_python.indices.create(index=self.index_name, mappings=mappings)
except BadRequestError as e:
appLogger.error("创建索引失败,重新")
appLogger.error(e)
2023-09-14 07:54:42 +08:00
try:
# langchain ES 连接、创建索引
2023-11-10 16:04:41 +08:00
if self.user != "" and self.password != "":
self.db_init = ElasticsearchStore(
es_url=f"http://{self.IP}:{self.PORT}",
2023-09-14 07:54:42 +08:00
index_name=self.index_name,
query_field="context",
2023-11-10 16:04:41 +08:00
vector_query_field="dense_vector",
2023-09-14 07:54:42 +08:00
embedding=self.embeddings_model,
2023-11-10 16:04:41 +08:00
es_user=self.user,
es_password=self.password
2023-09-14 07:54:42 +08:00
)
2023-11-10 16:04:41 +08:00
else:
appLogger.warning("ES未配置用户名和密码")
2023-11-10 16:04:41 +08:00
self.db_init = ElasticsearchStore(
es_url=f"http://{self.IP}:{self.PORT}",
index_name=self.index_name,
query_field="context",
vector_query_field="dense_vector",
embedding=self.embeddings_model,
)
2023-09-14 07:54:42 +08:00
except ConnectionError:
print("### 初始化 Elasticsearch 失败!")
appLogger.error("### 初始化 Elasticsearch 失败!")
raise ConnectionError
2023-09-14 07:54:42 +08:00
except Exception as e:
appLogger.error(f"Error 发生 : {e}")
raise e
try:
# 尝试通过db_init创建索引
self.db_init._create_index_if_not_exists(
index_name=self.index_name,
dims_length=self.dims_length
)
except Exception as e:
appLogger.error("创建索引失败...")
appLogger.error(e)
# raise e
2023-09-14 07:54:42 +08:00
@staticmethod
def get_kb_path(knowledge_base_name: str):
return os.path.join(KB_ROOT_PATH, knowledge_base_name)
@staticmethod
def get_vs_path(knowledge_base_name: str):
return os.path.join(ESKBService.get_kb_path(knowledge_base_name), "vector_store")
def do_create_kb(self):
if os.path.exists(self.doc_path):
2023-11-10 16:04:41 +08:00
if not os.path.exists(os.path.join(self.kb_path, "vector_store")):
os.makedirs(os.path.join(self.kb_path, "vector_store"))
else:
logger.warning("directory `vector_store` already exists.")
2023-09-14 07:54:42 +08:00
def vs_type(self) -> str:
return SupportedVSType.ES
def _load_es(self, docs, embed_model):
# 将docs写入到ES中
try:
# 连接 + 同时写入文档
#使用self.db_initmodified by weiweiwang
2023-11-10 16:04:41 +08:00
if self.user != "" and self.password != "":
# self.db_init.from_documents(
# documents=docs,
# embedding=embed_model,
# es_url= f"http://{self.IP}:{self.PORT}",
# index_name=self.index_name,
# distance_strategy="COSINE",
# query_field="context",
# vector_query_field="dense_vector",
# verify_certs=False,
# es_user=self.user,
# es_password=self.password
# )
2023-11-10 16:04:41 +08:00
self.db = ElasticsearchStore.from_documents(
documents=docs,
embedding=embed_model,
es_url= f"http://{self.IP}:{self.PORT}",
index_name=self.index_name,
distance_strategy="COSINE",
query_field="context",
vector_query_field="dense_vector",
verify_certs=False,
es_user=self.user,
es_password=self.password
)
else:
self.db = ElasticsearchStore.from_documents(
documents=docs,
embedding=embed_model,
es_url= f"http://{self.IP}:{self.PORT}",
index_name=self.index_name,
distance_strategy="COSINE",
query_field="context",
vector_query_field="dense_vector",
verify_certs=False)
except ConnectionError as ce:
print(ce)
print("连接到 Elasticsearch 失败!")
appLogger.error("连接到 Elasticsearch 失败!")
2023-09-14 07:54:42 +08:00
except Exception as e:
appLogger.error(f"Error 发生 : {e}")
2023-11-10 16:04:41 +08:00
print(e)
2023-09-14 07:54:42 +08:00
2023-11-10 16:04:41 +08:00
def do_search(self, query:str, top_k: int, score_threshold: float):
2023-09-14 07:54:42 +08:00
# 文本相似性检索
2024-01-02 15:22:35 +08:00
print(f"do_search,top_k:{top_k},score_threshold:{score_threshold}")
2023-09-14 07:54:42 +08:00
docs = self.db_init.similarity_search_with_score(query=query,
k=top_k)
return docs
def searchbyContent(self, query:str, top_k: int = 2):
if self.es_client_python.indices.exists(index=self.index_name):
appLogger.info(f"******ESKBService searchByContent {self.index_name},query:{query}")
tem_query = {
"query": {"match": {
"context": "*" + query + "*"
2024-03-07 14:29:08 +08:00
}},
"highlight":{"fields":{
"context":{}
}}
}
search_results = self.es_client_python.search(index=self.index_name, body=tem_query, size=top_k)
hits = [hit for hit in search_results["hits"]["hits"]]
2024-03-07 14:29:08 +08:00
docs_and_scores = []
for hit in hits:
highlighted_contexts = ""
if 'highlight' in hit:
highlighted_contexts = " ".join(hit['highlight']['context'])
#print(f"******searchByContent highlighted_contexts:{highlighted_contexts}")
docs_and_scores.append(DocumentWithVSId(
page_content=highlighted_contexts,
metadata=hit["_source"]["metadata"],
id = hit["_id"],
))
return docs_and_scores
def searchbyContentInternal(self, query:str, top_k: int = 2):
if self.es_client_python.indices.exists(index=self.index_name):
appLogger.info(f"******ESKBService searchbyContentInternal {self.index_name},query:{query}")
tem_query = {
"query": {"match": {
"context": "*" + query + "*"
}}
}
search_results = self.es_client_python.search(index=self.index_name, body=tem_query, size=top_k)
hits = [hit for hit in search_results["hits"]["hits"]]
docs_and_scores = [
(
Document(
page_content=hit["_source"]["context"],
metadata=hit["_source"]["metadata"],
),
1.3,
)
for hit in hits
]
return docs_and_scores
2024-03-21 11:11:34 +08:00
def get_doc_by_ids(self, ids: List[str]) -> List[Document]:
result_list = []
for doc_id in ids:
try:
result = self.es_client_python.get(index=self.index_name,
id=doc_id)
#print(f"es_kb_service:result:{result}")
result_list.append(Document(
page_content=result["_source"]["context"],
metadata=result["_source"]["metadata"],
))
except Exception as e:
appLogger.error(f"ES Docs Get Error! {e}")
2024-03-21 11:11:34 +08:00
return result_list
def del_doc_by_ids(self,ids: List[str]) -> bool:
appLogger.info(f"es_kb_service del_doc_by_ids")
for doc_id in ids:
try:
self.es_client_python.delete(index=self.index_name,
id=doc_id,
refresh=True)
except Exception as e:
appLogger.error(f"ES Docs Delete Error! {e}")
2023-09-14 07:54:42 +08:00
2024-03-21 11:11:34 +08:00
2023-09-14 07:54:42 +08:00
def do_delete_doc(self, kb_file, **kwargs):
2024-01-04 18:02:43 +08:00
base_file_name = os.path.basename(kb_file.filepath)
2023-09-14 07:54:42 +08:00
if self.es_client_python.indices.exists(index=self.index_name):
# 从向量数据库中删除索引(文档名称是Keyword)
query = {
"query": {
"term": {
2024-01-04 18:02:43 +08:00
"metadata.source.keyword": base_file_name
2023-09-14 07:54:42 +08:00
}
}
}
2024-01-04 18:02:43 +08:00
print(f"***do_delete_doc: kb_file.filepath:{kb_file.filepath}, base_file_name:{base_file_name}")
2023-09-14 07:54:42 +08:00
# 注意设置size默认返回10个。
search_results = self.es_client_python.search(index=self.index_name, body=query,size=200)
2023-09-14 07:54:42 +08:00
delete_list = [hit["_id"] for hit in search_results['hits']['hits']]
size = len(delete_list)
#print(f"***do_delete_doc: 删除的size:{size}, {delete_list}")
2023-09-14 07:54:42 +08:00
if len(delete_list) == 0:
return None
else:
for doc_id in delete_list:
try:
self.es_client_python.delete(index=self.index_name,
id=doc_id,
refresh=True)
except Exception as e:
appLogger.error(f"ES Docs Delete Error! {e}")
2023-09-14 07:54:42 +08:00
# self.db_init.delete(ids=delete_list)
#self.es_client_python.indices.refresh(index=self.index_name)
def do_add_doc(self, docs: List[Document], **kwargs):
'''向知识库添加文件'''
2023-11-10 16:04:41 +08:00
print(f"server.knowledge_base.kb_service.es_kb_service.do_add_doc 输入的docs参数长度为:{len(docs)}")
print("*"*100)
2023-09-14 07:54:42 +08:00
self._load_es(docs=docs, embed_model=self.embeddings_model)
# 获取 id 和 source , 格式:[{"id": str, "metadata": dict}, ...]
2023-11-10 16:04:41 +08:00
print("写入数据成功.")
print("*"*100)
2023-09-14 07:54:42 +08:00
if self.es_client_python.indices.exists(index=self.index_name):
2023-11-10 16:04:41 +08:00
file_path = docs[0].metadata.get("source")
2023-09-14 07:54:42 +08:00
query = {
"query": {
"term": {
"metadata.source.keyword": file_path
}
}
}
search_results = self.es_client_python.search(index=self.index_name, body=query,size=200)
2023-09-14 07:54:42 +08:00
if len(search_results["hits"]["hits"]) == 0:
raise ValueError("召回元素个数为0")
2023-11-10 16:04:41 +08:00
info_docs = [{"id":hit["_id"], "metadata": hit["_source"]["metadata"]} for hit in search_results["hits"]["hits"]]
#size = len(info_docs)
#print(f"do_add_doc 召回元素个数:{size}")
2023-11-10 16:04:41 +08:00
return info_docs
2023-09-14 07:54:42 +08:00
def do_clear_vs(self):
"""从知识库删除全部向量"""
if self.es_client_python.indices.exists(index=self.kb_name):
self.es_client_python.indices.delete(index=self.kb_name)
def do_drop_kb(self):
"""删除知识库"""
# self.kb_file: 知识库路径
if os.path.exists(self.kb_path):
shutil.rmtree(self.kb_path)
if __name__ == '__main__':
esKBService = ESKBService("test")
#esKBService.clear_vs()
#esKBService.create_kb()
esKBService.add_doc(KnowledgeFile(filename="README.md", knowledge_base_name="test"))
print(esKBService.search_docs("如何启动api服务"))
2023-09-14 07:54:42 +08:00