배경
- ChatOllama 내부 코드 분석1에 이어서 계속
코드
# 동기식 채팅 스트림을 생성하는 메서드
def _create_chat_stream(
self,
messages: List[BaseMessage], # 모델에 보낼 메시지 리스트
stop: Optional[List[str]] = None, # 스트림을 종료할 기준이 되는 선택적 stop 토큰 리스트
**kwargs: Any, # 추가적으로 전달할 선택적 키워드 인자들
) -> Iterator[str]: # 문자열(스트림 응답)을 반환하는 이터레이터
# payload는 모델 호출을 구조화된 방식으로 처리할 수 있도록 준비
payload = {
"model": self.model, # 사용할 LLM(모델) 식별자. 예: 'gpt-4' 또는 특정 모델 이름
"messages": self._convert_messages_to_ollama_messages(messages), # 전달된 메시지를 Ollama API에서 요구하는 형식으로 변환
}
# 실시간 데이터 스트림을 처리하기 위해 작성된 것으로, 주로 대화형 AI나 대규모 데이터를 단계적으로 처리해야 하는 경우 사용
# 언어 모델과 통신하기 위해 필요한 데이터를 보내고, API로부터 스트림 응답을 받아 재가공하거나 추가 처리 없이 그대로 반환.
# 사용자가 필요한 대로 **추가적인 조건이나 설정(**kwargs)**을 넣을 수 있는 유연성을 제공
# self._create_stream은 서버(API)와 스트림 연결을 설정하는 메서드
# yield from은 _create_stream 메서드가 반환하는 제너레이터(generator)의 값을 하나씩 호출자에게 전달
yield from self._create_stream(
payload=payload, # API 요청의 본문 데이터. 여기에는 모델 정보와 메시지가 포함됨
# 특정 문자열이나 패턴이 나타났을 때 스트림을 종료하는 기준입니다. 예를 들어, ["\n\n"]을 입력했다면
# 두 줄바꿈이 나타나면 응답을 종료하라는 의미
stop=stop,
api_url=f"{self.base_url}/api/chat", # API 호출 대상 URL. 기본 엔드포인트는 "/api/chat"
**kwargs, # 추가적으로 전달할 키워드 인자들 (예: 헤더, 타임아웃 설정 등)
)
# 비동기식 채팅 스트림을 생성하는 메서드
async def _acreate_chat_stream(
self,
messages: List[BaseMessage], # 모델에 보낼 메시지 리스트
stop: Optional[List[str]] = None, # 스트림 종료를 위한 선택적 stop 토큰 리스트
**kwargs: Any, # 추가 키워드 인자들
) -> AsyncIterator[str]: # 비동기적으로 문자열(스트림 응답)을 반환하는 이터레이터
# API에 전달할 페이로드 준비
payload = {
"model": self.model, # 사용할 모델 식별자
"messages": self._convert_messages_to_ollama_messages(messages), # 메시지 변환
}
# 스트림 응답 데이터를 비동기적으로 처리
async for stream_resp in self._acreate_stream(
payload=payload, # 요청 본문 데이터
stop=stop, # 스트림 종료 기준
api_url=f"{self.base_url}/api/chat", # API 엔드포인트 URL
**kwargs, # 추가 인자 전달
):
yield stream_resp # 스트림 데이터를 yield
# 스트림 데이터를 집계하면서 동작하는 메서드
def _chat_stream_with_aggregation(
self,
messages: List[BaseMessage], # 모델에 보낼 메시지 리스트
stop: Optional[List[str]] = None, # 스트림 종료를 위한 선택적 stop 토큰 리스트
run_manager: Optional[CallbackManagerForLLMRun] = None, # 선택적 콜백 매니저
verbose: bool = False, # 로그 메시지를 출력할지 여부
**kwargs: Any, # 추가 키워드 인자들
) -> ChatGenerationChunk: # 채팅 생성 결과(집계된 텍스트 조각)
final_chunk: Optional[ChatGenerationChunk] = None # 최종 결과를 저장할 변수
# 동기식 스트림 데이터를 반복적으로 처리
for stream_resp in self._create_chat_stream(messages, stop, **kwargs):
if stream_resp: # 스트림 응답이 비어 있지 않은 경우
# 응답 데이터를 채팅 생성 결과 형식으로 변환
chunk = _chat_stream_response_to_chat_generation_chunk(stream_resp)
if final_chunk is None: # 최종 결과가 초기화되지 않은 경우
final_chunk = chunk # 첫 번째 응답을 결과로 설정
else: # 이미 결과가 있는 경우
final_chunk += chunk # 결과에 새 응답 데이터를 추가
if run_manager: # 콜백 매니저가 제공된 경우
run_manager.on_llm_new_token(
chunk.text, # 새로 생성된 텍스트
chunk=chunk, # 현재 처리 중인 데이터 조각
verbose=verbose, # 로그 출력 여부
)
if final_chunk is None: # 최종 결과가 없는 경우 예외 발생
raise ValueError("Ollama 스트림에서 데이터를 수신하지 못했습니다.")
return final_chunk # 최종 집계된 결과 반환
# 비동기 스트림 데이터 수신,이 스트림은 LLM API가 반환하는 데이터를 한 덩어리씩(chunk) 실시간으로 제공
async def _achat_stream_with_aggregation(
self,
messages: List[BaseMessage], # 모델에 보낼 메시지 리스트
stop: Optional[List[str]] = None, # 스트림 종료 기준이 되는 stop 토큰 리스트
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, # 비동기 콜백 매니저
verbose: bool = False, # 로그 출력 여부
**kwargs: Any, # 추가적으로 전달할 선택적 키워드 인자들
) -> ChatGenerationChunk: # 최종적으로 생성된 텍스트 조각 반환
final_chunk: Optional[ChatGenerationChunk] = None # 최종 결과를 저장할 변수
# 비동기 스트림 데이터를 반복적으로 처리
async for stream_resp in self._acreate_chat_stream(messages, stop, **kwargs):
if stream_resp: # 응답 데이터가 비어 있지 않은 경우
# 응답 데이터를 채팅 생성 결과 형식으로 변환
# 스트림 데이터를 ChatGenerationChunk 형식으로 변환
chunk = _chat_stream_response_to_chat_generation_chunk(stream_resp)
# final_chunk가 None이라면, 첫 번째 chunk를 그대로 final_chunk로 설정
if final_chunk is None:
final_chunk = chunk
# 이미 결과가 있는 경우 final_chunk에 새로운 chunk를 병합(+=)해 나감
else:
final_chunk += chunk
if run_manager: # 콜백 매니저가 제공된 경우
# 새로 생성된 데이터가 있을 때 이를 호출하여 실시간으로 처리 상태를 갱신하거나 로그를 출력
await run_manager.on_llm_new_token(
chunk.text, # 새로 생성된 텍스트
chunk=chunk, # 현재 처리 중인 데이터 조각
verbose=verbose, # 로그 출력 여부
)
# 스트림 데이터가 하나도 수신되지 않은 경우, ValueError를 발생
if final_chunk is None:
raise ValueError("Ollama 스트림에서 데이터를 수신하지 못했습니다.")
return final_chunk # 최종 집계된 결과 반환
# 사용자가 LLM에게 질문을 보내면, 그에 대한 응답을 동기적으로 반환하는 역할
# 메시지 리스트(messages)와 선택적인 stop 토큰 리스트를 인자로 받아 실행
def _generate(
self,
messages: List[BaseMessage], # 모델에 보낼 메시지 리스트
stop: Optional[List[str]] = None, # 생성 종료 기준이 되는 stop 토큰 리스트
run_manager: Optional[CallbackManagerForLLMRun] = None, # 동기 콜백 매니저
**kwargs: Any, # 추가적으로 전달할 선택적 키워드 인자들
) -> ChatResult: # 최종적으로 생성된 채팅 결과 반환
"""
Ollama의 generate 엔드포인트를 호출하는 메서드.
Args:
messages: 모델에 전달할 메시지 리스트.
stop: 생성 시 사용할 선택적 stop 단어 리스트.
Returns:
모델에서 생성된 채팅 결과.
Example:
.. code-block:: python
response = ollama([
HumanMessage(content="AI의 역사에 대해 알려주세요")
])
"""
# 이 메서드는 스트림 데이터를 수집하고 하나의 최종 결과(final_chunk)로 집계
final_chunk = self._chat_stream_with_aggregation(
messages,
stop=stop,
# 동기 콜백 매니저로, 데이터 생성 중 이벤트를 트리거하거나 진행 상황을 추적
run_manager=run_manager,
# 디버깅이나 로그 출력 여부를 설정
verbose=self.verbose,
**kwargs,
)
# 최종 결과를 ChatGeneration 객체로 변환
chat_generation = ChatGeneration(
message=AIMessage(content=final_chunk.text), # 생성된 텍스트
generation_info=final_chunk.generation_info, # 생성 과정 관련 정보
)
# 최종적으로 LLM에서 생성된 응답이 ChatResult 형태로 반환
return ChatResult(generations=[chat_generation])
# 비동기식으로 생성 요청을 처리하는 메서드
async def _agenerate(
self,
messages: List[BaseMessage], # 모델에 보낼 메시지 리스트
stop: Optional[List[str]] = None, # 생성 종료 기준이 되는 stop 토큰 리스트
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, # 비동기 콜백 매니저
**kwargs: Any, # 추가적으로 전달할 선택적 키워드 인자들
) -> ChatResult: # 최종적으로 생성된 채팅 결과 반환
"""
Ollama의 generate 엔드포인트를 호출하는 비동기 메서드.
Args:
messages: 모델에 전달할 메시지 리스트.
stop: 생성 시 사용할 선택적 stop 단어 리스트.
Returns:
모델에서 생성된 채팅 결과.
Example:
.. code-block:: python
response = ollama([
HumanMessage(content="AI의 역사에 대해 알려주세요")
])
"""
# 비동기 스트림 데이터를 집계하여 최종 결과 생성
final_chunk = await self._achat_stream_with_aggregation(
messages,
stop=stop,
run_manager=run_manager,
verbose=self.verbose, # 인스턴스에서 설정된 verbose 값 사용
**kwargs,
)
# 최종 결과를 ChatGeneration 객체로 변환
chat_generation = ChatGeneration(
message=AIMessage(content=final_chunk.text), # 생성된 텍스트
generation_info=final_chunk.generation_info, # 생성 과정 관련 정보
)
# ChatResult 객체로 반환
return ChatResult(generations=[chat_generation])
# 동기식 스트림 메서드
def _stream(
self,
messages: List[BaseMessage], # 모델에 전달할 메시지 리스트
stop: Optional[List[str]] = None, # 스트림 종료 기준이 되는 stop 토큰 리스트
run_manager: Optional[CallbackManagerForLLMRun] = None, # 동기 콜백 매니저
**kwargs: Any, # 추가적으로 전달할 선택적 키워드 인자들
) -> Iterator[ChatGenerationChunk]: # ChatGenerationChunk(생성된 텍스트 조각) 이터레이터 반환
try:
# 스트림 응답 데이터를 반복적으로 처리, 메시지 리스트(messages)와 종료 조건(stop) 등을 이용
# API로부터 받은 스트림 데이터를 순차적으로 반환하는 제너레이터
for stream_resp in self._create_chat_stream(messages, stop, **kwargs):
if stream_resp: # 응답 데이터가 비어 있지 않은 경우
# 응답 데이터를 ChatGenerationChunk 형식으로 변환
chunk = _chat_stream_response_to_chat_generation_chunk(stream_resp)
if run_manager: # 콜백 매니저가 제공된 경우
run_manager.on_llm_new_token(
chunk.text, # 새로 생성된 텍스트
chunk=chunk, # 현재 데이터 조각
verbose=self.verbose, # 로그 출력 여부
)
yield chunk # 생성된 데이터 조각 반환
# OllamaEndpointNotFoundError는 API의 엔드포인트가 존재하지 않을 때 발생
except OllamaEndpointNotFoundError:
# 이전 버전의 스트림 메서드를 호출
yield from self._legacy_stream(messages, stop, **kwargs)
# 비동기 스트림 메서드
async def _astream(
self,
messages: List[BaseMessage], # 모델에 전달할 메시지 리스트
stop: Optional[List[str]] = None, # 스트림 종료 기준이 되는 stop 토큰 리스트
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, # 비동기 콜백 매니저
**kwargs: Any, # 추가적으로 전달할 선택적 키워드 인자들
) -> AsyncIterator[ChatGenerationChunk]: # 비동기 ChatGenerationChunk 이터레이터 반환
# 비동기 스트림 응답 데이터를 반복적으로 처리
async for stream_resp in self._acreate_chat_stream(messages, stop, **kwargs):
if stream_resp: # 응답 데이터가 비어 있지 않은 경우
# 응답 데이터를 ChatGenerationChunk 형식으로 변환
chunk = _chat_stream_response_to_chat_generation_chunk(stream_resp)
if run_manager: # 콜백 매니저가 제공된 경우
await run_manager.on_llm_new_token(
chunk.text, # 새로 생성된 텍스트
chunk=chunk, # 현재 데이터 조각
verbose=self.verbose, # 로그 출력 여부
)
yield chunk # 생성된 데이터 조각 반환
# 이전 버전의 스트림 메서드 (더 이상 사용되지 않음)
@deprecated("0.0.3", alternative="_stream") # 사용 중단 경고 표시 및 대체 메서드 안내
def _legacy_stream(
self,
messages: List[BaseMessage], # 모델에 전달할 메시지 리스트
stop: Optional[List[str]] = None, # 스트림 종료 기준이 되는 stop 토큰 리스트
run_manager: Optional[CallbackManagerForLLMRun] = None, # 동기 콜백 매니저
**kwargs: Any, # 추가적으로 전달할 선택적 키워드 인자들
) -> Iterator[ChatGenerationChunk]: # ChatGenerationChunk 이터레이터 반환
# 메시지를 텍스트 형식으로 변환
prompt = self._format_messages_as_text(messages)
# 생성 스트림 데이터를 반복적으로 처리
for stream_resp in self._create_generate_stream(prompt, stop, **kwargs):
if stream_resp: # 응답 데이터가 비어 있지 않은 경우
# 응답 데이터를 ChatGenerationChunk 형식으로 변환
chunk = _stream_response_to_chat_generation_chunk(stream_resp)
if run_manager: # 콜백 매니저가 제공된 경우
run_manager.on_llm_new_token(
chunk.text, # 새로 생성된 텍스트
chunk=chunk, # 현재 데이터 조각
verbose=self.verbose, # 로그 출력 여부
)
yield chunk # 생성된 데이터 조각 반환