Langchain-Chatchat/server/db/repository/knowledge_file_repository.py

113 lines
4.1 KiB
Python

from server.db.models.knowledge_base_model import KnowledgeBaseModel
from server.db.models.knowledge_file_model import KnowledgeFileModel
from server.db.session import with_session
from server.knowledge_base.utils import KnowledgeFile
@with_session
def count_files_from_db(session, kb_name: str) -> int:
return session.query(KnowledgeFileModel).filter_by(kb_name=kb_name).count()
@with_session
def list_files_from_db(session, kb_name):
files = session.query(KnowledgeFileModel).filter_by(kb_name=kb_name).all()
docs = [f.file_name for f in files]
return docs
@with_session
def add_file_to_db(session,
kb_file: KnowledgeFile,
docs_count: int = 0,
custom_docs: bool = False,):
kb = session.query(KnowledgeBaseModel).filter_by(kb_name=kb_file.kb_name).first()
if kb:
# 如果已经存在该文件,则更新文件信息与版本号
existing_file: KnowledgeFileModel = (session.query(KnowledgeFileModel)
.filter_by(file_name=kb_file.filename,
kb_name=kb_file.kb_name)
.first())
mtime = kb_file.get_mtime()
size = kb_file.get_size()
if existing_file:
existing_file.file_mtime = mtime
existing_file.file_size = size
existing_file.docs_count = docs_count
existing_file.custom_docs = custom_docs
existing_file.file_version += 1
# 否则,添加新文件
else:
new_file = KnowledgeFileModel(
file_name=kb_file.filename,
file_ext=kb_file.ext,
kb_name=kb_file.kb_name,
document_loader_name=kb_file.document_loader_name,
text_splitter_name=kb_file.text_splitter_name or "SpacyTextSplitter",
file_mtime=mtime,
file_size=size,
docs_count = docs_count,
custom_docs=custom_docs,
)
kb.file_count += 1
session.add(new_file)
return True
@with_session
def delete_file_from_db(session, kb_file: KnowledgeFile):
existing_file = session.query(KnowledgeFileModel).filter_by(file_name=kb_file.filename,
kb_name=kb_file.kb_name).first()
if existing_file:
session.delete(existing_file)
session.commit()
kb = session.query(KnowledgeBaseModel).filter_by(kb_name=kb_file.kb_name).first()
if kb:
kb.file_count -= 1
session.commit()
return True
@with_session
def delete_files_from_db(session, knowledge_base_name: str):
session.query(KnowledgeFileModel).filter_by(kb_name=knowledge_base_name).delete()
kb = session.query(KnowledgeBaseModel).filter_by(kb_name=knowledge_base_name).first()
if kb:
kb.file_count = 0
session.commit()
return True
@with_session
def file_exists_in_db(session, kb_file: KnowledgeFile):
existing_file = session.query(KnowledgeFileModel).filter_by(file_name=kb_file.filename,
kb_name=kb_file.kb_name).first()
return True if existing_file else False
@with_session
def get_file_detail(session, kb_name: str, filename: str) -> dict:
file: KnowledgeFileModel = (session.query(KnowledgeFileModel)
.filter_by(file_name=filename,
kb_name=kb_name).first())
if file:
return {
"kb_name": file.kb_name,
"file_name": file.file_name,
"file_ext": file.file_ext,
"file_version": file.file_version,
"document_loader": file.document_loader_name,
"text_splitter": file.text_splitter_name,
"create_time": file.create_time,
"file_mtime": file.file_mtime,
"file_size": file.file_size,
"custom_docs": file.custom_docs,
"docs_count": file.docs_count,
}
else:
return {}