CallbackManager
는 LangChain 라이브러리에서 작업 실행 상태를 관리하고, 실행 중 발생하는 이벤트를 콜백 핸들러에 전달하는 중추적인 역할을 수행합니다. 이 도구는 LangChain 워크플로우를 모니터링하고 디버깅하거나 사용자 정의 동작(예: 로그 기록, 메트릭 수집)을 추가하는 데 사용됩니다.
CallbackManager의 주요 역할
1. 작업 실행 상태 관리
CallbackManager
는 LangChain 작업의 시작, 진행, 완료, 오류 발생 상태를 추적합니다.- 이를 통해 실행 상태를 기록하거나 실행 흐름을 제어할 수 있습니다.
2. 콜백 핸들러 관리
- 여러 콜백 핸들러를 등록하여, 특정 이벤트가 발생할 때 핸들러가 실행되도록 관리합니다.
- 콜백 핸들러는 사용자 정의 동작(예: 로그 기록, 스트리밍 출력, 결과 저장)을 수행합니다.
3. 실시간 데이터 스트리밍 지원
- 스트리밍 데이터 출력 중간에 핸들러를 삽입하여 데이터 가공, 로그 출력, 실시간 결과 반환을 지원합니다.
4. 확장성 제공
- 사용자 정의 콜백 핸들러를 추가하거나, 기존 핸들러를 확장하여 LangChain의 기본 동작을 조정할 수 있습니다.
CallbackManager의 실시간 출력 원리
1. 이벤트 기반 프로그래밍
CallbackManager
는 작업의 특정 이벤트(예: 시작, 중간 처리, 종료, 오류 발생 등)를 트리거하여 콜백 핸들러를 호출합니다.- 핸들러는 이벤트가 발생할 때마다 실행되며, 이를 통해 실시간으로 데이터를 처리하거나 출력할 수 있습니다.
2. 제너레이터와 yield
를 활용한 스트리밍
- 제너레이터 패턴을 활용하여 데이터를 처리하면서 동시에 반환(
yield
)합니다. CallbackManager
는 데이터 스트림을 가로채고, 각 단계에서 이벤트를 발생시켜 핸들러가 이를 실시간으로 처리하도록 합니다.
실시간 출력 동작 흐름
핵심 원리
입력 스트림 처리:
- 입력 데이터를 처리하는 동안, 각 데이터 조각(chunk)을 반환할 때마다
CallbackManager
는 이벤트를 트리거합니다.
- 입력 데이터를 처리하는 동안, 각 데이터 조각(chunk)을 반환할 때마다
콜백 핸들러 호출:
- 이벤트 발생 시,
CallbackManager
에 등록된 핸들러가 호출됩니다. - 예: 스트리밍 데이터를 실시간으로 출력하거나 가공.
- 이벤트 발생 시,
데이터 반환:
- 데이터는 처리와 동시에 반환(
yield
)되므로, 실시간 스트리밍이 가능합니다.
- 데이터는 처리와 동시에 반환(
CallbackManager의 주요 메서드
아래는 CallbackManager의 주요 메서드에 대해 각각의 사용 예를 상세히 설명하고 코드 예제를 포함한 형태로 작성한 내용입니다.
1. add_handler
설명
CallbackManager
에 사용자 정의 콜백 핸들러를 추가합니다.- 추가된 핸들러는 작업 실행 중 이벤트가 발생할 때 호출됩니다.
예제
from langchain_core.callbacks.manager import CallbackManager
class CustomHandler:
def on_chain_start(self, serialized_data, inputs, **kwargs):
print(f"Chain started with inputs: {inputs}")
def on_chain_end(self, outputs, **kwargs):
print(f"Chain ended with outputs: {outputs}")
# CallbackManager 생성 및 핸들러 추가
callback_manager = CallbackManager()
callback_manager.add_handler(CustomHandler())
# 작업 시작/종료 이벤트 호출
callback_manager.on_chain_start({"name": "example_chain"}, {"input_key": "value"})
callback_manager.on_chain_end({"output_key": "result"})
출력
Chain started with inputs: {'input_key': 'value'}
Chain ended with outputs: {'output_key': 'result'}
2. on_chain_start
설명
- 체인의 실행이 시작될 때 호출됩니다.
- 작업 시작 시 초기화 작업, 실행 상태 로깅 등을 수행합니다.
예제
from langchain_core.callbacks.manager import CallbackManager
# 체인 시작 이벤트 호출
callback_manager = CallbackManager()
callback_manager.on_chain_start(
serialized_data={"name": "example_chain"},
inputs={"user_input": "Hello, world!"}
)
출력
(콜백 핸들러에 따라 처리)
3. on_chain_end
설명
- 체인의 실행이 정상적으로 종료되었을 때 호출됩니다.
- 체인의 결과를 저장하거나 로그를 남깁니다.
예제
from langchain_core.callbacks.manager import CallbackManager
# 체인 종료 이벤트 호출
callback_manager = CallbackManager()
callback_manager.on_chain_end(
outputs={"response": "Hello, user!"}
)
출력
(콜백 핸들러에 따라 처리)
4. on_chain_error
설명
- 체인의 실행 중 예외가 발생했을 때 호출됩니다.
- 오류를 로깅하거나 복구 작업을 수행합니다.
예제
from langchain_core.callbacks.manager import CallbackManager
# 오류가 발생했을 때의 처리
try:
raise ValueError("An example error")
except ValueError as e:
callback_manager = CallbackManager()
callback_manager.on_chain_error(e)
출력
(콜백 핸들러에 따라 오류 로그가 기록됨)
5. on_tool_start
, on_tool_end
, on_tool_error
설명
- LangChain 도구의 실행 상태를 추적합니다.
- 도구가 시작, 종료, 오류 상태일 때 호출됩니다.
예제
from langchain_core.callbacks.manager import CallbackManager
# 도구 실행 이벤트 호출
callback_manager = CallbackManager()
# 도구 실행 시작
callback_manager.on_tool_start("example_tool", {"input_key": "value"})
# 도구 실행 종료
callback_manager.on_tool_end({"output_key": "result"})
# 도구 실행 중 오류 발생
try:
raise RuntimeError("Tool execution error")
except RuntimeError as e:
callback_manager.on_tool_error(e)
출력
(콜백 핸들러에 따라 처리)
6. on_llm_start
, on_llm_new_token
, on_llm_end
, on_llm_error
설명
- 언어 모델(LLM) 호출과 관련된 모든 이벤트를 관리합니다.
- 특히, 실시간 토큰 생성(
on_llm_new_token
)은 LLM의 실시간 스트리밍을 처리하는 데 사용됩니다.
예제: 실시간 토큰 출력
from langchain_core.callbacks.manager import CallbackManager
class CustomLLMHandler:
def on_llm_new_token(self, token, **kwargs):
print(f"New token generated: {token}")
callback_manager = CallbackManager()
callback_manager.add_handler(CustomLLMHandler())
# LLM 호출 중 이벤트 발생
callback_manager.on_llm_start({"name": "example_llm"}, ["What is AI?"])
callback_manager.on_llm_new_token("Artificial")
callback_manager.on_llm_new_token("Intelligence")
callback_manager.on_llm_end({"output": "Artificial Intelligence is..."})
출력
New token generated: Artificial
New token generated: Intelligence
예제: 오류 처리
try:
raise RuntimeError("LLM generation error")
except RuntimeError as e:
callback_manager.on_llm_error(e)
요약
메서드 이름 | 사용 목적 | 주요 동작 |
---|---|---|
add_handler |
사용자 정의 핸들러 추가 | 작업 실행 중 이벤트 처리 |
on_chain_start |
체인 시작 시 호출 | 작업 초기화, 실행 상태 로깅 |
on_chain_end |
체인 종료 시 호출 | 작업 종료 로그 기록, 결과 저장 |
on_chain_error |
체인 실행 중 오류 발생 시 호출 | 오류 로그 기록, 복구 작업 수행 |
on_tool_start |
도구 실행 시작 시 호출 | 도구 초기화 |
on_tool_end |
도구 실행 종료 시 호출 | 도구 실행 결과 처리 |
on_tool_error |
도구 실행 중 오류 발생 시 호출 | 오류 처리, 로그 기록 |
on_llm_start |
언어 모델(LLM) 호출 시작 시 호출 | LLM 호출 초기화 |
on_llm_new_token |
LLM 호출 중 새 토큰이 생성될 때 호출 | 실시간 스트리밍 데이터 출력 |
on_llm_end |
LLM 호출이 정상적으로 종료될 때 호출 | 결과 저장 및 로그 기록 |
on_llm_error |
LLM 호출 중 오류 발생 시 호출 | 오류 처리, 로그 기록 |
CallbackManager의 작동 예제
1. 작업 실행 상태 추적
from langchain_core.callbacks.manager import CallbackManager
# 콜백 핸들러 추가
callback_manager = CallbackManager()
callback_manager.add_handler(CustomHandler())
# 작업 체인 시작
callback_manager.on_chain_start({"name": "example_chain"}, {"input_key": "value"})
# 작업 체인 종료
callback_manager.on_chain_end({"output_key": "result"})
2. 실시간 데이터 스트리밍
class StreamingHandler:
def on_llm_new_token(self, token, **kwargs):
print(f"New token generated: {token}")
# CallbackManager에 핸들러 추가
callback_manager.add_handler(StreamingHandler())
# 실시간 스트리밍 중 콜백 발생
callback_manager.on_llm_new_token("Hello")
callback_manager.on_llm_new_token("world")
출력:
New token generated: Hello
New token generated: world
3. 오류 발생 처리
try:
# 작업 실행 중 예외 발생
raise ValueError("An error occurred")
except ValueError as e:
callback_manager.on_chain_error(e)
- 사용 목적:
- 오류 발생 시 추가적인 처리를 수행하거나 로그를 기록.
CallbackManager의 역할 정리
역할 | 설명 |
---|---|
작업 실행 추적 | LangChain 워크플로우에서 체인, 도구, LLM 실행 상태를 추적. |
콜백 핸들러 관리 | 작업 중 발생하는 이벤트를 콜백 핸들러에 전달하여 로그 기록, 데이터 처리 등 수행. |
실시간 데이터 처리 | 실시간으로 생성되는 데이터(예: 스트리밍 토큰)를 중간에 가공하거나 사용자에게 전달. |
오류 관리 | 작업 실행 중 발생한 오류를 처리하고, 로그를 기록하거나 복구 동작을 수행. |
확장성 제공 | 사용자 정의 콜백 핸들러를 추가하여 기본 동작을 확장하거나 새로운 동작을 추가. |
결론
CallbackManager
는 LangChain 실행의 모든 단계를 추적하고 제어하는 핵심 도구입니다.- 이를 활용해 실시간 로그 출력, 중간 데이터 처리, 오류 처리, LLM의 스트리밍 출력 제어 등을 수행할 수 있습니다.
- 사용자 정의 핸들러를 통해 LangChain의 기능을 확장하고, 실행 상태를 정밀하게 모니터링하거나 조정할 수 있습니다.
'langchain 공부' 카테고리의 다른 글
(이해를 돕는) Langchain 컴포넌트 코드 분석 총 모음 (0) | 2024.11.27 |
---|---|
RunnablePassthrough 코드분석 (0) | 2024.11.25 |
PromptTemplate 내부코드 분석 (0) | 2024.11.24 |
BM25 알고리즘 선택 기준 및 상세 사례 (3) | 2024.11.21 |
BM25Retriever2 : BM25원리 코드로 해석하기 (2) | 2024.11.20 |