배경
- contextual compression Retriever의 원리가 궁금해 내부 코드를 살펴본다.
코드
from typing import Any, List
from langchain_core.callbacks import (
AsyncCallbackManagerForRetrieverRun, # 비동기 콜백 관리자를 사용
CallbackManagerForRetrieverRun, # 동기 콜백 관리자를 사용
)
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever, RetrieverLike
from pydantic import ConfigDict
from langchain.retrievers.document_compressors.base import (
BaseDocumentCompressor, # 문서 압축기를 위한 기본 클래스
)
class ContextualCompressionRetriever(BaseRetriever):
"""
BaseRetriever를 래핑하여 검색된 문서를 압축하는 Retriever.
이 클래스는 기본 Retriever(`base_retriever`)를 사용하여 쿼리에 대해
관련 문서를 검색한 다음, 검색된 문서들을 `base_compressor`를 통해 압축하여 반환합니다.
"""
base_compressor: BaseDocumentCompressor
"""검색된 문서를 압축하는 데 사용하는 압축기."""
base_retriever: RetrieverLike
"""관련 문서를 검색하는 데 사용하는 기본 Retriever."""
model_config = ConfigDict(
arbitrary_types_allowed=True, # pydantic 모델에서 임의 객체 타입을 필드로 허용
)
def _get_relevant_documents(
self,
query: str,
*,
run_manager: CallbackManagerForRetrieverRun, # 콜백을 관리하는 동기 콜백 관리자
**kwargs: Any,
) -> List[Document]:
"""
쿼리에 대한 관련 문서를 검색합니다.
Args:
query: 관련 문서를 검색하기 위한 문자열 쿼리.
run_manager: 실행 중 콜백 관리를 위한 동기 콜백 관리자.
kwargs: 추가적인 매개변수.
Returns:
검색된 문서를 압축한 결과 리스트.
"""
# base_retriever를 호출하여 쿼리에 대한 관련 문서를 검색
docs = self.base_retriever.invoke(
# run_manager가 관리하는 실행 과정 중에 발생하는 이벤트(예: 문서 검색 시작, 문서 압축 완료 등)를 추적하거나 로깅
# 하위 콜백 관리자는 호출된 base_retriever나 base_compressor에서 사용할 수 있도록 전달
query, config={"callbacks": run_manager.get_child()}, **kwargs
)
if docs: # 검색된 문서가 존재할 경우
# 검색된 문서를 압축기에 전달하여 압축된 문서 리스트를 생성
compressed_docs = self.base_compressor.compress_documents(
docs, query, callbacks=run_manager.get_child()
)
return list(compressed_docs) # 압축된 문서를 리스트로 반환
else: # 검색된 문서가 없는 경우 빈 리스트 반환
return []
async def _aget_relevant_documents(
self,
query: str,
*,
run_manager: AsyncCallbackManagerForRetrieverRun, # 콜백을 관리하는 비동기 콜백 관리자
**kwargs: Any,
) -> List[Document]:
"""
쿼리에 대한 관련 문서를 비동기로 검색합니다.
Args:
query: 관련 문서를 검색하기 위한 문자열 쿼리.
run_manager: 실행 중 콜백 관리를 위한 비동기 콜백 관리자.
kwargs: 추가적인 매개변수.
Returns:
검색된 문서를 압축한 결과 리스트 (비동기 방식).
"""
# base_retriever를 호출하여 쿼리에 대한 관련 문서를 비동기로 검색
docs = await self.base_retriever.ainvoke(
query, config={"callbacks": run_manager.get_child()}, **kwargs
)
if docs: # 검색된 문서가 존재할 경우
# 검색된 문서를 압축기에 전달하여 압축된 문서 리스트를 비동기로 생성
# base_compressor의 compress_documents 메서드를 호출하는 부분입니다.
compressed_docs = await self.base_compressor.acompress_documents(
# run_manager.get_child() 새로운 하위 콜백을 생성함으로써, 압축 작업과 검색 작업의 콜백 컨텍스트를 분리
# 이렇게 하면 상위 콜백에 영향을 주지 않고 압축 작업의 이벤트만 독립적으로 관리 docs, query, callbacks=run_manager.get_child()
)
return list(compressed_docs) # 압축된 문서를 리스트로 반환
else: # 검색된 문서가 없는 경우 빈 리스트 반환
return []