langchain 공부

RunnablePassthrough 코드분석

필만이 2024. 11. 25. 00:07

배경

  • 내부 원리를 알기위해 RunnablePassthrough 코드 분석하고자함

코드

"""Implementation of the RunnablePassthrough."""

from __future__ import annotations

import asyncio
import inspect
import threading
from typing import (
    TYPE_CHECKING,
    Any,
    AsyncIterator,
    Awaitable,
    Callable,
    Dict,
    Iterator,
    List,
    Mapping,
    Optional,
    Type,
    Union,
    cast,
)

from langchain_core.pydantic_v1 import BaseModel
from langchain_core.runnables.base import (
    Other,
    Runnable,
    RunnableParallel,
    RunnableSerializable,
)

from langchain_core.runnables.config import (
    RunnableConfig,
    acall_func_with_variable_args,
    call_func_with_variable_args,
    ensure_config,
    get_executor_for_config,
    patch_config,
)
from langchain_core.runnables.graph import Graph
from langchain_core.runnables.utils import (
    AddableDict,
    ConfigurableFieldSpec,
    create_model,
)
from langchain_core.utils.aiter import atee, py_anext
from langchain_core.utils.iter import safetee

if TYPE_CHECKING:
    from langchain_core.callbacks.manager import (
        AsyncCallbackManagerForChainRun,
        CallbackManagerForChainRun,
    )


def identity(x: Other) -> Other:
    """Identity function.

    Args:
        x (Other): input.

    Returns:
        Other: output.
    """
    return x


async def aidentity(x: Other) -> Other:
    """Async identity function.

    Args:
        x (Other): input.

    Returns:
        Other: output.
    """
    return x


