그때그때 CS 정리

ProcessPoolExecutor의 코드 및 동작원리

필만이 2024. 11. 23. 00:09

1. 전반적인 동작 방식

1.1. 초기화

  • 초기화 시, 다음과 같은 중요한 속성들이 설정됩니다:
    • max_workers: 사용할 최대 워커 프로세스 수를 결정. 기본값은 CPU 코어 수를 기반으로 설정.
    • mp_context: 멀티프로세싱 컨텍스트(spawn, fork 등) 설정. 플랫폼 및 옵션에 따라 안전한 방식을 사용.
    • max_tasks_per_child: 각 워커가 처리할 최대 작업 수 설정. 초과 시 프로세스는 종료되고 새로운 프로세스가 생성됨.

1.2. 작업 제출 (submit)

  • 사용자로부터 작업이 제출되면 다음 과정이 진행됩니다:
    1. Future 객체 생성: 작업의 결과를 비동기적으로 관리하기 위한 객체.
    2. 작업 항목 생성 및 저장: _WorkItem으로 작업 정의 후 대기 목록(_pending_work_items)에 추가.
    3. 작업 ID 큐에 추가: 작업 추적을 위해 ID를 큐에 추가.
    4. 프로세스 관리:
      • 유휴 워커가 없을 경우 새로운 워커를 생성 (_adjust_process_count).
      • 관리 스레드가 없으면 생성 및 시작 (_start_executor_manager_thread).

1.3. 작업 처리

  • 워커 프로세스는 _process_worker 함수에서 다음을 수행:
    1. 작업 요청 수신: 작업 요청 큐(_call_queue)에서 작업을 가져옴.
    2. 작업 실행: 작업 항목의 함수와 인자를 사용해 결과를 생성.
    3. 결과 반환: 결과 큐(_result_queue)를 통해 결과 또는 예외를 반환.
    4. 작업 수 초과 처리: max_tasks_per_child 설정이 있다면, 워커는 작업 수 초과 시 종료.

1.4. 작업 결과 수집

  • 관리 스레드(_ExecutorManagerThread)는 워커와 통신하여 결과를 수집:
    • 결과 큐에서 데이터를 읽어 Future 객체에 결과 또는 예외를 설정.
    • 실패한 작업이나 손상된 워커를 처리하여 안정성을 유지.

1.5. 종료 (shutdown)

  • Executor가 종료 요청을 받으면 다음 과정을 수행:
    1. 종료 상태 설정: _shutdown_thread 플래그를 설정하고 모든 작업이 종료되었음을 관리 스레드에 알림.
    2. 작업 취소: cancel_futures가 True인 경우 대기 중인 Future 객체를 취소.
    3. 워커 종료: 종료 신호(sentinel)를 워커들에게 보내 모든 프로세스를 안전하게 종료.
    4. 리소스 정리: 통신 채널, 관리 스레드, 프로세스 참조 등 해제.

2. 주요 특징

2.1. 병렬 작업 관리

  • 작업 큐결과 큐를 사용해 작업 요청과 결과 반환을 분리하여 병렬 작업을 효율적으로 관리.
  • 작업 제출과 결과 수집이 비동기로 처리되며, Future 객체를 통해 작업 상태를 추적 가능.

2.2. 유휴 워커 관리

  • Semaphore를 활용해 유휴 워커 상태를 확인.
  • 유휴 워커가 없을 경우 새로운 프로세스를 동적으로 생성.

2.3. 프로세스 안정성

  • 워커 손상 시 BrokenProcessPool 예외를 통해 호출자에게 알림.
  • max_tasks_per_child 설정으로 오래 실행된 프로세스를 주기적으로 재생성해 안정성 확보.

2.4. 플랫폼 호환성

  • mp_context를 활용해 플랫폼별 적합한 멀티프로세싱 방식(spawn, fork)을 선택.
  • Windows 환경에서는 spawn 방식을 기본으로 설정해 안정성을 높임.

3. 동작 흐름 요약

3.1. 작업 제출

  1. 사용자로부터 작업이 제출되면 작업 항목을 생성하고 작업 ID를 큐에 추가.
  2. Future 객체를 반환하여 작업 결과를 비동기로 확인할 수 있도록 설정.
  3. 필요 시 새로운 워커 프로세스를 생성.

3.2. 작업 처리

  1. 워커 프로세스가 작업 요청 큐에서 작업을 가져와 실행.
  2. 실행 결과 또는 예외를 결과 큐를 통해 반환.

3.3. 결과 수집

  1. 관리 스레드가 결과 큐를 통해 결과를 수집.
  2. 결과를 Future 객체에 설정하여 호출자가 결과를 확인 가능.

3.4. 종료

  1. 모든 작업이 완료되었거나 취소된 후 워커 종료.
  2. 통신 채널, 관리 스레드 등 내부 리소스 정리.

4. 장점

  • 병렬 처리 효율성: CPU 코어를 최대한 활용.
  • 안정성: 손상된 워커를 처리하고, 오래 실행된 프로세스를 재생성.
  • 유연성: 사용자 정의 초기화 함수, 작업 제한, 동적 워커 생성 지원.
  • 플랫폼 호환성: 다양한 멀티프로세싱 방식(spawn, fork) 지원.

5. 단점 및 제한

  • 프로세스 생성 비용: 워커 프로세스 생성은 스레드보다 비용이 큼.
  • 공유 메모리 제약: 프로세스 간 통신은 큐를 통해 이루어지므로 대규모 데이터 공유에 비효율적.
  • 복잡성: 관리 스레드, 큐, 락 등 여러 메커니즘을 사용해 코드가 복잡.

ThreadPoolExecutor와 비교

1. 실행 단위

  • ProcessPoolExecutor:
    • 프로세스를 실행 단위로 사용.
    • 각 작업이 별도의 프로세스에서 실행되며, 각 프로세스는 독립된 메모리 공간을 가짐.
    • multiprocessing 모듈을 기반으로 동작.
  • ThreadPoolExecutor:
    • 스레드를 실행 단위로 사용.
    • 스레드는 동일한 프로세스 내에서 실행되며, 공유 메모리를 사용.
    • threading 모듈을 기반으로 동작.

해석

  • ProcessPoolExecutor는 작업이 독립적인 프로세스에서 실행되므로, GIL(Global Interpreter Lock)의 영향을 받지 않음.
  • ThreadPoolExecutor는 GIL의 영향을 받으므로, CPU 바운드 작업에서 비효율적일 수 있음.

2. 메모리 구조

  • ProcessPoolExecutor:
    • 각 프로세스는 독립된 메모리 공간을 사용.
    • 프로세스 간 데이터를 전달하기 위해 또는 피클링(serialization)이 필요.
    • 데이터 전송은 IPC(Inter-Process Communication)를 통해 이루어지며, 비용이 크다.
  • ThreadPoolExecutor:
    • 모든 스레드는 동일한 메모리 공간을 공유.
    • 데이터 전달은 메모리 복사를 하지 않고 직접 접근 가능하므로 빠름.

해석

  • ProcessPoolExecutor는 메모리 고립을 통해 안정성이 높지만, 데이터 전달 비용이 큼.
  • ThreadPoolExecutor메모리 공유로 데이터 전달이 빠르지만, 잘못된 접근으로 충돌 위험이 있음.

