배경
- faiss 모듈 내 코드를 분석해서, 여러 용도로 응용하고자함.
코드
async def asimilarity_search_with_score_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
비동기적으로 주어진 임베딩 벡터와 가장 유사한 문서들을 반환합니다.
Args:
embedding: 유사한 문서를 찾을 임베딩 벡터.
k: 반환할 문서 수. 기본값은 4.
filter: 메타데이터에 따라 필터링하는 함수 또는 조건(딕셔너리). 기본값은 None.
fetch_k: 필터링 전 검색할 문서 수. 기본값은 20.
Returns:
쿼리와 가장 유사한 문서와 각 문서의 L2 거리 (유사도 점수)의 튜플 리스트.
"""
return await run_in_executor(
None,
self.similarity_search_with_score_by_vector,
embedding,
k=k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
def similarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
주어진 쿼리와 가장 유사한 문서들을 반환합니다.
Args:
query: 유사한 문서를 찾을 텍스트 쿼리.
k: 반환할 문서 수. 기본값은 4.
filter: 메타데이터에 따라 필터링하는 함수 또는 조건(딕셔너리). 기본값은 None.
fetch_k: 필터링 전 검색할 문서 수. 기본값은 20.
Returns:
쿼리와 가장 유사한 문서와 각 문서의 L2 거리 (유사도 점수)의 튜플 리스트.
"""
embedding = self._embed_query(query)
docs = self.similarity_search_with_score_by_vector(
embedding, k, filter=filter, fetch_k=fetch_k, **kwargs
)
return docs
async def asimilarity_search_with_score(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
비동기적으로 주어진 쿼리와 가장 유사한 문서들을 반환합니다.
Args:
query: 유사한 문서를 찾을 텍스트 쿼리.
k: 반환할 문서 수. 기본값은 4.
filter: 메타데이터에 따라 필터링하는 함수 또는 조건(딕셔너리). 기본값은 None.
fetch_k: 필터링 전 검색할 문서 수. 기본값은 20.
Returns:
쿼리와 가장 유사한 문서와 각 문서의 L2 거리 (유사도 점수)의 튜플 리스트.
"""
embedding = await self._aembed_query(query)
docs = await self.asimilarity_search_with_score_by_vector(
embedding, k, filter=filter, fetch_k=fetch_k, **kwargs
)
return docs
def similarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Dict[str, Any]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""
주어진 임베딩 벡터와 가장 유사한 문서들을 반환합니다 (점수 없이).
Args:
embedding: 유사한 문서를 찾을 임베딩 벡터.
k: 반환할 문서 수. 기본값은 4.
filter: 메타데이터에 따라 필터링하는 조건(딕셔너리). 기본값은 None.
fetch_k: 필터링 전 검색할 문서 수. 기본값은 20.
Returns:
쿼리와 가장 유사한 문서들의 리스트.
"""
docs_and_scores = self.similarity_search_with_score_by_vector(
embedding, k, filter=filter, fetch_k=fetch_k, **kwargs
)
return [doc for doc, _ in docs_and_scores]
async def asimilarity_search_by_vector(
self,
embedding: List[float],
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""
비동기적으로 주어진 임베딩 벡터와 가장 유사한 문서들을 반환합니다.
Args:
embedding: 유사한 문서를 찾을 임베딩 벡터.
k: 반환할 문서 수. 기본값은 4.
filter: 메타데이터를 기준으로 필터링하는 함수 또는 조건 (딕셔너리).
fetch_k: 필터링 전 검색할 문서 수. 기본값은 20.
Returns:
주어진 임베딩 벡터와 유사한 문서들의 리스트.
"""
docs_and_scores = await self.asimilarity_search_with_score_by_vector(
embedding,
k,
filter=filter,
fetch_k=fetch_k,
**kwargs,
)
return [doc for doc, _ in docs_and_scores]
def similarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""
주어진 쿼리와 가장 유사한 문서들을 반환합니다.
Args:
query: 유사한 문서를 찾을 텍스트 쿼리.
k: 반환할 문서 수. 기본값은 4.
filter: 메타데이터를 기준으로 필터링하는 함수 또는 조건 (딕셔너리).
fetch_k: 필터링 전 검색할 문서 수. 기본값은 20.
Returns:
주어진 쿼리와 유사한 문서들의 리스트.
"""
docs_and_scores = self.similarity_search_with_score(
query, k, filter=filter, fetch_k=fetch_k, **kwargs
)
return [doc for doc, _ in docs_and_scores]
async def asimilarity_search(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Document]:
"""
비동기적으로 주어진 쿼리와 가장 유사한 문서들을 반환합니다.
Args:
query: 유사한 문서를 찾을 텍스트 쿼리.
k: 반환할 문서 수. 기본값은 4.
filter: 메타데이터를 기준으로 필터링하는 함수 또는 조건 (딕셔너리).
fetch_k: 필터링 전 검색할 문서 수. 기본값은 20.
Returns:
주어진 쿼리와 유사한 문서들의 리스트.
"""
docs_and_scores = await self.asimilarity_search_with_score(
query, k, filter=filter, fetch_k=fetch_k, **kwargs
)
return [doc for doc, _ in docs_and_scores]
def max_marginal_relevance_search_with_score_by_vector(
self,
embedding: List[float],
*,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
) -> List[Tuple[Document, float]]:
"""
최대한의 다양성과 유사성을 고려해 문서와 유사도 점수를 반환합니다.
Args:
embedding: 유사한 문서를 찾을 임베딩 벡터.
k: 반환할 문서 수. 기본값은 4.
fetch_k: 필터링 전 검색할 문서 수. 기본값은 20.
lambda_mult: 다양성 조절 인자 (0: 최대 다양성, 1: 최소 다양성).
Returns:
다양성과 유사성을 최적화하여 선택한 문서와 유사도 점수의 리스트.
"""
scores, indices = self.index.search(
np.array([embedding], dtype=np.float32),
fetch_k if filter is None else fetch_k * 2,
)
if filter is not None:
filter_func = self._create_filter_func(filter)
filtered_indices = []
for i in indices[0]:
if i == -1:
continue
_id = self.index_to_docstore_id[i]
doc = self.docstore.search(_id)
if not isinstance(doc, Document):
raise ValueError(f"ID {_id}에 대한 문서를 찾을 수 없습니다.")
if filter_func(doc.metadata):
filtered_indices.append(i)
indices = np.array([filtered_indices])
embeddings = [self.index.reconstruct(int(i)) for i in indices[0] if i != -1]
mmr_selected = maximal_marginal_relevance(
np.array([embedding], dtype=np.float32),
embeddings,
k=k,
lambda_mult=lambda_mult,
)
docs_and_scores = []
for i in mmr_selected:
if indices[0][i] == -1:
continue
_id = self.index_to_docstore_id[indices[0][i]]
doc = self.docstore.search(_id)
if not isinstance(doc, Document):
raise ValueError(f"ID {_id}에 대한 문서를 찾을 수 없습니다.")
docs_and_scores.append((doc, scores[0][i]))
return docs_and_scores
async def amax_marginal_relevance_search_with_score_by_vector(
self,
embedding: List[float],
*,
k: int = 4,
fetch_k: int = 20,
lambda_mult: float = 0.5,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
) -> List[Tuple[Document, float]]:
"""
비동기적으로 다양성과 유사성을 고려해 문서와 유사도 점수를 반환합니다.
Args:
embedding: 유사한 문서를 찾을 임베딩 벡터.
k: 반환할 문서 수. 기본값은 4.
fetch_k: 필터링 전 검색할 문서 수. 기본값은 20.
lambda_mult: 다양성 조절 인자 (0: 최대 다양성, 1: 최소 다양성).
Returns:
다양성과 유사성을 최적화하여 선택한 문서와 유사도 점수의 리스트.
"""
return await run_in_executor(
None,
self.max_marginal_relevance_search_with_score_by_vector,
embedding,
k=k,
fetch_k=fetch_k,
lambda_mult=lambda_mult,
filter=filter,
)
def delete(self, ids: Optional[List[str]] = None, **kwargs: Any) -> Optional[bool]:
"""
ID를 통해 문서를 삭제합니다.
Args:
ids: 삭제할 문서 ID 목록.
Returns:
삭제 성공 시 True, 실패 시 False, 구현되지 않은 경우 None.
"""
if ids is None:
raise ValueError("삭제할 ID가 지정되지 않았습니다.")
missing_ids = set(ids).difference(self.index_to_docstore_id.values())
if missing_ids:
raise ValueError(f"존재하지 않는 ID: {missing_ids}")
reversed_index = {id_: idx for idx, id_ in self.index_to_docstore_id.items()}
index_to_delete = {reversed_index[id_] for id_ in ids}
self.index.remove_ids(np.fromiter(index_to_delete, dtype=np.int64))
self.docstore.delete(ids)
remaining_ids = [
id_ for i, id_ in sorted(self.index_to_docstore_id.items()) if i not in index_to_delete
]
self.index_to_docstore_id = {i: id_ for i, id_ in enumerate(remaining_ids)}
return True
def merge_from(self, target: FAISS) -> None:
"""
현재 FAISS 객체에 다른 FAISS 객체를 병합합니다.
Args:
target: 현재 객체에 병합할 대상 FAISS 객체
Raises:
ValueError: 현재 docstore가 AddableMixin을 지원하지 않으면 예외 발생.
"""
if not isinstance(self.docstore, AddableMixin):
raise ValueError("해당 유형의 docstore와 병합할 수 없습니다.")
starting_len = len(self.index_to_docstore_id)
self.index.merge_from(target.index)
full_info = []
for i, target_id in target.index_to_docstore_id.items():
doc = target.docstore.search(target_id)
if not isinstance(doc, Document):
raise ValueError("반환된 객체가 Document가 아닙니다.")
full_info.append((starting_len + i, target_id, doc))
self.docstore.add({_id: doc for _, _id, doc in full_info})
index_to_id = {index: _id for index, _id, _ in full_info}
self.index_to_docstore_id.update(index_to_id)
@classmethod
def __from(
cls,
texts: Iterable[str],
embeddings: List[List[float]],
embedding: Embeddings,
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
normalize_L2: bool = False,
distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE,
**kwargs: Any,
) -> FAISS:
"""
텍스트와 임베딩을 사용해 새로운 FAISS 인스턴스를 생성합니다.
Args:
texts: 추가할 텍스트 목록.
embeddings: 텍스트와 대응되는 벡터 임베딩 목록.
embedding: 텍스트를 벡터로 변환하는 Embeddings 객체.
metadatas: 각 텍스트에 연결된 선택적 메타데이터.
ids: 각 텍스트에 대한 선택적 고유 ID.
normalize_L2: L2 정규화 여부.
distance_strategy: 유사도 계산 방식.
Returns:
새로 생성된 FAISS 인스턴스.
"""
faiss = dependable_faiss_import()
index = (
faiss.IndexFlatIP(len(embeddings[0]))
if distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT
else faiss.IndexFlatL2(len(embeddings[0]))
)
docstore = kwargs.pop("docstore", InMemoryDocstore())
index_to_docstore_id = kwargs.pop("index_to_docstore_id", {})
vecstore = cls(
embedding,
index,
docstore,
index_to_docstore_id,
normalize_L2=normalize_L2,
distance_strategy=distance_strategy,
**kwargs,
)
vecstore.__add(texts, embeddings, metadatas=metadatas, ids=ids)
return vecstore
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""
텍스트에서 FAISS 인스턴스를 생성하는 사용자 친화적인 인터페이스.
1. 텍스트를 벡터화
2. 메모리 내 docstore 생성
3. FAISS 데이터베이스 초기화
Args:
texts: 추가할 텍스트 목록.
embedding: 텍스트를 벡터로 변환하는 Embeddings 객체.
metadatas: 각 텍스트에 연결된 선택적 메타데이터.
ids: 각 텍스트에 대한 선택적 고유 ID.
Returns:
새로 생성된 FAISS 인스턴스.
"""
embeddings = embedding.embed_documents(texts)
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
@classmethod
async def afrom_texts(
cls,
texts: list[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""
텍스트에서 비동기적으로 FAISS 인스턴스를 생성합니다.
Args:
texts: 추가할 텍스트 목록.
embedding: 텍스트를 벡터로 변환하는 Embeddings 객체.
metadatas: 각 텍스트에 연결된 선택적 메타데이터.
ids: 각 텍스트에 대한 선택적 고유 ID.
Returns:
새로 생성된 FAISS 인스턴스.
"""
embeddings = await embedding.aembed_documents(texts)
return cls.__from(
texts,
embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
@classmethod
def from_embeddings(
cls,
text_embeddings: Iterable[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""
임베딩이 포함된 텍스트에서 FAISS 인스턴스를 생성합니다.
Args:
text_embeddings: 텍스트와 해당 임베딩의 튜플 목록.
embedding: 텍스트를 벡터로 변환하는 Embeddings 객체.
metadatas: 각 텍스트에 연결된 선택적 메타데이터.
ids: 각 텍스트에 대한 선택적 고유 ID.
Returns:
새로 생성된 FAISS 인스턴스.
"""
texts, embeddings = zip(*text_embeddings)
return cls.__from(
list(texts),
list(embeddings),
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
@classmethod
async def afrom_embeddings(
cls,
text_embeddings: Iterable[Tuple[str, List[float]]],
embedding: Embeddings,
metadatas: Optional[Iterable[dict]] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> FAISS:
"""
비동기적으로 임베딩이 포함된 텍스트에서 FAISS 인스턴스를 생성합니다.
"""
return cls.from_embeddings(
text_embeddings,
embedding,
metadatas=metadatas,
ids=ids,
**kwargs,
)
def save_local(self, folder_path: str, index_name: str = "index") -> None:
"""
로컬 디스크에 FAISS 인덱스, docstore 및 index_to_docstore_id를 저장합니다.
Args:
folder_path: 인덱스, docstore 및 ID를 저장할 폴더 경로.
index_name: 저장할 인덱스 파일 이름 (기본값 "index").
"""
path = Path(folder_path)
path.mkdir(exist_ok=True, parents=True)
faiss = dependable_faiss_import()
faiss.write_index(self.index, str(path / f"{index_name}.faiss"))
with open(path / f"{index_name}.pkl", "wb") as f:
pickle.dump((self.docstore, self.index_to_docstore_id), f)
@classmethod
def load_local(
cls,
folder_path: str,
embeddings: Embeddings,
index_name: str = "index",
*,
allow_dangerous_deserialization: bool = False,
**kwargs: Any,
) -> FAISS:
"""
로컬 디스크에서 FAISS 인덱스, docstore 및 index_to_docstore_id를 로드합니다.
Args:
folder_path: 인덱스, docstore 및 ID를 로드할 폴더 경로.
embeddings: 쿼리 생성을 위한 Embeddings 객체.
index_name: 로드할 인덱스 파일 이름.
allow_dangerous_deserialization: 피클 파일로 인해 발생할 수 있는
보안 위험을 허용할지 여부.
Returns:
로드된 FAISS 인스턴스.
"""
if not allow_dangerous_deserialization:
raise ValueError(
"데이터의 피클 파일 로드는 보안 위험이 있으므로, 신뢰할 수 있는 "
"파일만 사용해야 합니다."
)
path = Path(folder_path)
faiss = dependable_faiss_import()
index = faiss.read_index(str(path / f"{index_name}.faiss"))
with open(path / f"{index_name}.pkl", "rb") as f:
docstore, index_to_docstore_id = pickle.load(f)
return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs)
def serialize_to_bytes(self) -> bytes:
"""
FAISS 인덱스, docstore 및 index_to_docstore_id를 바이트로 직렬화합니다.
Returns:
직렬화된 바이트 객체.
"""
return pickle.dumps((self.index, self.docstore, self.index_to_docstore_id))
@classmethod
def deserialize_from_bytes(
cls,
serialized: bytes,
embeddings: Embeddings,
*,
allow_dangerous_deserialization: bool = False,
**kwargs: Any,
) -> FAISS:
"""
바이트 데이터를 사용하여 FAISS 인덱스, docstore 및 index_to_docstore_id를 역직렬화합니다.
Args:
serialized: 직렬화된 바이트 데이터.
embeddings: 쿼리 생성을 위한 Embeddings 객체.
allow_dangerous_deserialization: 피클 파일을 로드할 수 있도록 허용할지 여부.
보안 위험을 수반하므로 신뢰할 수 있는 소스에서 가져온 파일에만 사용해야 합니다.
Raises:
ValueError: allow_dangerous_deserialization가 False일 때 예외를 발생시킵니다.
Returns:
역직렬화된 FAISS 인스턴스.
"""
if not allow_dangerous_deserialization:
raise ValueError(
"역직렬화는 피클 파일 로드를 기반으로 합니다. "
"피클 파일은 악성 코드를 포함할 수 있어, 실행 시 시스템에서 임의의 코드가 실행될 수 있습니다. "
"역직렬화를 활성화하려면 allow_dangerous_deserialization을 True로 설정해야 합니다. "
"이 경우, 데이터의 출처를 신뢰할 수 있는지 반드시 확인하세요. "
"예를 들어, 직접 생성한 파일이며, 다른 사람이 수정하지 않은 것을 알고 있다면 안전하게 사용할 수 있습니다. "
"신뢰할 수 없는 출처(예: 인터넷의 무작위 사이트)에서 파일을 로드하는 경우에는 "
"절대로 이 값을 True로 설정하지 마세요."
)
index, docstore, index_to_docstore_id = pickle.loads(serialized)
return cls(embeddings, index, docstore, index_to_docstore_id, **kwargs)
def _select_relevance_score_fn(self) -> Callable[[float], float]:
"""
적절한 유사도 평가 함수를 선택합니다. 선택한 함수는 유사도 점수를 정규화하여
적합성을 평가하는 데 사용됩니다.
Returns:
유사도 점수를 입력받아 정규화된 점수를 반환하는 함수.
Raises:
ValueError: distance_strategy가 올바르지 않은 경우 예외 발생.
"""
if self.override_relevance_score_fn is not None:
return self.override_relevance_score_fn
if self.distance_strategy == DistanceStrategy.MAX_INNER_PRODUCT:
return self._max_inner_product_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.EUCLIDEAN_DISTANCE:
return self._euclidean_relevance_score_fn
elif self.distance_strategy == DistanceStrategy.COSINE:
return self._cosine_relevance_score_fn
else:
raise ValueError(
"알 수 없는 거리 전략입니다. cosine, max_inner_product 또는 euclidean 중 하나여야 합니다."
)
def _similarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
정규화된 유사도 점수(0에서 1 사이)를 기준으로 쿼리와 가장 유사한 문서를 반환합니다.
Args:
query: 유사한 문서를 찾을 쿼리 텍스트.
k: 반환할 문서 수 (기본값: 4).
filter: 메타데이터에 따른 필터 조건 (함수 또는 딕셔너리).
fetch_k: 필터링 전 검색할 문서 수 (기본값: 20).
Returns:
문서와 정규화된 유사도 점수의 튜플 리스트.
"""
relevance_score_fn = self._select_relevance_score_fn()
if relevance_score_fn is None:
raise ValueError("정규화 함수가 없으면 유사도 점수를 정규화할 수 없습니다.")
docs_and_scores = self.similarity_search_with_score(
query, k=k, filter=filter, fetch_k=fetch_k, **kwargs
)
docs_and_rel_scores = [
(doc, relevance_score_fn(score)) for doc, score in docs_and_scores
]
return docs_and_rel_scores
async def _asimilarity_search_with_relevance_scores(
self,
query: str,
k: int = 4,
filter: Optional[Union[Callable, Dict[str, Any]]] = None,
fetch_k: int = 20,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""
비동기적으로 정규화된 유사도 점수(0에서 1 사이)를 기준으로 쿼리와 가장 유사한 문서를 반환합니다.
Args:
query: 유사한 문서를 찾을 쿼리 텍스트.
k: 반환할 문서 수 (기본값: 4).
filter: 메타데이터에 따른 필터 조건 (함수 또는 딕셔너리).
fetch_k: 필터링 전 검색할 문서 수 (기본값: 20).
Returns:
문서와 정규화된 유사도 점수의 튜플 리스트.
"""
relevance_score_fn = self._select_relevance_score_fn()
if relevance_score_fn is None:
raise ValueError("정규화 함수가 없으면 유사도 점수를 정규화할 수 없습니다.")
docs_and_scores = await self.asimilarity_search_with_score(
query, k=k, filter=filter, fetch_k=fetch_k, **kwargs
)
docs_and_rel_scores = [
(doc, relevance_score_fn(score)) for doc, score in docs_and_scores
]
return docs_and_rel_scores
@staticmethod
def _create_filter_func(
filter: Optional[Union[Callable, Dict[str, Any]]],
) -> Callable[[Dict[str, Any]], bool]:
"""
제공된 필터를 기반으로 문서의 메타데이터를 검사하는 필터 함수를 생성합니다.
Args:
filter: 문서의 조건을 나타내는 함수 또는 딕셔너리.
Returns:
메타데이터 딕셔너리를 입력받아 조건을 충족하면 True를 반환하는 필터 함수.
Raises:
ValueError: filter가 dict나 callable이 아닌 경우 예외 발생.
"""
if callable(filter):
return filter
if not isinstance(filter, dict):
raise ValueError("filter는 dict 또는 callable이어야 합니다.")
def filter_func(metadata: Dict[str, Any]) -> bool:
return all(
metadata.get(key) in value if isinstance(value, list) else metadata.get(key) == value
for key, value in filter.items()
)
return filter_func