langchain 공부

Faiss 모듈 내 코드 분석1

필만이 2024. 11. 11. 18:10

배경

  • faiss 모듈 내 코드를 분석해서, 여러 용도로 응용하고자함.

코드

from __future__ import annotations  # 미래 버전의 타입 힌트를 사용할 수 있게 함

import logging
import operator
import os
import pickle
import uuid
import warnings
from pathlib import Path
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    List,
    Optional,
    Sized,
    Tuple,
    Union,
)

import numpy as np
from langchain_core.documents import Document
from langchain_core.embeddings import Embeddings
from langchain_core.runnables.config import run_in_executor
from langchain_core.vectorstores import VectorStore

# 문서 저장소와 관련된 클래스 및 유틸리티 함수들 불러오기
from langchain_community.docstore.base import AddableMixin, Docstore
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain_community.vectorstores.utils import (
    DistanceStrategy,  # 거리 계산 방식 클래스
    maximal_marginal_relevance,  # 최대 한계 관련성 함수
)

# 로깅 객체 설정
logger = logging.getLogger(__name__)  # 현재 모듈의 로깅 이름을 기준으로 로거 생성


def dependable_faiss_import(no_avx2: Optional[bool] = None) -> Any:
    """
    faiss 라이브러리를 가져옵니다. faiss를 사용할 수 없는 경우 오류를 발생시킵니다.
    FAISS_NO_AVX2 환경 변수가 설정된 경우, AVX2 최적화 없이 faiss를 로드합니다.

    Args:
        no_avx2 (Optional[bool]): AVX2 최적화 없이 faiss를 로드하려면 True로 설정.
            이는 벡터 저장소의 호환성을 높여 다른 장치에서도 사용할 수 있게 합니다.

    Returns:
        faiss: 가져온 faiss 모듈 객체.

    Raises:
        ImportError: faiss 모듈을 가져올 수 없는 경우 예외가 발생합니다.
    """
    # no_avx2가 명시되지 않았고, 환경 변수 'FAISS_NO_AVX2'가 설정되어 있다면,
    # 환경 변수를 사용하여 no_avx2를 설정
    if no_avx2 is None and "FAISS_NO_AVX2" in os.environ:
        no_avx2 = bool(os.getenv("FAISS_NO_AVX2"))

    try:
        # no_avx2가 True일 경우 AVX2 최적화가 없는 faiss 모듈을 가져옴
        if no_avx2:
            from faiss import swigfaiss as faiss
        else:
            import faiss
    except ImportError:
        # faiss를 설치하지 않았을 경우 오류 메시지와 함께 ImportError를 발생
        raise ImportError(
            "faiss 파이썬 패키지를 가져올 수 없습니다. "
            "CUDA가 지원되는 GPU를 사용하는 경우 `pip install faiss-gpu`를 설치하거나, "
            "파이썬 버전에 맞게 `pip install faiss-cpu`를 설치하세요."
        )
    return faiss


def _len_check_if_sized(x: Any, y: Any, x_name: str, y_name: str) -> None:
    """
    두 개의 입력 객체 x와 y가 Sized일 경우, 길이가 동일한지 검사합니다.
    길이가 다르면 오류를 발생시킵니다.

    Args:
        x (Any): 비교할 첫 번째 객체.
        y (Any): 비교할 두 번째 객체.
        x_name (str): x 객체의 이름 (오류 메시지에 표시될 이름).
        y_name (str): y 객체의 이름 (오류 메시지에 표시될 이름).

    Raises:
        ValueError: x와 y가 Sized 타입이면서 길이가 다를 경우 예외 발생.
    """
    # x와 y가 Sized 타입인 경우 (길이를 측정할 수 있는 객체)
    if isinstance(x, Sized) and isinstance(y, Sized) and len(x) != len(y):
        # 길이가 다를 경우 ValueError 발생
        raise ValueError(
            f"{x_name}과 {y_name}의 길이가 같아야 하지만, "
            f"len({x_name})={len(x)} 및 len({y_name})={len(y)} 입니다."
        )
    return