class RunnablePassthrough(RunnableSerializable[Other, Other]):
    """입력을 그대로 전달하거나 추가 키와 함께 반환하는 Runnable 클래스.

    이 클래스는 거의 동일한 함수처럼 동작하지만, 입력값이 사전(dict)일 경우
    출력에 추가 키를 포함하도록 구성할 수 있습니다.

    아래 예제에서는 간단한 람다 함수를 사용한 간단한 체인 예제를 통해 이 Runnable이 어떻게 작동하는지 설명합니다.
    예제들은 실행 및 실험이 쉽게 구성되어 있습니다.

    Parameters:
        func (Callable[[Other], None], optional): 입력값을 처리할 함수.
        afunc (Callable[[Other], Awaitable[None]], optional): 비동기로 입력값을 처리할 함수.
        input_type (Optional[Type[Other]], optional): 입력값의 타입을 명시하는 선택적 속성.
        **kwargs (Any): 추가적인 키워드 인수들.

    Examples:

        .. code-block:: python

            from langchain_core.runnables import (
                RunnableLambda,
                RunnableParallel,
                RunnablePassthrough,
            )

            # 입력을 그대로 전달하는 origin과 입력값에 +1을 한 modified를 병렬로 실행
            runnable = RunnableParallel(
                origin=RunnablePassthrough(),
                modified=lambda x: x+1
            )

            runnable.invoke(1) # {'origin': 1, 'modified': 2}


            def fake_llm(prompt: str) -> str: # 예제용 가짜 LLM 함수
                return "completion"

            # 원본 텍스트와 역순으로 파싱된 텍스트를 동시에 반환하는 체인
            chain = RunnableLambda(fake_llm) | {
                'original': RunnablePassthrough(), # 원본 LLM 출력
                'parsed': lambda text: text[::-1] # 파싱 로직
            }

            chain.invoke('hello') # {'original': 'completion', 'parsed': 'noitelpmoc'}

    일부 경우에는 입력을 그대로 전달하면서 추가적인 키를 출력에 포함시키는 것이 유용할 수 있습니다.
    이 경우 `assign` 메서드를 사용할 수 있습니다:

        .. code-block:: python

            from langchain_core.runnables import RunnablePassthrough

            def fake_llm(prompt: str) -> str: # 예제용 가짜 LLM 함수
                return "completion"

            # 두 개의 LLM 출력을 포함하고 총 문자 수를 계산하여 반환하는 체인
            runnable = {
                'llm1':  fake_llm,
                'llm2':  fake_llm,
            } | RunnablePassthrough.assign(
                total_chars=lambda inputs: len(inputs['llm1'] + inputs['llm2'])
            )

            runnable.invoke('hello')
            # {'llm1': 'completion', 'llm2': 'completion', 'total_chars': 20}
    """

    """입력을 그대로 전달하거나 추가 키와 함께 반환하는 Runnable 클래스."""
    input_type: Optional[Type[Other]] = None
    """입력값의 타입을 명시하는 선택적 속성.
    - 입력 데이터의 타입을 지정하여 타입 안전성을 높이는 데 사용됩니다.
    - 기본값은 `None`이며, 입력값의 타입이 명시되지 않을 수도 있습니다.
    """

    func: Optional[
        Union[Callable[[Other], None], Callable[[Other, RunnableConfig], None]]
    ] = None
    """동기 함수로, 입력값을 처리하는 데 사용됩니다.
    - `Callable[[Other], None]`: 하나의 입력값을 처리하는 동기 함수.
    - `Callable[[Other, RunnableConfig], None]`: 입력값과 추가 설정(`RunnableConfig`)을 처리하는 동기 함수.
    """

    afunc: Optional[
        Union[
            Callable[[Other], Awaitable[None]], 
            Callable[[Other, RunnableConfig], Awaitable[None]],
        ]
    ] = None
    """비동기 함수로, 입력값을 비동기로 처리하는 데 사용됩니다.
    - `Callable[[Other], Awaitable[None]]`: 하나의 입력값을 처리하는 비동기 함수.
    - `Callable[[Other, RunnableConfig], Awaitable[None]]`: 입력값과 추가 설정(`RunnableConfig`)을 처리하는 비동기 함수.
    """

    def __repr_args__(self) -> Any:
        """객체의 문자열 표현을 생성하는 메서드.
        - 기본적으로 `repr(self)` 호출 시 무한 재귀 문제가 발생할 수 있어 이를 방지합니다.
        - 관련 이슈: https://github.com/pydantic/pydantic/issues/7327
        - 현재는 빈 리스트를 반환하도록 설정되어 있습니다.
        """
        return []

    # __init__ 메서드는 동기 함수와 비동기 함수 중 어떤 것을 사용할지를 결정합니다.
    def __init__(
        self,
        func: Optional[
            Union[
                Union[Callable[[Other], None], Callable[[Other, RunnableConfig], None]],
                Union[
                    Callable[[Other], Awaitable[None]],
                    Callable[[Other, RunnableConfig], Awaitable[None]],
                ],
            ]
        ] = None,
        afunc: Optional[
            Union[
                Callable[[Other], Awaitable[None]],
                Callable[[Other, RunnableConfig], Awaitable[None]],
            ]
        ] = None,
        *,
        input_type: Optional[Type[Other]] = None,
        **kwargs: Any,
    ) -> None:
        """
        초기화 메서드.

        Args:
            func (Optional[Union[Callable]]): 입력값을 처리할 동기 함수.
            afunc (Optional[Union[Callable]]): 입력값을 처리할 비동기 함수.
            input_type (Optional[Type[Other]]): 입력값의 타입.
            kwargs (Any): 추가적인 키워드 인자.

        동작 방식:
            - `func`가 비동기 함수인지 확인하기 위해 `inspect.iscoroutinefunction`을 사용합니다.
            - `func`가 비동기 함수인 경우, `afunc`에 할당하고 `func`는 `None`으로 설정합니다.
            - `super().__init__`를 호출하여 부모 클래스의 초기화를 수행합니다.
            - `type: ignore[call-arg]`는 타입 검사 도구에서 발생할 수 있는 경고를 무시합니다.
        """
        # `func`가 비동기 함수인지 확인
        if inspect.iscoroutinefunction(func):
            afunc = func  # `afunc`에 비동기 함수를 할당
            func = None   # `func`를 None으로 설정

        # 부모 클래스의 초기화 메서드 호출
        super().__init__(func=func, afunc=afunc, input_type=input_type, **kwargs)  # type: ignore[call-arg]

    @classmethod
    def is_lc_serializable(cls) -> bool:
        """LangChain 객체가 직렬화 가능한지 확인합니다.

        Returns:
            bool: 항상 `True`를 반환하며, 이 클래스가 직렬화 가능한 객체임을 나타냅니다.
                - LangChain에서 객체를 저장하거나 전송할 때 직렬화를 지원하도록 설계되었습니다.
        """
        return True

    @classmethod
    def get_lc_namespace(cls) -> List[str]:
        """LangChain 객체의 네임스페이스를 반환합니다.

        Returns:
            List[str]: LangChain에서 이 클래스가 속한 네임스페이스를 나타내는 문자열 목록.
                - 네임스페이스는 ["langchain", "schema", "runnable"]로 설정되어 있습니다.
                - 네임스페이스는 객체를 분류하거나 참조 경로를 설정하는 데 사용됩니다.
        """
        return ["langchain", "schema", "runnable"]

    @property
    def InputType(self) -> Any:
        """입력값의 타입을 반환합니다.

        Returns:
            Any: `input_type`이 설정되어 있으면 해당 타입을 반환하고, 설정되지 않은 경우 기본값 `Any`를 반환합니다.
                - 타입 힌트를 통해 입력값의 유형을 명시적으로 관리할 수 있습니다.
                - 유연한 입력값 처리를 위해 기본값은 `Any`로 설정되어 있습니다.
        """
        return self.input_type or Any

    @property
    def OutputType(self) -> Any:
        """출력값의 타입을 반환합니다.

        Returns:
            Any: `input_type`이 설정되어 있으면 동일한 타입을 반환하며, 기본값은 `Any`입니다.
                - 입력값과 출력값의 타입이 동일하다는 가정 하에 동작하도록 설계되었습니다.
                - 출력값의 명시적 타입 지정을 통해 데이터 처리의 안정성을 높일 수 있습니다.
        """
        return self.input_type or Any

    @classmethod
    def assign(
        cls,
        **kwargs: Union[
            Runnable[Dict[str, Any], Any],
            Callable[[Dict[str, Any]], Any],
            Mapping[
                str,
                Union[Runnable[Dict[str, Any], Any], Callable[[Dict[str, Any]], Any]],
            ],
        ],
    ) -> "RunnableAssign":
        """입력값을 매핑 결과와 병합하는 새로운 Runnable을 생성합니다.

        Args:
            **kwargs: 키-값 형태의 매핑.
                - 값은 Runnable 객체, Callable 함수, 또는 또 다른 매핑일 수 있습니다.
                - 매핑은 각 키에 대해 실행 가능한 작업(Runnable 또는 Callable)을 정의합니다.

        Returns:
            RunnableAssign: 입력값의 딕셔너리와 매핑 결과를 병합하여 반환하는 Runnable 객체.
                - 입력값은 키-값 쌍의 딕셔너리 형태로 전달됩니다.
                - 각 키에 대해 정의된 매핑 작업이 실행됩니다.
                - 최종적으로 입력값과 매핑 작업의 출력값을 병합합니다.
        """
        return RunnableAssign(RunnableParallel(kwargs))

    def invoke(
        self, input: Other, config: Optional[RunnableConfig] = None, **kwargs: Any
    ) -> Other:
        """입력값을 동기적으로 처리합니다.

        Args:
            input (Other): 처리할 입력값.
                - 입력값은 `input_type`으로 정의된 타입과 일치해야 합니다.
            config (Optional[RunnableConfig]): 실행 설정.
                - 설정이 제공되지 않으면 기본값으로 설정됩니다.
            **kwargs: 추가적인 매개변수.

        Returns:
            Other: 처리된 결과값. 입력값과 동일한 타입으로 반환됩니다.

        Raises:
            Exception: 동기 함수가 정의되지 않은 경우 에러가 발생할 수 있습니다.

        동작 원리:
            1. `func`가 정의된 경우, 입력값과 추가 매개변수를 사용하여 실행합니다.
            2. 설정값(`config`)은 기본값으로 초기화되거나, 외부에서 제공된 값을 사용합니다.
            3. 입력값을 `identity` 함수와 함께 `_call_with_config`로 전달하여 처리합니다.
        """
        if self.func is not None:
            call_func_with_variable_args(
                self.func, input, ensure_config(config), **kwargs
            )
        return self._call_with_config(identity, input, config)

    async def ainvoke(
        self,
        input: Other,
        config: Optional[RunnableConfig] = None,
        **kwargs: Optional[Any],
    ) -> Other:
        """입력값을 비동기적으로 처리합니다.

        Args:
            input (Other): 처리할 입력값.
                - 입력값은 `input_type`으로 정의된 타입과 일치해야 합니다.
            config (Optional[RunnableConfig]): 실행 설정.
            **kwargs: 추가적인 매개변수.

        Returns:
            Other: 처리된 결과값. 입력값과 동일한 타입으로 반환됩니다.

        동작 원리:
            1. `afunc`가 정의된 경우 비동기로 실행합니다.
            2. `afunc`가 없지만 `func`가 정의된 경우, 동기 함수를 호출합니다.
            3. 입력값과 설정값(`config`)을 `_acall_with_config` 함수에 전달하여 비동기적으로 처리합니다.
        """
        if self.afunc is not None:
            await acall_func_with_variable_args(
                self.afunc, input, ensure_config(config), **kwargs
            )
        elif self.func is not None:
            call_func_with_variable_args(
                self.func, input, ensure_config(config), **kwargs
            )
        return await self._acall_with_config(aidentity, input, config)

    def transform(
        self,
        input: Iterator[Other],
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> Iterator[Other]:
        """입력 스트림을 동기적으로 처리합니다.

        Args:
            input (Iterator[Other]): 입력값의 스트림.
                - 스트림 형태로 입력 데이터를 처리합니다.
            config (Optional[RunnableConfig]): 실행 설정.
            **kwargs: 추가적인 매개변수.

        Yields:
            Iterator[Other]: 처리된 스트림 데이터.

        동작 원리:
            1. 입력 스트림 데이터를 반복적으로 처리합니다.
            2. `func`가 정의된 경우, 스트림 데이터를 집계하여 최종 처리합니다.
            3. 스트림의 각 청크를 순차적으로 처리 및 반환합니다.
        """
        if self.func is None:
            for chunk in self._transform_stream_with_config(input, identity, config):
                yield chunk
        else:
            final: Other
            got_first_chunk = False

            for chunk in self._transform_stream_with_config(input, identity, config):
                yield chunk

                if not got_first_chunk:
                    final = chunk
                    got_first_chunk = True
                else:
                    try:
                        final = final + chunk  # type: ignore[operator]
                    except TypeError:
                        final = chunk

            if got_first_chunk:
                call_func_with_variable_args(
                    self.func, final, ensure_config(config), **kwargs
                )

    async def atransform(
        self,
        input: AsyncIterator[Other],
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> AsyncIterator[Other]:
        """입력 스트림을 비동기적으로 처리합니다.

        Args:
            input (AsyncIterator[Other]): 비동기 스트림 형태의 입력값.
            config (Optional[RunnableConfig]): 실행 설정.
            **kwargs: 추가적인 매개변수.

        Yields:
            AsyncIterator[Other]: 처리된 스트림 데이터.

        동작 원리:
            1. 비동기 스트림 데이터를 반복적으로 처리합니다.
            2. `afunc` 또는 `func`가 정의된 경우, 스트림 데이터를 집계하여 최종 처리합니다.
            3. 각 청크 데이터를 비동기적으로 처리 및 반환합니다.
        """
        if self.afunc is None and self.func is None:
            async for chunk in self._atransform_stream_with_config(
                input, identity, config
            ):
                yield chunk
        else:
            got_first_chunk = False

            async for chunk in self._atransform_stream_with_config(
                input, identity, config
            ):
                yield chunk

                if not got_first_chunk:
                    final = chunk
                    got_first_chunk = True
                else:
                    try:
                        final = final + chunk  # type: ignore[operator]
                    except TypeError:
                        final = chunk

            if got_first_chunk:
                config = ensure_config(config)
                if self.afunc is not None:
                    await acall_func_with_variable_args(
                        self.afunc, final, config, **kwargs
                    )
                elif self.func is not None:
                    call_func_with_variable_args(self.func, final, config, **kwargs)
def stream(
    self,
    input: Other,
    config: Optional[RunnableConfig] = None,
    **kwargs: Any,
) -> Iterator[Other]:
    """입력값을 동기 스트림 형태로 처리합니다.

    Args:
        input (Other): 처리할 입력값.
            - `Other`는 입력 데이터의 타입을 나타냅니다.
        config (Optional[RunnableConfig]): 실행 설정.
            - 실행에 필요한 추가 설정값을 포함합니다.
            - 설정값이 제공되지 않은 경우 기본 설정이 사용됩니다.
        **kwargs: 추가적인 매개변수.
            - `func`나 기타 커스텀 함수에 전달될 추가 파라미터.

    Returns:
        Iterator[Other]: 입력값을 스트림 형태로 처리한 결과를 반환하는 이터레이터.

    동작 원리:
        1. 입력값 `input`을 리스트로 래핑한 후, 이터레이터로 변환합니다.
        2. 내부적으로 `transform` 메서드를 호출하여 스트림 처리를 수행합니다.
        3. 스트림 처리의 결과를 이터레이터로 반환합니다.

    예시:
        - 단일 입력값을 스트림 처리:
            result = instance.stream("data")
            for chunk in result:
                print(chunk)
    """
    return self.transform(iter([input]), config, **kwargs)

async def astream(
    self,
    input: Other,
    config: Optional[RunnableConfig] = None,
    **kwargs: Any,
) -> AsyncIterator[Other]:
    """입력값을 비동기 스트림 형태로 처리합니다.

    Args:
        input (Other): 처리할 입력값.
            - `Other`는 입력 데이터의 타입을 나타냅니다.
        config (Optional[RunnableConfig]): 실행 설정.
            - 실행에 필요한 추가 설정값을 포함합니다.
            - 설정값이 제공되지 않은 경우 기본 설정이 사용됩니다.
        **kwargs: 추가적인 매개변수.
            - `afunc`나 기타 커스텀 비동기 함수에 전달될 추가 파라미터.

    Returns:
        AsyncIterator[Other]: 입력값을 비동기 스트림 형태로 처리한 결과를 반환하는 비동기 이터레이터.

    동작 원리:
        1. 비동기 생성기를 정의하여 입력값 `input`을 비동기적으로 전달합니다.
        2. 내부적으로 `atransform` 메서드를 호출하여 비동기 스트림 처리를 수행합니다.
        3. 각 처리된 청크(chunk)를 비동기적으로 반환합니다.

    예시:
        - 비동기 스트림 처리:
            async for chunk in instance.astream("data"):
                print(chunk)
    """
    async def input_aiter() -> AsyncIterator[Other]:
        """입력값을 비동기 이터레이터로 래핑합니다."""
        yield input

    async for chunk in self.atransform(input_aiter(), config, **kwargs):
        yield chunk


# _graph_passthrough는 데이터를 변형하지 않고 그대로 전달하는 기본 Runnable 객체입니다.
_graph_passthrough: RunnablePassthrough = RunnablePassthrough()


class RunnableAssign(RunnableSerializable[Dict[str, Any], Dict[str, Any]]):
    """입력 딕셔너리에 새로운 키-값 쌍을 추가하는 Runnable 클래스.

    이 클래스는 입력 데이터를 받아, 내부의 `RunnableParallel` 객체를 통해 특정 작업을 병렬적으로 수행하고,
    해당 결과를 원본 데이터에 병합하여 반환합니다.

    Attributes:
        mapper (RunnableParallel[Dict[str, Any]]): 입력 데이터를 변환하는 데 사용되는 RunnableParallel 객체.
            - 이 객체는 각 키에 대한 작업을 병렬적으로 실행하며, 결과를 반환합니다.

    Examples:
        - 단일 키-값 추가 작업:
            .. code-block:: python

                from langchain_core.runnables.base import RunnableLambda

                # 입력값에 10을 더하는 함수
                def add_ten(x: Dict[str, int]) -> Dict[str, int]:
                    return {"added": x["input"] + 10}

                # 매퍼 생성
                mapper = RunnableParallel(
                    {"add_step": RunnableLambda(add_ten)}
                )

                # RunnableAssign 생성
                runnable_assign = RunnableAssign(mapper)

                # 동기 실행
                result = runnable_assign.invoke({"input": 5})
                print(result)  # {'input': 5, 'add_step': {'added': 15}}

                # 비동기 실행
                result = await runnable_assign.ainvoke({"input": 5})
                print(result)  # {'input': 5, 'add_step': {'added': 15}}
    """

    mapper: RunnableParallel[Dict[str, Any]]

    def __init__(self, mapper: RunnableParallel[Dict[str, Any]], **kwargs: Any) -> None:
        """RunnableAssign 객체 초기화.

        Args:
            mapper (RunnableParallel[Dict[str, Any]]): 입력 데이터를 변환할 `RunnableParallel` 객체.
            **kwargs: 추가적인 매개변수.
                - 부모 클래스에서 필요한 추가 설정값을 받을 수 있습니다.
        """
        super().__init__(mapper=mapper, **kwargs)  # type: ignore[call-arg]

    @classmethod
    def is_lc_serializable(cls) -> bool:
        """LangChain 객체가 직렬화 가능한지 확인합니다.

        Returns:
            bool: 항상 `True`를 반환하여 이 클래스가 직렬화 가능함을 나타냅니다.
        """
        return True

    @classmethod
    def get_lc_namespace(cls) -> List[str]:
        """LangChain 객체의 네임스페이스를 반환합니다.

        Returns:
            List[str]: 이 클래스의 네임스페이스 경로.
                - 네임스페이스는 ["langchain", "schema", "runnable"]로 설정되어 있습니다.
        """
        return ["langchain", "schema", "runnable"]

    def get_name(
        self, suffix: Optional[str] = None, *, name: Optional[str] = None
    ) -> str:
        """Runnable 객체의 이름을 생성합니다.

        Args:
            suffix (Optional[str]): 이름에 추가할 접미사.
            name (Optional[str]): 직접 지정할 이름.

        Returns:
            str: 최종적으로 생성된 이름.
        """
        name = (
            name
            or self.name
            or f"RunnableAssign<{','.join(self.mapper.steps__.keys())}>"
        )
        return super().get_name(suffix, name=name)

    def _invoke(
        self,
        input: Dict[str, Any],
        run_manager: CallbackManagerForChainRun,
        config: RunnableConfig,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """입력 데이터를 변환하고 결과를 병합하여 반환합니다.

        Args:
            input (Dict[str, Any]): 처리할 입력값.
                - 반드시 딕셔너리 형식이어야 합니다.
            run_manager (CallbackManagerForChainRun): 실행 관리 객체.
                - 체인의 실행 중 콜백을 관리하는 데 사용됩니다.
            config (RunnableConfig): 실행 설정값.
            **kwargs: 추가적인 매개변수.

        Returns:
            Dict[str, Any]: 원본 데이터와 매핑 작업 결과가 병합된 최종 결과.

        Raises:
            AssertionError: 입력값이 딕셔너리가 아닌 경우 에러를 발생시킵니다.

        동작 원리:
            1. 입력값이 딕셔너리인지 검증합니다.
            2. `mapper`를 호출하여 입력 데이터를 변환합니다.
            3. 원본 입력 데이터와 변환 결과를 병합하여 반환합니다.
        """
        assert isinstance(
            input, dict
        ), "The input to RunnableAssign._invoke() must be a dict."

        return {
            **input,  # 원본 입력값 유지
            **self.mapper.invoke(
                input,
                patch_config(config, callbacks=run_manager.get_child()),  # 실행 설정 패치
                **kwargs,
            ),
        }
    def invoke(
        self,
        input: Dict[str, Any],
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """입력값을 동기적으로 처리하고 결과를 반환합니다.

        Args:
            input (Dict[str, Any]): 처리할 입력 데이터.
                - 반드시 딕셔너리 형식이어야 합니다.
            config (Optional[RunnableConfig]): 실행 설정값.
                - 설정이 제공되지 않은 경우 기본 설정이 사용됩니다.
            **kwargs: 추가적인 매개변수.

        Returns:
            Dict[str, Any]: 입력값과 매핑 작업 결과가 병합된 결과값.

        동작 원리:
            1. `_call_with_config` 메서드를 호출하여 `_invoke` 메서드와 설정값을 전달합니다.
            2. 내부적으로 `_invoke`를 통해 데이터를 처리하고 반환합니다.

        Raises:
            AssertionError: 입력값이 딕셔너리가 아닌 경우 에러를 발생시킵니다.
        """
        return self._call_with_config(self._invoke, input, config, **kwargs)

    async def ainvoke(
        self,
        input: Dict[str, Any],
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """입력값을 비동기적으로 처리하고 결과를 반환합니다.

        Args:
            input (Dict[str, Any]): 처리할 입력 데이터.
                - 반드시 딕셔너리 형식이어야 합니다.
            config (Optional[RunnableConfig]): 실행 설정값.
            **kwargs: 추가적인 매개변수.

        Returns:
            Dict[str, Any]: 입력값과 매핑 작업 결과가 병합된 결과값.

        동작 원리:
            1. `_acall_with_config` 메서드를 호출하여 `_ainvoke` 메서드와 설정값을 전달합니다.
            2. 내부적으로 `_ainvoke`를 통해 데이터를 비동기적으로 처리합니다.

        Raises:
            AssertionError: 입력값이 딕셔너리가 아닌 경우 에러를 발생시킵니다.
        """
        return await self._acall_with_config(self._ainvoke, input, config, **kwargs)

    def _transform(
        self,
        input: Iterator[Dict[str, Any]],
        run_manager: CallbackManagerForChainRun,
        config: RunnableConfig,
        **kwargs: Any,
    ) -> Iterator[Dict[str, Any]]:
        """입력 데이터를 스트림 형태로 처리합니다.

        Args:
            input (Iterator[Dict[str, Any]]): 처리할 입력 데이터의 이터레이터.
            run_manager (CallbackManagerForChainRun): 실행 관리 객체.
                - 체인의 실행 중 콜백을 관리합니다.
            config (RunnableConfig): 실행 설정값.
            **kwargs: 추가적인 매개변수.

        Returns:
            Iterator[Dict[str, Any]]: 처리된 스트림 데이터를 반환하는 이터레이터.

        동작 원리:
            1. `input` 데이터를 복제하여 두 개의 스트림으로 분리합니다.
                - `for_passthrough`: 원본 데이터를 그대로 전달하는 스트림.
                - `for_map`: 매핑 작업에 사용할 스트림.
            2. `mapper.transform`을 호출하여 `for_map` 스트림을 처리합니다.
            3. 원본 데이터(`for_passthrough`)에서 매퍼와 중복되는 키를 제거한 후 결과를 생성합니다.
            4. 매퍼의 결과(`map_output`)를 백그라운드에서 처리하고 반환합니다.

        Raises:
            AssertionError: 입력값이 딕셔너리가 아닌 경우 에러를 발생시킵니다.
        """
        # 매퍼의 키 수집
        mapper_keys = set(self.mapper.steps__.keys())

        # 입력 스트림 복제
        for_passthrough, for_map = safetee(input, 2, lock=threading.Lock())

        # 매퍼를 통해 처리된 출력 스트림 생성
        map_output = self.mapper.transform(
            for_map,
            patch_config(config, callbacks=run_manager.get_child()),
            **kwargs,
        )

        # 백그라운드에서 매퍼의 첫 번째 청크를 처리
        with get_executor_for_config(config) as executor:
            first_map_chunk_future = executor.submit(
                next, map_output, None
            )

            # 원본 데이터 스트림 처리
            for chunk in for_passthrough:
                assert isinstance(
                    chunk, dict
                ), "The input to RunnablePassthrough.assign() must be a dict."
                # 매퍼 키 제거
                filtered = AddableDict(
                    {k: v for k, v in chunk.items() if k not in mapper_keys}
                )
                if filtered:
                    yield filtered

            # 매퍼 결과 반환
            yield cast(Dict[str, Any], first_map_chunk_future.result())
            for chunk in map_output:
                yield chunk

    async def _atransform(
        self,
        input: AsyncIterator[Dict[str, Any]],
        run_manager: AsyncCallbackManagerForChainRun,
        config: RunnableConfig,
        **kwargs: Any,
    ) -> AsyncIterator[Dict[str, Any]]:
        """입력 데이터를 비동기 스트림 형태로 처리합니다.

        Args:
            input (AsyncIterator[Dict[str, Any]]): 처리할 비동기 입력 데이터.
            run_manager (AsyncCallbackManagerForChainRun): 실행 관리 객체.
            config (RunnableConfig): 실행 설정값.
            **kwargs: 추가적인 매개변수.

        Returns:
            AsyncIterator[Dict[str, Any]]: 처리된 비동기 스트림 데이터를 반환.

        동작 원리:
            1. `input` 데이터를 복제하여 두 개의 스트림으로 분리합니다.
                - `for_passthrough`: 원본 데이터를 그대로 전달하는 스트림.
                - `for_map`: 매핑 작업에 사용할 스트림.
            2. `mapper.atransform`을 호출하여 `for_map` 스트림을 비동기적으로 처리합니다.
            3. 원본 데이터(`for_passthrough`)에서 매퍼와 중복되는 키를 제거한 후 결과를 생성합니다.
            4. 매퍼의 결과(`map_output`)를 비동기적으로 반환합니다.

        Raises:
            AssertionError: 입력값이 딕셔너리가 아닌 경우 에러를 발생시킵니다.
        """
        # 매퍼의 키 수집
        mapper_keys = set(self.mapper.steps__.keys())

        # 입력 스트림 복제
        for_passthrough, for_map = atee(input, 2, lock=asyncio.Lock())

        # 매퍼를 통해 처리된 출력 스트림 생성
        map_output = self.mapper.atransform(
            for_map,
            patch_config(config, callbacks=run_manager.get_child()),
            **kwargs,
        )

        # 백그라운드에서 매퍼의 첫 번째 청크 처리
        first_map_chunk_task: asyncio.Task = asyncio.create_task(
            py_anext(map_output, None),  # type: ignore[arg-type]
        )

        # 원본 데이터 스트림 처리
        async for chunk in for_passthrough:
            assert isinstance(
                chunk, dict
            ), "The input to RunnablePassthrough.assign() must be a dict."
            # 매퍼 키 제거
            filtered = AddableDict(
                {k: v for k, v in chunk.items() if k not in mapper_keys}
            )
            if filtered:
                yield filtered

        # 매퍼 결과 반환
        yield await first_map_chunk_task
        async for chunk in map_output:
            yield chunk

class RunnablePick(RunnableSerializable[Dict[str, Any], Dict[str, Any]]):
    """입력 딕셔너리에서 지정된 키를 선택하여 반환하는 Runnable 클래스.

    RunnablePick 클래스는 입력 딕셔너리에서 특정 키를 선택적으로 추출하고,
    해당 키-값 쌍으로 구성된 새로운 딕셔너리를 반환합니다. 이를 통해
    필요한 데이터만 필터링하여 처리할 수 있습니다.

    Attributes:
        keys (Union[str, List[str]]): 추출할 키 또는 키 목록.
            - 단일 키를 문자열로 지정하거나, 다수의 키를 리스트로 지정할 수 있습니다.

    Example:
        입력값에서 'name'과 'age' 키만 선택하여 반환하는 예제:

        .. code-block:: python

            from langchain_core.runnables.passthrough import RunnablePick

            input_data = {
                'name': 'John',
                'age': 30,
                'city': 'New York',
                'country': 'USA'
            }

            # 'name'과 'age'만 추출
            runnable = RunnablePick(keys=['name', 'age'])
            output_data = runnable.invoke(input_data)

            print(output_data)  # Output: {'name': 'John', 'age': 30}
    """

    keys: Union[str, List[str]]

    def __init__(self, keys: Union[str, List[str]], **kwargs: Any) -> None:
        """RunnablePick 객체 초기화.

        Args:
            keys (Union[str, List[str]]): 추출할 키 또는 키 목록.
                - 문자열로 단일 키를 지정하거나, 리스트로 다수의 키를 지정합니다.
            **kwargs: 추가적인 매개변수. 부모 클래스에서 필요한 설정값을 받을 수 있습니다.
        """
        super().__init__(keys=keys, **kwargs)  # type: ignore[call-arg]

    @classmethod
    def is_lc_serializable(cls) -> bool:
        """LangChain 객체가 직렬화 가능한지 확인합니다.

        Returns:
            bool: 항상 `True`를 반환하여 이 클래스가 직렬화 가능함을 나타냅니다.
        """
        return True

    @classmethod
    def get_lc_namespace(cls) -> List[str]:
        """LangChain 객체의 네임스페이스를 반환합니다.

        Returns:
            List[str]: 이 클래스의 네임스페이스 경로.
                - 네임스페이스는 ["langchain", "schema", "runnable"]로 설정되어 있습니다.
        """
        return ["langchain", "schema", "runnable"]

    def get_name(
        self, suffix: Optional[str] = None, *, name: Optional[str] = None
    ) -> str:
        """Runnable 객체의 이름을 생성합니다.

        Args:
            suffix (Optional[str]): 이름에 추가할 접미사.
            name (Optional[str]): 직접 지정할 이름.

        Returns:
            str: 최종적으로 생성된 이름.
        """
        name = (
            name
            or self.name
            or f"RunnablePick<{','.join([self.keys] if isinstance(self.keys, str) else self.keys)}>"  # noqa: E501
        )
        return super().get_name(suffix, name=name)

    def _pick(self, input: Dict[str, Any]) -> Any:
        """입력 딕셔너리에서 지정된 키를 선택합니다.

        Args:
            input (Dict[str, Any]): 처리할 입력 딕셔너리.

        Returns:
            Any: 단일 키를 선택한 경우 해당 값, 또는 여러 키를 선택한 경우 새로운 딕셔너리.

        Raises:
            AssertionError: 입력값이 딕셔너리가 아닌 경우 에러 발생.

        동작 원리:
            1. 입력값이 딕셔너리인지 검증합니다.
            2. 단일 키(`str`)인 경우 해당 키의 값을 반환합니다.
            3. 다수의 키(`List[str]`)인 경우, 해당 키-값 쌍만 포함하는 딕셔너리를 생성하여 반환합니다.
        """
        assert isinstance(
            input, dict
        ), "The input to RunnablePassthrough.assign() must be a dict."

        if isinstance(self.keys, str):
            return input.get(self.keys)
        else:
            picked = {k: input.get(k) for k in self.keys if k in input}
            if picked:
                return AddableDict(picked)
            else:
                return None

    def invoke(
        self,
        input: Dict[str, Any],
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """입력값을 동기적으로 처리하고 지정된 키만 추출하여 반환합니다.

        Args:
            input (Dict[str, Any]): 처리할 입력 딕셔너리.
            config (Optional[RunnableConfig]): 실행 설정값.
                - 설정값이 제공되지 않은 경우 기본 설정이 사용됩니다.
            **kwargs: 추가적인 매개변수.

        Returns:
            Dict[str, Any]: 선택된 키와 해당 값을 포함하는 딕셔너리.

        동작 원리:
            1. `_call_with_config` 메서드를 호출하여 `_invoke`와 설정값을 전달합니다.
            2. `_pick` 메서드를 사용해 입력값에서 지정된 키만 추출합니다.
        """
        return self._call_with_config(self._invoke, input, config, **kwargs)

    async def ainvoke(
        self,
        input: Dict[str, Any],
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> Dict[str, Any]:
        """입력값을 비동기적으로 처리하고 지정된 키만 추출하여 반환합니다.

        Args:
            input (Dict[str, Any]): 처리할 입력 딕셔너리.
            config (Optional[RunnableConfig]): 실행 설정값.
            **kwargs: 추가적인 매개변수.

        Returns:
            Dict[str, Any]: 선택된 키와 해당 값을 포함하는 딕셔너리.

        동작 원리:
            1. `_acall_with_config` 메서드를 호출하여 `_ainvoke`와 설정값을 전달합니다.
            2. `_pick` 메서드를 사용해 입력값에서 지정된 키만 추출합니다.
        """
        return await self._acall_with_config(self._ainvoke, input, config, **kwargs)

    def transform(
        self,
        input: Iterator[Dict[str, Any]],
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> Iterator[Dict[str, Any]]:
        """입력값을 스트림 형태로 처리하여 선택된 키만 반환합니다.

        Args:
            input (Iterator[Dict[str, Any]]): 처리할 입력 데이터의 이터레이터.
            config (Optional[RunnableConfig]): 실행 설정값.
            **kwargs: 추가적인 매개변수.

        Returns:
            Iterator[Dict[str, Any]]: 선택된 키와 값을 포함하는 스트림.
        """
        yield from self._transform_stream_with_config(
            input, self._transform, config, **kwargs
        )

    async def atransform(
        self,
        input: AsyncIterator[Dict[str, Any]],
        config: Optional[RunnableConfig] = None,
        **kwargs: Any,
    ) -> AsyncIterator[Dict[str, Any]]:
        """입력값을 비동기 스트림 형태로 처리하여 선택된 키만 반환합니다.

        Args:
            input (AsyncIterator[Dict[str, Any]]): 처리할 비동기 입력 데이터.
            config (Optional[RunnableConfig]): 실행 설정값.
            **kwargs: 추가적인 매개변수.

        Returns:
            AsyncIterator[Dict[str, Any]]: 선택된 키와 값을 포함하는 비동기 스트림.
        """
        async for chunk in self._atransform_stream_with_config(
            input, self._atransform, config, **kwargs
        ):
            yield chunk