langchain 공부

contextual compression Retriever 내부 코드 분석

필만이 2024. 11. 15. 21:25

배경

  • contextual compression Retriever의 원리가 궁금해 내부 코드를 살펴본다.

코드


from typing import Any, List

from langchain_core.callbacks import (
    AsyncCallbackManagerForRetrieverRun,  # 비동기 콜백 관리자를 사용
    CallbackManagerForRetrieverRun,  # 동기 콜백 관리자를 사용
)
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever, RetrieverLike
from pydantic import ConfigDict

from langchain.retrievers.document_compressors.base import (
    BaseDocumentCompressor,  # 문서 압축기를 위한 기본 클래스
)


class ContextualCompressionRetriever(BaseRetriever):
    """
    BaseRetriever를 래핑하여 검색된 문서를 압축하는 Retriever.

    이 클래스는 기본 Retriever(`base_retriever`)를 사용하여 쿼리에 대해
    관련 문서를 검색한 다음, 검색된 문서들을 `base_compressor`를 통해 압축하여 반환합니다.
    """

    base_compressor: BaseDocumentCompressor
    """검색된 문서를 압축하는 데 사용하는 압축기."""

    base_retriever: RetrieverLike
    """관련 문서를 검색하는 데 사용하는 기본 Retriever."""

    model_config = ConfigDict(
        arbitrary_types_allowed=True,  # pydantic 모델에서 임의 객체 타입을 필드로 허용
    )

    def _get_relevant_documents(
        self,
        query: str,
        *,
        run_manager: CallbackManagerForRetrieverRun,  # 콜백을 관리하는 동기 콜백 관리자
        **kwargs: Any,
    ) -> List[Document]:
        """
        쿼리에 대한 관련 문서를 검색합니다.

        Args:
            query: 관련 문서를 검색하기 위한 문자열 쿼리.
            run_manager: 실행 중 콜백 관리를 위한 동기 콜백 관리자.
            kwargs: 추가적인 매개변수.

        Returns:
            검색된 문서를 압축한 결과 리스트.
        """
        # base_retriever를 호출하여 쿼리에 대한 관련 문서를 검색
        docs = self.base_retriever.invoke(

            # run_manager가 관리하는 실행 과정 중에 발생하는 이벤트(예: 문서 검색 시작, 문서 압축 완료 등)를 추적하거나 로깅

            # 하위 콜백 관리자는 호출된 base_retriever나 base_compressor에서 사용할 수 있도록 전달
            query, config={"callbacks": run_manager.get_child()}, **kwargs
        )
        if docs:  # 검색된 문서가 존재할 경우
            # 검색된 문서를 압축기에 전달하여 압축된 문서 리스트를 생성


            compressed_docs = self.base_compressor.compress_documents(
                docs, query, callbacks=run_manager.get_child()
            )
            return list(compressed_docs)  # 압축된 문서를 리스트로 반환
        else:  # 검색된 문서가 없는 경우 빈 리스트 반환
            return []

    async def _aget_relevant_documents(
        self,
        query: str,
        *,
        run_manager: AsyncCallbackManagerForRetrieverRun,  # 콜백을 관리하는 비동기 콜백 관리자
        **kwargs: Any,
    ) -> List[Document]:
        """
        쿼리에 대한 관련 문서를 비동기로 검색합니다.

        Args:
            query: 관련 문서를 검색하기 위한 문자열 쿼리.
            run_manager: 실행 중 콜백 관리를 위한 비동기 콜백 관리자.
            kwargs: 추가적인 매개변수.

        Returns:
            검색된 문서를 압축한 결과 리스트 (비동기 방식).
        """
        # base_retriever를 호출하여 쿼리에 대한 관련 문서를 비동기로 검색
        docs = await self.base_retriever.ainvoke(
            query, config={"callbacks": run_manager.get_child()}, **kwargs
        )
        if docs:  # 검색된 문서가 존재할 경우
            # 검색된 문서를 압축기에 전달하여 압축된 문서 리스트를 비동기로 생성

            # base_compressor의 compress_documents 메서드를 호출하는 부분입니다.



            compressed_docs = await self.base_compressor.acompress_documents(

            # run_manager.get_child() 새로운 하위 콜백을 생성함으로써, 압축 작업과 검색 작업의 콜백 컨텍스트를 분리

            # 이렇게 하면 상위 콜백에 영향을 주지 않고 압축 작업의 이벤트만 독립적으로 관리                     docs, query, callbacks=run_manager.get_child()
            )
            return list(compressed_docs)  # 압축된 문서를 리스트로 반환
        else:  # 검색된 문서가 없는 경우 빈 리스트 반환
            return []