3. 작업 처리

  • ProcessPoolExecutor:
    1. 작업이 제출되면 작업 요청 큐에 추가.
    2. 워커 프로세스가 큐에서 작업을 가져와 실행.
    3. 결과는 결과 큐를 통해 전달.
    4. 워커는 지정된 작업 수(max_tasks_per_child)를 초과하면 종료되고 새로 생성.
  • ThreadPoolExecutor:
    1. 작업이 제출되면 스레드풀에서 유휴 스레드를 선택.
    2. 스레드가 작업을 실행하고 결과를 반환.
    3. 스레드는 작업이 끝난 후 재사용되며, 종료되거나 새로 생성되지 않음.

4. 표 비교

특징 ProcessPoolExecutor ThreadPoolExecutor
실행 단위 프로세스 스레드
메모리 구조 독립 메모리 공간 공유 메모리 공간
데이터 전달 비용 높음 (큐, 피클링 필요) 낮음 (직접 메모리 접근)
작업 유형 CPU 바운드 작업 I/O 바운드 작업
GIL 영향 없음 있음
워커 관리 동적 워커 생성/종료 스레드 재사용 (고정 스레드풀)
플랫폼 호환성 플랫폼별 멀티프로세싱 방식 지원 (spawn, fork) 플랫폼 독립적
장기 실행 안정성 높음 (워커가 재생성됨) 중간 (스레드 재생성 없음)
작업 처리 비용 높음 (프로세스 생성 및 데이터 전송) 낮음 (스레드 생성 및 공유 메모리 접근)

코드

# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.

"""ProcessPoolExecutor를 구현한 모듈입니다.

아래 다이어그램과 설명은 시스템의 데이터 흐름을 나타냅니다:

|======================= 프로세스 내부 ===================|== 프로세스 외부 ==|

+----------+     +----------+       +--------+     +-----------+    +---------+
|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
|          |     +----------+       |        |     +-----------+    |  Pool   |
|          |     | ...      |       |        |     | ...       |    +---------+
|          |     | 6        |    => |        |  => | 5, call() | => |         |
|          |     | 7        |       |        |     | ...       |    |         |
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
| Executor |                        | Thread |                      |         |
|          |     +----------- +     |        |     +-----------+    |         |
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
|          |     +------------+     |        |     +-----------+    |         |
|          |     | 6: call()  |     |        |     | ...       |    |         |
|          |     |    future  |     |        |     | 4, result |    |         |
|          |     | ...        |     |        |     | 3, except |    |         |
+----------+     +------------+     +--------+     +-----------+    +---------+

Executor.submit() 호출 시:
- 고유 번호를 가진 `_WorkItem`을 생성하고 "Work Items" 딕셔너리에 추가
- 생성된 `_WorkItem`의 ID를 "Work Ids" 큐에 추가

로컬 워커 스레드(Local worker thread):
- "Work Ids" 큐에서 작업 ID를 읽고 "Work Items" 딕셔너리에서 해당 WorkItem을 조회:
  - 작업 항목이 취소된 경우 딕셔너리에서 제거
  - 그렇지 않으면 `_CallItem`으로 재구성하여 "Call Q"에 추가
- 새로운 `_CallItem`은 "Call Q"가 가득 차지 않을 때까지 추가됨.
  - **참고**: "Call Q"의 크기는 작게 유지됨. "Call Q"에 들어간 호출은 `Future.cancel()`로 취소할 수 없기 때문.
- "Result Q"에서 `_ResultItems`를 읽어 "Work Items" 딕셔너리에 저장된 Future 객체를 업데이트한 후 해당 항목 삭제.

프로세스 #1..n:
- "Call Q"에서 `_CallItems`를 읽어 실행
- 실행 결과를 `_ResultItems`로 구성하여 "Result Q"에 추가
"""


__author__ = 'Brian Quinlan (brian@sweetapp.com)'

import os
from concurrent.futures import _base
import queue
import multiprocessing as mp
import multiprocessing.connection
from multiprocessing.queues import Queue
import threading
import weakref
from functools import partial
import itertools
import sys
from traceback import format_exception


_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False

# 목적:스레드를 대기 상태에서 깨우는 메커니즘을 제공하는 클래스. 파이프(Pipe)를 사용해 스레드 간 간단한 통신을 구현.
class _ThreadWakeup:
    def __init__(self):
        self._closed = False
        # 단방향 파이프(Pipe) 생성 (스레드 간 통신에 사용)
        self._reader, self._writer = mp.Pipe(duplex=False)

    def close(self):
        # 파이프를 닫아 리소스를 해제
        if not self._closed:
            self._closed = True
            self._writer.close()  # 쓰기용 파이프 닫기
            self._reader.close()  # 읽기용 파이프 닫기

    def wakeup(self):
        # 파이프를 통해 스레드에 신호를 보내 깨어나도록 유도
        if not self._closed:
            self._writer.send_bytes(b"")  # 빈 바이트 데이터 전송

    def clear(self):
        # 파이프에 대기 중인 신호를 모두 제거
        if not self._closed:
            while self._reader.poll():  # 읽을 데이터가 있는 동안
                self._reader.recv_bytes()  # 데이터를 읽어서 제거


def _python_exit():
    global _global_shutdown
    _global_shutdown = True  # 글로벌 상태를 종료로 설정

    # 등록된 스레드와 관련된 wakeup 객체를 모두 가져옴
    items = list(_threads_wakeups.items())

    # 모든 wakeup 객체에 신호를 보내 대기 중인 스레드들을 깨움
    for _, thread_wakeup in items:
        thread_wakeup.wakeup()

    # 각 스레드가 종료될 때까지 대기
    for t, _ in items:
        t.join()

# 모든 non-daemon 스레드가 종료되기 전에 `_python_exit()`를 호출하도록 등록.
# 이는 `atexit.register()` 대신 사용되며, 하위 인터프리터와의 호환성을 위해 설계됨.
# 하위 인터프리터는 더 이상 데몬 스레드를 지원하지 않음. 관련 내용은 bpo-39812 참조.
threading._register_atexit(_python_exit)

# Call queue에 프로세스 수보다 더 많은 호출을 대기시키는 수를 제어.
# - 작은 값: 프로세스가 작업을 기다리는 시간이 늘어남.
# - 큰 값: Call queue에 있는 작업은 `Future.cancel()`로 취소 불가, 따라서 취소 성공률 감소.
EXTRA_QUEUED_CALLS = 1

# Windows에서는 `WaitForMultipleObjects`를 사용하여 프로세스 종료를 대기.
# 최대 63개의 객체를 대기할 수 있음.
# 추가로 아래 두 객체의 오버헤드가 있음:
# - 결과 큐 리더 (result queue reader)
# - 스레드 웨이크업 리더 (thread wakeup reader)
_MAX_WINDOWS_WORKERS = 63 - 2

# 원격 트레이스백을 로컬 트레이스백에 문자열 형태로 포함시키는 해킹 클래스.
# _RemoteTraceback은 병렬 처리 또는 원격 호출에서 원격 프로세스의 에러를 추적하고, 디버깅을 용이하게 하기 위해 
# 사용되는 유틸리티 클래스입니다. 원격 프로세스에서 발생한 예외를 부모 프로세스에서 확인해야 하는 경우 유용
class _RemoteTraceback(Exception):
    def __init__(self, tb):
        self.tb = tb  # 원격 프로세스에서 생성된 트레이스백 저장
    def __str__(self):
        return self.tb  # 트레이스백을 문자열로 반환

