배경
- 내부 원리를 알기위해 RunnablePassthrough 코드 분석하고자함
코드
"""Implementation of the RunnablePassthrough."""
from __future__ import annotations
import asyncio
import inspect
import threading
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Awaitable,
Callable,
Dict,
Iterator,
List,
Mapping,
Optional,
Type,
Union,
cast,
)
from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables.base import (
Other,
Runnable,
RunnableParallel,
RunnableSerializable,
)
from langchain_core.runnables.config import (
RunnableConfig,
acall_func_with_variable_args,
call_func_with_variable_args,
ensure_config,
get_executor_for_config,
patch_config,
)
from langchain_core.runnables.graph import Graph
from langchain_core.runnables.utils import (
AddableDict,
ConfigurableFieldSpec,
create_model,
)
from langchain_core.utils.aiter import atee, py_anext
from langchain_core.utils.iter import safetee
if TYPE_CHECKING:
from langchain_core.callbacks.manager import (
AsyncCallbackManagerForChainRun,
CallbackManagerForChainRun,
)
def identity(x: Other) -> Other:
"""Identity function.
Args:
x (Other): input.
Returns:
Other: output.
"""
return x
async def aidentity(x: Other) -> Other:
"""Async identity function.
Args:
x (Other): input.
Returns:
Other: output.
"""
return x
class RunnablePassthrough(RunnableSerializable[Other, Other]):
"""입력을 그대로 전달하거나 추가 키와 함께 반환하는 Runnable 클래스.
이 클래스는 거의 동일한 함수처럼 동작하지만, 입력값이 사전(dict)일 경우
출력에 추가 키를 포함하도록 구성할 수 있습니다.
아래 예제에서는 간단한 람다 함수를 사용한 간단한 체인 예제를 통해 이 Runnable이 어떻게 작동하는지 설명합니다.
예제들은 실행 및 실험이 쉽게 구성되어 있습니다.
Parameters:
func (Callable[[Other], None], optional): 입력값을 처리할 함수.
afunc (Callable[[Other], Awaitable[None]], optional): 비동기로 입력값을 처리할 함수.
input_type (Optional[Type[Other]], optional): 입력값의 타입을 명시하는 선택적 속성.
**kwargs (Any): 추가적인 키워드 인수들.
Examples:
.. code-block:: python
from langchain_core.runnables import (
RunnableLambda,
RunnableParallel,
RunnablePassthrough,
)
# 입력을 그대로 전달하는 origin과 입력값에 +1을 한 modified를 병렬로 실행
runnable = RunnableParallel(
origin=RunnablePassthrough(),
modified=lambda x: x+1
)
runnable.invoke(1) # {'origin': 1, 'modified': 2}
def fake_llm(prompt: str) -> str: # 예제용 가짜 LLM 함수
return "completion"
# 원본 텍스트와 역순으로 파싱된 텍스트를 동시에 반환하는 체인
chain = RunnableLambda(fake_llm) | {
'original': RunnablePassthrough(), # 원본 LLM 출력
'parsed': lambda text: text[::-1] # 파싱 로직
}
chain.invoke('hello') # {'original': 'completion', 'parsed': 'noitelpmoc'}
일부 경우에는 입력을 그대로 전달하면서 추가적인 키를 출력에 포함시키는 것이 유용할 수 있습니다.
이 경우 `assign` 메서드를 사용할 수 있습니다:
.. code-block:: python
from langchain_core.runnables import RunnablePassthrough
def fake_llm(prompt: str) -> str: # 예제용 가짜 LLM 함수
return "completion"
# 두 개의 LLM 출력을 포함하고 총 문자 수를 계산하여 반환하는 체인
runnable = {
'llm1': fake_llm,
'llm2': fake_llm,
} | RunnablePassthrough.assign(
total_chars=lambda inputs: len(inputs['llm1'] + inputs['llm2'])
)
runnable.invoke('hello')
# {'llm1': 'completion', 'llm2': 'completion', 'total_chars': 20}
"""
"""입력을 그대로 전달하거나 추가 키와 함께 반환하는 Runnable 클래스."""
input_type: Optional[Type[Other]] = None
"""입력값의 타입을 명시하는 선택적 속성.
- 입력 데이터의 타입을 지정하여 타입 안전성을 높이는 데 사용됩니다.
- 기본값은 `None`이며, 입력값의 타입이 명시되지 않을 수도 있습니다.
"""
func: Optional[
Union[Callable[[Other], None], Callable[[Other, RunnableConfig], None]]
] = None
"""동기 함수로, 입력값을 처리하는 데 사용됩니다.
- `Callable[[Other], None]`: 하나의 입력값을 처리하는 동기 함수.
- `Callable[[Other, RunnableConfig], None]`: 입력값과 추가 설정(`RunnableConfig`)을 처리하는 동기 함수.
"""
afunc: Optional[
Union[
Callable[[Other], Awaitable[None]],
Callable[[Other, RunnableConfig], Awaitable[None]],
]
] = None
"""비동기 함수로, 입력값을 비동기로 처리하는 데 사용됩니다.
- `Callable[[Other], Awaitable[None]]`: 하나의 입력값을 처리하는 비동기 함수.
- `Callable[[Other, RunnableConfig], Awaitable[None]]`: 입력값과 추가 설정(`RunnableConfig`)을 처리하는 비동기 함수.
"""
def __repr_args__(self) -> Any:
"""객체의 문자열 표현을 생성하는 메서드.
- 기본적으로 `repr(self)` 호출 시 무한 재귀 문제가 발생할 수 있어 이를 방지합니다.
- 관련 이슈: https://github.com/pydantic/pydantic/issues/7327
- 현재는 빈 리스트를 반환하도록 설정되어 있습니다.
"""
return []
# __init__ 메서드는 동기 함수와 비동기 함수 중 어떤 것을 사용할지를 결정합니다.
def __init__(
self,
func: Optional[
Union[
Union[Callable[[Other], None], Callable[[Other, RunnableConfig], None]],
Union[
Callable[[Other], Awaitable[None]],
Callable[[Other, RunnableConfig], Awaitable[None]],
],
]
] = None,
afunc: Optional[
Union[
Callable[[Other], Awaitable[None]],
Callable[[Other, RunnableConfig], Awaitable[None]],
]
] = None,
*,
input_type: Optional[Type[Other]] = None,
**kwargs: Any,
) -> None:
"""
초기화 메서드.
Args:
func (Optional[Union[Callable]]): 입력값을 처리할 동기 함수.
afunc (Optional[Union[Callable]]): 입력값을 처리할 비동기 함수.
input_type (Optional[Type[Other]]): 입력값의 타입.
kwargs (Any): 추가적인 키워드 인자.
동작 방식:
- `func`가 비동기 함수인지 확인하기 위해 `inspect.iscoroutinefunction`을 사용합니다.
- `func`가 비동기 함수인 경우, `afunc`에 할당하고 `func`는 `None`으로 설정합니다.
- `super().__init__`를 호출하여 부모 클래스의 초기화를 수행합니다.
- `type: ignore[call-arg]`는 타입 검사 도구에서 발생할 수 있는 경고를 무시합니다.
"""
# `func`가 비동기 함수인지 확인
if inspect.iscoroutinefunction(func):
afunc = func # `afunc`에 비동기 함수를 할당
func = None # `func`를 None으로 설정
# 부모 클래스의 초기화 메서드 호출
super().__init__(func=func, afunc=afunc, input_type=input_type, **kwargs) # type: ignore[call-arg]
@classmethod
def is_lc_serializable(cls) -> bool:
"""LangChain 객체가 직렬화 가능한지 확인합니다.
Returns:
bool: 항상 `True`를 반환하며, 이 클래스가 직렬화 가능한 객체임을 나타냅니다.
- LangChain에서 객체를 저장하거나 전송할 때 직렬화를 지원하도록 설계되었습니다.
"""
return True
@classmethod
def get_lc_namespace(cls) -> List[str]:
"""LangChain 객체의 네임스페이스를 반환합니다.
Returns:
List[str]: LangChain에서 이 클래스가 속한 네임스페이스를 나타내는 문자열 목록.
- 네임스페이스는 ["langchain", "schema", "runnable"]로 설정되어 있습니다.
- 네임스페이스는 객체를 분류하거나 참조 경로를 설정하는 데 사용됩니다.
"""
return ["langchain", "schema", "runnable"]
@property
def InputType(self) -> Any:
"""입력값의 타입을 반환합니다.
Returns:
Any: `input_type`이 설정되어 있으면 해당 타입을 반환하고, 설정되지 않은 경우 기본값 `Any`를 반환합니다.
- 타입 힌트를 통해 입력값의 유형을 명시적으로 관리할 수 있습니다.
- 유연한 입력값 처리를 위해 기본값은 `Any`로 설정되어 있습니다.
"""
return self.input_type or Any
@property
def OutputType(self) -> Any:
"""출력값의 타입을 반환합니다.
Returns:
Any: `input_type`이 설정되어 있으면 동일한 타입을 반환하며, 기본값은 `Any`입니다.
- 입력값과 출력값의 타입이 동일하다는 가정 하에 동작하도록 설계되었습니다.
- 출력값의 명시적 타입 지정을 통해 데이터 처리의 안정성을 높일 수 있습니다.
"""
return self.input_type or Any
@classmethod
def assign(
cls,
**kwargs: Union[
Runnable[Dict[str, Any], Any],
Callable[[Dict[str, Any]], Any],
Mapping[
str,
Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]],
],
],
) -> "RunnableAssign":
"""입력값을 매핑 결과와 병합하는 새로운 Runnable을 생성합니다.
Args:
**kwargs: 키-값 형태의 매핑.
- 값은 Runnable 객체, Callable 함수, 또는 또 다른 매핑일 수 있습니다.
- 매핑은 각 키에 대해 실행 가능한 작업(Runnable 또는 Callable)을 정의합니다.
Returns:
RunnableAssign: 입력값의 딕셔너리와 매핑 결과를 병합하여 반환하는 Runnable 객체.
- 입력값은 키-값 쌍의 딕셔너리 형태로 전달됩니다.
- 각 키에 대해 정의된 매핑 작업이 실행됩니다.
- 최종적으로 입력값과 매핑 작업의 출력값을 병합합니다.
"""
return RunnableAssign(RunnableParallel(kwargs))
def invoke(
self, input: Other, config: Optional[RunnableConfig] = None, **kwargs: Any
) -> Other:
"""입력값을 동기적으로 처리합니다.
Args:
input (Other): 처리할 입력값.
- 입력값은 `input_type`으로 정의된 타입과 일치해야 합니다.
config (Optional[RunnableConfig]): 실행 설정.
- 설정이 제공되지 않으면 기본값으로 설정됩니다.
**kwargs: 추가적인 매개변수.
Returns:
Other: 처리된 결과값. 입력값과 동일한 타입으로 반환됩니다.
Raises:
Exception: 동기 함수가 정의되지 않은 경우 에러가 발생할 수 있습니다.
동작 원리:
1. `func`가 정의된 경우, 입력값과 추가 매개변수를 사용하여 실행합니다.
2. 설정값(`config`)은 기본값으로 초기화되거나, 외부에서 제공된 값을 사용합니다.
3. 입력값을 `identity` 함수와 함께 `_call_with_config`로 전달하여 처리합니다.
"""
if self.func is not None:
call_func_with_variable_args(
self.func, input, ensure_config(config), **kwargs
)
return self._call_with_config(identity, input, config)
async def ainvoke(
self,
input: Other,
config: Optional[RunnableConfig] = None,
**kwargs: Optional[Any],
) -> Other:
"""입력값을 비동기적으로 처리합니다.
Args:
input (Other): 처리할 입력값.
- 입력값은 `input_type`으로 정의된 타입과 일치해야 합니다.
config (Optional[RunnableConfig]): 실행 설정.
**kwargs: 추가적인 매개변수.
Returns:
Other: 처리된 결과값. 입력값과 동일한 타입으로 반환됩니다.
동작 원리:
1. `afunc`가 정의된 경우 비동기로 실행합니다.
2. `afunc`가 없지만 `func`가 정의된 경우, 동기 함수를 호출합니다.
3. 입력값과 설정값(`config`)을 `_acall_with_config` 함수에 전달하여 비동기적으로 처리합니다.
"""
if self.afunc is not None:
await acall_func_with_variable_args(
self.afunc, input, ensure_config(config), **kwargs
)
elif self.func is not None:
call_func_with_variable_args(
self.func, input, ensure_config(config), **kwargs
)
return await self._acall_with_config(aidentity, input, config)
def transform(
self,
input: Iterator[Other],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[Other]:
"""입력 스트림을 동기적으로 처리합니다.
Args:
input (Iterator[Other]): 입력값의 스트림.
- 스트림 형태로 입력 데이터를 처리합니다.
config (Optional[RunnableConfig]): 실행 설정.
**kwargs: 추가적인 매개변수.
Yields:
Iterator[Other]: 처리된 스트림 데이터.
동작 원리:
1. 입력 스트림 데이터를 반복적으로 처리합니다.
2. `func`가 정의된 경우, 스트림 데이터를 집계하여 최종 처리합니다.
3. 스트림의 각 청크를 순차적으로 처리 및 반환합니다.
"""
if self.func is None:
for chunk in self._transform_stream_with_config(input, identity, config):
yield chunk
else:
final: Other
got_first_chunk = False
for chunk in self._transform_stream_with_config(input, identity, config):
yield chunk
if not got_first_chunk:
final = chunk
got_first_chunk = True
else:
try:
final = final + chunk # type: ignore[operator]
except TypeError:
final = chunk
if got_first_chunk:
call_func_with_variable_args(
self.func, final, ensure_config(config), **kwargs
)
async def atransform(
self,
input: AsyncIterator[Other],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[Other]:
"""입력 스트림을 비동기적으로 처리합니다.
Args:
input (AsyncIterator[Other]): 비동기 스트림 형태의 입력값.
config (Optional[RunnableConfig]): 실행 설정.
**kwargs: 추가적인 매개변수.
Yields:
AsyncIterator[Other]: 처리된 스트림 데이터.
동작 원리:
1. 비동기 스트림 데이터를 반복적으로 처리합니다.
2. `afunc` 또는 `func`가 정의된 경우, 스트림 데이터를 집계하여 최종 처리합니다.
3. 각 청크 데이터를 비동기적으로 처리 및 반환합니다.
"""
if self.afunc is None and self.func is None:
async for chunk in self._atransform_stream_with_config(
input, identity, config
):
yield chunk
else:
got_first_chunk = False
async for chunk in self._atransform_stream_with_config(
input, identity, config
):
yield chunk
if not got_first_chunk:
final = chunk
got_first_chunk = True
else:
try:
final = final + chunk # type: ignore[operator]
except TypeError:
final = chunk
if got_first_chunk:
config = ensure_config(config)
if self.afunc is not None:
await acall_func_with_variable_args(
self.afunc, final, config, **kwargs
)
elif self.func is not None:
call_func_with_variable_args(self.func, final, config, **kwargs)
def stream(
self,
input: Other,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[Other]:
"""입력값을 동기 스트림 형태로 처리합니다.
Args:
input (Other): 처리할 입력값.
- `Other`는 입력 데이터의 타입을 나타냅니다.
config (Optional[RunnableConfig]): 실행 설정.
- 실행에 필요한 추가 설정값을 포함합니다.
- 설정값이 제공되지 않은 경우 기본 설정이 사용됩니다.
**kwargs: 추가적인 매개변수.
- `func`나 기타 커스텀 함수에 전달될 추가 파라미터.
Returns:
Iterator[Other]: 입력값을 스트림 형태로 처리한 결과를 반환하는 이터레이터.
동작 원리:
1. 입력값 `input`을 리스트로 래핑한 후, 이터레이터로 변환합니다.
2. 내부적으로 `transform` 메서드를 호출하여 스트림 처리를 수행합니다.
3. 스트림 처리의 결과를 이터레이터로 반환합니다.
예시:
- 단일 입력값을 스트림 처리:
result = instance.stream("data")
for chunk in result:
print(chunk)
"""
return self.transform(iter([input]), config, **kwargs)
async def astream(
self,
input: Other,
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[Other]:
"""입력값을 비동기 스트림 형태로 처리합니다.
Args:
input (Other): 처리할 입력값.
- `Other`는 입력 데이터의 타입을 나타냅니다.
config (Optional[RunnableConfig]): 실행 설정.
- 실행에 필요한 추가 설정값을 포함합니다.
- 설정값이 제공되지 않은 경우 기본 설정이 사용됩니다.
**kwargs: 추가적인 매개변수.
- `afunc`나 기타 커스텀 비동기 함수에 전달될 추가 파라미터.
Returns:
AsyncIterator[Other]: 입력값을 비동기 스트림 형태로 처리한 결과를 반환하는 비동기 이터레이터.
동작 원리:
1. 비동기 생성기를 정의하여 입력값 `input`을 비동기적으로 전달합니다.
2. 내부적으로 `atransform` 메서드를 호출하여 비동기 스트림 처리를 수행합니다.
3. 각 처리된 청크(chunk)를 비동기적으로 반환합니다.
예시:
- 비동기 스트림 처리:
async for chunk in instance.astream("data"):
print(chunk)
"""
async def input_aiter() -> AsyncIterator[Other]:
"""입력값을 비동기 이터레이터로 래핑합니다."""
yield input
async for chunk in self.atransform(input_aiter(), config, **kwargs):
yield chunk
# _graph_passthrough는 데이터를 변형하지 않고 그대로 전달하는 기본 Runnable 객체입니다.
_graph_passthrough: RunnablePassthrough = RunnablePassthrough()
class RunnableAssign(RunnableSerializable[Dict[str, Any], Dict[str, Any]]):
"""입력 딕셔너리에 새로운 키-값 쌍을 추가하는 Runnable 클래스.
이 클래스는 입력 데이터를 받아, 내부의 `RunnableParallel` 객체를 통해 특정 작업을 병렬적으로 수행하고,
해당 결과를 원본 데이터에 병합하여 반환합니다.
Attributes:
mapper (RunnableParallel[Dict[str, Any]]): 입력 데이터를 변환하는 데 사용되는 RunnableParallel 객체.
- 이 객체는 각 키에 대한 작업을 병렬적으로 실행하며, 결과를 반환합니다.
Examples:
- 단일 키-값 추가 작업:
.. code-block:: python
from langchain_core.runnables.base import RunnableLambda
# 입력값에 10을 더하는 함수
def add_ten(x: Dict[str, int]) -> Dict[str, int]:
return {"added": x["input"] + 10}
# 매퍼 생성
mapper = RunnableParallel(
{"add_step": RunnableLambda(add_ten)}
)
# RunnableAssign 생성
runnable_assign = RunnableAssign(mapper)
# 동기 실행
result = runnable_assign.invoke({"input": 5})
print(result) # {'input': 5, 'add_step': {'added': 15}}
# 비동기 실행
result = await runnable_assign.ainvoke({"input": 5})
print(result) # {'input': 5, 'add_step': {'added': 15}}
"""
mapper: RunnableParallel[Dict[str, Any]]
def __init__(self, mapper: RunnableParallel[Dict[str, Any]], **kwargs: Any) -> None:
"""RunnableAssign 객체 초기화.
Args:
mapper (RunnableParallel[Dict[str, Any]]): 입력 데이터를 변환할 `RunnableParallel` 객체.
**kwargs: 추가적인 매개변수.
- 부모 클래스에서 필요한 추가 설정값을 받을 수 있습니다.
"""
super().__init__(mapper=mapper, **kwargs) # type: ignore[call-arg]
@classmethod
def is_lc_serializable(cls) -> bool:
"""LangChain 객체가 직렬화 가능한지 확인합니다.
Returns:
bool: 항상 `True`를 반환하여 이 클래스가 직렬화 가능함을 나타냅니다.
"""
return True
@classmethod
def get_lc_namespace(cls) -> List[str]:
"""LangChain 객체의 네임스페이스를 반환합니다.
Returns:
List[str]: 이 클래스의 네임스페이스 경로.
- 네임스페이스는 ["langchain", "schema", "runnable"]로 설정되어 있습니다.
"""
return ["langchain", "schema", "runnable"]
def get_name(
self, suffix: Optional[str] = None, *, name: Optional[str] = None
) -> str:
"""Runnable 객체의 이름을 생성합니다.
Args:
suffix (Optional[str]): 이름에 추가할 접미사.
name (Optional[str]): 직접 지정할 이름.
Returns:
str: 최종적으로 생성된 이름.
"""
name = (
name
or self.name
or f"RunnableAssign<{','.join(self.mapper.steps__.keys())}>"
)
return super().get_name(suffix, name=name)
def _invoke(
self,
input: Dict[str, Any],
run_manager: CallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Dict[str, Any]:
"""입력 데이터를 변환하고 결과를 병합하여 반환합니다.
Args:
input (Dict[str, Any]): 처리할 입력값.
- 반드시 딕셔너리 형식이어야 합니다.
run_manager (CallbackManagerForChainRun): 실행 관리 객체.
- 체인의 실행 중 콜백을 관리하는 데 사용됩니다.
config (RunnableConfig): 실행 설정값.
**kwargs: 추가적인 매개변수.
Returns:
Dict[str, Any]: 원본 데이터와 매핑 작업 결과가 병합된 최종 결과.
Raises:
AssertionError: 입력값이 딕셔너리가 아닌 경우 에러를 발생시킵니다.
동작 원리:
1. 입력값이 딕셔너리인지 검증합니다.
2. `mapper`를 호출하여 입력 데이터를 변환합니다.
3. 원본 입력 데이터와 변환 결과를 병합하여 반환합니다.
"""
assert isinstance(
input, dict
), "The input to RunnableAssign._invoke() must be a dict."
return {
**input, # 원본 입력값 유지
**self.mapper.invoke(
input,
patch_config(config, callbacks=run_manager.get_child()), # 실행 설정 패치
**kwargs,
),
}
def invoke(
self,
input: Dict[str, Any],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""입력값을 동기적으로 처리하고 결과를 반환합니다.
Args:
input (Dict[str, Any]): 처리할 입력 데이터.
- 반드시 딕셔너리 형식이어야 합니다.
config (Optional[RunnableConfig]): 실행 설정값.
- 설정이 제공되지 않은 경우 기본 설정이 사용됩니다.
**kwargs: 추가적인 매개변수.
Returns:
Dict[str, Any]: 입력값과 매핑 작업 결과가 병합된 결과값.
동작 원리:
1. `_call_with_config` 메서드를 호출하여 `_invoke` 메서드와 설정값을 전달합니다.
2. 내부적으로 `_invoke`를 통해 데이터를 처리하고 반환합니다.
Raises:
AssertionError: 입력값이 딕셔너리가 아닌 경우 에러를 발생시킵니다.
"""
return self._call_with_config(self._invoke, input, config, **kwargs)
async def ainvoke(
self,
input: Dict[str, Any],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""입력값을 비동기적으로 처리하고 결과를 반환합니다.
Args:
input (Dict[str, Any]): 처리할 입력 데이터.
- 반드시 딕셔너리 형식이어야 합니다.
config (Optional[RunnableConfig]): 실행 설정값.
**kwargs: 추가적인 매개변수.
Returns:
Dict[str, Any]: 입력값과 매핑 작업 결과가 병합된 결과값.
동작 원리:
1. `_acall_with_config` 메서드를 호출하여 `_ainvoke` 메서드와 설정값을 전달합니다.
2. 내부적으로 `_ainvoke`를 통해 데이터를 비동기적으로 처리합니다.
Raises:
AssertionError: 입력값이 딕셔너리가 아닌 경우 에러를 발생시킵니다.
"""
return await self._acall_with_config(self._ainvoke, input, config, **kwargs)
def _transform(
self,
input: Iterator[Dict[str, Any]],
run_manager: CallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> Iterator[Dict[str, Any]]:
"""입력 데이터를 스트림 형태로 처리합니다.
Args:
input (Iterator[Dict[str, Any]]): 처리할 입력 데이터의 이터레이터.
run_manager (CallbackManagerForChainRun): 실행 관리 객체.
- 체인의 실행 중 콜백을 관리합니다.
config (RunnableConfig): 실행 설정값.
**kwargs: 추가적인 매개변수.
Returns:
Iterator[Dict[str, Any]]: 처리된 스트림 데이터를 반환하는 이터레이터.
동작 원리:
1. `input` 데이터를 복제하여 두 개의 스트림으로 분리합니다.
- `for_passthrough`: 원본 데이터를 그대로 전달하는 스트림.
- `for_map`: 매핑 작업에 사용할 스트림.
2. `mapper.transform`을 호출하여 `for_map` 스트림을 처리합니다.
3. 원본 데이터(`for_passthrough`)에서 매퍼와 중복되는 키를 제거한 후 결과를 생성합니다.
4. 매퍼의 결과(`map_output`)를 백그라운드에서 처리하고 반환합니다.
Raises:
AssertionError: 입력값이 딕셔너리가 아닌 경우 에러를 발생시킵니다.
"""
# 매퍼의 키 수집
mapper_keys = set(self.mapper.steps__.keys())
# 입력 스트림 복제
for_passthrough, for_map = safetee(input, 2, lock=threading.Lock())
# 매퍼를 통해 처리된 출력 스트림 생성
map_output = self.mapper.transform(
for_map,
patch_config(config, callbacks=run_manager.get_child()),
**kwargs,
)
# 백그라운드에서 매퍼의 첫 번째 청크를 처리
with get_executor_for_config(config) as executor:
first_map_chunk_future = executor.submit(
next, map_output, None
)
# 원본 데이터 스트림 처리
for chunk in for_passthrough:
assert isinstance(
chunk, dict
), "The input to RunnablePassthrough.assign() must be a dict."
# 매퍼 키 제거
filtered = AddableDict(
{k: v for k, v in chunk.items() if k not in mapper_keys}
)
if filtered:
yield filtered
# 매퍼 결과 반환
yield cast(Dict[str, Any], first_map_chunk_future.result())
for chunk in map_output:
yield chunk
async def _atransform(
self,
input: AsyncIterator[Dict[str, Any]],
run_manager: AsyncCallbackManagerForChainRun,
config: RunnableConfig,
**kwargs: Any,
) -> AsyncIterator[Dict[str, Any]]:
"""입력 데이터를 비동기 스트림 형태로 처리합니다.
Args:
input (AsyncIterator[Dict[str, Any]]): 처리할 비동기 입력 데이터.
run_manager (AsyncCallbackManagerForChainRun): 실행 관리 객체.
config (RunnableConfig): 실행 설정값.
**kwargs: 추가적인 매개변수.
Returns:
AsyncIterator[Dict[str, Any]]: 처리된 비동기 스트림 데이터를 반환.
동작 원리:
1. `input` 데이터를 복제하여 두 개의 스트림으로 분리합니다.
- `for_passthrough`: 원본 데이터를 그대로 전달하는 스트림.
- `for_map`: 매핑 작업에 사용할 스트림.
2. `mapper.atransform`을 호출하여 `for_map` 스트림을 비동기적으로 처리합니다.
3. 원본 데이터(`for_passthrough`)에서 매퍼와 중복되는 키를 제거한 후 결과를 생성합니다.
4. 매퍼의 결과(`map_output`)를 비동기적으로 반환합니다.
Raises:
AssertionError: 입력값이 딕셔너리가 아닌 경우 에러를 발생시킵니다.
"""
# 매퍼의 키 수집
mapper_keys = set(self.mapper.steps__.keys())
# 입력 스트림 복제
for_passthrough, for_map = atee(input, 2, lock=asyncio.Lock())
# 매퍼를 통해 처리된 출력 스트림 생성
map_output = self.mapper.atransform(
for_map,
patch_config(config, callbacks=run_manager.get_child()),
**kwargs,
)
# 백그라운드에서 매퍼의 첫 번째 청크 처리
first_map_chunk_task: asyncio.Task = asyncio.create_task(
py_anext(map_output, None), # type: ignore[arg-type]
)
# 원본 데이터 스트림 처리
async for chunk in for_passthrough:
assert isinstance(
chunk, dict
), "The input to RunnablePassthrough.assign() must be a dict."
# 매퍼 키 제거
filtered = AddableDict(
{k: v for k, v in chunk.items() if k not in mapper_keys}
)
if filtered:
yield filtered
# 매퍼 결과 반환
yield await first_map_chunk_task
async for chunk in map_output:
yield chunk
class RunnablePick(RunnableSerializable[Dict[str, Any], Dict[str, Any]]):
"""입력 딕셔너리에서 지정된 키를 선택하여 반환하는 Runnable 클래스.
RunnablePick 클래스는 입력 딕셔너리에서 특정 키를 선택적으로 추출하고,
해당 키-값 쌍으로 구성된 새로운 딕셔너리를 반환합니다. 이를 통해
필요한 데이터만 필터링하여 처리할 수 있습니다.
Attributes:
keys (Union[str, List[str]]): 추출할 키 또는 키 목록.
- 단일 키를 문자열로 지정하거나, 다수의 키를 리스트로 지정할 수 있습니다.
Example:
입력값에서 'name'과 'age' 키만 선택하여 반환하는 예제:
.. code-block:: python
from langchain_core.runnables.passthrough import RunnablePick
input_data = {
'name': 'John',
'age': 30,
'city': 'New York',
'country': 'USA'
}
# 'name'과 'age'만 추출
runnable = RunnablePick(keys=['name', 'age'])
output_data = runnable.invoke(input_data)
print(output_data) # Output: {'name': 'John', 'age': 30}
"""
keys: Union[str, List[str]]
def __init__(self, keys: Union[str, List[str]], **kwargs: Any) -> None:
"""RunnablePick 객체 초기화.
Args:
keys (Union[str, List[str]]): 추출할 키 또는 키 목록.
- 문자열로 단일 키를 지정하거나, 리스트로 다수의 키를 지정합니다.
**kwargs: 추가적인 매개변수. 부모 클래스에서 필요한 설정값을 받을 수 있습니다.
"""
super().__init__(keys=keys, **kwargs) # type: ignore[call-arg]
@classmethod
def is_lc_serializable(cls) -> bool:
"""LangChain 객체가 직렬화 가능한지 확인합니다.
Returns:
bool: 항상 `True`를 반환하여 이 클래스가 직렬화 가능함을 나타냅니다.
"""
return True
@classmethod
def get_lc_namespace(cls) -> List[str]:
"""LangChain 객체의 네임스페이스를 반환합니다.
Returns:
List[str]: 이 클래스의 네임스페이스 경로.
- 네임스페이스는 ["langchain", "schema", "runnable"]로 설정되어 있습니다.
"""
return ["langchain", "schema", "runnable"]
def get_name(
self, suffix: Optional[str] = None, *, name: Optional[str] = None
) -> str:
"""Runnable 객체의 이름을 생성합니다.
Args:
suffix (Optional[str]): 이름에 추가할 접미사.
name (Optional[str]): 직접 지정할 이름.
Returns:
str: 최종적으로 생성된 이름.
"""
name = (
name
or self.name
or f"RunnablePick<{','.join([self.keys] if isinstance(self.keys, str) else self.keys)}>" # noqa: E501
)
return super().get_name(suffix, name=name)
def _pick(self, input: Dict[str, Any]) -> Any:
"""입력 딕셔너리에서 지정된 키를 선택합니다.
Args:
input (Dict[str, Any]): 처리할 입력 딕셔너리.
Returns:
Any: 단일 키를 선택한 경우 해당 값, 또는 여러 키를 선택한 경우 새로운 딕셔너리.
Raises:
AssertionError: 입력값이 딕셔너리가 아닌 경우 에러 발생.
동작 원리:
1. 입력값이 딕셔너리인지 검증합니다.
2. 단일 키(`str`)인 경우 해당 키의 값을 반환합니다.
3. 다수의 키(`List[str]`)인 경우, 해당 키-값 쌍만 포함하는 딕셔너리를 생성하여 반환합니다.
"""
assert isinstance(
input, dict
), "The input to RunnablePassthrough.assign() must be a dict."
if isinstance(self.keys, str):
return input.get(self.keys)
else:
picked = {k: input.get(k) for k in self.keys if k in input}
if picked:
return AddableDict(picked)
else:
return None
def invoke(
self,
input: Dict[str, Any],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""입력값을 동기적으로 처리하고 지정된 키만 추출하여 반환합니다.
Args:
input (Dict[str, Any]): 처리할 입력 딕셔너리.
config (Optional[RunnableConfig]): 실행 설정값.
- 설정값이 제공되지 않은 경우 기본 설정이 사용됩니다.
**kwargs: 추가적인 매개변수.
Returns:
Dict[str, Any]: 선택된 키와 해당 값을 포함하는 딕셔너리.
동작 원리:
1. `_call_with_config` 메서드를 호출하여 `_invoke`와 설정값을 전달합니다.
2. `_pick` 메서드를 사용해 입력값에서 지정된 키만 추출합니다.
"""
return self._call_with_config(self._invoke, input, config, **kwargs)
async def ainvoke(
self,
input: Dict[str, Any],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""입력값을 비동기적으로 처리하고 지정된 키만 추출하여 반환합니다.
Args:
input (Dict[str, Any]): 처리할 입력 딕셔너리.
config (Optional[RunnableConfig]): 실행 설정값.
**kwargs: 추가적인 매개변수.
Returns:
Dict[str, Any]: 선택된 키와 해당 값을 포함하는 딕셔너리.
동작 원리:
1. `_acall_with_config` 메서드를 호출하여 `_ainvoke`와 설정값을 전달합니다.
2. `_pick` 메서드를 사용해 입력값에서 지정된 키만 추출합니다.
"""
return await self._acall_with_config(self._ainvoke, input, config, **kwargs)
def transform(
self,
input: Iterator[Dict[str, Any]],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> Iterator[Dict[str, Any]]:
"""입력값을 스트림 형태로 처리하여 선택된 키만 반환합니다.
Args:
input (Iterator[Dict[str, Any]]): 처리할 입력 데이터의 이터레이터.
config (Optional[RunnableConfig]): 실행 설정값.
**kwargs: 추가적인 매개변수.
Returns:
Iterator[Dict[str, Any]]: 선택된 키와 값을 포함하는 스트림.
"""
yield from self._transform_stream_with_config(
input, self._transform, config, **kwargs
)
async def atransform(
self,
input: AsyncIterator[Dict[str, Any]],
config: Optional[RunnableConfig] = None,
**kwargs: Any,
) -> AsyncIterator[Dict[str, Any]]:
"""입력값을 비동기 스트림 형태로 처리하여 선택된 키만 반환합니다.
Args:
input (AsyncIterator[Dict[str, Any]]): 처리할 비동기 입력 데이터.
config (Optional[RunnableConfig]): 실행 설정값.
**kwargs: 추가적인 매개변수.
Returns:
AsyncIterator[Dict[str, Any]]: 선택된 키와 값을 포함하는 비동기 스트림.
"""
async for chunk in self._atransform_stream_with_config(
input, self._atransform, config, **kwargs
):
yield chunk