langchain 공부

ChatOllama 내부 코드 분석 2

필만이 2024. 11. 18. 21:26

배경

  • ChatOllama 내부 코드 분석1에 이어서 계속

코드

    # 동기식 채팅 스트림을 생성하는 메서드
    def _create_chat_stream(
        self,
        messages: List[BaseMessage],  # 모델에 보낼 메시지 리스트
        stop: Optional[List[str]] = None,  # 스트림을 종료할 기준이 되는 선택적 stop 토큰 리스트
        **kwargs: Any,  # 추가적으로 전달할 선택적 키워드 인자들
    ) -> Iterator[str]:  # 문자열(스트림 응답)을 반환하는 이터레이터
        # payload는 모델 호출을 구조화된 방식으로 처리할 수 있도록 준비
        payload = {
            "model": self.model,  # 사용할 LLM(모델) 식별자. 예: 'gpt-4' 또는 특정 모델 이름
            "messages": self._convert_messages_to_ollama_messages(messages),  # 전달된 메시지를 Ollama API에서 요구하는 형식으로 변환
        }
        # 실시간 데이터 스트림을 처리하기 위해 작성된 것으로, 주로 대화형 AI나 대규모 데이터를 단계적으로 처리해야 하는 경우 사용
        # 언어 모델과 통신하기 위해 필요한 데이터를 보내고, API로부터 스트림 응답을 받아 재가공하거나 추가 처리 없이 그대로 반환.
        # 사용자가 필요한 대로 **추가적인 조건이나 설정(**kwargs)**을 넣을 수 있는 유연성을 제공
        # self._create_stream은 서버(API)와 스트림 연결을 설정하는 메서드
        # yield from은 _create_stream 메서드가 반환하는 제너레이터(generator)의 값을 하나씩 호출자에게 전달
        yield from self._create_stream(
            payload=payload,  # API 요청의 본문 데이터. 여기에는 모델 정보와 메시지가 포함됨
            # 특정 문자열이나 패턴이 나타났을 때 스트림을 종료하는 기준입니다. 예를 들어, ["\n\n"]을 입력했다면 
            # 두 줄바꿈이 나타나면 응답을 종료하라는 의미
            stop=stop,  
            api_url=f"{self.base_url}/api/chat",  # API 호출 대상 URL. 기본 엔드포인트는 "/api/chat"
            **kwargs,  # 추가적으로 전달할 키워드 인자들 (예: 헤더, 타임아웃 설정 등)
        )

    # 비동기식 채팅 스트림을 생성하는 메서드
    async def _acreate_chat_stream(
        self,
        messages: List[BaseMessage],  # 모델에 보낼 메시지 리스트
        stop: Optional[List[str]] = None,  # 스트림 종료를 위한 선택적 stop 토큰 리스트
        **kwargs: Any,  # 추가 키워드 인자들
    ) -> AsyncIterator[str]:  # 비동기적으로 문자열(스트림 응답)을 반환하는 이터레이터
        # API에 전달할 페이로드 준비
        payload = {
            "model": self.model,  # 사용할 모델 식별자
            "messages": self._convert_messages_to_ollama_messages(messages),  # 메시지 변환
        }
        # 스트림 응답 데이터를 비동기적으로 처리
        async for stream_resp in self._acreate_stream(
            payload=payload,  # 요청 본문 데이터
            stop=stop,  # 스트림 종료 기준
            api_url=f"{self.base_url}/api/chat",  # API 엔드포인트 URL
            **kwargs,  # 추가 인자 전달
        ):
            yield stream_resp  # 스트림 데이터를 yield

    # 스트림 데이터를 집계하면서 동작하는 메서드
    def _chat_stream_with_aggregation(
        self,
        messages: List[BaseMessage],  # 모델에 보낼 메시지 리스트
        stop: Optional[List[str]] = None,  # 스트림 종료를 위한 선택적 stop 토큰 리스트
        run_manager: Optional[CallbackManagerForLLMRun] = None,  # 선택적 콜백 매니저
        verbose: bool = False,  # 로그 메시지를 출력할지 여부
        **kwargs: Any,  # 추가 키워드 인자들
    ) -> ChatGenerationChunk:  # 채팅 생성 결과(집계된 텍스트 조각)
        final_chunk: Optional[ChatGenerationChunk] = None  # 최종 결과를 저장할 변수
        # 동기식 스트림 데이터를 반복적으로 처리
        for stream_resp in self._create_chat_stream(messages, stop, **kwargs):
            if stream_resp:  # 스트림 응답이 비어 있지 않은 경우
                # 응답 데이터를 채팅 생성 결과 형식으로 변환
                chunk = _chat_stream_response_to_chat_generation_chunk(stream_resp)
                if final_chunk is None:  # 최종 결과가 초기화되지 않은 경우
                    final_chunk = chunk  # 첫 번째 응답을 결과로 설정
                else:  # 이미 결과가 있는 경우
                    final_chunk += chunk  # 결과에 새 응답 데이터를 추가
                if run_manager:  # 콜백 매니저가 제공된 경우
                    run_manager.on_llm_new_token(
                        chunk.text,  # 새로 생성된 텍스트
                        chunk=chunk,  # 현재 처리 중인 데이터 조각
                        verbose=verbose,  # 로그 출력 여부
                    )
        if final_chunk is None:  # 최종 결과가 없는 경우 예외 발생
            raise ValueError("Ollama 스트림에서 데이터를 수신하지 못했습니다.")

        return final_chunk  # 최종 집계된 결과 반환

    # 비동기 스트림 데이터 수신,이 스트림은 LLM API가 반환하는 데이터를 한 덩어리씩(chunk) 실시간으로 제공
    async def _achat_stream_with_aggregation(
        self,
        messages: List[BaseMessage],  # 모델에 보낼 메시지 리스트
        stop: Optional[List[str]] = None,  # 스트림 종료 기준이 되는 stop 토큰 리스트
        run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,  # 비동기 콜백 매니저
        verbose: bool = False,  # 로그 출력 여부
        **kwargs: Any,  # 추가적으로 전달할 선택적 키워드 인자들
    ) -> ChatGenerationChunk:  # 최종적으로 생성된 텍스트 조각 반환
        final_chunk: Optional[ChatGenerationChunk] = None  # 최종 결과를 저장할 변수
        # 비동기 스트림 데이터를 반복적으로 처리
        async for stream_resp in self._acreate_chat_stream(messages, stop, **kwargs):
            if stream_resp:  # 응답 데이터가 비어 있지 않은 경우
                # 응답 데이터를 채팅 생성 결과 형식으로 변환
                # 스트림 데이터를 ChatGenerationChunk 형식으로 변환
                chunk = _chat_stream_response_to_chat_generation_chunk(stream_resp)
                # final_chunk가 None이라면, 첫 번째 chunk를 그대로 final_chunk로 설정
                if final_chunk is None:
                    final_chunk = chunk
                # 이미 결과가 있는 경우 final_chunk에 새로운 chunk를 병합(+=)해 나감
                else:  
                    final_chunk += chunk 
                if run_manager:  # 콜백 매니저가 제공된 경우
                    # 새로 생성된 데이터가 있을 때 이를 호출하여 실시간으로 처리 상태를 갱신하거나 로그를 출력
                    await run_manager.on_llm_new_token(
                        chunk.text,  # 새로 생성된 텍스트
                        chunk=chunk,  # 현재 처리 중인 데이터 조각
                        verbose=verbose,  # 로그 출력 여부
                    )
        # 스트림 데이터가 하나도 수신되지 않은 경우, ValueError를 발생
        if final_chunk is None: 
            raise ValueError("Ollama 스트림에서 데이터를 수신하지 못했습니다.")

        return final_chunk  # 최종 집계된 결과 반환

    # 사용자가 LLM에게 질문을 보내면, 그에 대한 응답을 동기적으로 반환하는 역할
    # 메시지 리스트(messages)와 선택적인 stop 토큰 리스트를 인자로 받아 실행
    def _generate(
        self,
        messages: List[BaseMessage],  # 모델에 보낼 메시지 리스트
        stop: Optional[List[str]] = None,  # 생성 종료 기준이 되는 stop 토큰 리스트
        run_manager: Optional[CallbackManagerForLLMRun] = None,  # 동기 콜백 매니저
        **kwargs: Any,  # 추가적으로 전달할 선택적 키워드 인자들
    ) -> ChatResult:  # 최종적으로 생성된 채팅 결과 반환
        """
        Ollama의 generate 엔드포인트를 호출하는 메서드.

        Args:
            messages: 모델에 전달할 메시지 리스트.
            stop: 생성 시 사용할 선택적 stop 단어 리스트.

        Returns:
            모델에서 생성된 채팅 결과.

        Example:
            .. code-block:: python

                response = ollama([
                    HumanMessage(content="AI의 역사에 대해 알려주세요")
                ])
        """

        # 이 메서드는 스트림 데이터를 수집하고 하나의 최종 결과(final_chunk)로 집계
        final_chunk = self._chat_stream_with_aggregation(
            messages,
            stop=stop,
            # 동기 콜백 매니저로, 데이터 생성 중 이벤트를 트리거하거나 진행 상황을 추적
            run_manager=run_manager,
            # 디버깅이나 로그 출력 여부를 설정
            verbose=self.verbose,
            **kwargs,
        )
        # 최종 결과를 ChatGeneration 객체로 변환
        chat_generation = ChatGeneration(
            message=AIMessage(content=final_chunk.text),  # 생성된 텍스트
            generation_info=final_chunk.generation_info,  # 생성 과정 관련 정보
        )
        # 최종적으로 LLM에서 생성된 응답이 ChatResult 형태로 반환
        return ChatResult(generations=[chat_generation])

    # 비동기식으로 생성 요청을 처리하는 메서드
    async def _agenerate(
        self,
        messages: List[BaseMessage],  # 모델에 보낼 메시지 리스트
        stop: Optional[List[str]] = None,  # 생성 종료 기준이 되는 stop 토큰 리스트
        run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,  # 비동기 콜백 매니저
        **kwargs: Any,  # 추가적으로 전달할 선택적 키워드 인자들
    ) -> ChatResult:  # 최종적으로 생성된 채팅 결과 반환
        """
        Ollama의 generate 엔드포인트를 호출하는 비동기 메서드.

        Args:
            messages: 모델에 전달할 메시지 리스트.
            stop: 생성 시 사용할 선택적 stop 단어 리스트.

        Returns:
            모델에서 생성된 채팅 결과.

        Example:
            .. code-block:: python

                response = ollama([
                    HumanMessage(content="AI의 역사에 대해 알려주세요")
                ])
        """

        # 비동기 스트림 데이터를 집계하여 최종 결과 생성
        final_chunk = await self._achat_stream_with_aggregation(
            messages,
            stop=stop,
            run_manager=run_manager,
            verbose=self.verbose,  # 인스턴스에서 설정된 verbose 값 사용
            **kwargs,
        )
        # 최종 결과를 ChatGeneration 객체로 변환
        chat_generation = ChatGeneration(
            message=AIMessage(content=final_chunk.text),  # 생성된 텍스트
            generation_info=final_chunk.generation_info,  # 생성 과정 관련 정보
        )
        # ChatResult 객체로 반환
        return ChatResult(generations=[chat_generation])

    # 동기식 스트림 메서드
    def _stream(
        self,
        messages: List[BaseMessage],  # 모델에 전달할 메시지 리스트
        stop: Optional[List[str]] = None,  # 스트림 종료 기준이 되는 stop 토큰 리스트
        run_manager: Optional[CallbackManagerForLLMRun] = None,  # 동기 콜백 매니저
        **kwargs: Any,  # 추가적으로 전달할 선택적 키워드 인자들
    ) -> Iterator[ChatGenerationChunk]:  # ChatGenerationChunk(생성된 텍스트 조각) 이터레이터 반환
        try:
            # 스트림 응답 데이터를 반복적으로 처리, 메시지 리스트(messages)와 종료 조건(stop) 등을 이용
            #  API로부터 받은 스트림 데이터를 순차적으로 반환하는 제너레이터
            for stream_resp in self._create_chat_stream(messages, stop, **kwargs):
                if stream_resp:  # 응답 데이터가 비어 있지 않은 경우
                    # 응답 데이터를 ChatGenerationChunk 형식으로 변환
                    chunk = _chat_stream_response_to_chat_generation_chunk(stream_resp)
                    if run_manager:  # 콜백 매니저가 제공된 경우
                        run_manager.on_llm_new_token(
                            chunk.text,  # 새로 생성된 텍스트
                            chunk=chunk,  # 현재 데이터 조각
                            verbose=self.verbose,  # 로그 출력 여부
                        )
                    yield chunk  # 생성된 데이터 조각 반환
        # OllamaEndpointNotFoundError는 API의 엔드포인트가 존재하지 않을 때 발생
        except OllamaEndpointNotFoundError:
            # 이전 버전의 스트림 메서드를 호출
            yield from self._legacy_stream(messages, stop, **kwargs)

    # 비동기 스트림 메서드
    async def _astream(
        self,
        messages: List[BaseMessage],  # 모델에 전달할 메시지 리스트
        stop: Optional[List[str]] = None,  # 스트림 종료 기준이 되는 stop 토큰 리스트
        run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,  # 비동기 콜백 매니저
        **kwargs: Any,  # 추가적으로 전달할 선택적 키워드 인자들
    ) -> AsyncIterator[ChatGenerationChunk]:  # 비동기 ChatGenerationChunk 이터레이터 반환
        # 비동기 스트림 응답 데이터를 반복적으로 처리
        async for stream_resp in self._acreate_chat_stream(messages, stop, **kwargs):
            if stream_resp:  # 응답 데이터가 비어 있지 않은 경우
                # 응답 데이터를 ChatGenerationChunk 형식으로 변환
                chunk = _chat_stream_response_to_chat_generation_chunk(stream_resp)
                if run_manager:  # 콜백 매니저가 제공된 경우
                    await run_manager.on_llm_new_token(
                        chunk.text,  # 새로 생성된 텍스트
                        chunk=chunk,  # 현재 데이터 조각
                        verbose=self.verbose,  # 로그 출력 여부
                    )
                yield chunk  # 생성된 데이터 조각 반환

    # 이전 버전의 스트림 메서드 (더 이상 사용되지 않음)
    @deprecated("0.0.3", alternative="_stream")  # 사용 중단 경고 표시 및 대체 메서드 안내
    def _legacy_stream(
        self,
        messages: List[BaseMessage],  # 모델에 전달할 메시지 리스트
        stop: Optional[List[str]] = None,  # 스트림 종료 기준이 되는 stop 토큰 리스트
        run_manager: Optional[CallbackManagerForLLMRun] = None,  # 동기 콜백 매니저
        **kwargs: Any,  # 추가적으로 전달할 선택적 키워드 인자들
    ) -> Iterator[ChatGenerationChunk]:  # ChatGenerationChunk 이터레이터 반환
        # 메시지를 텍스트 형식으로 변환
        prompt = self._format_messages_as_text(messages)
        # 생성 스트림 데이터를 반복적으로 처리
        for stream_resp in self._create_generate_stream(prompt, stop, **kwargs):
            if stream_resp:  # 응답 데이터가 비어 있지 않은 경우
                # 응답 데이터를 ChatGenerationChunk 형식으로 변환
                chunk = _stream_response_to_chat_generation_chunk(stream_resp)
                if run_manager:  # 콜백 매니저가 제공된 경우
                    run_manager.on_llm_new_token(
                        chunk.text,  # 새로 생성된 텍스트
                        chunk=chunk,  # 현재 데이터 조각
                        verbose=self.verbose,  # 로그 출력 여부
                    )
                yield chunk  # 생성된 데이터 조각 반환