# 원격/비동기 작업 중 발생한 예외와 트레이스백을 포맷팅하고, 이후 재구성 가능하도록 관리.
# 동작 : 예외와 트레이스백을 문자열로 변환해 저장 및 복구를 위한 로직 제공.
class _ExceptionWithTraceback:
    def __init__(self, exc, tb):
        # 트레이스백 객체를 받아, traceback.format_exception으로 포맷팅된 문자열로 변환
        tb = ''.join(format_exception(type(exc), exc, tb))  # 예외와 트레이스백 포맷팅
        self.exc = exc  # 예외 객체 저장

        # 트레이스백 객체는 메모리에서 삭제, 참조를 방지하여 가비지 컬렉션 가능
        # 순환 참조를 방지하고 메모리 누수를 막기 위해 예외 객체의 트레이스백 참조를 제거
        self.exc.__traceback__ = None
        # 포맷팅된 트레이스백을 문자열로 저장
        self.tb = '\n"""\n%s"""' % tb  # 문자열 형태로 변환

    # 이 메서드는 객체를 저장할 때 self 전체를 직접 저장하지 않고, 대신 이 객체를 복원하는 데 필요한 정보와 로직만 저장하도록 만듬
    # _rebuild_exc 함수와 예외 객체 및 트레이스백 문자열을 반환해, 객체를 역직렬화하거나 복구할 때 사용.
    def __reduce__(self):
        # _rebuild_exc: 직렬화된 데이터를 기반으로 예외 객체를 재구성하기 위한 함수입니다.
        # (self.exc, self.tb): 예외 객체와 포맷팅된 트레이스백 문자열을 _rebuild_exc 함수에 전달
        return _rebuild_exc, (self.exc, self.tb)

# 예외 객체(exc)와 포맷팅된 트레이스백(tb)을 받아 원래의 예외 객체를 복구.
# exc.__cause__에 _RemoteTraceback(tb)를 추가해 원격 트레이스백을 포함한 예외 객체로 반환.
def _rebuild_exc(exc, tb):
    exc.__cause__ = _RemoteTraceback(tb)  # 원격 트레이스백 추가
    return exc  # 예외 객체 반환

# 비동기 작업의 실행을 위해 작업과 관련된 정보를 저장(실행할 함수, 인자, 결과 저장을 위한 Future 객체 등)
# 동작 : 작업을 정의하고 Future를 통해 작업 결과 추적.
class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
        self.future = future  # 작업 결과를 저장할 Future 객체
        self.fn = fn          # 실행할 함수
        self.args = args      # 함수의 위치 인자
        self.kwargs = kwargs  # 함수의 키워드 인자

# 작업 실행 결과, 발생한 예외, 종료 프로세스 ID 등 작업의 결과 정보를 저장.
# 동작 : 작업 ID 기반으로 결과와 예외를 추적 및 관리.
class _ResultItem(object):
    def __init__(self, work_id, exception=None, result=None, exit_pid=None):
        self.work_id = work_id    # 작업 ID
        self.exception = exception  # 작업 실행 중 발생한 예외
        self.result = result      # 작업 실행 결과
        self.exit_pid = exit_pid  # 작업을 종료한 프로세스 ID (필요한 경우)

# 작업에서 호출할 함수와 해당 인자 정보를 저장.
# 동작 : 함수 호출의 세부 정보 저장 및 실행을 위한 기본 구조 제공.
class _CallItem(object):
    def __init__(self, work_id, fn, args, kwargs):
        self.work_id = work_id  # 작업 ID
        self.fn = fn            # 실행할 함수
        self.args = args        # 함수의 위치 인자
        self.kwargs = kwargs    # 함수의 키워드 인자

# 큐에서 작업 항목을 처리 중 발생하는 예외를 안전하게 관리하고,
# 작업 항목과 연결된 Future 객체에 예외 정보를 설정.
# 스레드 간 동기화와 작업 종료 시 안전한 관리를 지원.
class _SafeQueue(Queue):

    # 큐 초기화와 함께, 작업 항목(pending_work_items), 종료 동기화 잠금(shutdown_lock), 
    # 스레드 웨이크업 메커니즘(thread_wakeup)을 설정
    def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
                 thread_wakeup):
        self.pending_work_items = pending_work_items  # 처리 중인 작업 항목들을 관리하는 딕셔너리.
        self.shutdown_lock = shutdown_lock            # 종료 시 동기화를 보장하기 위한 잠금 객체.
        self.thread_wakeup = thread_wakeup            # 대기 중인 스레드를 깨우는 메커니즘.
        super().__init__(max_size, ctx=ctx)           # 부모 클래스 초기화

    # 큐 피더(feed) 중 예외가 발생했을 때 이를 처리.
    def _on_queue_feeder_error(self, e, obj):
        # 큐 작업 중 예외가 발생했을 때 호출. 예외와 연관된 작업 항목(_CallItem)의 Future 객체에 
        # 예외 정보를 설정하고, 작업 항목을 제거. 스레드 웨이크업 메커니즘을 호출해 대기 중인 스레드가 예외 상황에 반응하도록 처리.

        # obj가 _CallItem 객체인지 확인해, 큐에 추가된 작업 항목에서 발생한 예외인지 판단
        # _CallItem은 실행할 함수(fn)와 관련 인자(args, kwargs), 그리고 작업 ID(work_id)를 포함한 작업 정의 객체
        if isinstance(obj, _CallItem):  # 호출 항목일 경우
            # format_exception을 사용해 예외(e)와 그 트레이스백 정보를 문자열로 변환.
            # 이 문자열은 사람이 읽기 쉽고, 디버깅에 유용한 형태로 저장
            tb = format_exception(type(e), e, e.__traceback__)  # 예외 정보 포맷팅
            # _RemoteTraceback 객체를 생성해 예외 객체의 __cause__로 설정.
            # 이 작업은 원격 작업이나 비동기 작업에서 발생한 예외를 추적 가능하게 만듦.
            # 트레이스백 문자열(tb)을 포맷팅해 포함
            e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))

            # pending_work_items 딕셔너리에서 작업 ID(obj.work_id)에 해당하는 작업 항목을 제거.
            # 작업 ID는 _CallItem 생성 시 지정된 고유 값으로, 작업 항목을 관리하는 데 사용.
            # 작업 항목을 제거함으로써, 실패한 작업이 다시 실행되지 않도록 보장
            work_item = self.pending_work_items.pop(obj.work_id, None)

            # shutdown_lock을 사용해 멀티스레드 환경에서 동기화 보장.
            # 작업 항목을 수정하거나 스레드를 깨울 때, 다른 스레드와 충돌하지 않도록 보호.
            with self.shutdown_lock:
                # 스레드를 깨워 대기 중인 스레드가 예외 상황에 즉각 반응할 수 있도록 처리
                self.thread_wakeup.wakeup() 
            # 제거된 작업 항목(work_item)이 존재하는지 확인
            if work_item is not None:
                # 작업 항목이 존재하면, Future 객체에 예외를 설정
                work_item.future.set_exception(e)
        else:
            super()._on_queue_feeder_error(e, obj)  # 부모 클래스의 기본 처리 호출

