그때그때 CS 정리

ThreadPoolExecutor 내부 구조 분석(큐를 활용한 멀티스레딩)

필만이 2024. 11. 5. 21:15

배경

  • 멀티 스레딩에 이해가 부족해서, ThreadPoolExecutor의 내부 구조를 알아보고자함

분석

  • 요약 : ThreadPoolExecutor`는 Python의 표준 라이브러리에서 제공하는 멀티스레딩 도구로, 여러 스레드가 동시에 작업을 실행할 수 있도록 관리하는 역할을 함. 각 스레드가 작업 큐에서 작업을 가져와 수행하고, 스레드 풀이 종료될 때까지 작업을 처리하는게 기본적인 흐름
  1. 초기화 단계
    • ThreadPoolExecutor가 생성될 때, 최대 스레드 수, 작업 큐, 세마포어 등이 설정됩니다.
    • max_workers 인자로 최대 스레드 수를 설정하며, 지정하지 않으면 시스템 CPU 코어 수 + 4 또는 최대 32로 기본 설정됩니다.
    • 초기화 함수(initializer)와 초기화 인자(initargs)가 제공되면, 각 스레드가 시작될 때 이 함수와 인자를 사용해 초기화 작업을 수행할 수 있습니다.
    • 세마포어(self._idle_semaphore)는 유휴 상태의 스레드를 추적하는 데 사용되며, 이를 통해 현재 대기 상태인 스레드 수를 관리합니다.
  2. 작업 제출 단계
    • submit() 메서드를 통해 새로운 작업을 제출할 수 있습니다.
    • 이 메서드는 새로운 작업 항목을 생성하고 이를 작업 큐에 추가합니다. 작업 항목에는 실행할 함수와 그 함수에 전달할 인자 및 결과를 저장할 Future 객체가 포함됩니다.
    • 스레드 풀이 이미 종료되었거나 초기화 실패로 인해 손상된 상태라면, submit() 메서드는 예외를 발생시켜 추가 작업 제출을 막습니다.
  3. 스레드 수 조정 단계
    • 작업을 큐에 추가할 때마다 _adjust_thread_count()가 호출되어 스레드 수를 동적으로 관리합니다.
    • _adjust_thread_count()는 유휴 스레드가 있는지 세마포어를 사용해 확인합니다. 유휴 스레드가 있다면 추가 스레드 생성이 불필요하므로 새 스레드를 생성하지 않습니다.
    • 유휴 스레드가 없고 현재 실행 중인 스레드 수가 최대 스레드 수보다 적다면, 새로운 스레드를 생성하여 작업을 처리할 준비를 합니다.
    • 각 스레드는 고유한 이름을 가지며, _worker 함수가 실행될 수 있도록 설정됩니다. _worker는 큐에서 작업을 가져와 이를 수행하고, 작업이 완료되면 대기 상태로 돌아갑니다.
  4. 작업자 스레드의 작업 처리 단계
    • 스레드가 생성되면 _worker() 함수가 실행됩니다. _worker()는 큐에서 작업 항목을 가져와 작업을 실행하고, 종료 신호를 받을 때까지 계속 대기합니다.
    • 작업 항목에는 Future 객체가 포함되어 있어, 작업의 결과 또는 예외를 Future 객체에 저장합니다.
    • initializer 함수와 initargs가 제공된 경우, 스레드가 시작될 때 초기화 작업을 수행합니다. 만약 초기화 중 오류가 발생하면, ThreadPoolExecutor는 초기화 실패 상태가 되며, 이후 작업 제출이 차단됩니다.
    • 스레드는 작업이 완료될 때마다 유휴 세마포어를 해제하여 유휴 상태임을 표시합니다. 이를 통해 추가 작업 제출 시 유휴 스레드를 재사용할 수 있게 됩니다.
  5. 초기화 실패 시 예외 처리 단계
    • 초기화 함수 실행 중 예외가 발생할 경우, _initializer_failed() 메서드가 호출됩니다.
    • 이 메서드는 스레드 풀의 상태를 "손상된 상태(broken)"로 설정하여 이후 작업이 더 이상 제출되지 않도록 합니다.
    • 작업 큐에 남아 있는 모든 작업을 취소하고, 작업 항목의 Future 객체에 예외를 설정하여 초기화 실패 사실을 사용자에게 전달합니다.
  6. 스레드 풀 종료 단계
    • shutdown() 메서드는 스레드 풀 종료를 위한 절차를 제공합니다.
    • shutdown()이 호출되면, _shutdown 플래그가 True로 설정되어 이후 작업이 더 이상 제출되지 않도록 제한합니다.
    • cancel_futures=True로 설정하면 작업 큐에 남아 있는 작업을 모두 취소하고, 각 Future 객체의 상태를 "취소됨"으로 표시하여 대기 중인 작업이 실행되지 않도록 합니다.
    • 대기 중인 스레드가 종료 신호를 인식할 수 있도록 큐에 None을 삽입하여, 큐에 대기 중인 스레드가 종료 절차를 인식하도록 합니다.
    • wait=True로 설정하면, 모든 작업이 완료될 때까지 대기하여 스레드가 완전히 종료된 후에만 shutdown을 완료합니다. 각 스레드는 join()을 통해 메인 스레드에서 안전하게 종료됩니다.
  7. 자원 해제 및 종료 완료
    • 모든 스레드가 종료되고, 큐에 남은 작업이 없으면 스레드 풀이 완전히 종료됩니다.
    • shutdown()이 완료되면 ThreadPoolExecutor 인스턴스는 더 이상 작업을 수행하지 않으며, 이후 작업 제출은 차단됩니다.
    • 종료 절차가 완료됨으로써, 스레드 풀에 사용된 모든 자원이 안전하게 해제됩니다.

코드

# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.

"""Implements ThreadPoolExecutor."""

__author__ = 'Brian Quinlan (brian@sweetapp.com)'

from concurrent.futures import _base
import itertools
import queue
import threading
import types
import weakref
import os


# 스레드와 해당 작업 큐를 관리하기 위한 약한 참조 딕셔너리
# - 약한 참조로 각 스레드와 작업 큐를 추적 및 관리
# - WeakKeyDictionary를 사용하는 이유:
#   1. **메모리 관리**: 더 이상 사용되지 않는 스레드는 딕셔너리에서 자동 제거되어 메모리를 절약
#   2. **동적 자원 관리**: 참조가 사라진 스레드의 큐도 자동으로 제거하여 불필요한 리소스 점유 방지
_threads_queues = weakref.WeakKeyDictionary()

# 인터프리터가 종료 중인지 나타내는 플래그
# 목적: 인터프리터 종료 시 안전하게 스레드 풀을 종료하기 위해 상태를 확인
_shutdown = False

# 인터프리터 종료 시 새로운 작업자 스레드가 생성되지 않도록 보장하는 잠금
# - 목적: 여러 스레드가 동시에 작업을 제출하거나 스레드 풀 상태를 검사하고 변경할 때, 
#         `_shutdown` 플래그에 안전하게 접근하도록 함
# - 필요성: 이 잠금이 없으면 상태 불일치나 충돌이 발생할 수 있으며, 
#           스레드 풀의 안정적인 운영과 종료 시 안전한 상태 관리를 위해 필수적
_global_shutdown_lock = threading.Lock()


# 인터프리터 종료 시 호출되는 함수
# - 목적: 모든 스레드가 안전하게 종료되도록 작업 큐에 종료 신호(None)를 삽입하고 각 스레드를 종료
def _python_exit():
    global _shutdown
    with _global_shutdown_lock:
        _shutdown = True  # 인터프리터 종료 상태 설정
    items = list(_threads_queues.items())  # 스레드와 큐의 항목을 리스트로 가져옴
    for t, q in items:
        # Queue는 FIFO(선입선출) 방식으로 데이터를 처리하며 None을 만나면 종료신호로 인식
        # 따라서 모두 None으로 만듦
        q.put(None)
    for t, q in items:
        # join()은 각 스레드가 종료될 때까지 메인 스레드를 대기 상태로 유지합니다. 이로써 모든 작업이 안전하게 종료
        # 스레드가 수행하는 작업이 종료되지 않은 상태에서 메인 스레드가 종료되면 메모리 누수, 데이터 손상 또는 리소스 충돌이 발생할 수 있음
        t.join()

# 인터프리터 종료 시 `_python_exit()` 호출을 등록
# _register_atexit는 Python threading 모듈 내부에서 사용되는 함수로, 프로그램 종료 시점에서 필요한 정리 작업(cleanup)을 예약합니다. 
# 이 함수는 Python 종료 단계에서 특정 작업(예: 스레드 정리)을 수행하기 위해 atexit 모듈을 활용
threading._register_atexit(_python_exit)

# 이제부터, 부모, 자식 프로세서를 생성할꺼다. 
# 이유는 부모(ThreadPoolExecutor)는 작업 큐에 작업을 추가하고, 자식 스레드가 이를 병렬로 처리하기 위함

# fork()는 현재 프로세스를 복제하여 새로운 자식 프로세스를 생성하는 시스템 호출입니다.
# - 부모와 자식 프로세스는 독립적인 메모리 공간을 가지며, 자식은 부모의 메모리 복사본으로 시작합니다.
# - 부모 프로세스는 자원을 관리하고, 자식 프로세스는 독립적으로 병렬 작업을 수행하거나 특정 작업을 처리합니다.

# 목적: os 모듈이 register_at_fork 메서드를 지원하는지 확인합니다. 
# - 이유: 특정 플랫폼(예: Unix)에서만 지원되며, 프로그램이 fork 이벤트를 처리할 수 있는지 보장하기 위함입니다.
if hasattr(os, 'register_at_fork'):
    # 목적: fork() 호출 전후에 실행할 콜백을 등록하여 잠금 상태와 자원을 적절히 관리합니다.
    # - 이유: fork 호출 시 자식 프로세스가 올바르게 동작하도록 잠금을 초기화하고 리소스를 보호합니다.
    os.register_at_fork(
        # fork 호출 전에 _global_shutdown_lock 잠금을 획득하여 데이터 일관성을 유지합니다.
        before=_global_shutdown_lock.acquire,
        # 자식 프로세스에서 fork 호출 후 잠금을 재초기화하여 부모와 자식 간의 잠금 충돌을 방지합니다.
        after_in_child=_global_shutdown_lock._at_fork_reinit,
        # 부모 프로세스에서 fork 호출 후 잠금을 해제하여 기존 상태를 유지하고 다음 작업을 진행합니다.
        after_in_parent=_global_shutdown_lock.release
    )

# 작업 항목을 나타내는 클래스
# - 목적: 스레드가 실행할 작업을 캡슐화하여 큐에 저장할 수 있도록 함.
#         각 작업은 실행할 함수(fn)와 그 함수에 전달할 인자(args, kwargs), 
#         그리고 작업 결과를 담을 Future 객체를 포함함.
class _WorkItem(object):
    def __init__(self, future, fn, args, kwargs):
        self.future = future  # 작업 결과를 담을 Future 객체. 작업 성공 시 결과를, 실패 시 예외를 저장
        self.fn = fn  # 실행할 함수. 스레드에서 수행할 작업을 정의
        self.args = args  # 함수에 전달할 위치 인자. 작업을 실행할 때 필요한 값
        self.kwargs = kwargs  # 함수에 전달할 키워드 인자. 추가적인 설정이나 옵션을 담을 수 있음

    # 작업 실행 메서드
    # - 목적: 큐에서 작업을 가져와 실행한 뒤, 결과를 Future 객체에 저장.
    #         작업이 성공하면 Future 객체에 결과를 설정하고, 예외 발생 시 예외 정보를 설정
    def run(self):
        # Future 객체가 이미 실행 중이거나 취소된 경우 작업을 실행하지 않고 종료
        # - set_running_or_notify_cancel(): Future가 취소되지 않았을 경우 True를 반환하며,
        #   실행 상태로 설정. 취소된 경우 False를 반환하여 작업을 수행하지 않도록 함
        if not self.future.set_running_or_notify_cancel():
            return  # Future가 이미 취소되었을 경우 실행하지 않음

        try:
            # 함수(fn)를 인자(args, kwargs)와 함께 실행하여 결과를 얻음
            # - *self.args: 튜플 형태의 위치 인자를 개별 인자로 분해해 전달
            # - **self.kwargs: 딕셔너리 형태의 키워드 인자를 분해해 전달
            # 예: args = (1, 2), kwargs = {'x': 10}일 경우, fn(*args, **kwargs)는 fn(1, 2, x=10)으로 호출
            result = self.fn(*self.args, **self.kwargs)  # 함수 실행 후 결과 저장
        except BaseException as exc:
            # 예외가 발생하면 Future 객체에 예외 정보를 설정
            self.future.set_exception(exc)  # Future에 예외 설정
            self = None  # 참조 순환 방지를 위해 self를 None으로 설정
        else:
            # 함수가 성공적으로 실행되면 Future 객체에 결과를 저장
            self.future.set_result(result)  # 성공 시 Future에 결과 설정

    # types.GenericAlias : 제네릭 클래스와 관련된 타입 매핑 작업을 자동으로 처리
    # - 목적: 클래스 인스턴스를 생성할 때 타입 힌트를 추가할 수 있도록 지원
    __class_getitem__ = classmethod(types.GenericAlias)


# 스레드 풀 내 작업자 스레드의 실행 함수
# - 목적: 큐에서 작업 항목을 가져와 실행하며, 종료 조건이 만족되면 스레드를 안전하게 종료
# - 흐름: 
#    1. 큐에서 작업을 가져와 실행
#    2. 작업이 끝나면 유휴 상태로 전환
#    3. 종료 조건이 만족될 때까지 반복 수행
def _worker(executor_reference, work_queue, initializer, initargs):
    # 초기화 함수(initializer)가 제공된 경우, 스레드 시작 시 이를 실행
    if initializer is not None:
        try:
            initializer(*initargs)  # initializer 함수를 initargs 인자와 함께 실행
        except BaseException:
            # 초기화 도중 예외 발생 시, 예외 내용을 로깅하고 스레드 종료
            # 로깅 수준이 **"치명적(Critical)"**인 메시지를 기록
            # exc_info가 True일 때, Python은 현재 발생한 예외의 정보를 자동으로 포함
            _base.LOGGER.critical('Exception in initializer:', exc_info=True)  # 예외 정보 로깅
            executor = executor_reference()  # executor 참조 가져오기
            if executor is not None:
                executor._initializer_failed()  # 초기화 실패 상태로 설정
            return  # 초기화 실패로 스레드 종료

    try:
        # 무한 루프를 통해 작업 큐에서 항목을 계속 가져옴
        while True:
            # 작업 큐에서 항목을 가져옴. 큐가 비어 있으면 새로운 작업이 들어올 때까지 대기
            work_item = work_queue.get(block=True)
            if work_item is not None:  # None이 아닌 유효한 작업 항목일 경우
                # 작업 항목 실행 (run 메서드 호출하여 작업 수행)
                work_item.run()
                del work_item  # 작업 완료 후 참조 해제하여 메모리 해제

                # 작업 완료 후 executor 참조 가져오기
                executor = executor_reference()
                if executor is not None:
                    # **세마포어(Semaphore)**는 동시에 접근할 수 있는 최대 리소스 수를 제어하는 동기화 도구
                    # 유휴 스레드 세마포어를 해제하여, 스레드가 유휴 상태임을 표시
                    executor._idle_semaphore.release()
                del executor  # 참조 해제
                continue  # 다음 작업을 기다리기 위해 반복문 계속

            # 작업 항목이 None일 경우 종료 조건 확인
            executor = executor_reference()
            # 종료 조건:
            # - _shutdown 플래그가 True로 설정되었거나
            # - executor가 더 이상 존재하지 않거나 (가비지 컬렉터에 의해 수집됨)
            # - executor의 _shutdown 플래그가 True로 설정된 경우
            if _shutdown or executor is None or executor._shutdown:
                if executor is not None:
                    executor._shutdown = True  # executor 종료 상태로 설정

                # 다른 스레드들에게도 종료 신호를 알리기 위해 큐에 None 삽입
                work_queue.put(None)
                return  # 종료 조건 만족 시 스레드 종료
            del executor  # 참조 해제

    except BaseException:
        # 작업 실행 중 예외가 발생할 경우 예외 내용을 로깅
        _base.LOGGER.critical('Exception in worker', exc_info=True)  # 예외 발생 시 로깅


class BrokenThreadPool(_base.BrokenExecutor):
    """
    Raised when a worker thread in a ThreadPoolExecutor failed initializing.
    """


class ThreadPoolExecutor(_base.Executor):

    # itertools.count()는 무한 증가 카운터를 반환하며, .__next__는 그 값을 차례로 반환할 수 있도록 합니다. 
    # 이를 통해 ThreadPoolExecutor의 각 스레드가 고유한 이름을 가질 수 있도록 보장
    # 목적: 각 스레드의 이름이 중복되지 않도록 보장
    _counter = itertools.count().__next__

    # ThreadPoolExecutor 클래스 초기화 메서드
    # 목적: 스레드 풀의 최대 스레드 수, 작업 큐, 세마포어 및 기타 내부 상태를 설정
    def __init__(self, max_workers=None, thread_name_prefix='',
                 initializer=None, initargs=()):
        """새로운 ThreadPoolExecutor 인스턴스 초기화

        Args:
            max_workers: 실행할 수 있는 최대 스레드 수.
            thread_name_prefix: 스레드에 부여할 이름의 접두사.
            initializer: 작업자 스레드 초기화 시 호출할 함수.
            initargs: 초기화 함수에 전달할 인자 튜플.
        """
        if max_workers is None:
            # 최대 스레드 수를 CPU 코어 수 + 4로 설정하되 최대 32로 제한
            # 목적: 리소스 사용을 최적화하여 과도한 자원 소모 방지
            max_workers = min(32, (os.cpu_count() or 1) + 4)

        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")  # 유효한 스레드 수 검사

        if initializer is not None and not callable(initializer):
            raise TypeError("initializer must be a callable")  # 초기화 함수가 호출 가능한지 검사

        # 클래스 속성 초기화
        # 목적: 스레드 풀의 상태와 설정 초기화
        self._max_workers = max_workers
        self._work_queue = queue.SimpleQueue()  # 작업 큐 생성
        self._idle_semaphore = threading.Semaphore(0)  # 유휴 스레드 카운트 관리
        self._threads = set()  # 실행 중인 스레드 집합
        self._broken = False  # 스레드 풀 상태 플래그
        self._shutdown = False  # 종료 상태 플래그
        self._shutdown_lock = threading.Lock()  # 종료 시 동시성 보호 잠금
        self._thread_name_prefix = (thread_name_prefix or
                                    ("ThreadPoolExecutor-%d" % self._counter()))  # 스레드 이름 접두사
        self._initializer = initializer  # 초기화 함수
        self._initargs = initargs  # 초기화 함수 인자

    # 작업 제출 메서드로 새로운 작업을 작업 큐에 추가하고 스레드 풀을 통해 실행함.
    # /는 fn 인자가 위치 전용 인자임을 의미합니다. 즉, fn은 키워드 인자 형태로 사용할 수 없고, 위치 인자로만 전달
    # 위치인자는 함수를 변수 순서대로 호출하는 인자, 키워드인자는 함수를 이름과 값의 쌍으로 호출하는 인자
    def submit(self, fn, /, *args, **kwargs):
        # 작업을 스레드 풀에 제출하여 실행하고, Future 객체를 반환하는 메서드
        # fn: 실행할 함수
        # *args: 함수에 전달할 위치 기반 인자
        # **kwargs: 함수에 전달할 키워드 인자

        # 두 잠금이 모두 확보될 때까지 다른 스레드에서 관련 작업을 진행하지 못함.
        # _shutdown_lock: 스레드 풀의 종료 상태와 관련된 잠금.
        # _global_shutdown_lock: 전역적으로 스레드 풀이나 리소스를 관리하기 위한 잠금
        with self._shutdown_lock, _global_shutdown_lock:
            # 스레드 풀이 "손상된 상태"라면, 예외를 발생시킴
            # BrokenThreadPool은 스레드 풀 초기화 중 오류가 발생한 경우를 나타냄
            if self._broken:
                raise BrokenThreadPool(self._broken)  # 스레드 풀 초기화 실패 시 예외 발생

            # 스레드 풀이 종료된 상태라면, 새 작업을 제출할 수 없으므로 예외 발생
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')  # 종료 후 작업 제출 방지

            # Python 인터프리터가 종료 중인 상태라면, 작업 제출이 불가능하므로 예외 발생
            if _shutdown:
                raise RuntimeError('cannot schedule new futures after '
                                   'interpreter shutdown')  # 인터프리터 종료 시 작업 제출 방지

            # Future 객체 생성: 작업의 실행 상태(완료 여부, 결과 등)를 추적
            f = _base.Future()  # 새 Future 객체 생성

            # 작업 항목 생성: 실행할 함수(fn), 위치 인자(args), 키워드 인자(kwargs)를 포함
            w = _WorkItem(f, fn, args, kwargs)  # 작업 항목 생성

            # 작업 항목을 스레드 풀의 작업 큐에 추가
            # 작업 큐에 추가된 작업은 스레드 풀의 스레드에 의해 비동기로 처리됨
            self._work_queue.put(w)  # 작업 큐에 작업 추가

            # 스레드 풀의 스레드 수를 조정
            # 작업의 양에 따라 필요한 스레드를 추가 생성하거나 조정
            self._adjust_thread_count()  # 스레드 수 조정

            # Future 객체를 반환
            # 호출자는 Future 객체를 통해 작업의 상태나 결과를 비동기로 확인 가능
            return f  # Future 객체 반환

    # 메서드의 문서화를 위해 상위 클래스의 `submit` 메서드 문서를 참조
    # submit 메서드는 현재 클래스에 새롭게 정의되었지만, 상위 클래스(_base.Executor)의 동일 메서드 submit의 문서 문자열을 그대로 재사용
    submit.__doc__ = _base.Executor.submit.__doc__


    # 스레드 수 조정 메서드
    # - 목적: 실행 가능한 스레드 수가 부족할 때 새로운 스레드를 생성하여 작업이 원활히 처리되도록 보장
    def _adjust_thread_count(self):
        # 유휴 스레드 확인
        # - 유휴 스레드 세마포어를 확인하여 사용 가능한 스레드가 있으면 반환
        #   (유휴 스레드가 있다면 추가 스레드 생성 불필요)
        if self._idle_semaphore.acquire(timeout=0):
            return

        # 스레드가 해제될 때 작업자 스레드에게 종료 신호를 전달
        # - 목적: 스레드가 종료될 때 None을 큐에 삽입하여 다른 스레드가 이를 인식하고 종료 절차를 진행
        def weakref_cb(_, q=self._work_queue):
            q.put(None)  # 종료 신호 전달

        num_threads = len(self._threads)  # 현재 활성 스레드 수 계산

        # 현재 스레드 수가 최대 스레드 수보다 작을 경우 새 스레드 생성
        if num_threads < self._max_workers:
            # 각 스레드에 고유한 이름을 할당하여 생성 (중복 방지)
            thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads)
            # 새로운 작업자 스레드를 생성하고 _worker 함수를 실행하도록 설정
            t = threading.Thread(name=thread_name, target=_worker,
                                 args=(weakref.ref(self, weakref_cb),  # 스레드 풀이 해제되면 weakref_cb 호출
                                       self._work_queue,               # 작업 큐
                                       self._initializer,               # 초기화 함수
                                       self._initargs))                 # 초기화 함수 인자들
            t.start()  # 새 스레드를 시작하여 작업 대기 상태로 만듦
            self._threads.add(t)  # 새 스레드를 집합에 추가하여 추적
            _threads_queues[t] = self._work_queue  # 새 스레드를 작업 큐와 연결하여 관리

    # 초기화 실패 처리 메서드
    # - 목적: 초기화 함수가 실패할 경우 스레드 풀의 상태를 "손상된 상태(broken)"로 설정하고,
    #         큐에 남아 있는 작업 항목들을 정리하여 오류 상태를 명확히 전달
    def _initializer_failed(self):
        with self._shutdown_lock:
            # 스레드 풀이 손상(broken) 상태임을 나타내는 플래그 설정
            # - 설명: 초기화 과정 중 오류가 발생하면, 이후 작업이 정상적으로 진행되지 않도록
            #         broken 플래그를 설정하여 스레드 풀의 손상 여부를 다른 메서드에서도 인식하게 함
            self._broken = ('A thread initializer failed, the thread pool '
                            'is not usable anymore')  # 손상된 상태 플래그 설정

            # 작업 큐를 비우고, 모든 Future 객체에 예외를 설정하여 초기화 실패 상태 전달
            while True:
                try:
                    # 큐에서 작업 항목을 하나씩 제거 (get_nowait()로 대기 없이 즉시 가져옴)
                    work_item = self._work_queue.get_nowait()
                except queue.Empty:
                    # 큐가 비어 있으면 작업 제거 완료, 루프 종료
                    break
                if work_item is not None:
                    # 작업 항목의 Future 객체에 예외를 설정하여 초기화 실패를 알림
                    # - 설명: 이 Future 객체는 이후에 이 작업의 결과를 기다리는 코드가 초기화 실패 예외를 받게 됨
                    work_item.future.set_exception(BrokenThreadPool(self._broken))

    # 스레드 풀 종료 메서드
    # - 목적: 더 이상 작업을 받지 않으며, 대기 중인 모든 작업을 안전하게 종료하고,
    #         모든 스레드가 종료될 때까지 대기하여 자원을 정리
    def shutdown(self, wait=True, *, cancel_futures=False):
        with self._shutdown_lock:
            # 스레드 풀의 종료 상태를 나타내는 플래그 설정
            # - 설명: _shutdown 플래그가 True로 설정되면 더 이상 새로운 작업을 받지 않도록 제한
            self._shutdown = True

            if cancel_futures:
                # 큐에 남은 작업들을 모두 제거하고, 각 Future 객체를 "취소됨" 상태로 설정
                # - 목적: 실행 대기 중인 작업이 있다면, 이를 모두 취소하여 사용자가 알 수 있도록 함
                while True:
                    try:
                        # 대기 중인 작업 항목을 하나씩 제거 (get_nowait() 사용)
                        work_item = self._work_queue.get_nowait()
                    except queue.Empty:
                        # 큐가 비었으면 루프 종료
                        break
                    if work_item is not None:
                        # Future 객체를 취소 상태로 설정
                        # - 설명: 해당 작업이 취소되었음을 나타내며, 이를 기다리는 코드에서 확인 가능
                        work_item.future.cancel()

            # 작업 큐에 None을 삽입하여 블로킹된 스레드가 종료를 인식하도록 함
            # - 목적: 대기 중인 스레드가 큐에서 None을 받고 종료 루프를 빠져나오도록 유도
            # - 설명: 이를 통해 대기 중인 스레드가 영구적으로 멈추는 상황을 방지
            self._work_queue.put(None)

        if wait:
            # 모든 스레드가 종료될 때까지 메인 스레드가 대기하여 안전한 종료 보장
            # - 설명: 각 스레드의 join()을 호출하여, 모든 작업 스레드가 완전히 종료될 때까지 기다림
            for t in self._threads:
                t.join()  # 스레드가 종료될 때까지 대기
    shutdown.__doc__ = _base.Executor.shutdown.__doc__