36 KiB
ClassDef ThreadSafeObject
ThreadSafeObject: ThreadSafeObject 类的功能是提供一个线程安全的对象封装,用于在多线程环境中安全地访问和修改对象。
属性:
_obj: 存储实际对象的属性,可以是任何类型。_key: 对象的键,用于标识对象,可以是字符串或元组。_pool: 对象所属的缓存池,类型为CachePool,默认为 None。_lock: 一个重入锁(threading.RLock),用于确保线程安全。_loaded: 一个事件(threading.Event),用于控制对象加载状态。
代码描述:
ThreadSafeObject 类通过封装对象、键、所属缓存池以及同步机制(重入锁和事件),提供了一个线程安全的方式来访问和修改对象。通过 acquire 方法,可以安全地获取对象的引用,进行操作,并在操作完成后自动释放锁。此外,start_loading 和 finish_loading 方法允许控制对象的加载状态,而 wait_for_loading 方法可以阻塞当前线程,直到对象加载完成。
在项目中,ThreadSafeObject 类与 CachePool 类紧密相关,后者负责管理缓存中的 ThreadSafeObject 实例。例如,CachePool 的 get 方法在返回缓存对象之前会调用 wait_for_loading 方法,确保对象已经加载完成。set 和 pop 方法则用于向缓存中添加或移除 ThreadSafeObject 实例。此外,acquire 方法提供了一种机制,允许在执行操作前安全地获取对象的引用。
ThreadSafeObject 类还被用作其他特定类型对象的基类,例如 ThreadSafeFaiss 类,后者在 ThreadSafeObject 的基础上增加了与 FAISS 相关的功能,如文档计数和保存到磁盘等。
注意:
- 在多线程环境中操作共享资源时,使用 ThreadSafeObject 可以避免数据竞争和其他并发问题。
- 使用
acquire方法时,应确保使用with语句或手动释放锁,以避免死锁。 - 修改
_obj属性时,应通过obj属性的 setter 方法,以确保线程安全。
输出示例:
假设有一个 ThreadSafeObject 实例,其 _key 为 "example_key",_obj 为某个自定义对象。调用 __repr__ 方法可能会返回如下字符串:
"<ThreadSafeObject: key: example_key, obj: <自定义对象的表示>>"
FunctionDef init(self, key, obj, pool)
init: 该函数用于初始化ThreadSafeObject对象。
参数:
key: 可以是字符串或元组,用于标识对象。obj: 初始化时分配给ThreadSafeObject的对象,默认为None。pool: 一个CachePool实例,用于存储缓存对象,默认为None。
代码描述:
这个__init__方法是ThreadSafeObject类的构造函数,负责初始化一个线程安全的对象。它接收三个参数:key、obj和pool。key是一个标识符,可以是字符串或元组,用于唯一标识这个对象。obj参数是任意类型,代表需要被线程安全访问的数据对象,默认值为None。pool参数是一个CachePool类型的实例,它是一个可选参数,默认值为None,用于指定这个对象所属的缓存池。
在对象初始化过程中,首先将传入的obj、key和pool参数分别赋值给内部变量_obj、_key和_pool。接着,使用threading.RLock()创建一个可重入锁(Reentrant Lock),并将其赋值给_lock属性,这样可以确保对象的线程安全访问。最后,创建一个threading.Event()实例赋值给_loaded属性,这个事件对象用于控制对象加载状态的同步。
注意:
- 在多线程环境下操作同一个
ThreadSafeObject实例时,应确保正确使用_lock来避免数据竞争。 key参数是必须的,因为它用于唯一标识一个ThreadSafeObject实例。- 如果提供了
pool参数,那么这个ThreadSafeObject实例将与指定的缓存池相关联,这在管理多个缓存对象时非常有用。
FunctionDef repr(self)
repr: 此函数的功能是返回对象的官方字符串表示。
参数: 此函数没有参数。
代码描述: __repr__ 方法是一个特殊的方法,用于定义对象的“官方”字符串表示。在这个实现中,首先通过 type(self).__name__ 获取对象的类名,然后结合对象的 key 属性和 _obj 属性构造并返回一个格式化的字符串。这个字符串以 <类名: key: 键值, obj: 对象值> 的格式展示,其中 键值 是通过调用对象的 key 方法获取的,而 对象值 直接访问的是对象的 _obj 属性。这种表示方式不仅提供了对象的基本信息,还包括了对象的关键数据,使得调试和日志记录更为方便。
从项目的结构来看,__repr__ 方法与 key 方法有直接的调用关系。key 方法用于获取对象的键值,这是对象在缓存或其他数据结构中的唯一标识符。在 __repr__ 方法中,通过调用 key 方法可以获取到这个键值,并将其包含在对象的字符串表示中。这样做有助于在日志记录或调试时快速识别对象。
注意: 在使用 __repr__ 方法时,需要确保对象的 _key 和 _obj 属性已经被正确初始化,否则可能会导致错误。此外,考虑到 __repr__ 方法的输出可能会被用于日志记录,应确保包含的信息既有用又不过于冗长。
输出示例: 假设对象的类名为 ThreadSafeObject,_key 属性的值为 "example_key",_obj 属性的值为 "example_object",那么调用 __repr__ 方法将返回:
<ThreadSafeObject: key: example_key, obj: example_object>
FunctionDef key(self)
key: 此函数的功能是获取对象的键值。
参数: 此函数没有参数。
代码描述: key 函数是一个简单但关键的部分,用于访问和返回对象的 _key 属性。这个属性通常代表了对象在缓存或其他数据结构中的唯一标识符。在多线程环境下,访问和管理这些键值是确保数据一致性和线程安全的重要机制。
从项目中的调用情况来看,key 函数被多个地方调用,显示其在对象表示(__repr__ 方法)、资源获取(acquire 方法)、以及特定操作如保存和清空缓存(save 和 clear 方法)中的重要性。例如,在 __repr__ 方法中,通过调用 key 函数来获取对象的键值,以构建并返回对象的字符串表示,这有助于调试和日志记录。在 acquire 方法中,key 函数的返回值用于日志记录,帮助追踪哪个线程正在操作哪个资源。在 save 和 clear 方法中,key 同样用于日志记录,提供操作的上下文信息。
注意: 使用此函数时,需要确保 _key 属性已经被正确初始化,否则可能会引发错误。此外,考虑到线程安全,访问 _key 属性时应当小心处理同步问题,尽管在此函数的实现中看似简单,但在多线程环境下使用时应当保持警惕。
输出示例: 假设 _key 属性的值为 "example_key",那么调用 key 函数将返回:
"example_key"
FunctionDef acquire(self, owner, msg)
acquire: 此函数的功能是安全地获取并操作对象资源。
参数:
owner: 字符串类型,默认为空字符串,表示资源的拥有者。如果未提供,则默认使用当前线程的ID。msg: 字符串类型,默认为空字符串,用于附加额外的日志信息。
代码描述:
acquire 函数是一个上下文管理器,用于在多线程环境中安全地获取和释放资源。它首先尝试获取一个锁,以确保在操作对象资源时的线程安全。如果提供了owner参数,则使用该参数值作为资源的拥有者;如果未提供,则默认使用当前线程的ID作为拥有者。此外,如果存在_pool属性且不为None,则会调用_pool._cache.move_to_end(self.key)方法,将当前对象的键值移动到缓存的末尾,这通常用于维护缓存的LRU(最近最少使用)策略。
在成功获取锁并进行资源操作的前后,如果log_verbose为真,则会记录日志信息,包括操作的开始和结束,以及提供的msg信息。这有助于追踪资源的使用情况和调试。
在finally块中,无论资源操作是否成功,都会释放之前获取的锁,确保资源的安全释放,避免死锁的发生。
注意:
- 使用
acquire函数时,需要确保_lock属性已经被正确初始化为一个锁对象,否则在尝试获取锁时会引发异常。 - 在多线程环境下操作共享资源时,正确使用锁是非常重要的,以避免数据竞争和不一致的问题。
acquire函数设计为一个上下文管理器,推荐使用with语句进行调用,以确保资源的正确获取和释放。
FunctionDef start_loading(self)
start_loading: 此函数的功能是清除加载状态。
参数: 此函数没有参数。
代码描述: start_loading 函数是 ThreadSafeObject 类的一个方法,用于重置对象的加载状态。在这个方法中,通过调用 self._loaded.clear() 实现了这一功能。_loaded 是一个标志,用于指示对象的数据是否已经被加载。调用 clear 方法将这个标志重置,意味着对象的加载状态被清除,对象被视为未加载状态。这通常是为了在数据需要重新加载时准备对象,确保数据的最新状态可以被重新加载和使用。
注意: 使用 start_loading 方法时,需要确保任何依赖于对象加载状态的操作都能正确处理对象的未加载状态。此外,考虑到这是一个线程安全的对象,start_loading 方法的调用应该在适当的同步机制下进行,以避免并发访问导致的问题。
FunctionDef finish_loading(self)
finish_loading: 此函数的功能是标记对象加载完成。
参数: 此函数没有参数。
代码描述: finish_loading 函数是ThreadSafeObject类的一个方法,用于在对象的加载或初始化过程完成后,通过设置一个内部的线程安全标志(例如,使用threading.Event的set方法),来通知其他可能在等待此对象加载完成的线程。在本项目中,finish_loading方法被多个地方调用,主要用于标记嵌入模型(Embeddings)或向量存储(Vector Store)加载完成的状态。
在load_embeddings方法中,finish_loading被调用来标记一个嵌入模型对象加载完成。这个过程包括选择合适的嵌入模型,加载模型,并将模型对象赋值给ThreadSafeObject实例的obj属性,最后通过调用finish_loading方法来标记加载过程完成。
在load_vector_store方法中,无论是在KBFaissPool还是MemoFaissPool类中,finish_loading同样被用于标记向量存储加载完成。加载向量存储的过程可能包括从磁盘加载现有的向量存储,或者创建一个新的空向量存储,然后将这个向量存储对象赋值给ThreadSafeFaiss实例的obj属性,并通过finish_loading方法来标记加载完成。
注意: 使用finish_loading方法时,需要确保在对象的加载或初始化逻辑正确完成后调用此方法。此外,调用此方法前,通常会有线程锁的操作,以确保线程安全。在调用finish_loading之后,其他线程可以通过检查相应的线程安全标志来确定对象是否已经加载完成,从而进行后续操作。
FunctionDef wait_for_loading(self)
wait_for_loading: 此函数的功能是等待直到对象加载完成。
参数: 此函数没有参数。
代码描述: wait_for_loading 函数是 ThreadSafeObject 类的一个方法,用于在多线程环境中确保对象安全地完成加载。该方法通过调用 _loaded 属性(一个线程安全的事件对象)的 wait 方法来实现。当 _loaded 事件被设置时,表示对象已经完成加载,此时 wait 方法将停止阻塞,允许执行后续代码。如果 _loaded 事件尚未被设置,调用此方法的线程将被阻塞,直到事件被设置。这种机制确保了在对象未完全加载前,任何依赖于对象状态的操作都将被暂停,从而避免了潜在的数据不一致或竞态条件问题。
注意: 使用 wait_for_loading 方法时,需要确保 _loaded 事件在对象加载完成后被正确设置,否则调用此方法的线程可能会无限期地阻塞。此外,考虑到多线程编程的复杂性,开发者应当仔细管理线程间的同步和通信,以避免死锁或资源竞争等问题。
FunctionDef obj(self)
obj: 此函数的功能是获取ThreadSafeObject对象中的_obj属性。
参数: 此函数没有参数。
代码描述: obj函数是一个简单的访问器(accessor),用于返回ThreadSafeObject实例中的_obj属性。在多线程环境下,ThreadSafeObject对象提供了线程安全的访问方式,确保在并发访问时,对象的状态保持一致性。在项目中,obj函数被用于多个场景,主要是在加载嵌入向量(embeddings)和向量存储(vector store)时,获取已经加载或创建的对象实例。
例如,在load_embeddings方法中,首先检查是否已经加载了指定模型和设备的嵌入向量,如果没有,则创建一个新的ThreadSafeObject实例,并通过obj函数设置嵌入向量对象。在load_vector_store方法中,同样的逻辑被应用于加载或创建向量存储实例。这种使用方式确保了在并发环境下,对嵌入向量和向量存储的访问是线程安全的。
注意: 在使用obj函数时,需要确保ThreadSafeObject实例的_obj属性已经被正确初始化。否则,返回的将是None或者是初始状态的对象,这可能会导致后续操作中出现错误。
输出示例: 假设ThreadSafeObject实例的_obj属性被设置为了一个嵌入向量对象,那么调用obj函数将返回这个嵌入向量对象。例如:
embeddings = thread_safe_object_instance.obj()
这里,embeddings将是之前存储在ThreadSafeObject实例中的嵌入向量对象。
FunctionDef obj(self, val)
obj: obj函数用于设置ThreadSafeObject实例的内部对象。
参数:
val: 任意类型,表示要设置的值。
代码描述:
obj函数是ThreadSafeObject类的一个成员方法,其主要功能是将传入的参数val赋值给实例的_obj属性。这个方法是线程安全对象操作的基础,允许在多线程环境中安全地修改对象的状态。
在项目中,obj函数被用于不同的上下文中,主要涉及到加载和设置嵌入式对象或向量存储。例如,在EmbeddingsPool的load_embeddings方法中,obj函数用于将加载的嵌入式模型对象赋值给ThreadSafeObject实例。这样做可以确保在并发访问时,嵌入式模型对象的加载和访问是线程安全的。
同样,在KBFaissPool和MemoFaissPool的load_vector_store方法中,obj函数被用于设置加载或创建的向量存储对象。这些方法首先检查缓存中是否已存在所需的向量存储,如果不存在,则创建一个新的ThreadSafeFaiss实例,并通过obj函数将向量存储对象赋值给它。这确保了向量存储的加载和初始化过程是线程安全的。
注意:
- 使用obj函数时,需要确保传入的
val参数是正确的类型,因为函数内部不进行类型检查。 - 在多线程环境中使用obj函数修改对象状态时,应注意同步和并发控制,以避免数据竞争和不一致性问题。虽然obj函数本身的操作是简单的赋值,但它在项目中的应用场景通常涉及到线程安全的上下文,因此正确的使用方式对于保持程序的稳定性和正确性至关重要。
ClassDef CachePool
CachePool: CachePool 类的功能是提供一个线程安全的缓存池,用于存储和管理缓存对象。
属性:
_cache_num: 缓存池中允许存储的最大缓存对象数量。如果设置为-1,则不限制数量。_cache: 一个有序字典,用于存储缓存对象。atomic: 一个线程锁,确保缓存操作的线程安全。
代码描述:
CachePool 类提供了一个线程安全的缓存池实现,允许用户存储、获取和管理缓存对象。它使用了一个有序字典 _cache 来存储缓存对象,其中键是缓存对象的标识符,值是缓存对象本身。通过 atomic 线程锁确保了对缓存操作的线程安全性。
__init__方法初始化缓存池,可以指定缓存池中允许的最大缓存对象数量。keys方法返回当前缓存中所有键的列表。_check_count方法检查当前缓存的数量,如果超过了设定的最大值,则移除最早添加的缓存对象。get方法根据键获取缓存对象。如果对象存在,则等待对象加载完成后返回。set方法将一个对象添加到缓存中,并根据需要移除最早的缓存对象以保持在最大数量限制内。pop方法可以移除并返回指定键的缓存对象。如果没有指定键,则移除并返回最早添加的缓存对象。acquire方法尝试获取指定键的缓存对象,并对其进行加锁处理,确保在并发环境下的数据一致性。load_kb_embeddings方法用于加载知识库嵌入向量,它根据知识库名称、嵌入设备和默认嵌入模型加载嵌入向量。
CachePool 类在项目中被其他对象如 EmbeddingsPool 和 _FaissPool 调用,用于管理嵌入向量和向量存储的缓存。这些调用情况表明 CachePool 类在知识库嵌入向量和向量存储管理中起到了核心作用,为上层提供了缓存管理和线程安全的支持。
注意:
- 在多线程环境下操作缓存时,应确保正确使用
atomic锁来避免数据竞争。 - 当设置
_cache_num限制缓存数量时,需要注意缓存淘汰策略可能会影响到缓存对象的可用性。
输出示例:
由于 CachePool 主要提供缓存管理功能,其输出依赖于存储在缓存中的对象类型。例如,如果缓存中存储的是嵌入向量对象,那么 get 方法可能返回一个嵌入向量对象的实例。
FunctionDef init(self, cache_num)
init: 此函数的功能是初始化CachePool对象。
参数:
cache_num: 整型,表示缓存中允许的最大元素数量,默认值为-1,表示不限制数量。self._cache: 使用OrderedDict初始化,用于存储缓存数据,保持插入顺序。self.atomic: 使用threading.RLock初始化,提供一个基于线程的锁,用于控制对缓存数据的并发访问,确保线程安全。
代码描述:
此函数是CachePool类的构造函数,用于初始化一个缓存池对象。它接受一个参数cache_num,该参数指定了缓存中可以存储的最大元素数量。如果cache_num的值为-1,则表示缓存大小不受限制。函数内部首先将cache_num参数的值赋给对象的_cache_num属性,用于后续控制缓存大小。接着,使用OrderedDict初始化_cache属性,OrderedDict是一种特殊的字典,它可以记住元素被插入的顺序,这对于某些缓存淘汰策略(如最近最少使用LRU)是非常有用的。最后,通过threading.RLock创建一个可重入锁atomic,赋给对象的atomic属性。这个锁用于同步对缓存的访问,确保在多线程环境下对缓存的操作是线程安全的。
注意:
- 在多线程环境下操作缓存时,应确保正确使用
self.atomic锁,以避免数据竞争和不一致的问题。 cache_num的默认值为-1,意味着如果不特别指定,缓存大小不会受到限制。在实际应用中,根据需要合理设置此参数,以避免因缓存过大而消耗过多内存资源。OrderedDict虽然可以保持元素的插入顺序,但在处理大量数据时可能会比普通字典有更高的性能开销,因此在设计缓存策略时应考虑到这一点。
FunctionDef keys(self)
keys: 此函数的作用是获取缓存中所有键的列表。
参数: 此函数没有参数。
代码描述: keys 函数是 CachePool 类的一个方法,它的主要作用是从缓存中检索所有的键,并将这些键作为字符串列表返回。在这个函数中,self._cache.keys() 调用会获取 _cache 字典中所有的键,然后通过 list() 函数将这些键转换成列表形式。这是一个非常基础但重要的功能,因为它允许其他部分的代码了解缓存中目前存储了哪些数据的键。
在项目中,这个函数被 file_chat 函数调用,以检查传入的 knowledge_id 是否存在于 memo_faiss_pool 缓存中。如果 knowledge_id 不在缓存的键中,file_chat 函数将返回一个错误响应,指出需要的临时知识库未找到。这表明 keys 函数在项目中用于验证和检索操作,确保在进行进一步处理之前,所需的数据已经被正确地存储在缓存中。
注意: 使用此函数时,需要确保 _cache 已经被正确初始化并且包含了所需的数据。此外,返回的键列表仅代表了函数被调用时刻缓存中的状态,缓存内容的后续更新不会反映在已返回的列表中。
输出示例: 假设缓存中存储了三个键,分别为 "key1", "key2", "key3",那么调用 keys 函数将返回以下列表:
["key1", "key2", "key3"]
FunctionDef _check_count(self)
_check_count: 此函数的功能是检查缓存中的项目数量,并确保其不超过设定的最大值。
参数: 此函数没有参数。
代码描述: _check_count 函数是 CachePool 类的一个私有方法,用于维护缓存池中的项目数量不超过一个预设的最大值。这个函数首先检查成员变量 _cache_num 是否为整数且大于0。如果是,函数进入一个循环,循环的条件是缓存池 _cache 的长度大于 _cache_num。在循环内部,使用 _cache.popitem(last=False) 从缓存中移除最早添加的项目,直到缓存的大小不超过 _cache_num。这种机制确保了缓存池不会无限增长,从而有效管理内存使用。
在项目中,_check_count 被 set 方法调用。set 方法用于向缓存中添加一个新的项目,并在添加后立即调用 _check_count 来确保缓存池的大小不会超过预设的限制。这表明 _check_count 在缓存管理策略中起到了关键的作用,它通过限制缓存大小来防止内存溢出,确保了缓存系统的健壮性和稳定性。
注意: _check_count 是一个私有方法,意味着它仅在 CachePool 类内部使用,不应该被类外部直接调用。这种设计封装了缓存管理的细节,使得 CachePool 类的使用更加安全和方便。在使用 CachePool 类时,开发者无需直接管理缓存大小,而是通过设置 _cache_num 并使用 set 方法来间接控制。
FunctionDef get(self, key)
get: 此函数的功能是从缓存池中获取与给定键关联的线程安全对象。
参数:
key: 字符串类型,用于从缓存中检索对象的键。
代码描述:
get 函数首先尝试从缓存池 _cache 中使用给定的键 key 来获取一个对象。如果找到了对应的对象,该函数会调用该对象的 wait_for_loading 方法。这个方法的作用是阻塞当前线程,直到对象的加载状态被设置为完成。这确保了在返回对象之前,对象已经处于可用状态。如果缓存中没有找到对应的键,函数将不会返回任何值。
在项目中,get 函数被多个地方调用,包括但不限于加载嵌入向量、保存和卸载向量存储、以及在工作线程中操作向量存储。这些调用场景表明,get 函数是处理缓存对象的关键组件,特别是在需要确保对象加载完成后再进行操作的情况下。
注意:
- 使用
get函数时,应确保提供的键key在缓存中确实存在,否则函数将返回None。 - 在多线程环境下,
get函数通过wait_for_loading方法确保了线程安全,避免了在对象加载完成前的竞态条件。
输出示例:
假设缓存中存在一个键为 "example_key" 的 ThreadSafeObject 实例,且该实例已完成加载。调用 get("example_key") 将返回该 ThreadSafeObject 实例。如果 "example_key" 不存在于缓存中,函数将返回 None。
FunctionDef set(self, key, obj)
set: 此函数的功能是将一个线程安全的对象添加到缓存池中,并返回该对象。
参数:
key: 字符串类型,用于标识缓存中的对象。obj:ThreadSafeObject类型,表示要添加到缓存中的线程安全对象。
代码描述: set 函数首先将传入的线程安全对象 obj 与其对应的键 key 存储在缓存池的内部字典 _cache 中。这一操作确保了对象可以通过键值快速检索。随后,函数调用 _check_count 方法来检查缓存池中的对象数量是否超过了预设的最大值,如果超过了,将自动移除最早添加的对象以保持缓存池的大小在限制之内。最后,函数返回刚刚添加的线程安全对象 obj。
在项目中,set 函数被用于多个场景,包括但不限于加载嵌入向量(load_embeddings)、加载向量存储(load_vector_store)等。这些场景中,set 函数负责将新创建或加载的资源(如嵌入向量、向量存储)以线程安全的方式添加到缓存池中,确保后续可以高效、安全地访问这些资源。
注意:
- 使用
set函数时,需要确保传入的键key在缓存池中唯一,以避免覆盖已有的缓存项。 - 由于
set函数会检查缓存池的大小并在必要时移除最早的缓存项,开发者应当合理设置缓存池的最大容量_cache_num,以平衡内存使用和性能需求。 - 在多线程环境下,
set函数保证了添加缓存项的线程安全性,但在使用缓存项时,仍需注意线程安全的访问和操作。
输出示例: 假设调用 set 函数添加了一个键为 "example_key",对象为某个 ThreadSafeObject 实例的缓存项,函数将返回这个 ThreadSafeObject 实例。
FunctionDef pop(self, key)
pop: 该函数用于从缓存池中移除并返回指定键的对象或最早添加的对象。
参数:
key: 字符串类型,指定要移除对象的键。默认为 None,表示移除并返回最早添加的对象。
代码描述:
pop 函数是 CachePool 类的一个方法,用于根据给定的键从缓存中移除并返回相应的对象。如果调用时没有提供键(即 key=None),则函数会移除并返回缓存中最早被添加的对象。这是通过调用 _cache 字典的 popitem 方法实现的,其中 last=False 参数确保返回最早添加的项。如果提供了键,则通过 _cache 字典的 pop 方法尝试移除并返回与该键关联的对象。如果键不存在,则返回 None。
在项目中,pop 方法被多个场景调用,例如在 upload_temp_docs 函数中,用于移除之前的临时文档;在 unload_vector_store 方法中,用于释放向量库;在 do_clear_vs 方法中,用于清除特定的向量存储;以及在 drop_kb_summary 方法中,用于删除知识库摘要。这些调用场景表明 pop 方法在管理缓存资源、维护缓存状态和释放不再需要的资源方面起着关键作用。
注意:
- 在使用
pop方法时,应确保键的正确性和存在性,特别是在期望移除特定对象时。如果键不存在,方法将返回None,而不是抛出异常。 - 当不需要指定键移除对象时,应注意
pop方法将移除并返回最早添加的对象,这可能会影响缓存的使用逻辑。
输出示例:
假设缓存中存在键为 "example_key" 的 ThreadSafeObject 对象,调用 pop("example_key") 将返回该对象,并从缓存中移除。如果缓存为空或键不存在,调用 pop("nonexistent_key") 将返回 None。
FunctionDef acquire(self, key, owner, msg)
acquire: 此函数的功能是从缓存池中安全地获取与给定键关联的对象。
参数:
key: 字符串或元组类型,用于从缓存中检索对象的键。owner: 字符串类型,默认为空字符串,表示请求对象的所有者。msg: 字符串类型,默认为空字符串,用于附加消息或说明。
代码描述:
acquire 函数首先通过调用 get 方法尝试从缓存池中获取与给定键 key 关联的对象。如果缓存中不存在该键对应的对象,函数将抛出 RuntimeError 异常,提示请求的资源不存在。如果成功获取到对象,并且该对象是 ThreadSafeObject 类型的实例,则会调用该对象的 acquire 方法来安全地获取对象的引用,并将其返回。这一过程涉及到线程安全的处理,确保在多线程环境下对对象的访问是安全的。如果获取到的对象不是 ThreadSafeObject 类型的实例,则直接返回该对象。
在项目中,acquire 函数被用于安全地获取缓存中的对象,以进行后续操作。例如,在 knowledge_base_chat_iterator 函数中,通过调用 acquire 方法来获取知识库中的向量存储对象,以执行相似度搜索等操作。
注意:
- 在使用
acquire函数时,应确保提供的键key在缓存中确实存在,否则会抛出异常。 - 当获取到的对象是
ThreadSafeObject类型的实例时,应通过with语句或其他方式确保在操作完成后释放锁,以避免潜在的死锁问题。 acquire方法的使用场景主要集中在需要线程安全访问和操作缓存对象的情况。
输出示例:
由于 acquire 函数的返回值取决于缓存中对象的类型,因此可能有不同的返回形式。如果缓存中的对象是 ThreadSafeObject 类型的实例,则返回值将是该对象的引用;如果是其他类型的对象,则直接返回该对象。例如,如果缓存中存在键为 "example_key" 的 ThreadSafeObject 实例,调用 acquire("example_key") 将返回该实例的引用。如果 "example_key" 对应的是非 ThreadSafeObject 类型的对象,则直接返回该对象。
FunctionDef load_kb_embeddings(self, kb_name, embed_device, default_embed_model)
load_kb_embeddings: 此函数的功能是加载指定知识库名称的嵌入向量。
参数:
kb_name: 字符串类型,指定要加载嵌入向量的知识库名称。embed_device: 字符串类型,默认值由embedding_device函数确定,用于指定计算设备。default_embed_model: 字符串类型,默认值为EMBEDDING_MODEL,用于指定默认的嵌入模型。
代码描述:
此函数首先从knowledge_base_repository中调用get_kb_detail函数,根据kb_name获取知识库的详细信息,包括嵌入模型名称。如果知识库详情中包含嵌入模型名称,则使用该名称;否则,使用传入的default_embed_model作为嵌入模型名称。接着,函数检查该嵌入模型是否在在线模型列表中,该列表由list_online_embed_models函数提供。如果嵌入模型存在于在线模型列表中,则通过EmbeddingsFunAdapter类创建一个嵌入向量适配器实例并返回。如果嵌入模型不在在线模型列表中,则调用embeddings_pool的load_embeddings方法,根据模型名称和设备类型加载嵌入向量,并返回加载的嵌入向量实例。
注意:
- 在调用此函数时,需要确保传入的
kb_name是存在的知识库名称,且知识库中有对应的嵌入模型信息。 embed_device参数应根据实际计算环境选择合适的设备类型,如"cuda"、"mps"或"cpu"。- 此函数依赖于
EmbeddingsFunAdapter类和embeddings_pool的load_embeddings方法,因此在使用前应确保相关依赖正确配置。
输出示例:
假设知识库名称为"技术文档库",且该知识库使用的嵌入模型在在线模型列表中,则可能返回一个EmbeddingsFunAdapter实例。如果嵌入模型不在在线模型列表中,则可能返回由load_embeddings方法加载的嵌入向量实例,具体类型取决于加载的嵌入模型。
ClassDef EmbeddingsPool
EmbeddingsPool: EmbeddingsPool 类的功能是管理和加载不同模型的嵌入向量。
属性:
- 无特定公开属性,继承自 CachePool 类的属性。
代码描述:
EmbeddingsPool 类继承自 CachePool 类,专门用于加载和管理嵌入向量。它提供了一个 load_embeddings 方法,该方法负责根据指定的模型和设备加载嵌入向量。方法首先尝试从缓存中获取嵌入向量,如果缓存中不存在,则根据模型类型创建相应的嵌入向量对象,并将其添加到缓存中。
在加载嵌入向量时,根据模型的不同,EmbeddingsPool 类会调用不同的嵌入向量类。例如,对于 OpenAI 的 "text-embedding-ada-002" 模型,它会使用 OpenAIEmbeddings 类;对于包含 "bge-" 的模型,它会根据模型语言选择 HuggingFaceBgeEmbeddings 类,并设置相应的查询指令;对于其他模型,它默认使用 HuggingFaceEmbeddings 类。
此外,load_embeddings 方法在加载嵌入向量之前会通过 atomic 锁确保线程安全,避免在并发环境下的数据竞争问题。加载完成后,嵌入向量对象会被存储在缓存中,以便后续快速获取。
注意:
- 使用
load_embeddings方法时,需要确保传入的模型和设备参数正确,否则可能无法加载正确的嵌入向量。 - 在多线程环境下使用此类时,其内部的线程安全机制可以保护嵌入向量的加载过程,但调用者仍需注意线程安全的其他方面。
- 由于嵌入向量可能占用大量内存,应合理管理缓存大小,避免内存溢出。
输出示例:
调用 load_embeddings 方法可能返回的嵌入向量对象示例:
embeddings = embeddings_pool.load_embeddings(model="text-embedding-ada-002", device="cuda")
此代码行将返回一个针对 "text-embedding-ada-002" 模型的 OpenAIEmbeddings 对象,该对象已配置好与 OpenAI API 交互所需的所有参数,并准备好在指定的设备上进行嵌入向量的计算。
FunctionDef load_embeddings(self, model, device)
load_embeddings: 此函数的功能是加载并返回指定模型和设备上的嵌入向量对象。
参数:
model: 字符串类型,指定要加载的嵌入模型名称。如果未提供,则使用默认的嵌入模型。device: 字符串类型,指定计算设备。如果未提供,则通过embedding_device函数自动检测并选择合适的设备。
代码描述:
函数首先尝试获取一个线程安全的锁,以确保在多线程环境下的操作安全。然后,根据提供的模型名称和设备类型构造一个键值对key,用于检索或存储嵌入向量对象。如果缓存中不存在对应的嵌入向量对象,函数将创建一个新的ThreadSafeObject实例,并根据模型名称加载相应的嵌入向量。
如果模型名称为"text-embedding-ada-002",则加载OpenAI提供的嵌入模型。如果模型名称包含"bge-",则根据模型名称中的语言标识(如"zh"或"en")加载对应语言的HuggingFace Bge嵌入模型,并设置相应的查询指令。对于其他模型名称,将加载HuggingFace提供的嵌入模型。
加载完成后,将嵌入向量对象赋值给ThreadSafeObject实例的obj属性,并标记加载完成。如果缓存中已存在对应的嵌入向量对象,则直接返回该对象。
注意:
- 在使用此函数时,应确保提供的模型名称和计算设备类型正确,以便正确加载嵌入向量。
- 函数内部使用了多线程安全机制,包括锁和
ThreadSafeObject,以确保在并发环境下的操作安全。 - 加载嵌入向量可能需要一定的时间,特别是首次加载时,因此在设计应用逻辑时应考虑到可能的延迟。
输出示例:
调用load_embeddings(model="text-embedding-ada-002", device="cuda")可能会返回一个OpenAIEmbeddings的实例,该实例已经加载了指定的OpenAI嵌入模型,并准备好在CUDA设备上进行嵌入向量的计算。
在项目中,load_embeddings函数被load_kb_embeddings和load_local_embeddings等函数调用,用于加载知识库嵌入向量或本地嵌入向量,以支持不同的应用场景,如文本相似度计算、文本检索等。