# 반복 가능한 객체를 지정된 크기(chunksize)로 분할해 청크 단위로 작업을 수행할 수 있도록 제공.
def _get_chunks(*iterables, chunksize):
    """zip()으로 묶인 반복 가능한 객체를 청크로 나눔."""
    it = zip(*iterables)  # 여러 반복 가능한 객체를 묶음
    while True:
        # itertools.islice를 사용해 반복 가능한 객체를 슬라이싱. 입력 데이터를 chunksize 단위로 나눈 튜플을 생성하고 반환
        chunk = tuple(itertools.islice(it, chunksize))
        if not chunk:  # 더 이상 데이터가 없으면 종료
            return
        yield chunk  # 생성된 청크 반환

# 특정 함수(fn)를 입력 데이터 청크의 각 항목에 대해 실행. 입력 데이터를 청크 단위로 병렬 처리하기 위한 기초 제공
def _process_chunk(fn, chunk):
    """작업 청크를 처리하는 함수.

    주어진 함수(fn)를 청크의 각 항목에 대해 실행합니다.

    Args:
        fn: 청크 내 각 항목에 대해 실행할 함수.
        chunk: 처리할 데이터 청크.

    Returns:
        처리된 청크 결과 리스트.
    """
    return [fn(*args) for args in chunk]  # 청크의 각 항목에 대해 함수를 실행하고 결과를 반환

# 작업의 결과(result) 또는 예외(exception)를 안전하게 결과 큐(result_queue)로 반환.
# 작업의 **ID(work_id)**와 함께 결과를 관리.
def _sendback_result(result_queue, work_id, result=None, exception=None,
                     exit_pid=None):
    """결과 또는 예외를 안전하게 결과 큐로 반환합니다.

    Args:
        result_queue: 결과를 저장할 큐.
        work_id: 작업 ID.
        result: 작업 실행 결과(기본값 None).
        exception: 작업 실행 중 발생한 예외(기본값 None).
        exit_pid: 작업을 종료한 프로세스 ID(기본값 None).
    """
    try:
        # 작업 결과를 `_ResultItem` 형태로 결과 큐에 추가
        result_queue.put(_ResultItem(work_id, result=result,
                                     exception=exception, exit_pid=exit_pid))
    except BaseException as e:
        # 예외 발생 시 `_ExceptionWithTraceback`을 통해 예외 정보를 처리
        exc = _ExceptionWithTraceback(e, e.__traceback__)
        result_queue.put(_ResultItem(work_id, exception=exc,
                                     exit_pid=exit_pid))

# 작업 큐(call_queue)에서 작업을 가져와 처리하고 결과를 결과 큐(result_queue)로 반환.
# 프로세스별 워커로 실행되며, 초기화 함수와 최대 작업 수 등을 관리.
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
    """
    작업 큐에서 작업을 읽고 결과를 결과 큐에 반환하는 워커 함수.

    이 함수는 별도의 프로세스에서 실행되며, 작업 큐(call_queue)에서 작업 항목을 가져와 실행하고
    결과를 결과 큐(result_queue)에 반환합니다. 작업 실행 중 예외가 발생한 경우, 해당 예외를 결과 큐에 반환합니다.

    Args:
        call_queue: 작업이 저장된 큐(`_CallItem` 객체).
        result_queue: 작업 결과가 저장될 큐(`_ResultItem` 객체).
        initializer: 워커 초기화 시 호출할 함수(없으면 None).
        initargs: 초기화 함수의 인자(튜플 형태).
        max_tasks: 프로세스가 처리할 최대 작업 수. None인 경우 무제한 실행.
    """
    # 초기화 함수가 있을 경우 실행
    if initializer is not None:
        try:
            # initializer에 전달받은 인자를 사용하여 초기화 작업 실행
            initializer(*initargs)
        except BaseException:
            # 초기화 중 예외 발생 시 로깅 및 프로세스 종료
            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
            return  # 부모 프로세스에 풀(pool)의 손상을 알림

    num_tasks = 0  # 현재 워커가 처리한 작업 수
    exit_pid = None  # 작업 종료 시 현재 프로세스 ID 저장

    while True:
        # 작업 큐에서 작업 항목(_CallItem 객체)을 가져옴 (blocking)
        call_item = call_queue.get(block=True)

        # 작업 항목이 None일 경우 워커 종료 신호로 간주
        if call_item is None:
            result_queue.put(os.getpid())  # 현재 프로세스 ID를 결과 큐에 추가
            return  # 프로세스 종료

        # 최대 작업 수 제한이 있는 경우 처리된 작업 수를 증가
        if max_tasks is not None:
            num_tasks += 1
            if num_tasks >= max_tasks:
                exit_pid = os.getpid()  # 최대 작업 수 초과 시 현재 프로세스 ID 저장

        try:
            # 작업 실행: call_item에 저장된 함수(fn)와 인자(args, kwargs)를 실행
            r = call_item.fn(*call_item.args, **call_item.kwargs)
        except BaseException as e:
            # 작업 실행 중 예외 발생 시 예외를 결과 큐에 반환
            exc = _ExceptionWithTraceback(e, e.__traceback__)  # 예외와 트레이스백 포맷팅
            _sendback_result(
                result_queue,
                call_item.work_id,
                exception=exc,
                exit_pid=exit_pid
            )
        else:
            # 작업 성공 시 결과를 결과 큐에 반환
            _sendback_result(
                result_queue,
                call_item.work_id,
                result=r,
                exit_pid=exit_pid
            )
            del r  # 작업 결과(r)를 삭제하여 메모리 해제

        # 작업 항목(call_item)을 삭제하여 메모리 해제
        del call_item

        # 최대 작업 수 초과 시 프로세스 종료
        if exit_pid is not None:
            return  # 프로세스 종료


