배경
- 멀티 스레딩에 이해가 부족해서, ThreadPoolExecutor의 내부 구조를 알아보고자함
분석
- 요약 : ThreadPoolExecutor`는 Python의 표준 라이브러리에서 제공하는 멀티스레딩 도구로, 여러 스레드가 동시에 작업을 실행할 수 있도록 관리하는 역할을 함. 각 스레드가 작업 큐에서 작업을 가져와 수행하고, 스레드 풀이 종료될 때까지 작업을 처리하는게 기본적인 흐름
- 초기화 단계
- ThreadPoolExecutor가 생성될 때, 최대 스레드 수, 작업 큐, 세마포어 등이 설정됩니다.
- max_workers 인자로 최대 스레드 수를 설정하며, 지정하지 않으면 시스템 CPU 코어 수 + 4 또는 최대 32로 기본 설정됩니다.
- 초기화 함수(initializer)와 초기화 인자(initargs)가 제공되면, 각 스레드가 시작될 때 이 함수와 인자를 사용해 초기화 작업을 수행할 수 있습니다.
- 세마포어(self._idle_semaphore)는 유휴 상태의 스레드를 추적하는 데 사용되며, 이를 통해 현재 대기 상태인 스레드 수를 관리합니다.
- 작업 제출 단계
- submit() 메서드를 통해 새로운 작업을 제출할 수 있습니다.
- 이 메서드는 새로운 작업 항목을 생성하고 이를 작업 큐에 추가합니다. 작업 항목에는 실행할 함수와 그 함수에 전달할 인자 및 결과를 저장할 Future 객체가 포함됩니다.
- 스레드 풀이 이미 종료되었거나 초기화 실패로 인해 손상된 상태라면, submit() 메서드는 예외를 발생시켜 추가 작업 제출을 막습니다.
- 스레드 수 조정 단계
- 작업을 큐에 추가할 때마다 _adjust_thread_count()가 호출되어 스레드 수를 동적으로 관리합니다.
- _adjust_thread_count()는 유휴 스레드가 있는지 세마포어를 사용해 확인합니다. 유휴 스레드가 있다면 추가 스레드 생성이 불필요하므로 새 스레드를 생성하지 않습니다.
- 유휴 스레드가 없고 현재 실행 중인 스레드 수가 최대 스레드 수보다 적다면, 새로운 스레드를 생성하여 작업을 처리할 준비를 합니다.
- 각 스레드는 고유한 이름을 가지며, _worker 함수가 실행될 수 있도록 설정됩니다. _worker는 큐에서 작업을 가져와 이를 수행하고, 작업이 완료되면 대기 상태로 돌아갑니다.
- 작업자 스레드의 작업 처리 단계
- 스레드가 생성되면 _worker() 함수가 실행됩니다. _worker()는 큐에서 작업 항목을 가져와 작업을 실행하고, 종료 신호를 받을 때까지 계속 대기합니다.
- 작업 항목에는 Future 객체가 포함되어 있어, 작업의 결과 또는 예외를 Future 객체에 저장합니다.
- initializer 함수와 initargs가 제공된 경우, 스레드가 시작될 때 초기화 작업을 수행합니다. 만약 초기화 중 오류가 발생하면, ThreadPoolExecutor는 초기화 실패 상태가 되며, 이후 작업 제출이 차단됩니다.
- 스레드는 작업이 완료될 때마다 유휴 세마포어를 해제하여 유휴 상태임을 표시합니다. 이를 통해 추가 작업 제출 시 유휴 스레드를 재사용할 수 있게 됩니다.
- 초기화 실패 시 예외 처리 단계
- 초기화 함수 실행 중 예외가 발생할 경우, _initializer_failed() 메서드가 호출됩니다.
- 이 메서드는 스레드 풀의 상태를 "손상된 상태(broken)"로 설정하여 이후 작업이 더 이상 제출되지 않도록 합니다.
- 작업 큐에 남아 있는 모든 작업을 취소하고, 작업 항목의 Future 객체에 예외를 설정하여 초기화 실패 사실을 사용자에게 전달합니다.
- 스레드 풀 종료 단계
- shutdown() 메서드는 스레드 풀 종료를 위한 절차를 제공합니다.
- shutdown()이 호출되면, _shutdown 플래그가 True로 설정되어 이후 작업이 더 이상 제출되지 않도록 제한합니다.
- cancel_futures=True로 설정하면 작업 큐에 남아 있는 작업을 모두 취소하고, 각 Future 객체의 상태를 "취소됨"으로 표시하여 대기 중인 작업이 실행되지 않도록 합니다.
- 대기 중인 스레드가 종료 신호를 인식할 수 있도록 큐에 None을 삽입하여, 큐에 대기 중인 스레드가 종료 절차를 인식하도록 합니다.
- wait=True로 설정하면, 모든 작업이 완료될 때까지 대기하여 스레드가 완전히 종료된 후에만 shutdown을 완료합니다. 각 스레드는 join()을 통해 메인 스레드에서 안전하게 종료됩니다.
- 자원 해제 및 종료 완료
- 모든 스레드가 종료되고, 큐에 남은 작업이 없으면 스레드 풀이 완전히 종료됩니다.
- shutdown()이 완료되면 ThreadPoolExecutor 인스턴스는 더 이상 작업을 수행하지 않으며, 이후 작업 제출은 차단됩니다.
- 종료 절차가 완료됨으로써, 스레드 풀에 사용된 모든 자원이 안전하게 해제됩니다.
코드
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Implements ThreadPoolExecutor."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from concurrent.futures import _base
import itertools
import queue
import threading
import types
import weakref
import os
# 스레드와 해당 작업 큐를 관리하기 위한 약한 참조 딕셔너리
# - 약한 참조로 각 스레드와 작업 큐를 추적 및 관리
# - WeakKeyDictionary를 사용하는 이유:
# 1. **메모리 관리**: 더 이상 사용되지 않는 스레드는 딕셔너리에서 자동 제거되어 메모리를 절약
# 2. **동적 자원 관리**: 참조가 사라진 스레드의 큐도 자동으로 제거하여 불필요한 리소스 점유 방지
_threads_queues = weakref.WeakKeyDictionary()
# 인터프리터가 종료 중인지 나타내는 플래그
# 목적: 인터프리터 종료 시 안전하게 스레드 풀을 종료하기 위해 상태를 확인
_shutdown = False
# 인터프리터 종료 시 새로운 작업자 스레드가 생성되지 않도록 보장하는 잠금
# - 목적: 여러 스레드가 동시에 작업을 제출하거나 스레드 풀 상태를 검사하고 변경할 때,
# `_shutdown` 플래그에 안전하게 접근하도록 함
# - 필요성: 이 잠금이 없으면 상태 불일치나 충돌이 발생할 수 있으며,
# 스레드 풀의 안정적인 운영과 종료 시 안전한 상태 관리를 위해 필수적
_global_shutdown_lock = threading.Lock()
# 인터프리터 종료 시 호출되는 함수
# - 목적: 모든 스레드가 안전하게 종료되도록 작업 큐에 종료 신호(None)를 삽입하고 각 스레드를 종료
def _python_exit():
global _shutdown
with _global_shutdown_lock:
_shutdown = True # 인터프리터 종료 상태 설정
items = list(_threads_queues.items()) # 스레드와 큐의 항목을 리스트로 가져옴
for t, q in items:
# Queue는 FIFO(선입선출) 방식으로 데이터를 처리하며 None을 만나면 종료신호로 인식
# 따라서 모두 None으로 만듦
q.put(None)
for t, q in items:
# join()은 각 스레드가 종료될 때까지 메인 스레드를 대기 상태로 유지합니다. 이로써 모든 작업이 안전하게 종료
# 스레드가 수행하는 작업이 종료되지 않은 상태에서 메인 스레드가 종료되면 메모리 누수, 데이터 손상 또는 리소스 충돌이 발생할 수 있음
t.join()
# 인터프리터 종료 시 `_python_exit()` 호출을 등록
# _register_atexit는 Python threading 모듈 내부에서 사용되는 함수로, 프로그램 종료 시점에서 필요한 정리 작업(cleanup)을 예약합니다.
# 이 함수는 Python 종료 단계에서 특정 작업(예: 스레드 정리)을 수행하기 위해 atexit 모듈을 활용
threading._register_atexit(_python_exit)
# 이제부터, 부모, 자식 프로세서를 생성할꺼다.
# 이유는 부모(ThreadPoolExecutor)는 작업 큐에 작업을 추가하고, 자식 스레드가 이를 병렬로 처리하기 위함
# fork()는 현재 프로세스를 복제하여 새로운 자식 프로세스를 생성하는 시스템 호출입니다.
# - 부모와 자식 프로세스는 독립적인 메모리 공간을 가지며, 자식은 부모의 메모리 복사본으로 시작합니다.
# - 부모 프로세스는 자원을 관리하고, 자식 프로세스는 독립적으로 병렬 작업을 수행하거나 특정 작업을 처리합니다.
# 목적: os 모듈이 register_at_fork 메서드를 지원하는지 확인합니다.
# - 이유: 특정 플랫폼(예: Unix)에서만 지원되며, 프로그램이 fork 이벤트를 처리할 수 있는지 보장하기 위함입니다.
if hasattr(os, 'register_at_fork'):
# 목적: fork() 호출 전후에 실행할 콜백을 등록하여 잠금 상태와 자원을 적절히 관리합니다.
# - 이유: fork 호출 시 자식 프로세스가 올바르게 동작하도록 잠금을 초기화하고 리소스를 보호합니다.
os.register_at_fork(
# fork 호출 전에 _global_shutdown_lock 잠금을 획득하여 데이터 일관성을 유지합니다.
before=_global_shutdown_lock.acquire,
# 자식 프로세스에서 fork 호출 후 잠금을 재초기화하여 부모와 자식 간의 잠금 충돌을 방지합니다.
after_in_child=_global_shutdown_lock._at_fork_reinit,
# 부모 프로세스에서 fork 호출 후 잠금을 해제하여 기존 상태를 유지하고 다음 작업을 진행합니다.
after_in_parent=_global_shutdown_lock.release
)
# 작업 항목을 나타내는 클래스
# - 목적: 스레드가 실행할 작업을 캡슐화하여 큐에 저장할 수 있도록 함.
# 각 작업은 실행할 함수(fn)와 그 함수에 전달할 인자(args, kwargs),
# 그리고 작업 결과를 담을 Future 객체를 포함함.
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future # 작업 결과를 담을 Future 객체. 작업 성공 시 결과를, 실패 시 예외를 저장
self.fn = fn # 실행할 함수. 스레드에서 수행할 작업을 정의
self.args = args # 함수에 전달할 위치 인자. 작업을 실행할 때 필요한 값
self.kwargs = kwargs # 함수에 전달할 키워드 인자. 추가적인 설정이나 옵션을 담을 수 있음
# 작업 실행 메서드
# - 목적: 큐에서 작업을 가져와 실행한 뒤, 결과를 Future 객체에 저장.
# 작업이 성공하면 Future 객체에 결과를 설정하고, 예외 발생 시 예외 정보를 설정
def run(self):
# Future 객체가 이미 실행 중이거나 취소된 경우 작업을 실행하지 않고 종료
# - set_running_or_notify_cancel(): Future가 취소되지 않았을 경우 True를 반환하며,
# 실행 상태로 설정. 취소된 경우 False를 반환하여 작업을 수행하지 않도록 함
if not self.future.set_running_or_notify_cancel():
return # Future가 이미 취소되었을 경우 실행하지 않음
try:
# 함수(fn)를 인자(args, kwargs)와 함께 실행하여 결과를 얻음
# - *self.args: 튜플 형태의 위치 인자를 개별 인자로 분해해 전달
# - **self.kwargs: 딕셔너리 형태의 키워드 인자를 분해해 전달
# 예: args = (1, 2), kwargs = {'x': 10}일 경우, fn(*args, **kwargs)는 fn(1, 2, x=10)으로 호출
result = self.fn(*self.args, **self.kwargs) # 함수 실행 후 결과 저장
except BaseException as exc:
# 예외가 발생하면 Future 객체에 예외 정보를 설정
self.future.set_exception(exc) # Future에 예외 설정
self = None # 참조 순환 방지를 위해 self를 None으로 설정
else:
# 함수가 성공적으로 실행되면 Future 객체에 결과를 저장
self.future.set_result(result) # 성공 시 Future에 결과 설정
# types.GenericAlias : 제네릭 클래스와 관련된 타입 매핑 작업을 자동으로 처리
# - 목적: 클래스 인스턴스를 생성할 때 타입 힌트를 추가할 수 있도록 지원
__class_getitem__ = classmethod(types.GenericAlias)
# 스레드 풀 내 작업자 스레드의 실행 함수
# - 목적: 큐에서 작업 항목을 가져와 실행하며, 종료 조건이 만족되면 스레드를 안전하게 종료
# - 흐름:
# 1. 큐에서 작업을 가져와 실행
# 2. 작업이 끝나면 유휴 상태로 전환
# 3. 종료 조건이 만족될 때까지 반복 수행
def _worker(executor_reference, work_queue, initializer, initargs):
# 초기화 함수(initializer)가 제공된 경우, 스레드 시작 시 이를 실행
if initializer is not None:
try:
initializer(*initargs) # initializer 함수를 initargs 인자와 함께 실행
except BaseException:
# 초기화 도중 예외 발생 시, 예외 내용을 로깅하고 스레드 종료
# 로깅 수준이 **"치명적(Critical)"**인 메시지를 기록
# exc_info가 True일 때, Python은 현재 발생한 예외의 정보를 자동으로 포함
_base.LOGGER.critical('Exception in initializer:', exc_info=True) # 예외 정보 로깅
executor = executor_reference() # executor 참조 가져오기
if executor is not None:
executor._initializer_failed() # 초기화 실패 상태로 설정
return # 초기화 실패로 스레드 종료
try:
# 무한 루프를 통해 작업 큐에서 항목을 계속 가져옴
while True:
# 작업 큐에서 항목을 가져옴. 큐가 비어 있으면 새로운 작업이 들어올 때까지 대기
work_item = work_queue.get(block=True)
if work_item is not None: # None이 아닌 유효한 작업 항목일 경우
# 작업 항목 실행 (run 메서드 호출하여 작업 수행)
work_item.run()
del work_item # 작업 완료 후 참조 해제하여 메모리 해제
# 작업 완료 후 executor 참조 가져오기
executor = executor_reference()
if executor is not None:
# **세마포어(Semaphore)**는 동시에 접근할 수 있는 최대 리소스 수를 제어하는 동기화 도구
# 유휴 스레드 세마포어를 해제하여, 스레드가 유휴 상태임을 표시
executor._idle_semaphore.release()
del executor # 참조 해제
continue # 다음 작업을 기다리기 위해 반복문 계속
# 작업 항목이 None일 경우 종료 조건 확인
executor = executor_reference()
# 종료 조건:
# - _shutdown 플래그가 True로 설정되었거나
# - executor가 더 이상 존재하지 않거나 (가비지 컬렉터에 의해 수집됨)
# - executor의 _shutdown 플래그가 True로 설정된 경우
if _shutdown or executor is None or executor._shutdown:
if executor is not None:
executor._shutdown = True # executor 종료 상태로 설정
# 다른 스레드들에게도 종료 신호를 알리기 위해 큐에 None 삽입
work_queue.put(None)
return # 종료 조건 만족 시 스레드 종료
del executor # 참조 해제
except BaseException:
# 작업 실행 중 예외가 발생할 경우 예외 내용을 로깅
_base.LOGGER.critical('Exception in worker', exc_info=True) # 예외 발생 시 로깅
class BrokenThreadPool(_base.BrokenExecutor):
"""
Raised when a worker thread in a ThreadPoolExecutor failed initializing.
"""
class ThreadPoolExecutor(_base.Executor):
# itertools.count()는 무한 증가 카운터를 반환하며, .__next__는 그 값을 차례로 반환할 수 있도록 합니다.
# 이를 통해 ThreadPoolExecutor의 각 스레드가 고유한 이름을 가질 수 있도록 보장
# 목적: 각 스레드의 이름이 중복되지 않도록 보장
_counter = itertools.count().__next__
# ThreadPoolExecutor 클래스 초기화 메서드
# 목적: 스레드 풀의 최대 스레드 수, 작업 큐, 세마포어 및 기타 내부 상태를 설정
def __init__(self, max_workers=None, thread_name_prefix='',
initializer=None, initargs=()):
"""새로운 ThreadPoolExecutor 인스턴스 초기화
Args:
max_workers: 실행할 수 있는 최대 스레드 수.
thread_name_prefix: 스레드에 부여할 이름의 접두사.
initializer: 작업자 스레드 초기화 시 호출할 함수.
initargs: 초기화 함수에 전달할 인자 튜플.
"""
if max_workers is None:
# 최대 스레드 수를 CPU 코어 수 + 4로 설정하되 최대 32로 제한
# 목적: 리소스 사용을 최적화하여 과도한 자원 소모 방지
max_workers = min(32, (os.cpu_count() or 1) + 4)
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0") # 유효한 스레드 수 검사
if initializer is not None and not callable(initializer):
raise TypeError("initializer must be a callable") # 초기화 함수가 호출 가능한지 검사
# 클래스 속성 초기화
# 목적: 스레드 풀의 상태와 설정 초기화
self._max_workers = max_workers
self._work_queue = queue.SimpleQueue() # 작업 큐 생성
self._idle_semaphore = threading.Semaphore(0) # 유휴 스레드 카운트 관리
self._threads = set() # 실행 중인 스레드 집합
self._broken = False # 스레드 풀 상태 플래그
self._shutdown = False # 종료 상태 플래그
self._shutdown_lock = threading.Lock() # 종료 시 동시성 보호 잠금
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter())) # 스레드 이름 접두사
self._initializer = initializer # 초기화 함수
self._initargs = initargs # 초기화 함수 인자
# 작업 제출 메서드로 새로운 작업을 작업 큐에 추가하고 스레드 풀을 통해 실행함.
# /는 fn 인자가 위치 전용 인자임을 의미합니다. 즉, fn은 키워드 인자 형태로 사용할 수 없고, 위치 인자로만 전달
# 위치인자는 함수를 변수 순서대로 호출하는 인자, 키워드인자는 함수를 이름과 값의 쌍으로 호출하는 인자
def submit(self, fn, /, *args, **kwargs):
# 작업을 스레드 풀에 제출하여 실행하고, Future 객체를 반환하는 메서드
# fn: 실행할 함수
# *args: 함수에 전달할 위치 기반 인자
# **kwargs: 함수에 전달할 키워드 인자
# 두 잠금이 모두 확보될 때까지 다른 스레드에서 관련 작업을 진행하지 못함.
# _shutdown_lock: 스레드 풀의 종료 상태와 관련된 잠금.
# _global_shutdown_lock: 전역적으로 스레드 풀이나 리소스를 관리하기 위한 잠금
with self._shutdown_lock, _global_shutdown_lock:
# 스레드 풀이 "손상된 상태"라면, 예외를 발생시킴
# BrokenThreadPool은 스레드 풀 초기화 중 오류가 발생한 경우를 나타냄
if self._broken:
raise BrokenThreadPool(self._broken) # 스레드 풀 초기화 실패 시 예외 발생
# 스레드 풀이 종료된 상태라면, 새 작업을 제출할 수 없으므로 예외 발생
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown') # 종료 후 작업 제출 방지
# Python 인터프리터가 종료 중인 상태라면, 작업 제출이 불가능하므로 예외 발생
if _shutdown:
raise RuntimeError('cannot schedule new futures after '
'interpreter shutdown') # 인터프리터 종료 시 작업 제출 방지
# Future 객체 생성: 작업의 실행 상태(완료 여부, 결과 등)를 추적
f = _base.Future() # 새 Future 객체 생성
# 작업 항목 생성: 실행할 함수(fn), 위치 인자(args), 키워드 인자(kwargs)를 포함
w = _WorkItem(f, fn, args, kwargs) # 작업 항목 생성
# 작업 항목을 스레드 풀의 작업 큐에 추가
# 작업 큐에 추가된 작업은 스레드 풀의 스레드에 의해 비동기로 처리됨
self._work_queue.put(w) # 작업 큐에 작업 추가
# 스레드 풀의 스레드 수를 조정
# 작업의 양에 따라 필요한 스레드를 추가 생성하거나 조정
self._adjust_thread_count() # 스레드 수 조정
# Future 객체를 반환
# 호출자는 Future 객체를 통해 작업의 상태나 결과를 비동기로 확인 가능
return f # Future 객체 반환
# 메서드의 문서화를 위해 상위 클래스의 `submit` 메서드 문서를 참조
# submit 메서드는 현재 클래스에 새롭게 정의되었지만, 상위 클래스(_base.Executor)의 동일 메서드 submit의 문서 문자열을 그대로 재사용
submit.__doc__ = _base.Executor.submit.__doc__
# 스레드 수 조정 메서드
# - 목적: 실행 가능한 스레드 수가 부족할 때 새로운 스레드를 생성하여 작업이 원활히 처리되도록 보장
def _adjust_thread_count(self):
# 유휴 스레드 확인
# - 유휴 스레드 세마포어를 확인하여 사용 가능한 스레드가 있으면 반환
# (유휴 스레드가 있다면 추가 스레드 생성 불필요)
if self._idle_semaphore.acquire(timeout=0):
return
# 스레드가 해제될 때 작업자 스레드에게 종료 신호를 전달
# - 목적: 스레드가 종료될 때 None을 큐에 삽입하여 다른 스레드가 이를 인식하고 종료 절차를 진행
def weakref_cb(_, q=self._work_queue):
q.put(None) # 종료 신호 전달
num_threads = len(self._threads) # 현재 활성 스레드 수 계산
# 현재 스레드 수가 최대 스레드 수보다 작을 경우 새 스레드 생성
if num_threads < self._max_workers:
# 각 스레드에 고유한 이름을 할당하여 생성 (중복 방지)
thread_name = '%s_%d' % (self._thread_name_prefix or self, num_threads)
# 새로운 작업자 스레드를 생성하고 _worker 함수를 실행하도록 설정
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb), # 스레드 풀이 해제되면 weakref_cb 호출
self._work_queue, # 작업 큐
self._initializer, # 초기화 함수
self._initargs)) # 초기화 함수 인자들
t.start() # 새 스레드를 시작하여 작업 대기 상태로 만듦
self._threads.add(t) # 새 스레드를 집합에 추가하여 추적
_threads_queues[t] = self._work_queue # 새 스레드를 작업 큐와 연결하여 관리
# 초기화 실패 처리 메서드
# - 목적: 초기화 함수가 실패할 경우 스레드 풀의 상태를 "손상된 상태(broken)"로 설정하고,
# 큐에 남아 있는 작업 항목들을 정리하여 오류 상태를 명확히 전달
def _initializer_failed(self):
with self._shutdown_lock:
# 스레드 풀이 손상(broken) 상태임을 나타내는 플래그 설정
# - 설명: 초기화 과정 중 오류가 발생하면, 이후 작업이 정상적으로 진행되지 않도록
# broken 플래그를 설정하여 스레드 풀의 손상 여부를 다른 메서드에서도 인식하게 함
self._broken = ('A thread initializer failed, the thread pool '
'is not usable anymore') # 손상된 상태 플래그 설정
# 작업 큐를 비우고, 모든 Future 객체에 예외를 설정하여 초기화 실패 상태 전달
while True:
try:
# 큐에서 작업 항목을 하나씩 제거 (get_nowait()로 대기 없이 즉시 가져옴)
work_item = self._work_queue.get_nowait()
except queue.Empty:
# 큐가 비어 있으면 작업 제거 완료, 루프 종료
break
if work_item is not None:
# 작업 항목의 Future 객체에 예외를 설정하여 초기화 실패를 알림
# - 설명: 이 Future 객체는 이후에 이 작업의 결과를 기다리는 코드가 초기화 실패 예외를 받게 됨
work_item.future.set_exception(BrokenThreadPool(self._broken))
# 스레드 풀 종료 메서드
# - 목적: 더 이상 작업을 받지 않으며, 대기 중인 모든 작업을 안전하게 종료하고,
# 모든 스레드가 종료될 때까지 대기하여 자원을 정리
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
# 스레드 풀의 종료 상태를 나타내는 플래그 설정
# - 설명: _shutdown 플래그가 True로 설정되면 더 이상 새로운 작업을 받지 않도록 제한
self._shutdown = True
if cancel_futures:
# 큐에 남은 작업들을 모두 제거하고, 각 Future 객체를 "취소됨" 상태로 설정
# - 목적: 실행 대기 중인 작업이 있다면, 이를 모두 취소하여 사용자가 알 수 있도록 함
while True:
try:
# 대기 중인 작업 항목을 하나씩 제거 (get_nowait() 사용)
work_item = self._work_queue.get_nowait()
except queue.Empty:
# 큐가 비었으면 루프 종료
break
if work_item is not None:
# Future 객체를 취소 상태로 설정
# - 설명: 해당 작업이 취소되었음을 나타내며, 이를 기다리는 코드에서 확인 가능
work_item.future.cancel()
# 작업 큐에 None을 삽입하여 블로킹된 스레드가 종료를 인식하도록 함
# - 목적: 대기 중인 스레드가 큐에서 None을 받고 종료 루프를 빠져나오도록 유도
# - 설명: 이를 통해 대기 중인 스레드가 영구적으로 멈추는 상황을 방지
self._work_queue.put(None)
if wait:
# 모든 스레드가 종료될 때까지 메인 스레드가 대기하여 안전한 종료 보장
# - 설명: 각 스레드의 join()을 호출하여, 모든 작업 스레드가 완전히 종료될 때까지 기다림
for t in self._threads:
t.join() # 스레드가 종료될 때까지 대기
shutdown.__doc__ = _base.Executor.shutdown.__doc__