class FAISS(VectorStore):
    """
    FAISS 벡터 스토어 클래스입니다.

    `Meta Faiss` 라이브러리를 기반으로 하는 벡터 스토어로, 문서의 임베딩을 저장하고 
    검색할 수 있습니다. FAISS 인덱스는 효율적으로 벡터 데이터를 검색하는 데 사용됩니다.

    사용하려면 `faiss` 파이썬 패키지가 설치되어 있어야 합니다.

    예제:
        .. code-block:: python

            from langchain_community.embeddings.openai import OpenAIEmbeddings
            from langchain_community.vectorstores import FAISS

            embeddings = OpenAIEmbeddings()
            texts = ["FAISS is an important library", "LangChain supports FAISS"]
            faiss = FAISS.from_texts(texts, embeddings)

    """

    def __init__(
        self,
        # Union을 사용해 두 타입을 허용 : 매개변수가 여러 타입의 값을 받을 수 있게 하여 더 유연하고 재사용 가능한 코드를 만들기 위해서
        # Callable[[str], List[float]]: 텍스트를 입력으로 받아 임베딩 벡터(리스트 형태의 실수)로 반환
        # Embeddings: 임베딩을 처리하는 객체
        embedding_function: Union[
            Callable[[str], List[float]],
            Embeddings,
        ],
        index: Any,
        docstore: Docstore,
        index_to_docstore_id: Dict[int, str],
        # Optional: relevance_score_fn이 지정되지 않거나 None 값이 전달될 수 있도록 허용
        # Callable은 함수 타입을 나타내며, 여기서는 하나의 float 값을 입력으로 받고 float 값을 반환하는 함수를 기대한다는 것을 나타냅
        relevance_score_fn: Optional[Callable[[float], float]] = None,
        normalize_L2: bool = False,
        distance_strategy: DistanceStrategy = DistanceStrategy.EUCLIDEAN_DISTANCE,
    ):
        """
        FAISS 인스턴스를 초기화합니다.

        Args:
            embedding_function: 텍스트를 벡터(임베딩)로 변환하는 함수 또는 Embeddings 객체.
            index: FAISS 인덱스 객체. 벡터들을 저장하고 검색할 수 있는 데이터 구조입니다.
            docstore: 문서를 저장하는 Docstore 객체. 메타데이터 및 원본 문서를 관리합니다.
            index_to_docstore_id: 인덱스에서 문서 저장소로의 매핑을 나타내는 딕셔너리입니다.
            relevance_score_fn: 선택적 함수. 검색 결과의 적합성을 평가하기 위해 유사도 점수를 변환합니다.
            normalize_L2: L2 정규화 여부를 나타내는 플래그입니다.
            distance_strategy: 거리 측정 전략 (유클리드 거리, 코사인 거리 등).

        Raises:
            경고: embedding_function이 Embeddings 객체가 아니면 경고 메시지를 출력합니다.
        """
        if not isinstance(embedding_function, Embeddings):
            logger.warning(
                "`embedding_function`은 Embeddings 객체여야 합니다. 함수 전달 지원은 곧 제거될 예정입니다."
            )
        self.embedding_function = embedding_function  # 임베딩을 수행하는 함수 또는 Embeddings 객체.
        self.index = index  # FAISS 인덱스 객체, 벡터 데이터의 저장과 검색을 수행.
        self.docstore = docstore  # 문서 저장소, 문서의 메타데이터 및 내용을 저장.
        self.index_to_docstore_id = index_to_docstore_id  # 인덱스의 ID와 문서 저장소의 ID 간의 매핑 정보.
        self.distance_strategy = distance_strategy  # 벡터 간 거리 계산 전략, 기본값은 유클리드 거리.
        self.override_relevance_score_fn = relevance_score_fn  # 검색 결과의 적합성을 평가하는 사용자 정의 함수.
        # 벡터 추가 시 L2 정규화를 수행할지 여부. 벡터의 크기가 검색 정확도에 영향을 미치지 않도록 하는 것이 중요합니다. 
        # 특히 코사인 유사도를 사용할 경우, L2 정규화를 적용하면 벡터 간의 각도 차이만 비교할 수 있어 검색 품질이 향상됨
        self._normalize_L2 = normalize_L2 

        # 거리 측정 전략이 유클리드가 아닐 때, L2 정규화가 설정되었을 경우 경고를 발생.
        if (
            self.distance_strategy != DistanceStrategy.EUCLIDEAN_DISTANCE
            and self._normalize_L2
        ):
            warnings.warn(
                "L2 정규화는 유클리드 거리 외의 거리 측정 방법에서는 사용할 수 없습니다: "
                f"metric type: {self.distance_strategy}"
            )

    @property
    def embeddings(self) -> Optional[Embeddings]:
        """
        Embeddings 객체를 반환하는 속성입니다.

        Returns:
            embedding_function이 Embeddings 객체일 경우 반환, 아니면 None 반환.
        """
        return (
            self.embedding_function
            if isinstance(self.embedding_function, Embeddings)
            else None
        )

    # 문자열 리스트(List[str])를 받아들이며, 각 문자열은 임베딩할 텍스트를 나타냅니다.
    # -> List[List[float]]: 함수가 반환하는 타입을 지정함
    # 외부 리스트는 여러 개의 벡터를 담고 있습니다. 내부 리스트는 하나의 벡터를 나타내며, 벡터의 각 요소는 float 타입의 숫자
    # 예> "Hello world"와 "This is a test" 텍스트를 벡터화한 결과라고 가정
    # [
    #     [0.1, 0.2, 0.3],  # 첫 번째 텍스트의 벡터
    #     [0.4, 0.5, 0.6]   # 두 번째 텍스트의 벡터
    # ]

    def _embed_documents(self, texts: List[str]) -> List[List[float]]:
        """
        주어진 텍스트들을 벡터로 변환합니다.

        Args:
            texts: 임베딩할 문자열의 리스트.

        Returns:
            주어진 텍스트들의 벡터 리스트.
        """
        if isinstance(self.embedding_function, Embeddings):
            return self.embedding_function.embed_documents(texts)  # Embeddings 객체 사용 시 벡터화
        else:
            return [self.embedding_function(text) for text in texts]  # 함수로 전달 시 각 텍스트를 벡터화

    async def _aembed_documents(self, texts: List[str]) -> List[List[float]]:
        """
        주어진 텍스트들을 비동기적으로 벡터로 변환합니다.

        Args:
            texts: 임베딩할 문자열의 리스트.

        Returns:
            주어진 텍스트들의 벡터 리스트.

        Raises:
            Exception: embedding_function이 Embeddings 객체가 아니면 예외 발생.
        """
        if isinstance(self.embedding_function, Embeddings):
            return await self.embedding_function.aembed_documents(texts)  # 비동기 임베딩 수행
        else:
            raise Exception(
                "`embedding_function`은 Embeddings 객체여야 합니다. 함수 전달 지원은 곧 제거될 예정입니다."
            )

    def _embed_query(self, text: str) -> List[float]:
        """
        주어진 쿼리를 벡터로 변환합니다.

        Args:
            text: 임베딩할 쿼리 문자열.

        Returns:
            벡터로 변환된 쿼리.
        """
        if isinstance(self.embedding_function, Embeddings):
            return self.embedding_function.embed_query(text)
        else:
            return self.embedding_function(text)

    async def _aembed_query(self, text: str) -> List[float]:
        """
        주어진 쿼리를 비동기적으로 벡터로 변환합니다.

        Args:
            text: 임베딩할 쿼리 문자열.

        Returns:
            벡터로 변환된 쿼리.

        Raises:
            Exception: embedding_function이 Embeddings 객체가 아니면 예외 발생.
        """
        if isinstance(self.embedding_function, Embeddings):
            return await self.embedding_function.aembed_query(text)
        else:
            raise Exception(
                "`embedding_function`은 Embeddings 객체여야 합니다. 함수 전달 지원은 곧 제거될 예정입니다."
            )

    # 실시간으로 데이터를 추가하거나, 새로운 데이터를 반영하기 위해 데이터베이스를 업데이트해야 할 경우 임베딩 추가 기능이 필수적입니다.
    def __add(
        self,
        # Iterable 타입을 사용하면 리스트, 튜플, 제너레이터 등의 모든 반복 가능한 형태를 인자로 받을 수 있슴
        texts: Iterable[str],
        # Iterable[List[float]]는 List[List[float]], Tuple[List[float], ...], 또는 제너레이터 (List[float] for _ in range(n))와 같은 다양한 형태의 입력을 지원
        # List[List[float]] 대신 Iterable[List[float]]로 지정함으로써 메모리를 덜 사용하는 방식으로 데이터를 넘길 수 있슴
        embeddings: Iterable[List[float]],
        metadatas: Optional[Iterable[dict]] = None,
        ids: Optional[List[str]] = None,
    ) -> List[str]:
        """
        주어진 텍스트와 임베딩들을 벡터 인덱스 및 문서 저장소에 추가합니다.

        Args:
            texts: 추가할 문자열의 반복 가능한 객체.
            embeddings: 각 텍스트에 해당하는 벡터의 리스트.
            metadatas: 각 텍스트에 연결된 메타데이터 리스트 (옵션).
            ids: 각 텍스트에 대한 고유 ID 리스트 (옵션).

        Returns:
            추가된 텍스트의 ID 리스트.

        Raises:
            ValueError: docstore가 AddableMixin을 지원하지 않거나 ID가 중복될 경우 예외 발생.
        """
        faiss = dependable_faiss_import()

        # isinstance로 self.docstore가 AddableMixin을 상속받는지 확인
        # AddableMixin을 상속받으면 해당 객체가 add 메서드를 통해 항목 추가 기능을 제공할 수 있음을 보장. 
        # 여러 클래스에서 동일한 기능(항목 추가)을 구현해야 하는 경우, AddableMixin을 상속받아 간단히 해당 기능을 추가가능
        if not isinstance(self.docstore, AddableMixin):
            raise ValueError(
                "텍스트를 추가하려면 기본 docstore가 항목 추가를 지원해야 합니다. "
                f"{self.docstore}는 이 기능을 지원하지 않습니다."
            )

        # texts와 metadatas의 길이 일치 여부 확인, metadatas가 없을 경우 빈 딕셔너리 사용
        _len_check_if_sized(texts, metadatas, "texts", "metadatas")
        # metadatas가 None일 경우 빈 딕셔너리 {}를 텍스트마다 생성하도록 합
        _metadatas = metadatas or ({} for _ in texts)
        documents = [
            Document(page_content=t, metadata=m) for t, m in zip(texts, _metadatas)
        ]

        # documents, embeddings, ids의 길이 일치 여부 확인
        _len_check_if_sized(documents, embeddings, "documents", "embeddings")
        _len_check_if_sized(documents, ids, "documents", "ids")

        # ID에 중복이 있는지 확인
        if ids and len(ids) != len(set(ids)):
            raise ValueError("ID 목록에 중복된 ID가 있습니다.")

        # 인덱스에 벡터 추가
        vector = np.array(embeddings, dtype=np.float32)
        if self._normalize_L2:
            faiss.normalize_L2(vector)  # L2 정규화 옵션이 설정된 경우 벡터 정규화
        self.index.add(vector)

        # docstore와 인덱스에 문서 추가
        # ids 파라미터가 주어지지 않은 경우, 각 텍스트마다 고유한 ID가 필요하므로, 파이썬의 uuid 모듈을 사용하여 UUID를 생성합니다.
        # uuid.uuid4()는 임의의 고유 ID를 생성하는 함수로, UUID는 중복 가능성이 매우 낮은 무작위한 ID를 제공
        ids = ids or [str(uuid.uuid4()) for _ in texts] 
        # self.docstore는 텍스트 문서를 저장하는 문서 저장소 객체입니다. 이 라인은 새로 생성한 ID와 Document 객체들을 docstore에 추가하는 역할
        self.docstore.add({id_: doc for id_, doc in zip(ids, documents)})
        # starting_len은 현재 인덱스의 길이를 저장하는 변수입니다. 이는 새로 추가할 문서의 위치를 결정하기 위해 필요,
        # 새로 추가하는 문서들은 starting_len 이후의 인덱스 번호를 할당받게 됨
        starting_len = len(self.index_to_docstore_id)
        # index_to_id는 새로 추가할 문서들의 인덱스 번호와 ID를 매핑하는 딕셔너리
        index_to_id = {starting_len + j: id_ for j, id_ in enumerate(ids)}
        # self.index_to_docstore_id가 새로 추가된 문서까지 포함하여 업데이트
        self.index_to_docstore_id.update(index_to_id)
        return ids

    def similarity_search_with_score_by_vector(
        self,
        embedding: List[float],
        k: int = 4,
        # 메타데이터 필터링 조건으로, 메타데이터에 기반한 필터링을 수행할 수 있습니다. 함수나 조건 딕셔너리 형태로 제공됩니다. 필터링 조건을 만족하는 문서만 최종 결과에 포함
        # Callable(함수) 또는 Dict[str, Any](딕셔너리) 중 하나의 타입을 허용
        # Dict[str, Any]는 필터로 사용될 키-값 쌍을 정의합니다. 딕셔너리의 각 키는 특정 필드를 나타내고, 그 값은 해당 필드에 적용할 조건을 나타냄
        # None: 이 필터 파라미터가 기본적으로 None이라는 값으로 설정된다는 의미입니다. 따라서 사용자가 필터를 제공하지 않으면 필터가 적용되지 않습니다.
        filter: Optional[Union[Callable, Dict[str, Any]]] = None,
        # 필터 적용 전 검색할 최대 문서 수로, 필터링을 위해 더 많은 문서를 검색한 뒤 필터를 적용해 상위 k개의 문서를 선택합니다. 기본값은 20
        fetch_k: int = 20,
        # 선택적인 추가 매개변수로, score_threshold 등 추가 필터링 조건을 설정할 수 있습니다.
        **kwargs: Any,
    ) -> List[Tuple[Document, float]]:
        """
        주어진 임베딩 벡터와 가장 유사한 문서들을 반환합니다.

        Args:
            embedding: 유사한 문서를 찾을 임베딩 벡터.
            k: 반환할 문서 수. 기본값은 4.
            filter: 메타데이터에 따라 필터링하는 함수 또는 조건(딕셔너리). 기본값은 None.
            fetch_k: 필터링 전 검색할 문서 수. 기본값은 20.

        Returns:
            쿼리와 가장 유사한 문서와 각 문서의 L2 거리 (유사도 점수)의 튜플 리스트.
        """
        # FAISS 라이브러리가 설치되어 있는지 확인하고, 없는 경우 ImportError를 발생
        faiss = dependable_faiss_import()  # FAISS 모듈 불러오기
        # 검색할 임베딩 벡터를 numpy 배열로 변환합니다. np.array([embedding], dtype=np.float32)로 벡터를 32비트 부동소수점 형식의 배열로 생성
        vector = np.array([embedding], dtype=np.float32)  # 쿼리 벡터를 numpy 배열로 변환
        # _normalize_L2 옵션이 True일 경우 faiss.normalize_L2(vector)를 호출하여 벡터를 L2 정규화합니다. L2 정규화는 벡터의 길이를 1로 만들어, 방향만 비교하도록 함
        if self._normalize_L2:
            faiss.normalize_L2(vector)
        # self.index.search 메서드를 사용해 벡터와 가장 유사한 상위 k개의 문서를 검색합니다. 필터가 없는 경우 상위 k개를 검색하고, 
        # 필터가 있으면 fetch_k 개수만큼 검색하여 이후 필터링 단계에서 더 많은 문서를 필터링
        # scores는 각 검색 결과의 유사도 점수이며, 점수가 낮을수록 유사도가 높습니다 (유클리드 거리 기준)
        scores, indices = self.index.search(vector, k if filter is None else fetch_k)  # 벡터와 가장 가까운 문서 검색
        docs = []

        # 필터가 있을 경우 필터 함수 생성
        if filter is not None:
            filter_func = self._create_filter_func(filter)
        # indices[0]는 검색된 문서의 인덱스 번호를 포함한 리스트입니다. 이를 순회하여 각 검색 결과를 처리
        for j, i in enumerate(indices[0]):
            # i == -1인 경우, 이는 FAISS가 충분한 결과를 반환하지 않았다는 것을 의미합니다. 이 경우 해당 인덱스를 건너뜁
            if i == -1:
                continue  # 검색 결과가 부족할 때 해당 인덱스 건너뜀
            _id = self.index_to_docstore_id[i]  # 인덱스 번호로 해당 문서의 ID 찾기
            doc = self.docstore.search(_id)  # ID로 문서 검색
            if not isinstance(doc, Document):
                raise ValueError(f"ID {_id}에 대한 문서를 찾을 수 없습니다. 현재 값: {doc}")
            # 필터가 설정되어 있으면 필터 조건을 만족하는지 확인
            if filter is not None:
                if filter_func(doc.metadata):  # 필터 조건을 만족하는 경우 문서 추가
                    # 필터 조건을 만족할 경우 (doc, scores[0][j]) 튜플을 docs 리스트에 추가합니다. 여기서 scores[0][j]는 해당 문서의 유사도 점수
                    docs.append((doc, scores[0][j]))  
            else:
                # 필터가 설정되지 않았다면, 조건 검사 없이 문서와 점수를 바로 추가
                docs.append((doc, scores[0][j]))  # 필터가 없으면 문서와 점수를 바로 추가

        # 선택적 점수 임계값 필터링
        score_threshold = kwargs.get("score_threshold")
        if score_threshold is not None:
            cmp = (
                # MAX_INNER_PRODUCT 또는 JACCARD일 경우 operator.ge (크거나 같음) 연산자를 사용하고, 
                # 그외(유클리드 거리, 코사인 거리 )는 operator.le (작거나 같음) 연산자를 사용
                # MAX_INNER_PRODUCT, JACCARD는 점수가 높을수록 유사도가 높은 방식이고, 다른 방식은 점수가 낮을수록 유사도가 높은 방식이기 때문
                operator.ge
                if self.distance_strategy in (DistanceStrategy.MAX_INNER_PRODUCT, DistanceStrategy.JACCARD)
                else operator.le
            )
            docs = [
                (doc, similarity) for doc, similarity in docs if cmp(similarity, score_threshold)
            ]
        return docs[:k]  # 상위 k개의 문서와 유사도 점수 반환