class _ExecutorManagerThread(threading.Thread):
    """이 프로세스와 워커 프로세스 간의 통신을 관리하는 관리자 스레드.

    이 스레드는 로컬 스레드에서 실행됩니다.

    Args:
        executor: 이 스레드의 소유자인 ProcessPoolExecutor에 대한 참조.
                  이 참조는 약한 참조(weakref)를 통해 관리됩니다.
    """

    def __init__(self, executor):
        # Executor의 내부 객체 참조를 저장
        # 스레드 웨이크업 메커니즘
        self.thread_wakeup = executor._executor_manager_thread_wakeup
        self.shutdown_lock = executor._shutdown_lock

        # 약한 참조를 사용하여 ProcessPoolExecutor에 대한 참조 저장
        def weakref_cb(_,
                       thread_wakeup=self.thread_wakeup,
                       shutdown_lock=self.shutdown_lock):
            mp.util.debug('Executor collected: triggering callback for '
                          'QueueManager wakeup')
            with shutdown_lock:
                thread_wakeup.wakeup()  # 종료 시 스레드를 깨움
        #  executor를 약한 참조로 만들고, 객체가 삭제될 때 weakref_cb 함수를 호출
        self.executor_reference = weakref.ref(executor, weakref_cb)

        # 워커로 사용할 ctx.Process 인스턴스 목록
        self.processes = executor._processes

        # 작업 요청이 저장될 큐
        self.call_queue = executor._call_queue

        # 작업 결과가 저장될 큐
        self.result_queue = executor._result_queue

        # 작업 ID가 저장된 큐
        self.work_ids_queue = executor._work_ids

        # 워커 프로세스가 안전하게 종료되기 전 처리할 수 있는 최대 작업 수
        self.max_tasks_per_child = executor._max_tasks_per_child

        # 작업 ID를 _WorkItem 객체에 매핑하는 딕셔너리
        self.pending_work_items = executor._pending_work_items

        super().__init__()  # 부모 클래스 초기화

    def run(self):
        """Executor 관리 스레드의 메인 루프."""
        while True:
            self.add_call_item_to_queue()  # 작업 항목을 큐에 추가

            # 결과 큐에서 결과를 기다리거나, 워커 프로세스 종료 또는 웨이크업 신호를 대기
            result_item, is_broken, cause = self.wait_result_broken_or_wakeup()

            if is_broken:
                # 큐가 손상된 경우 종료 처리
                self.terminate_broken(cause)
                return

            if result_item is not None:
                # 결과 항목 처리
                self.process_result_item(result_item)

                process_exited = result_item.exit_pid is not None
                if process_exited:
                    # 종료된 워커 프로세스를 정리
                    p = self.processes.pop(result_item.exit_pid)
                    p.join()

                # 결과 항목의 참조를 제거하여 메모리 관리
                del result_item

                # Executor 참조를 통해 프로세스를 조정
                if executor := self.executor_reference():
                    if process_exited:
                        with self.shutdown_lock:
                            executor._adjust_process_count()  # 프로세스 수 조정
                    else:
                        executor._idle_worker_semaphore.release()  # 워커 상태 갱신
                    del executor

            if self.is_shutting_down():
                # 종료 상태로 표시
                self.flag_executor_shutting_down()

                # 남아 있는 작업 항목이 없을 때까지 반복적으로 작업 큐를 비움
                self.add_call_item_to_queue()

                # 더 이상 대기 작업이 없으면 내부 정리 후 스레드 종료
                if not self.pending_work_items:
                    self.join_executor_internals()
                    return

    def add_call_item_to_queue(self):
        """작업 큐에 _WorkItems를 추가."""
        while True:
            if self.call_queue.full():  # 작업 큐가 가득 찬 경우 종료
                return
            try:
                # 작업 ID를 작업 ID 큐에서 가져옴
                work_id = self.work_ids_queue.get(block=False)
            except queue.Empty:
                # 작업 ID 큐가 비어 있으면 종료
                return
            else:
                # 작업 ID에 해당하는 _WorkItem 가져오기
                work_item = self.pending_work_items[work_id]

                # 작업이 실행 가능한 상태인지 확인
                if work_item.future.set_running_or_notify_cancel():
                    # 실행 가능한 경우 작업 큐에 _CallItem 추가
                    self.call_queue.put(_CallItem(work_id,
                                                  work_item.fn,
                                                  work_item.args,
                                                  work_item.kwargs),
                                        block=True)
                else:
                    # 작업이 취소된 경우 딕셔너리에서 제거
                    del self.pending_work_items[work_id]
                    continue

    def wait_result_broken_or_wakeup(self):
        """
        결과 대기, 워커 상태 확인 또는 웨이크업 신호 대기.

        이 메서드는 작업의 결과를 대기하거나 워커 프로세스가 정상인지 확인하며,
        웨이크업 신호가 발생하면 즉시 처리합니다.

        Returns:
            result_item: 결과 큐에서 읽은 결과 항목(없으면 None).
            is_broken: 시스템이 손상되었는지 여부(True: 손상됨, False: 정상).
            cause: 예외가 발생한 경우 예외의 트레이스백 문자열(없으면 None).
        """
        # 결과 큐의 리더(reader) 파이프 핸들
        result_reader = self.result_queue._reader

        # 웨이크업 메커니즘이 닫혀 있지 않은지 확인
        assert not self.thread_wakeup._closed

        # 웨이크업 신호의 리더(reader) 파이프 핸들
        wakeup_reader = self.thread_wakeup._reader

        # `mp.connection.wait`로 대기할 리더 목록 설정
        # - 결과 큐 리더(result_reader)
        # - 웨이크업 리더(wakeup_reader)
        readers = [result_reader, wakeup_reader]

        # 워커 프로세스의 sentinel(프로세스 종료 감지 핸들) 추가
        # - 각 프로세스의 sentinel을 워커 상태 확인에 사용
        worker_sentinels = [p.sentinel for p in list(self.processes.values())]

        # 결과 큐, 웨이크업 신호, 워커 프로세스 상태를 감시
        # - `readers`와 `worker_sentinels`에 대해 블로킹(wait) 호출
        ready = mp.connection.wait(readers + worker_sentinels)

        # 초기 상태 변수 설정
        cause = None  # 예외 원인 (발생하지 않으면 None)
        is_broken = True  # 시스템이 손상되었는지 여부(True: 손상됨, False: 정상)
        result_item = None  # 결과 항목 (결과 큐에서 읽은 데이터)

        # 1. 결과 큐에서 결과 항목을 읽을 준비가 된 경우
        if result_reader in ready:
            try:
                # 결과 큐에서 결과 항목 읽기
                result_item = result_reader.recv()
                # 결과를 정상적으로 읽었으므로 시스템 상태를 "정상"으로 설정
                is_broken = False
            except BaseException as e:
                # 결과를 읽는 동안 예외가 발생한 경우
                # 예외의 타입, 메시지, 트레이스백을 문자열로 포맷
                cause = format_exception(type(e), e, e.__traceback__)

        # 2. 웨이크업 신호가 발생한 경우
        elif wakeup_reader in ready:
            # 웨이크업 신호가 정상적으로 전달되었으므로 시스템 상태를 "정상"으로 설정
            is_broken = False

        # 종료 동작 또는 상태 초기화를 위한 잠금 처리
        with self.shutdown_lock:
            # 웨이크업 신호를 초기화
            self.thread_wakeup.clear()

        # 최종 결과 반환:
        # - `result_item`: 결과 큐에서 읽은 항목 (없으면 None)
        # - `is_broken`: 시스템 손상 상태 (True: 손상됨, False: 정상)
        # - `cause`: 예외 발생 시 원인 (없으면 None)
        return result_item, is_broken, cause

    def process_result_item(self, result_item):
        """결과 항목을 처리.

        주어진 결과 항목(`result_item`)을 처리합니다. 워커의 종료 여부와 결과 큐에서 가져온 작업 결과를 관리합니다.
        결과가 워커의 종료 신호인지, 작업의 결과인지를 구분하여 적절히 처리합니다.

        Args:
            result_item: 처리할 결과 항목. 이는 워커의 PID(int) 또는 `_ResultItem` 객체일 수 있습니다.
        """
        if isinstance(result_item, int):
            # 결과 항목이 정수(PID)인 경우: 워커 프로세스가 종료를 알림
            # 워커가 PID를 반환한 경우, 이를 정상 종료로 간주
            assert self.is_shutting_down()  # 현재 시스템이 종료 중인지 확인
            p = self.processes.pop(result_item)  # 종료된 프로세스를 `processes`에서 제거
            p.join()  # 프로세스 종료 대기(join)
            if not self.processes:
                # 모든 워커가 종료되었으면 내부 정리 수행
                self.join_executor_internals()  # 실행자 내부 리소스 정리
                return  # 종료

        else:
            # 결과 항목이 `_ResultItem` 객체인 경우
            # _ResultItem을 처리하여 Future 객체 상태를 갱신
            work_item = self.pending_work_items.pop(result_item.work_id, None)  # 작업 ID로 작업 항목 검색
            if work_item is not None:
                if result_item.exception:
                    # 작업 실행 중 예외가 발생한 경우
                    # Future 객체에 예외를 설정하여 호출자가 이를 확인할 수 있도록 함
                    work_item.future.set_exception(result_item.exception)
                else:
                    # 작업이 성공적으로 완료된 경우
                    # Future 객체에 작업 결과를 설정하여 호출자가 결과를 받을 수 있도록 함
                    work_item.future.set_result(result_item.result)

    def is_shutting_down(self):
        # Executor가 종료 상태인지 확인하는 메서드
        executor = self.executor_reference()
        # 아래 조건 중 하나라도 충족되면 종료 상태로 간주:
        # 1. `_global_shutdown` 플래그가 활성화된 경우 (인터프리터 전체가 종료 중)
        # 2. Executor 객체가 가비지 수집(GC)으로 제거된 경우
        # 3. Executor가 명시적으로 종료 상태로 설정된 경우 (`_shutdown_thread`가 True)
        return (_global_shutdown or executor is None
                or executor._shutdown_thread)

    def terminate_broken(self, cause):
        # Executor가 손상된 상태에서 종료 처리를 수행하는 메서드
        # - 손상된 이유를 나타내는 `cause`를 사용해 디버깅 정보 제공 가능

        # 1. Executor를 손상된 상태로 표시하여 추가 작업 제출을 차단
        executor = self.executor_reference()
        if executor is not None:
            executor._broken = ('하위 프로세스가 비정상적으로 종료되었습니다. '
                                '프로세스 풀을 더 이상 사용할 수 없습니다.')
            executor._shutdown_thread = True  # Executor 종료 상태로 설정
            executor = None

        # 2. 대기 중인 모든 작업에 실패 예외를 설정
        bpe = BrokenProcessPool("프로세스 풀의 작업이 실행 중 또는 대기 중에 "
                                "비정상적으로 종료되었습니다.")
        if cause is not None:
            # 원격에서 발생한 트레이스백 정보를 포함시킴
            bpe.__cause__ = _RemoteTraceback(
                f"\n'''\n{''.join(cause)}'''")

        # 각 대기 작업에 BrokenProcessPool 예외를 설정
        for work_id, work_item in self.pending_work_items.items():
            work_item.future.set_exception(bpe)  # Future 객체에 실패 상태 설정
            del work_item  # 메모리에서 제거
        self.pending_work_items.clear()  # 모든 작업 항목 제거

        # 3. 모든 워커 프로세스를 강제로 종료
        for p in self.processes.values():
            p.terminate()  # 프로세스 강제 종료

        # 4. 더 이상 읽을 수 없는 파이프에 데이터 쓰기를 방지
        self.call_queue._reader.close()

        # 5. 내부 리소스 정리
        self.join_executor_internals()
    def flag_executor_shutting_down(self):
        """
        Executor를 종료 상태로 표시하고 대기 중인 작업을 취소.

        목적:
            - Executor의 `_shutdown_thread` 플래그를 True로 설정하여 종료 상태를 표시.
            - 대기 중인 작업 중 취소되지 않은 작업만 `pending_work_items`에 남김.
            - 작업 ID 큐를 비우고, `_cancel_pending_futures` 플래그를 재설정하여 재실행 방지.

        동작:
            - Executor 종료 상태를 플래그로 표시.
            - 대기 중인 Future 객체의 취소를 시도하고, 취소되지 않은 작업만 남김.
            - 작업 ID 큐를 초기화하여 종료 준비 완료.
        """
        executor = self.executor_reference()  # 약한 참조로 Executor 객체 가져오기
        if executor is not None:
            executor._shutdown_thread = True  # Executor가 종료 중임을 표시

            if executor._cancel_pending_futures:  # 대기 작업을 취소해야 하는 경우
                new_pending_work_items = {}
                for work_id, work_item in self.pending_work_items.items():
                    if not work_item.future.cancel():  # 작업 취소 시도
                        new_pending_work_items[work_id] = work_item
                self.pending_work_items = new_pending_work_items  # 취소되지 않은 작업만 유지

                # 작업 ID 큐 비우기
                while True:
                    try:
                        self.work_ids_queue.get_nowait()  # 작업 ID 제거
                    except queue.Empty:  # 큐가 비었으면 종료
                        break

                executor._cancel_pending_futures = False  # 재실행 방지

    def shutdown_workers(self):
        """
        현재 실행 중인 워커 프로세스를 종료.

        목적:
            - 모든 워커 프로세스가 종료되도록 sentinel(종료 신호)을 작업 큐에 추가.
            - 큐가 가득 찼을 경우 대기하며 종료 신호를 계속 보냄.

        동작:
            - 현재 실행 중인 워커 수를 확인하고, 필요한 종료 신호를 큐에 추가.
            - 워커가 종료될 때까지 sentinel 전송을 반복.
        """
        n_children_to_stop = self.get_n_children_alive()  # 현재 살아 있는 워커 수
        n_sentinels_sent = 0  # 보낸 종료 신호(sentinel) 수

        # 워커 프로세스가 모두 종료될 때까지 종료 신호를 큐에 추가
        while n_sentinels_sent < n_children_to_stop and self.get_n_children_alive() > 0:
            for _ in range(n_children_to_stop - n_sentinels_sent):
                try:
                    self.call_queue.put_nowait(None)  # 종료 신호(sentinel) 전송
                    n_sentinels_sent += 1
                except queue.Full:  # 큐가 가득 찬 경우 대기
                    break

    def join_executor_internals(self):
        """
        Executor 내부 리소스를 정리.

        목적:
            - 워커 프로세스 종료를 처리하고, 모든 내부 큐와 객체를 닫음.
            - 워커 프로세스가 종료될 때까지 기다려 리소스 누수 방지.

        동작:
            - `shutdown_workers`를 호출하여 모든 워커를 종료.
            - 작업 큐와 연결된 스레드 및 웨이크업 객체를 닫음.
            - 모든 워커 프로세스가 종료될 때까지 `join` 호출로 기다림.
        """
        self.shutdown_workers()  # 워커 프로세스 종료
        self.call_queue.close()  # 작업 큐 닫기
        self.call_queue.join_thread()  # 작업 큐와 연결된 스레드 종료
        with self.shutdown_lock:  # 동기화 잠금
            self.thread_wakeup.close()  # 웨이크업 객체 종료

        # 모든 프로세스가 종료되었는지 확인하고 종료 대기
        for p in self.processes.values():
            p.join()

    def get_n_children_alive(self):
        """
        현재 살아 있는 워커 프로세스 수를 반환.

        목적:
            - 워커 프로세스가 얼마나 남아 있는지 확인.

        동작:
            - `self.processes` 딕셔너리에서 살아 있는 프로세스(`is_alive()`)를 카운트.
        """
        return sum(p.is_alive() for p in self.processes.values())

    def _check_system_limits():
        """
        플랫폼에서 사용 가능한 시스템 리소스 제한 확인.

        목적:
            - 시스템에서 동기화 객체나 세마포어 사용 가능 여부 확인.
            - 제한이 있을 경우 예외를 발생시켜 실행을 방지.

        동작:
            - `multiprocessing.synchronize` 모듈 확인으로 동기화 객체 지원 여부 확인.
            - `os.sysconf`를 사용해 세마포어 제한 확인. 최소 256개 이상 필요.
        """
        global _system_limits_checked, _system_limited
        if _system_limits_checked:  # 이미 확인되었으면 재확인 방지
            if _system_limited:
                raise NotImplementedError(_system_limited)
        _system_limits_checked = True

        try:
            import multiprocessing.synchronize  # 동기화 객체 확인
        except ImportError:
            _system_limited = (
                "이 Python 빌드는 multiprocessing.synchronize를 지원하지 않습니다. "
                "이는 플랫폼에서 named semaphore를 사용할 수 없기 때문입니다."
            )
            raise NotImplementedError(_system_limited)

        try:
            nsems_max = os.sysconf("SC_SEM_NSEMS_MAX")  # 세마포어 제한 확인
        except (AttributeError, ValueError):
            return  # 제한 없음 또는 sysconf 미지원

        if nsems_max == -1:
            return  # 제한 없음으로 간주
        if nsems_max >= 256:
            return  # POSIX 표준에 따라 최소 256개 세마포어 필요
        _system_limited = (f"시스템에서 사용할 수 있는 세마포어가 부족합니다 "
                           f"({nsems_max}개 사용 가능, 최소 256개 필요).")
        raise NotImplementedError(_system_limited)


