1. 전반적인 동작 방식
1.1. 초기화
- 초기화 시, 다음과 같은 중요한 속성들이 설정됩니다:
max_workers
: 사용할 최대 워커 프로세스 수를 결정. 기본값은 CPU 코어 수를 기반으로 설정.
mp_context
: 멀티프로세싱 컨텍스트(spawn
, fork
등) 설정. 플랫폼 및 옵션에 따라 안전한 방식을 사용.
max_tasks_per_child
: 각 워커가 처리할 최대 작업 수 설정. 초과 시 프로세스는 종료되고 새로운 프로세스가 생성됨.
1.2. 작업 제출 (submit
)
- 사용자로부터 작업이 제출되면 다음 과정이 진행됩니다:
- Future 객체 생성: 작업의 결과를 비동기적으로 관리하기 위한 객체.
- 작업 항목 생성 및 저장:
_WorkItem
으로 작업 정의 후 대기 목록(_pending_work_items
)에 추가.
- 작업 ID 큐에 추가: 작업 추적을 위해 ID를 큐에 추가.
- 프로세스 관리:
- 유휴 워커가 없을 경우 새로운 워커를 생성 (
_adjust_process_count
).
- 관리 스레드가 없으면 생성 및 시작 (
_start_executor_manager_thread
).
1.3. 작업 처리
- 워커 프로세스는
_process_worker
함수에서 다음을 수행:
- 작업 요청 수신: 작업 요청 큐(
_call_queue
)에서 작업을 가져옴.
- 작업 실행: 작업 항목의 함수와 인자를 사용해 결과를 생성.
- 결과 반환: 결과 큐(
_result_queue
)를 통해 결과 또는 예외를 반환.
- 작업 수 초과 처리:
max_tasks_per_child
설정이 있다면, 워커는 작업 수 초과 시 종료.
1.4. 작업 결과 수집
- 관리 스레드(
_ExecutorManagerThread
)는 워커와 통신하여 결과를 수집:
- 결과 큐에서 데이터를 읽어
Future
객체에 결과 또는 예외를 설정.
- 실패한 작업이나 손상된 워커를 처리하여 안정성을 유지.
1.5. 종료 (shutdown
)
- Executor가 종료 요청을 받으면 다음 과정을 수행:
- 종료 상태 설정:
_shutdown_thread
플래그를 설정하고 모든 작업이 종료되었음을 관리 스레드에 알림.
- 작업 취소:
cancel_futures
가 True인 경우 대기 중인 Future 객체를 취소.
- 워커 종료: 종료 신호(sentinel)를 워커들에게 보내 모든 프로세스를 안전하게 종료.
- 리소스 정리: 통신 채널, 관리 스레드, 프로세스 참조 등 해제.
2. 주요 특징
2.1. 병렬 작업 관리
- 작업 큐와 결과 큐를 사용해 작업 요청과 결과 반환을 분리하여 병렬 작업을 효율적으로 관리.
- 작업 제출과 결과 수집이 비동기로 처리되며, Future 객체를 통해 작업 상태를 추적 가능.
2.2. 유휴 워커 관리
Semaphore
를 활용해 유휴 워커 상태를 확인.
- 유휴 워커가 없을 경우 새로운 프로세스를 동적으로 생성.
2.3. 프로세스 안정성
- 워커 손상 시
BrokenProcessPool
예외를 통해 호출자에게 알림.
max_tasks_per_child
설정으로 오래 실행된 프로세스를 주기적으로 재생성해 안정성 확보.
2.4. 플랫폼 호환성
mp_context
를 활용해 플랫폼별 적합한 멀티프로세싱 방식(spawn
, fork
)을 선택.
- Windows 환경에서는
spawn
방식을 기본으로 설정해 안정성을 높임.
3. 동작 흐름 요약
3.1. 작업 제출
- 사용자로부터 작업이 제출되면 작업 항목을 생성하고 작업 ID를 큐에 추가.
- Future 객체를 반환하여 작업 결과를 비동기로 확인할 수 있도록 설정.
- 필요 시 새로운 워커 프로세스를 생성.
3.2. 작업 처리
- 워커 프로세스가 작업 요청 큐에서 작업을 가져와 실행.
- 실행 결과 또는 예외를 결과 큐를 통해 반환.
3.3. 결과 수집
- 관리 스레드가 결과 큐를 통해 결과를 수집.
- 결과를 Future 객체에 설정하여 호출자가 결과를 확인 가능.
3.4. 종료
- 모든 작업이 완료되었거나 취소된 후 워커 종료.
- 통신 채널, 관리 스레드 등 내부 리소스 정리.
4. 장점
- 병렬 처리 효율성: CPU 코어를 최대한 활용.
- 안정성: 손상된 워커를 처리하고, 오래 실행된 프로세스를 재생성.
- 유연성: 사용자 정의 초기화 함수, 작업 제한, 동적 워커 생성 지원.
- 플랫폼 호환성: 다양한 멀티프로세싱 방식(
spawn
, fork
) 지원.
5. 단점 및 제한
- 프로세스 생성 비용: 워커 프로세스 생성은 스레드보다 비용이 큼.
- 공유 메모리 제약: 프로세스 간 통신은 큐를 통해 이루어지므로 대규모 데이터 공유에 비효율적.
- 복잡성: 관리 스레드, 큐, 락 등 여러 메커니즘을 사용해 코드가 복잡.
ThreadPoolExecutor와 비교
1. 실행 단위
ProcessPoolExecutor
:
- 프로세스를 실행 단위로 사용.
- 각 작업이 별도의 프로세스에서 실행되며, 각 프로세스는 독립된 메모리 공간을 가짐.
multiprocessing
모듈을 기반으로 동작.
ThreadPoolExecutor
:
- 스레드를 실행 단위로 사용.
- 스레드는 동일한 프로세스 내에서 실행되며, 공유 메모리를 사용.
threading
모듈을 기반으로 동작.
해석
ProcessPoolExecutor
는 작업이 독립적인 프로세스에서 실행되므로, GIL(Global Interpreter Lock)의 영향을 받지 않음.
ThreadPoolExecutor
는 GIL의 영향을 받으므로, CPU 바운드 작업에서 비효율적일 수 있음.
2. 메모리 구조
ProcessPoolExecutor
:
- 각 프로세스는 독립된 메모리 공간을 사용.
- 프로세스 간 데이터를 전달하기 위해 큐 또는 피클링(serialization)이 필요.
- 데이터 전송은 IPC(Inter-Process Communication)를 통해 이루어지며, 비용이 크다.
ThreadPoolExecutor
:
- 모든 스레드는 동일한 메모리 공간을 공유.
- 데이터 전달은 메모리 복사를 하지 않고 직접 접근 가능하므로 빠름.
해석
ProcessPoolExecutor
는 메모리 고립을 통해 안정성이 높지만, 데이터 전달 비용이 큼.
ThreadPoolExecutor
는 메모리 공유로 데이터 전달이 빠르지만, 잘못된 접근으로 충돌 위험이 있음.
3. 작업 처리
ProcessPoolExecutor
:
- 작업이 제출되면 작업 요청 큐에 추가.
- 워커 프로세스가 큐에서 작업을 가져와 실행.
- 결과는 결과 큐를 통해 전달.
- 워커는 지정된 작업 수(
max_tasks_per_child
)를 초과하면 종료되고 새로 생성.
ThreadPoolExecutor
:
- 작업이 제출되면 스레드풀에서 유휴 스레드를 선택.
- 스레드가 작업을 실행하고 결과를 반환.
- 스레드는 작업이 끝난 후 재사용되며, 종료되거나 새로 생성되지 않음.
4. 표 비교
특징 |
ProcessPoolExecutor |
ThreadPoolExecutor |
실행 단위 |
프로세스 |
스레드 |
메모리 구조 |
독립 메모리 공간 |
공유 메모리 공간 |
데이터 전달 비용 |
높음 (큐, 피클링 필요) |
낮음 (직접 메모리 접근) |
작업 유형 |
CPU 바운드 작업 |
I/O 바운드 작업 |
GIL 영향 |
없음 |
있음 |
워커 관리 |
동적 워커 생성/종료 |
스레드 재사용 (고정 스레드풀) |
플랫폼 호환성 |
플랫폼별 멀티프로세싱 방식 지원 (spawn , fork ) |
플랫폼 독립적 |
장기 실행 안정성 |
높음 (워커가 재생성됨) |
중간 (스레드 재생성 없음) |
작업 처리 비용 |
높음 (프로세스 생성 및 데이터 전송) |
낮음 (스레드 생성 및 공유 메모리 접근) |
코드
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""ProcessPoolExecutor를 구현한 모듈입니다.
아래 다이어그램과 설명은 시스템의 데이터 흐름을 나타냅니다:
|======================= 프로세스 내부 ===================|== 프로세스 외부 ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | | | | Call Q | | Process |
| | +----------+ | | +-----------+ | Pool |
| | | ... | | | | ... | +---------+
| | | 6 | => | | => | 5, call() | => | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
Executor.submit() 호출 시:
- 고유 번호를 가진 `_WorkItem`을 생성하고 "Work Items" 딕셔너리에 추가
- 생성된 `_WorkItem`의 ID를 "Work Ids" 큐에 추가
로컬 워커 스레드(Local worker thread):
- "Work Ids" 큐에서 작업 ID를 읽고 "Work Items" 딕셔너리에서 해당 WorkItem을 조회:
- 작업 항목이 취소된 경우 딕셔너리에서 제거
- 그렇지 않으면 `_CallItem`으로 재구성하여 "Call Q"에 추가
- 새로운 `_CallItem`은 "Call Q"가 가득 차지 않을 때까지 추가됨.
- **참고**: "Call Q"의 크기는 작게 유지됨. "Call Q"에 들어간 호출은 `Future.cancel()`로 취소할 수 없기 때문.
- "Result Q"에서 `_ResultItems`를 읽어 "Work Items" 딕셔너리에 저장된 Future 객체를 업데이트한 후 해당 항목 삭제.
프로세스 #1..n:
- "Call Q"에서 `_CallItems`를 읽어 실행
- 실행 결과를 `_ResultItems`로 구성하여 "Result Q"에 추가
"""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import os
from concurrent.futures import _base
import queue
import multiprocessing as mp
import multiprocessing.connection
from multiprocessing.queues import Queue
import threading
import weakref
from functools import partial
import itertools
import sys
from traceback import format_exception
_threads_wakeups = weakref.WeakKeyDictionary()
_global_shutdown = False
# 목적:스레드를 대기 상태에서 깨우는 메커니즘을 제공하는 클래스. 파이프(Pipe)를 사용해 스레드 간 간단한 통신을 구현.
class _ThreadWakeup:
def __init__(self):
self._closed = False
# 단방향 파이프(Pipe) 생성 (스레드 간 통신에 사용)
self._reader, self._writer = mp.Pipe(duplex=False)
def close(self):
# 파이프를 닫아 리소스를 해제
if not self._closed:
self._closed = True
self._writer.close() # 쓰기용 파이프 닫기
self._reader.close() # 읽기용 파이프 닫기
def wakeup(self):
# 파이프를 통해 스레드에 신호를 보내 깨어나도록 유도
if not self._closed:
self._writer.send_bytes(b"") # 빈 바이트 데이터 전송
def clear(self):
# 파이프에 대기 중인 신호를 모두 제거
if not self._closed:
while self._reader.poll(): # 읽을 데이터가 있는 동안
self._reader.recv_bytes() # 데이터를 읽어서 제거
def _python_exit():
global _global_shutdown
_global_shutdown = True # 글로벌 상태를 종료로 설정
# 등록된 스레드와 관련된 wakeup 객체를 모두 가져옴
items = list(_threads_wakeups.items())
# 모든 wakeup 객체에 신호를 보내 대기 중인 스레드들을 깨움
for _, thread_wakeup in items:
thread_wakeup.wakeup()
# 각 스레드가 종료될 때까지 대기
for t, _ in items:
t.join()
# 모든 non-daemon 스레드가 종료되기 전에 `_python_exit()`를 호출하도록 등록.
# 이는 `atexit.register()` 대신 사용되며, 하위 인터프리터와의 호환성을 위해 설계됨.
# 하위 인터프리터는 더 이상 데몬 스레드를 지원하지 않음. 관련 내용은 bpo-39812 참조.
threading._register_atexit(_python_exit)
# Call queue에 프로세스 수보다 더 많은 호출을 대기시키는 수를 제어.
# - 작은 값: 프로세스가 작업을 기다리는 시간이 늘어남.
# - 큰 값: Call queue에 있는 작업은 `Future.cancel()`로 취소 불가, 따라서 취소 성공률 감소.
EXTRA_QUEUED_CALLS = 1
# Windows에서는 `WaitForMultipleObjects`를 사용하여 프로세스 종료를 대기.
# 최대 63개의 객체를 대기할 수 있음.
# 추가로 아래 두 객체의 오버헤드가 있음:
# - 결과 큐 리더 (result queue reader)
# - 스레드 웨이크업 리더 (thread wakeup reader)
_MAX_WINDOWS_WORKERS = 63 - 2
# 원격 트레이스백을 로컬 트레이스백에 문자열 형태로 포함시키는 해킹 클래스.
# _RemoteTraceback은 병렬 처리 또는 원격 호출에서 원격 프로세스의 에러를 추적하고, 디버깅을 용이하게 하기 위해
# 사용되는 유틸리티 클래스입니다. 원격 프로세스에서 발생한 예외를 부모 프로세스에서 확인해야 하는 경우 유용
class _RemoteTraceback(Exception):
def __init__(self, tb):
self.tb = tb # 원격 프로세스에서 생성된 트레이스백 저장
def __str__(self):
return self.tb # 트레이스백을 문자열로 반환
# 원격/비동기 작업 중 발생한 예외와 트레이스백을 포맷팅하고, 이후 재구성 가능하도록 관리.
# 동작 : 예외와 트레이스백을 문자열로 변환해 저장 및 복구를 위한 로직 제공.
class _ExceptionWithTraceback:
def __init__(self, exc, tb):
# 트레이스백 객체를 받아, traceback.format_exception으로 포맷팅된 문자열로 변환
tb = ''.join(format_exception(type(exc), exc, tb)) # 예외와 트레이스백 포맷팅
self.exc = exc # 예외 객체 저장
# 트레이스백 객체는 메모리에서 삭제, 참조를 방지하여 가비지 컬렉션 가능
# 순환 참조를 방지하고 메모리 누수를 막기 위해 예외 객체의 트레이스백 참조를 제거
self.exc.__traceback__ = None
# 포맷팅된 트레이스백을 문자열로 저장
self.tb = '\n"""\n%s"""' % tb # 문자열 형태로 변환
# 이 메서드는 객체를 저장할 때 self 전체를 직접 저장하지 않고, 대신 이 객체를 복원하는 데 필요한 정보와 로직만 저장하도록 만듬
# _rebuild_exc 함수와 예외 객체 및 트레이스백 문자열을 반환해, 객체를 역직렬화하거나 복구할 때 사용.
def __reduce__(self):
# _rebuild_exc: 직렬화된 데이터를 기반으로 예외 객체를 재구성하기 위한 함수입니다.
# (self.exc, self.tb): 예외 객체와 포맷팅된 트레이스백 문자열을 _rebuild_exc 함수에 전달
return _rebuild_exc, (self.exc, self.tb)
# 예외 객체(exc)와 포맷팅된 트레이스백(tb)을 받아 원래의 예외 객체를 복구.
# exc.__cause__에 _RemoteTraceback(tb)를 추가해 원격 트레이스백을 포함한 예외 객체로 반환.
def _rebuild_exc(exc, tb):
exc.__cause__ = _RemoteTraceback(tb) # 원격 트레이스백 추가
return exc # 예외 객체 반환
# 비동기 작업의 실행을 위해 작업과 관련된 정보를 저장(실행할 함수, 인자, 결과 저장을 위한 Future 객체 등)
# 동작 : 작업을 정의하고 Future를 통해 작업 결과 추적.
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future # 작업 결과를 저장할 Future 객체
self.fn = fn # 실행할 함수
self.args = args # 함수의 위치 인자
self.kwargs = kwargs # 함수의 키워드 인자
# 작업 실행 결과, 발생한 예외, 종료 프로세스 ID 등 작업의 결과 정보를 저장.
# 동작 : 작업 ID 기반으로 결과와 예외를 추적 및 관리.
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id # 작업 ID
self.exception = exception # 작업 실행 중 발생한 예외
self.result = result # 작업 실행 결과
self.exit_pid = exit_pid # 작업을 종료한 프로세스 ID (필요한 경우)
# 작업에서 호출할 함수와 해당 인자 정보를 저장.
# 동작 : 함수 호출의 세부 정보 저장 및 실행을 위한 기본 구조 제공.
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id # 작업 ID
self.fn = fn # 실행할 함수
self.args = args # 함수의 위치 인자
self.kwargs = kwargs # 함수의 키워드 인자
# 큐에서 작업 항목을 처리 중 발생하는 예외를 안전하게 관리하고,
# 작업 항목과 연결된 Future 객체에 예외 정보를 설정.
# 스레드 간 동기화와 작업 종료 시 안전한 관리를 지원.
class _SafeQueue(Queue):
# 큐 초기화와 함께, 작업 항목(pending_work_items), 종료 동기화 잠금(shutdown_lock),
# 스레드 웨이크업 메커니즘(thread_wakeup)을 설정
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
thread_wakeup):
self.pending_work_items = pending_work_items # 처리 중인 작업 항목들을 관리하는 딕셔너리.
self.shutdown_lock = shutdown_lock # 종료 시 동기화를 보장하기 위한 잠금 객체.
self.thread_wakeup = thread_wakeup # 대기 중인 스레드를 깨우는 메커니즘.
super().__init__(max_size, ctx=ctx) # 부모 클래스 초기화
# 큐 피더(feed) 중 예외가 발생했을 때 이를 처리.
def _on_queue_feeder_error(self, e, obj):
# 큐 작업 중 예외가 발생했을 때 호출. 예외와 연관된 작업 항목(_CallItem)의 Future 객체에
# 예외 정보를 설정하고, 작업 항목을 제거. 스레드 웨이크업 메커니즘을 호출해 대기 중인 스레드가 예외 상황에 반응하도록 처리.
# obj가 _CallItem 객체인지 확인해, 큐에 추가된 작업 항목에서 발생한 예외인지 판단
# _CallItem은 실행할 함수(fn)와 관련 인자(args, kwargs), 그리고 작업 ID(work_id)를 포함한 작업 정의 객체
if isinstance(obj, _CallItem): # 호출 항목일 경우
# format_exception을 사용해 예외(e)와 그 트레이스백 정보를 문자열로 변환.
# 이 문자열은 사람이 읽기 쉽고, 디버깅에 유용한 형태로 저장
tb = format_exception(type(e), e, e.__traceback__) # 예외 정보 포맷팅
# _RemoteTraceback 객체를 생성해 예외 객체의 __cause__로 설정.
# 이 작업은 원격 작업이나 비동기 작업에서 발생한 예외를 추적 가능하게 만듦.
# 트레이스백 문자열(tb)을 포맷팅해 포함
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
# pending_work_items 딕셔너리에서 작업 ID(obj.work_id)에 해당하는 작업 항목을 제거.
# 작업 ID는 _CallItem 생성 시 지정된 고유 값으로, 작업 항목을 관리하는 데 사용.
# 작업 항목을 제거함으로써, 실패한 작업이 다시 실행되지 않도록 보장
work_item = self.pending_work_items.pop(obj.work_id, None)
# shutdown_lock을 사용해 멀티스레드 환경에서 동기화 보장.
# 작업 항목을 수정하거나 스레드를 깨울 때, 다른 스레드와 충돌하지 않도록 보호.
with self.shutdown_lock:
# 스레드를 깨워 대기 중인 스레드가 예외 상황에 즉각 반응할 수 있도록 처리
self.thread_wakeup.wakeup()
# 제거된 작업 항목(work_item)이 존재하는지 확인
if work_item is not None:
# 작업 항목이 존재하면, Future 객체에 예외를 설정
work_item.future.set_exception(e)
else:
super()._on_queue_feeder_error(e, obj) # 부모 클래스의 기본 처리 호출
# 반복 가능한 객체를 지정된 크기(chunksize)로 분할해 청크 단위로 작업을 수행할 수 있도록 제공.
def _get_chunks(*iterables, chunksize):
"""zip()으로 묶인 반복 가능한 객체를 청크로 나눔."""
it = zip(*iterables) # 여러 반복 가능한 객체를 묶음
while True:
# itertools.islice를 사용해 반복 가능한 객체를 슬라이싱. 입력 데이터를 chunksize 단위로 나눈 튜플을 생성하고 반환
chunk = tuple(itertools.islice(it, chunksize))
if not chunk: # 더 이상 데이터가 없으면 종료
return
yield chunk # 생성된 청크 반환
# 특정 함수(fn)를 입력 데이터 청크의 각 항목에 대해 실행. 입력 데이터를 청크 단위로 병렬 처리하기 위한 기초 제공
def _process_chunk(fn, chunk):
"""작업 청크를 처리하는 함수.
주어진 함수(fn)를 청크의 각 항목에 대해 실행합니다.
Args:
fn: 청크 내 각 항목에 대해 실행할 함수.
chunk: 처리할 데이터 청크.
Returns:
처리된 청크 결과 리스트.
"""
return [fn(*args) for args in chunk] # 청크의 각 항목에 대해 함수를 실행하고 결과를 반환
# 작업의 결과(result) 또는 예외(exception)를 안전하게 결과 큐(result_queue)로 반환.
# 작업의 **ID(work_id)**와 함께 결과를 관리.
def _sendback_result(result_queue, work_id, result=None, exception=None,
exit_pid=None):
"""결과 또는 예외를 안전하게 결과 큐로 반환합니다.
Args:
result_queue: 결과를 저장할 큐.
work_id: 작업 ID.
result: 작업 실행 결과(기본값 None).
exception: 작업 실행 중 발생한 예외(기본값 None).
exit_pid: 작업을 종료한 프로세스 ID(기본값 None).
"""
try:
# 작업 결과를 `_ResultItem` 형태로 결과 큐에 추가
result_queue.put(_ResultItem(work_id, result=result,
exception=exception, exit_pid=exit_pid))
except BaseException as e:
# 예외 발생 시 `_ExceptionWithTraceback`을 통해 예외 정보를 처리
exc = _ExceptionWithTraceback(e, e.__traceback__)
result_queue.put(_ResultItem(work_id, exception=exc,
exit_pid=exit_pid))
# 작업 큐(call_queue)에서 작업을 가져와 처리하고 결과를 결과 큐(result_queue)로 반환.
# 프로세스별 워커로 실행되며, 초기화 함수와 최대 작업 수 등을 관리.
def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
"""
작업 큐에서 작업을 읽고 결과를 결과 큐에 반환하는 워커 함수.
이 함수는 별도의 프로세스에서 실행되며, 작업 큐(call_queue)에서 작업 항목을 가져와 실행하고
결과를 결과 큐(result_queue)에 반환합니다. 작업 실행 중 예외가 발생한 경우, 해당 예외를 결과 큐에 반환합니다.
Args:
call_queue: 작업이 저장된 큐(`_CallItem` 객체).
result_queue: 작업 결과가 저장될 큐(`_ResultItem` 객체).
initializer: 워커 초기화 시 호출할 함수(없으면 None).
initargs: 초기화 함수의 인자(튜플 형태).
max_tasks: 프로세스가 처리할 최대 작업 수. None인 경우 무제한 실행.
"""
# 초기화 함수가 있을 경우 실행
if initializer is not None:
try:
# initializer에 전달받은 인자를 사용하여 초기화 작업 실행
initializer(*initargs)
except BaseException:
# 초기화 중 예외 발생 시 로깅 및 프로세스 종료
_base.LOGGER.critical('Exception in initializer:', exc_info=True)
return # 부모 프로세스에 풀(pool)의 손상을 알림
num_tasks = 0 # 현재 워커가 처리한 작업 수
exit_pid = None # 작업 종료 시 현재 프로세스 ID 저장
while True:
# 작업 큐에서 작업 항목(_CallItem 객체)을 가져옴 (blocking)
call_item = call_queue.get(block=True)
# 작업 항목이 None일 경우 워커 종료 신호로 간주
if call_item is None:
result_queue.put(os.getpid()) # 현재 프로세스 ID를 결과 큐에 추가
return # 프로세스 종료
# 최대 작업 수 제한이 있는 경우 처리된 작업 수를 증가
if max_tasks is not None:
num_tasks += 1
if num_tasks >= max_tasks:
exit_pid = os.getpid() # 최대 작업 수 초과 시 현재 프로세스 ID 저장
try:
# 작업 실행: call_item에 저장된 함수(fn)와 인자(args, kwargs)를 실행
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
# 작업 실행 중 예외 발생 시 예외를 결과 큐에 반환
exc = _ExceptionWithTraceback(e, e.__traceback__) # 예외와 트레이스백 포맷팅
_sendback_result(
result_queue,
call_item.work_id,
exception=exc,
exit_pid=exit_pid
)
else:
# 작업 성공 시 결과를 결과 큐에 반환
_sendback_result(
result_queue,
call_item.work_id,
result=r,
exit_pid=exit_pid
)
del r # 작업 결과(r)를 삭제하여 메모리 해제
# 작업 항목(call_item)을 삭제하여 메모리 해제
del call_item
# 최대 작업 수 초과 시 프로세스 종료
if exit_pid is not None:
return # 프로세스 종료
class _ExecutorManagerThread(threading.Thread):
"""이 프로세스와 워커 프로세스 간의 통신을 관리하는 관리자 스레드.
이 스레드는 로컬 스레드에서 실행됩니다.
Args:
executor: 이 스레드의 소유자인 ProcessPoolExecutor에 대한 참조.
이 참조는 약한 참조(weakref)를 통해 관리됩니다.
"""
def __init__(self, executor):
# Executor의 내부 객체 참조를 저장
# 스레드 웨이크업 메커니즘
self.thread_wakeup = executor._executor_manager_thread_wakeup
self.shutdown_lock = executor._shutdown_lock
# 약한 참조를 사용하여 ProcessPoolExecutor에 대한 참조 저장
def weakref_cb(_,
thread_wakeup=self.thread_wakeup,
shutdown_lock=self.shutdown_lock):
mp.util.debug('Executor collected: triggering callback for '
'QueueManager wakeup')
with shutdown_lock:
thread_wakeup.wakeup() # 종료 시 스레드를 깨움
# executor를 약한 참조로 만들고, 객체가 삭제될 때 weakref_cb 함수를 호출
self.executor_reference = weakref.ref(executor, weakref_cb)
# 워커로 사용할 ctx.Process 인스턴스 목록
self.processes = executor._processes
# 작업 요청이 저장될 큐
self.call_queue = executor._call_queue
# 작업 결과가 저장될 큐
self.result_queue = executor._result_queue
# 작업 ID가 저장된 큐
self.work_ids_queue = executor._work_ids
# 워커 프로세스가 안전하게 종료되기 전 처리할 수 있는 최대 작업 수
self.max_tasks_per_child = executor._max_tasks_per_child
# 작업 ID를 _WorkItem 객체에 매핑하는 딕셔너리
self.pending_work_items = executor._pending_work_items
super().__init__() # 부모 클래스 초기화
def run(self):
"""Executor 관리 스레드의 메인 루프."""
while True:
self.add_call_item_to_queue() # 작업 항목을 큐에 추가
# 결과 큐에서 결과를 기다리거나, 워커 프로세스 종료 또는 웨이크업 신호를 대기
result_item, is_broken, cause = self.wait_result_broken_or_wakeup()
if is_broken:
# 큐가 손상된 경우 종료 처리
self.terminate_broken(cause)
return
if result_item is not None:
# 결과 항목 처리
self.process_result_item(result_item)
process_exited = result_item.exit_pid is not None
if process_exited:
# 종료된 워커 프로세스를 정리
p = self.processes.pop(result_item.exit_pid)
p.join()
# 결과 항목의 참조를 제거하여 메모리 관리
del result_item
# Executor 참조를 통해 프로세스를 조정
if executor := self.executor_reference():
if process_exited:
with self.shutdown_lock:
executor._adjust_process_count() # 프로세스 수 조정
else:
executor._idle_worker_semaphore.release() # 워커 상태 갱신
del executor
if self.is_shutting_down():
# 종료 상태로 표시
self.flag_executor_shutting_down()
# 남아 있는 작업 항목이 없을 때까지 반복적으로 작업 큐를 비움
self.add_call_item_to_queue()
# 더 이상 대기 작업이 없으면 내부 정리 후 스레드 종료
if not self.pending_work_items:
self.join_executor_internals()
return
def add_call_item_to_queue(self):
"""작업 큐에 _WorkItems를 추가."""
while True:
if self.call_queue.full(): # 작업 큐가 가득 찬 경우 종료
return
try:
# 작업 ID를 작업 ID 큐에서 가져옴
work_id = self.work_ids_queue.get(block=False)
except queue.Empty:
# 작업 ID 큐가 비어 있으면 종료
return
else:
# 작업 ID에 해당하는 _WorkItem 가져오기
work_item = self.pending_work_items[work_id]
# 작업이 실행 가능한 상태인지 확인
if work_item.future.set_running_or_notify_cancel():
# 실행 가능한 경우 작업 큐에 _CallItem 추가
self.call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
else:
# 작업이 취소된 경우 딕셔너리에서 제거
del self.pending_work_items[work_id]
continue
def wait_result_broken_or_wakeup(self):
"""
결과 대기, 워커 상태 확인 또는 웨이크업 신호 대기.
이 메서드는 작업의 결과를 대기하거나 워커 프로세스가 정상인지 확인하며,
웨이크업 신호가 발생하면 즉시 처리합니다.
Returns:
result_item: 결과 큐에서 읽은 결과 항목(없으면 None).
is_broken: 시스템이 손상되었는지 여부(True: 손상됨, False: 정상).
cause: 예외가 발생한 경우 예외의 트레이스백 문자열(없으면 None).
"""
# 결과 큐의 리더(reader) 파이프 핸들
result_reader = self.result_queue._reader
# 웨이크업 메커니즘이 닫혀 있지 않은지 확인
assert not self.thread_wakeup._closed
# 웨이크업 신호의 리더(reader) 파이프 핸들
wakeup_reader = self.thread_wakeup._reader
# `mp.connection.wait`로 대기할 리더 목록 설정
# - 결과 큐 리더(result_reader)
# - 웨이크업 리더(wakeup_reader)
readers = [result_reader, wakeup_reader]
# 워커 프로세스의 sentinel(프로세스 종료 감지 핸들) 추가
# - 각 프로세스의 sentinel을 워커 상태 확인에 사용
worker_sentinels = [p.sentinel for p in list(self.processes.values())]
# 결과 큐, 웨이크업 신호, 워커 프로세스 상태를 감시
# - `readers`와 `worker_sentinels`에 대해 블로킹(wait) 호출
ready = mp.connection.wait(readers + worker_sentinels)
# 초기 상태 변수 설정
cause = None # 예외 원인 (발생하지 않으면 None)
is_broken = True # 시스템이 손상되었는지 여부(True: 손상됨, False: 정상)
result_item = None # 결과 항목 (결과 큐에서 읽은 데이터)
# 1. 결과 큐에서 결과 항목을 읽을 준비가 된 경우
if result_reader in ready:
try:
# 결과 큐에서 결과 항목 읽기
result_item = result_reader.recv()
# 결과를 정상적으로 읽었으므로 시스템 상태를 "정상"으로 설정
is_broken = False
except BaseException as e:
# 결과를 읽는 동안 예외가 발생한 경우
# 예외의 타입, 메시지, 트레이스백을 문자열로 포맷
cause = format_exception(type(e), e, e.__traceback__)
# 2. 웨이크업 신호가 발생한 경우
elif wakeup_reader in ready:
# 웨이크업 신호가 정상적으로 전달되었으므로 시스템 상태를 "정상"으로 설정
is_broken = False
# 종료 동작 또는 상태 초기화를 위한 잠금 처리
with self.shutdown_lock:
# 웨이크업 신호를 초기화
self.thread_wakeup.clear()
# 최종 결과 반환:
# - `result_item`: 결과 큐에서 읽은 항목 (없으면 None)
# - `is_broken`: 시스템 손상 상태 (True: 손상됨, False: 정상)
# - `cause`: 예외 발생 시 원인 (없으면 None)
return result_item, is_broken, cause
def process_result_item(self, result_item):
"""결과 항목을 처리.
주어진 결과 항목(`result_item`)을 처리합니다. 워커의 종료 여부와 결과 큐에서 가져온 작업 결과를 관리합니다.
결과가 워커의 종료 신호인지, 작업의 결과인지를 구분하여 적절히 처리합니다.
Args:
result_item: 처리할 결과 항목. 이는 워커의 PID(int) 또는 `_ResultItem` 객체일 수 있습니다.
"""
if isinstance(result_item, int):
# 결과 항목이 정수(PID)인 경우: 워커 프로세스가 종료를 알림
# 워커가 PID를 반환한 경우, 이를 정상 종료로 간주
assert self.is_shutting_down() # 현재 시스템이 종료 중인지 확인
p = self.processes.pop(result_item) # 종료된 프로세스를 `processes`에서 제거
p.join() # 프로세스 종료 대기(join)
if not self.processes:
# 모든 워커가 종료되었으면 내부 정리 수행
self.join_executor_internals() # 실행자 내부 리소스 정리
return # 종료
else:
# 결과 항목이 `_ResultItem` 객체인 경우
# _ResultItem을 처리하여 Future 객체 상태를 갱신
work_item = self.pending_work_items.pop(result_item.work_id, None) # 작업 ID로 작업 항목 검색
if work_item is not None:
if result_item.exception:
# 작업 실행 중 예외가 발생한 경우
# Future 객체에 예외를 설정하여 호출자가 이를 확인할 수 있도록 함
work_item.future.set_exception(result_item.exception)
else:
# 작업이 성공적으로 완료된 경우
# Future 객체에 작업 결과를 설정하여 호출자가 결과를 받을 수 있도록 함
work_item.future.set_result(result_item.result)
def is_shutting_down(self):
# Executor가 종료 상태인지 확인하는 메서드
executor = self.executor_reference()
# 아래 조건 중 하나라도 충족되면 종료 상태로 간주:
# 1. `_global_shutdown` 플래그가 활성화된 경우 (인터프리터 전체가 종료 중)
# 2. Executor 객체가 가비지 수집(GC)으로 제거된 경우
# 3. Executor가 명시적으로 종료 상태로 설정된 경우 (`_shutdown_thread`가 True)
return (_global_shutdown or executor is None
or executor._shutdown_thread)
def terminate_broken(self, cause):
# Executor가 손상된 상태에서 종료 처리를 수행하는 메서드
# - 손상된 이유를 나타내는 `cause`를 사용해 디버깅 정보 제공 가능
# 1. Executor를 손상된 상태로 표시하여 추가 작업 제출을 차단
executor = self.executor_reference()
if executor is not None:
executor._broken = ('하위 프로세스가 비정상적으로 종료되었습니다. '
'프로세스 풀을 더 이상 사용할 수 없습니다.')
executor._shutdown_thread = True # Executor 종료 상태로 설정
executor = None
# 2. 대기 중인 모든 작업에 실패 예외를 설정
bpe = BrokenProcessPool("프로세스 풀의 작업이 실행 중 또는 대기 중에 "
"비정상적으로 종료되었습니다.")
if cause is not None:
# 원격에서 발생한 트레이스백 정보를 포함시킴
bpe.__cause__ = _RemoteTraceback(
f"\n'''\n{''.join(cause)}'''")
# 각 대기 작업에 BrokenProcessPool 예외를 설정
for work_id, work_item in self.pending_work_items.items():
work_item.future.set_exception(bpe) # Future 객체에 실패 상태 설정
del work_item # 메모리에서 제거
self.pending_work_items.clear() # 모든 작업 항목 제거
# 3. 모든 워커 프로세스를 강제로 종료
for p in self.processes.values():
p.terminate() # 프로세스 강제 종료
# 4. 더 이상 읽을 수 없는 파이프에 데이터 쓰기를 방지
self.call_queue._reader.close()
# 5. 내부 리소스 정리
self.join_executor_internals()
def flag_executor_shutting_down(self):
"""
Executor를 종료 상태로 표시하고 대기 중인 작업을 취소.
목적:
- Executor의 `_shutdown_thread` 플래그를 True로 설정하여 종료 상태를 표시.
- 대기 중인 작업 중 취소되지 않은 작업만 `pending_work_items`에 남김.
- 작업 ID 큐를 비우고, `_cancel_pending_futures` 플래그를 재설정하여 재실행 방지.
동작:
- Executor 종료 상태를 플래그로 표시.
- 대기 중인 Future 객체의 취소를 시도하고, 취소되지 않은 작업만 남김.
- 작업 ID 큐를 초기화하여 종료 준비 완료.
"""
executor = self.executor_reference() # 약한 참조로 Executor 객체 가져오기
if executor is not None:
executor._shutdown_thread = True # Executor가 종료 중임을 표시
if executor._cancel_pending_futures: # 대기 작업을 취소해야 하는 경우
new_pending_work_items = {}
for work_id, work_item in self.pending_work_items.items():
if not work_item.future.cancel(): # 작업 취소 시도
new_pending_work_items[work_id] = work_item
self.pending_work_items = new_pending_work_items # 취소되지 않은 작업만 유지
# 작업 ID 큐 비우기
while True:
try:
self.work_ids_queue.get_nowait() # 작업 ID 제거
except queue.Empty: # 큐가 비었으면 종료
break
executor._cancel_pending_futures = False # 재실행 방지
def shutdown_workers(self):
"""
현재 실행 중인 워커 프로세스를 종료.
목적:
- 모든 워커 프로세스가 종료되도록 sentinel(종료 신호)을 작업 큐에 추가.
- 큐가 가득 찼을 경우 대기하며 종료 신호를 계속 보냄.
동작:
- 현재 실행 중인 워커 수를 확인하고, 필요한 종료 신호를 큐에 추가.
- 워커가 종료될 때까지 sentinel 전송을 반복.
"""
n_children_to_stop = self.get_n_children_alive() # 현재 살아 있는 워커 수
n_sentinels_sent = 0 # 보낸 종료 신호(sentinel) 수
# 워커 프로세스가 모두 종료될 때까지 종료 신호를 큐에 추가
while n_sentinels_sent < n_children_to_stop and self.get_n_children_alive() > 0:
for _ in range(n_children_to_stop - n_sentinels_sent):
try:
self.call_queue.put_nowait(None) # 종료 신호(sentinel) 전송
n_sentinels_sent += 1
except queue.Full: # 큐가 가득 찬 경우 대기
break
def join_executor_internals(self):
"""
Executor 내부 리소스를 정리.
목적:
- 워커 프로세스 종료를 처리하고, 모든 내부 큐와 객체를 닫음.
- 워커 프로세스가 종료될 때까지 기다려 리소스 누수 방지.
동작:
- `shutdown_workers`를 호출하여 모든 워커를 종료.
- 작업 큐와 연결된 스레드 및 웨이크업 객체를 닫음.
- 모든 워커 프로세스가 종료될 때까지 `join` 호출로 기다림.
"""
self.shutdown_workers() # 워커 프로세스 종료
self.call_queue.close() # 작업 큐 닫기
self.call_queue.join_thread() # 작업 큐와 연결된 스레드 종료
with self.shutdown_lock: # 동기화 잠금
self.thread_wakeup.close() # 웨이크업 객체 종료
# 모든 프로세스가 종료되었는지 확인하고 종료 대기
for p in self.processes.values():
p.join()
def get_n_children_alive(self):
"""
현재 살아 있는 워커 프로세스 수를 반환.
목적:
- 워커 프로세스가 얼마나 남아 있는지 확인.
동작:
- `self.processes` 딕셔너리에서 살아 있는 프로세스(`is_alive()`)를 카운트.
"""
return sum(p.is_alive() for p in self.processes.values())
def _check_system_limits():
"""
플랫폼에서 사용 가능한 시스템 리소스 제한 확인.
목적:
- 시스템에서 동기화 객체나 세마포어 사용 가능 여부 확인.
- 제한이 있을 경우 예외를 발생시켜 실행을 방지.
동작:
- `multiprocessing.synchronize` 모듈 확인으로 동기화 객체 지원 여부 확인.
- `os.sysconf`를 사용해 세마포어 제한 확인. 최소 256개 이상 필요.
"""
global _system_limits_checked, _system_limited
if _system_limits_checked: # 이미 확인되었으면 재확인 방지
if _system_limited:
raise NotImplementedError(_system_limited)
_system_limits_checked = True
try:
import multiprocessing.synchronize # 동기화 객체 확인
except ImportError:
_system_limited = (
"이 Python 빌드는 multiprocessing.synchronize를 지원하지 않습니다. "
"이는 플랫폼에서 named semaphore를 사용할 수 없기 때문입니다."
)
raise NotImplementedError(_system_limited)
try:
nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") # 세마포어 제한 확인
except (AttributeError, ValueError):
return # 제한 없음 또는 sysconf 미지원
if nsems_max == -1:
return # 제한 없음으로 간주
if nsems_max >= 256:
return # POSIX 표준에 따라 최소 256개 세마포어 필요
_system_limited = (f"시스템에서 사용할 수 있는 세마포어가 부족합니다 "
f"({nsems_max}개 사용 가능, 최소 256개 필요).")
raise NotImplementedError(_system_limited)
def _chain_from_iterable_of_lists(iterable):
"""
반복 가능한 리스트 객체를 순회하며, 각 리스트의 요소를 역순으로 순차적으로 반환.
목적:
- 여러 리스트를 하나의 반복 가능한 객체로 연결하여 요소를 순차적으로 반환.
- 리스트를 역순으로 처리하여 요소를 효율적으로 제거하며 반환.
Args:
iterable: 반복 가능한 리스트 객체.
Yields:
리스트의 각 요소를 순차적으로 반환.
"""
for element in iterable:
element.reverse() # 리스트를 역순으로 변환
while element:
yield element.pop() # 리스트의 마지막 요소를 반환하고 제거
class BrokenProcessPool(_base.BrokenExecutor):
"""
ProcessPoolExecutor에서 일부 워커 프로세스가 실패하거나 비정상적으로 종료되었을 때 발생하는 예외 클래스.
목적:
- 프로세스 풀에서 워커가 실행 도중 예기치 않게 종료되었음을 알림.
- Executor가 더 이상 작업을 안정적으로 처리할 수 없는 상태를 나타냄.
"""
pass
class ProcessPoolExecutor(_base.Executor):
"""
ProcessPoolExecutor는 멀티프로세싱 기반으로 병렬 작업을 관리하는 클래스입니다.
목적:
- 여러 워커 프로세스를 사용하여 작업을 병렬로 처리.
- 각 워커 프로세스는 독립적인 작업 실행 환경을 가지며, 작업 결과를 수집하거나 예외를 처리.
- 시스템 리소스를 효율적으로 활용하면서 안정성과 확장성을 제공.
Args:
max_workers (int): 최대 워커 프로세스 수. 기본값은 CPU 코어 수.
mp_context: 멀티프로세싱 컨텍스트 객체 (예: spawn, fork).
initializer (callable): 워커 프로세스를 초기화할 함수.
initargs (tuple): 초기화 함수에 전달할 인수.
max_tasks_per_child (int): 워커 프로세스가 처리할 최대 작업 수. 초과 시 프로세스가 종료되고 새로운 프로세스로 교체.
"""
def __init__(self, max_workers=None, mp_context=None,
initializer=None, initargs=(), *, max_tasks_per_child=None):
_check_system_limits() # 시스템 자원 제한 확인
# 최대 워커 수 설정
if max_workers is None:
self._max_workers = os.cpu_count() or 1 # CPU 코어 수 계산
if sys.platform == 'win32': # Windows의 경우 최대 워커 수 제한
self._max_workers = min(_MAX_WINDOWS_WORKERS, self._max_workers)
else:
if max_workers <= 0:
raise ValueError("max_workers는 0보다 커야 합니다.")
if sys.platform == 'win32' and max_workers > _MAX_WINDOWS_WORKERS:
raise ValueError(
f"max_workers는 {_MAX_WINDOWS_WORKERS} 이하여야 합니다."
)
self._max_workers = max_workers
# 멀티프로세싱 컨텍스트 설정
if mp_context is None:
if max_tasks_per_child is not None:
mp_context = mp.get_context("spawn") # 기본적으로 spawn 방식 사용
else:
mp_context = mp.get_context() # 플랫폼 기본 컨텍스트
self._mp_context = mp_context
# 동적 워커 생성이 안전한지 여부 확인 (fork가 아닌 경우)
self._safe_to_dynamically_spawn_children = (
self._mp_context.get_start_method(allow_none=False) != "fork"
)
# 초기화 함수 유효성 검사
if initializer is not None and not callable(initializer):
raise TypeError("initializer는 callable 객체여야 합니다.")
self._initializer = initializer
self._initargs = initargs
# max_tasks_per_child 설정 및 유효성 검사
if max_tasks_per_child is not None:
if not isinstance(max_tasks_per_child, int):
raise TypeError("max_tasks_per_child는 정수여야 합니다.")
if max_tasks_per_child <= 0:
raise ValueError("max_tasks_per_child는 1 이상이어야 합니다.")
if self._mp_context.get_start_method(allow_none=False) == "fork":
raise ValueError("max_tasks_per_child는 'fork' 방식과 호환되지 않습니다.")
self._max_tasks_per_child = max_tasks_per_child
# 내부 속성 초기화
self._executor_manager_thread = None # 워커 프로세스를 관리하는 스레드
self._processes = {} # PID와 프로세스를 매핑하는 딕셔너리
self._shutdown_thread = False # Executor 종료 여부 플래그
self._shutdown_lock = threading.Lock() # 종료 상태 보호 락
self._idle_worker_semaphore = threading.Semaphore(0) # 유휴 워커 제어
self._broken = False # Executor 손상 여부
self._queue_count = 0 # 작업 큐 카운터
self._pending_work_items = {} # 대기 중인 작업 항목 딕셔너리
self._cancel_pending_futures = False # Future 객체 취소 여부
# Wakeup 채널 초기화
self._executor_manager_thread_wakeup = _ThreadWakeup()
# 통신 채널 초기화
queue_size = self._max_workers + EXTRA_QUEUED_CALLS # 작업 큐 크기 설정
self._call_queue = _SafeQueue(
max_size=queue_size,
ctx=self._mp_context,
pending_work_items=self._pending_work_items,
shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup
)
self._call_queue._ignore_epipe = True # Broken pipe 오류 무시
self._result_queue = mp_context.SimpleQueue() # 작업 결과 큐 생성
self._work_ids = queue.Queue() # 작업 ID를 저장하는 큐
def _start_executor_manager_thread(self):
"""
Executor 관리 스레드를 생성하고 시작.
- 관리 스레드가 존재하지 않을 경우 새로 생성합니다.
- 'fork' 방식에서는 워커 프로세스를 미리 시작하여 데드락 방지.
- 관리 스레드를 시작한 후, wakeup 채널을 등록하여 관리.
"""
if self._executor_manager_thread is None:
if not self._safe_to_dynamically_spawn_children:
self._launch_processes() # 모든 워커 프로세스 시작
self._executor_manager_thread = _ExecutorManagerThread(self) # 관리 스레드 생성
self._executor_manager_thread.start() # 관리 스레드 시작
# 관리 스레드와 wakeup 채널을 연결
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup
def _adjust_process_count(self):
"""
현재 워커 수를 확인하고 필요한 경우 새 프로세스를 생성.
- 유휴 워커가 있는 경우 추가 프로세스 생성이 불필요.
- 현재 워커 수가 최대 워커 수보다 적을 경우 새로운 프로세스를 생성.
"""
if self._idle_worker_semaphore.acquire(blocking=False):
return # 유휴 워커가 있는 경우 종료
process_count = len(self._processes) # 현재 실행 중인 프로세스 수 확인
if process_count < self._max_workers: # 최대 워커 수를 초과하지 않는 경우
self._spawn_process() # 새 프로세스 생성
def _launch_processes(self):
"""
관리 스레드 시작 전에 모든 워커 프로세스를 시작.
- 관리 스레드 실행 중 fork() 호출 시 데드락 방지를 위해 실행 전에 워커 생성.
- 현재 프로세스 수를 확인하여 최대 워커 수까지 프로세스를 추가 생성.
"""
assert not self._executor_manager_thread, (
'관리 스레드가 시작된 후에는 fork()를 사용할 수 없습니다. '
'이로 인해 자식 프로세스에서 데드락이 발생할 수 있습니다.'
)
for _ in range(len(self._processes), self._max_workers):
self._spawn_process() # 프로세스 생성
def _spawn_process(self):
"""
새로운 워커 프로세스를 생성하고 시작.
- `_process_worker` 함수를 실행할 프로세스를 생성.
- 작업 요청 큐와 결과 큐를 프로세스와 연결.
- 생성된 프로세스를 실행하고, 프로세스 PID를 프로세스 목록에 저장.
"""
p = self._mp_context.Process(
target=_process_worker, # 작업 처리 함수
args=(self._call_queue, # 작업 요청 큐
self._result_queue, # 작업 결과 큐
self._initializer, # 초기화 함수
self._initargs, # 초기화 함수 인수
self._max_tasks_per_child)) # 프로세스당 최대 작업 수
p.start() # 프로세스 시작
self._processes[p.pid] = p # PID와 프로세스를 프로세스 목록에 추가
def submit(self, fn, /, *args, **kwargs):
"""
새 작업을 제출하고 Future 객체를 반환.
- 작업을 비동기로 처리하기 위한 Future 객체를 생성하고 작업 항목으로 저장.
- 관리 스레드를 시작하고 필요한 경우 새로운 프로세스를 생성.
- 작업이 종료된 상태에서는 제출할 수 없으며, Future 객체를 통해 결과를 반환.
Args:
fn: 실행할 함수.
*args, **kwargs: 함수에 전달할 위치 및 키워드 인수.
Returns:
작업 결과를 비동기로 관리하는 Future 객체.
"""
with self._shutdown_lock: # 종료 보호 락 사용
if self._broken:
raise BrokenProcessPool(self._broken) # Executor 손상 상태 예외 발생
if self._shutdown_thread:
raise RuntimeError('종료된 후에는 새로운 작업을 제출할 수 없습니다.')
if _global_shutdown:
raise RuntimeError('인터프리터 종료 후 작업을 제출할 수 없습니다.')
f = _base.Future() # Future 객체 생성
w = _WorkItem(f, fn, args, kwargs) # 작업 항목 생성
self._pending_work_items[self._queue_count] = w # 작업 항목 저장
self._work_ids.put(self._queue_count) # 작업 ID 큐에 추가
self._queue_count += 1 # 작업 ID 카운터 증가
self._executor_manager_thread_wakeup.wakeup() # 관리 스레드에 신호 보내기
if self._safe_to_dynamically_spawn_children:
self._adjust_process_count() # 필요 시 프로세스 생성
self._start_executor_manager_thread() # 관리 스레드 시작
return f # Future 객체 반환
def map(self, fn, *iterables, timeout=None, chunksize=1):
"""
제공된 함수를 여러 입력 값에 병렬로 적용하여 결과를 반환.
- 입력 값을 `chunksize` 크기로 나눠 작업을 병렬로 처리.
- 결과는 순서를 유지하며 `map(func, *iterables)`와 동일한 인터페이스 제공.
Args:
fn: 호출 가능한 함수.
*iterables: 병렬로 처리할 입력 값.
timeout: 작업 제한 시간(초). 기본값은 None.
chunksize: 처리 단위 크기. 기본값은 1.
Returns:
작업 결과를 반환하는 이터레이터.
"""
if chunksize < 1: # chunksize는 1 이상이어야 함
raise ValueError("chunksize must be >= 1.")
results = super().map(
partial(_process_chunk, fn), # 청크 단위로 함수 적용
_get_chunks(*iterables, chunksize=chunksize), # 입력 값을 청크로 분리
timeout=timeout # 제한 시간
)
return _chain_from_iterable_of_lists(results) # 결과를 단일 이터레이터로 병합
def shutdown(self, wait=True, *, cancel_futures=False):
"""
Executor를 종료하고 리소스를 정리.
- 모든 워커를 종료하고, 실행 중이거나 대기 중인 작업을 정리.
- 종료 상태 플래그를 설정하고 관리 스레드 종료 대기.
Args:
wait: 모든 작업이 완료될 때까지 기다릴지 여부 (기본값: True).
cancel_futures: 실행 대기 중인 작업을 취소할지 여부 (기본값: False).
"""
with self._shutdown_lock: # 종료 보호 락 사용
self._cancel_pending_futures = cancel_futures # 작업 취소 플래그 설정
self._shutdown_thread = True # 종료 상태 플래그 설정
if self._executor_manager_thread_wakeup is not None:
self._executor_manager_thread_wakeup.wakeup() # 관리 스레드에 종료 신호
if self._executor_manager_thread is not None and wait:
self._executor_manager_thread.join() # 관리 스레드 종료 대기
# 리소스 정리
self._executor_manager_thread = None
self._call_queue = None # 작업 요청 큐 제거
if self._result_queue is not None and wait:
self._result_queue.close() # 결과 큐 닫기
self._result_queue = None
self._processes = None # 프로세스 목록 제거
self._executor_manager_thread_wakeup = None # wakeup 채널 제거
shutdown.__doc__ = _base.Executor.shutdown.__doc__ # 부모 클래스 shutdown 문서 가져오기