def _chain_from_iterable_of_lists(iterable):
    """
    반복 가능한 리스트 객체를 순회하며, 각 리스트의 요소를 역순으로 순차적으로 반환.

    목적:
        - 여러 리스트를 하나의 반복 가능한 객체로 연결하여 요소를 순차적으로 반환.
        - 리스트를 역순으로 처리하여 요소를 효율적으로 제거하며 반환.

    Args:
        iterable: 반복 가능한 리스트 객체.

    Yields:
        리스트의 각 요소를 순차적으로 반환.
    """
    for element in iterable:
        element.reverse()  # 리스트를 역순으로 변환
        while element:
            yield element.pop()  # 리스트의 마지막 요소를 반환하고 제거


class BrokenProcessPool(_base.BrokenExecutor):
    """
    ProcessPoolExecutor에서 일부 워커 프로세스가 실패하거나 비정상적으로 종료되었을 때 발생하는 예외 클래스.

    목적:
        - 프로세스 풀에서 워커가 실행 도중 예기치 않게 종료되었음을 알림.
        - Executor가 더 이상 작업을 안정적으로 처리할 수 없는 상태를 나타냄.
    """
    pass


class ProcessPoolExecutor(_base.Executor):
    """
    ProcessPoolExecutor는 멀티프로세싱 기반으로 병렬 작업을 관리하는 클래스입니다.

    목적:
        - 여러 워커 프로세스를 사용하여 작업을 병렬로 처리.
        - 각 워커 프로세스는 독립적인 작업 실행 환경을 가지며, 작업 결과를 수집하거나 예외를 처리.
        - 시스템 리소스를 효율적으로 활용하면서 안정성과 확장성을 제공.

    Args:
        max_workers (int): 최대 워커 프로세스 수. 기본값은 CPU 코어 수.
        mp_context: 멀티프로세싱 컨텍스트 객체 (예: spawn, fork).
        initializer (callable): 워커 프로세스를 초기화할 함수.
        initargs (tuple): 초기화 함수에 전달할 인수.
        max_tasks_per_child (int): 워커 프로세스가 처리할 최대 작업 수. 초과 시 프로세스가 종료되고 새로운 프로세스로 교체.
    """
    def __init__(self, max_workers=None, mp_context=None,
                 initializer=None, initargs=(), *, max_tasks_per_child=None):
        _check_system_limits()  # 시스템 자원 제한 확인

        # 최대 워커 수 설정
        if max_workers is None:
            self._max_workers = os.cpu_count() or 1  # CPU 코어 수 계산
            if sys.platform == 'win32':  # Windows의 경우 최대 워커 수 제한
                self._max_workers = min(_MAX_WINDOWS_WORKERS, self._max_workers)
        else:
            if max_workers <= 0:
                raise ValueError("max_workers는 0보다 커야 합니다.")
            if sys.platform == 'win32' and max_workers > _MAX_WINDOWS_WORKERS:
                raise ValueError(
                    f"max_workers는 {_MAX_WINDOWS_WORKERS} 이하여야 합니다."
                )
            self._max_workers = max_workers

        # 멀티프로세싱 컨텍스트 설정
        if mp_context is None:
            if max_tasks_per_child is not None:
                mp_context = mp.get_context("spawn")  # 기본적으로 spawn 방식 사용
            else:
                mp_context = mp.get_context()  # 플랫폼 기본 컨텍스트
        self._mp_context = mp_context

        # 동적 워커 생성이 안전한지 여부 확인 (fork가 아닌 경우)
        self._safe_to_dynamically_spawn_children = (
            self._mp_context.get_start_method(allow_none=False) != "fork"
        )

        # 초기화 함수 유효성 검사
        if initializer is not None and not callable(initializer):
            raise TypeError("initializer는 callable 객체여야 합니다.")
        self._initializer = initializer
        self._initargs = initargs

        # max_tasks_per_child 설정 및 유효성 검사
        if max_tasks_per_child is not None:
            if not isinstance(max_tasks_per_child, int):
                raise TypeError("max_tasks_per_child는 정수여야 합니다.")
            if max_tasks_per_child <= 0:
                raise ValueError("max_tasks_per_child는 1 이상이어야 합니다.")
            if self._mp_context.get_start_method(allow_none=False) == "fork":
                raise ValueError("max_tasks_per_child는 'fork' 방식과 호환되지 않습니다.")
        self._max_tasks_per_child = max_tasks_per_child

        # 내부 속성 초기화
        self._executor_manager_thread = None  # 워커 프로세스를 관리하는 스레드
        self._processes = {}  # PID와 프로세스를 매핑하는 딕셔너리
        self._shutdown_thread = False  # Executor 종료 여부 플래그
        self._shutdown_lock = threading.Lock()  # 종료 상태 보호 락
        self._idle_worker_semaphore = threading.Semaphore(0)  # 유휴 워커 제어
        self._broken = False  # Executor 손상 여부
        self._queue_count = 0  # 작업 큐 카운터
        self._pending_work_items = {}  # 대기 중인 작업 항목 딕셔너리
        self._cancel_pending_futures = False  # Future 객체 취소 여부

        # Wakeup 채널 초기화
        self._executor_manager_thread_wakeup = _ThreadWakeup()

        # 통신 채널 초기화
        queue_size = self._max_workers + EXTRA_QUEUED_CALLS  # 작업 큐 크기 설정
        self._call_queue = _SafeQueue(
            max_size=queue_size,
            ctx=self._mp_context,
            pending_work_items=self._pending_work_items,
            shutdown_lock=self._shutdown_lock,
            thread_wakeup=self._executor_manager_thread_wakeup
        )
        self._call_queue._ignore_epipe = True  # Broken pipe 오류 무시
        self._result_queue = mp_context.SimpleQueue()  # 작업 결과 큐 생성
        self._work_ids = queue.Queue()  # 작업 ID를 저장하는 큐

    def _start_executor_manager_thread(self):
        """
        Executor 관리 스레드를 생성하고 시작.

        - 관리 스레드가 존재하지 않을 경우 새로 생성합니다.
        - 'fork' 방식에서는 워커 프로세스를 미리 시작하여 데드락 방지.
        - 관리 스레드를 시작한 후, wakeup 채널을 등록하여 관리.
        """
        if self._executor_manager_thread is None:
            if not self._safe_to_dynamically_spawn_children:
                self._launch_processes()  # 모든 워커 프로세스 시작
            self._executor_manager_thread = _ExecutorManagerThread(self)  # 관리 스레드 생성
            self._executor_manager_thread.start()  # 관리 스레드 시작
            # 관리 스레드와 wakeup 채널을 연결
            _threads_wakeups[self._executor_manager_thread] = \
                self._executor_manager_thread_wakeup


    def _adjust_process_count(self):
        """
        현재 워커 수를 확인하고 필요한 경우 새 프로세스를 생성.

        - 유휴 워커가 있는 경우 추가 프로세스 생성이 불필요.
        - 현재 워커 수가 최대 워커 수보다 적을 경우 새로운 프로세스를 생성.
        """
        if self._idle_worker_semaphore.acquire(blocking=False):
            return  # 유휴 워커가 있는 경우 종료

        process_count = len(self._processes)  # 현재 실행 중인 프로세스 수 확인
        if process_count < self._max_workers:  # 최대 워커 수를 초과하지 않는 경우
            self._spawn_process()  # 새 프로세스 생성


    def _launch_processes(self):
        """
        관리 스레드 시작 전에 모든 워커 프로세스를 시작.

        - 관리 스레드 실행 중 fork() 호출 시 데드락 방지를 위해 실행 전에 워커 생성.
        - 현재 프로세스 수를 확인하여 최대 워커 수까지 프로세스를 추가 생성.
        """
        assert not self._executor_manager_thread, (
            '관리 스레드가 시작된 후에는 fork()를 사용할 수 없습니다. '
            '이로 인해 자식 프로세스에서 데드락이 발생할 수 있습니다.'
        )
        for _ in range(len(self._processes), self._max_workers):
            self._spawn_process()  # 프로세스 생성


    def _spawn_process(self):
        """
        새로운 워커 프로세스를 생성하고 시작.

        - `_process_worker` 함수를 실행할 프로세스를 생성.
        - 작업 요청 큐와 결과 큐를 프로세스와 연결.
        - 생성된 프로세스를 실행하고, 프로세스 PID를 프로세스 목록에 저장.
        """
        p = self._mp_context.Process(
            target=_process_worker,  # 작업 처리 함수
            args=(self._call_queue,  # 작업 요청 큐
                  self._result_queue,  # 작업 결과 큐
                  self._initializer,  # 초기화 함수
                  self._initargs,  # 초기화 함수 인수
                  self._max_tasks_per_child))  # 프로세스당 최대 작업 수
        p.start()  # 프로세스 시작
        self._processes[p.pid] = p  # PID와 프로세스를 프로세스 목록에 추가


    def submit(self, fn, /, *args, **kwargs):
        """
        새 작업을 제출하고 Future 객체를 반환.

        - 작업을 비동기로 처리하기 위한 Future 객체를 생성하고 작업 항목으로 저장.
        - 관리 스레드를 시작하고 필요한 경우 새로운 프로세스를 생성.
        - 작업이 종료된 상태에서는 제출할 수 없으며, Future 객체를 통해 결과를 반환.

        Args:
            fn: 실행할 함수.
            *args, **kwargs: 함수에 전달할 위치 및 키워드 인수.

        Returns:
            작업 결과를 비동기로 관리하는 Future 객체.
        """
        with self._shutdown_lock:  # 종료 보호 락 사용
            if self._broken:
                raise BrokenProcessPool(self._broken)  # Executor 손상 상태 예외 발생
            if self._shutdown_thread:
                raise RuntimeError('종료된 후에는 새로운 작업을 제출할 수 없습니다.')
            if _global_shutdown:
                raise RuntimeError('인터프리터 종료 후 작업을 제출할 수 없습니다.')

            f = _base.Future()  # Future 객체 생성
            w = _WorkItem(f, fn, args, kwargs)  # 작업 항목 생성
            self._pending_work_items[self._queue_count] = w  # 작업 항목 저장
            self._work_ids.put(self._queue_count)  # 작업 ID 큐에 추가
            self._queue_count += 1  # 작업 ID 카운터 증가
            self._executor_manager_thread_wakeup.wakeup()  # 관리 스레드에 신호 보내기

            if self._safe_to_dynamically_spawn_children:
                self._adjust_process_count()  # 필요 시 프로세스 생성
            self._start_executor_manager_thread()  # 관리 스레드 시작
            return f  # Future 객체 반환


    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """
        제공된 함수를 여러 입력 값에 병렬로 적용하여 결과를 반환.

        - 입력 값을 `chunksize` 크기로 나눠 작업을 병렬로 처리.
        - 결과는 순서를 유지하며 `map(func, *iterables)`와 동일한 인터페이스 제공.

        Args:
            fn: 호출 가능한 함수.
            *iterables: 병렬로 처리할 입력 값.
            timeout: 작업 제한 시간(초). 기본값은 None.
            chunksize: 처리 단위 크기. 기본값은 1.

        Returns:
            작업 결과를 반환하는 이터레이터.
        """
        if chunksize < 1:  # chunksize는 1 이상이어야 함
            raise ValueError("chunksize must be >= 1.")
        results = super().map(
            partial(_process_chunk, fn),  # 청크 단위로 함수 적용
            _get_chunks(*iterables, chunksize=chunksize),  # 입력 값을 청크로 분리
            timeout=timeout  # 제한 시간
        )
        return _chain_from_iterable_of_lists(results)  # 결과를 단일 이터레이터로 병합


    def shutdown(self, wait=True, *, cancel_futures=False):
        """
        Executor를 종료하고 리소스를 정리.

        - 모든 워커를 종료하고, 실행 중이거나 대기 중인 작업을 정리.
        - 종료 상태 플래그를 설정하고 관리 스레드 종료 대기.

        Args:
            wait: 모든 작업이 완료될 때까지 기다릴지 여부 (기본값: True).
            cancel_futures: 실행 대기 중인 작업을 취소할지 여부 (기본값: False).
        """
        with self._shutdown_lock:  # 종료 보호 락 사용
            self._cancel_pending_futures = cancel_futures  # 작업 취소 플래그 설정
            self._shutdown_thread = True  # 종료 상태 플래그 설정
            if self._executor_manager_thread_wakeup is not None:
                self._executor_manager_thread_wakeup.wakeup()  # 관리 스레드에 종료 신호

        if self._executor_manager_thread is not None and wait:
            self._executor_manager_thread.join()  # 관리 스레드 종료 대기

        # 리소스 정리
        self._executor_manager_thread = None
        self._call_queue = None  # 작업 요청 큐 제거
        if self._result_queue is not None and wait:
            self._result_queue.close()  # 결과 큐 닫기
        self._result_queue = None
        self._processes = None  # 프로세스 목록 제거
        self._executor_manager_thread_wakeup = None  # wakeup 채널 제거

    shutdown.__doc__ = _base.Executor.shutdown.__doc__  # 부모 클래스 shutdown 문서 가져오기