오류 해결 과정

(ProcessPoolExecutor 사용 오류) A process in the process pool was terminated abruptly while the future was running or pending

필만이 2024. 11. 4. 22:56

배경

  • 멀티스레드 대신 멀티프로세서(ProcessPoolExecutor) 방식을 사용하려 했다.
  • 그런데 아래 오류가 뜨면서 동작하지 않았다.
BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

해결과정

  1. 어떻게 해도 ProcessPoolExecutor가 동작하지 않았다.

  2. 오랜시간 검색끝에, 대화형 환경(주피터)에서 실행한 것이 문제라는 문서를 발견했다.

  3. 공식문서 내용 요약

    • ProcessPoolExecutor는 각 프로세스를 독립적으로 실행하므로, 프로세스가 시작될 때마다 메인 코드(즉, main 모듈)를 다시 불러와야 합니다. 이 main 모듈은 프로세스가 작업을 제대로 실행하는 데 필요한 환경을 설정함
    • 만약 main 모듈을 찾지 못하면 프로세스가 원하는 대로 작동하지 않을 수 있습니다. 이 때문에, ProcessPoolExecutor는 대화형 인터프리터(예: Jupyter Notebook이나 Python REPL)처럼 명확한 main 모듈이 없는 환경에서는 제대로 작동하지 않습니다.
    • 즉 주피터가 아닌 .py 환경에서 if name == 'main': 를 위에 입력한 뒤 실행
  4. 공식문서 출처
    https://docs.python.org/ko/3.7/library/concurrent.futures.html

코드

import json
import os
import pandas as pd
import cProfile
import pstats
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor,as_completed

# JSON 파일을 읽고 DataFrame으로 변환하는 함수
def read_json_to_df(filename):
    try:
        with open(filename) as f:
            data = json.load(f)
            users_df = pd.json_normalize(data['users'], sep='_')
            orders_df = pd.json_normalize(data['users'], 'orders', ['user_id', 'name', 'email'], sep='_')
            items_df = pd.json_normalize(
                data['users'],
                record_path=['orders', 'items'],
                meta=[['user_id'], ['name'], ['email'], ['orders', 'order_id'], ['orders', 'date']],
                sep='_',
                meta_prefix='user_'
            )
        return users_df, orders_df, items_df
    except Exception as e:
        print(f"Error processing file {filename}: {e}")
        # 빈 데이터프레임 반환하여 실패한 파일에 대한 결과 방지
        return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
max_thread =0
# JSON 파일들을 읽고 병합 (배치 처리로 나눔)
def merge_data_from_files(output_dir, num_files, batch_size, max_workers):
    buffer_users = []
    buffer_orders = []
    buffer_items = []
    global max_thread

    start_total = time.time()
    # 전체 파일을 batch_size 단위로 나누어 처리
    for i in range(0, num_files, batch_size):
        batch_files = [os.path.join(output_dir, f"data_{j}.json") for j in range(i, min(i + batch_size, num_files))]

        start_batch = time.time()
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            results = executor.map(read_json_to_df, batch_files)
            for users_df, orders_df, items_df in results:
                buffer_users.append(users_df)
                buffer_orders.append(orders_df)
                buffer_items.append(items_df)
            if max_thread < executor._max_workers:
                max_thread = executor._max_workers
        end_batch = time.time()
        print(f"Time for batch {i // batch_size}: {end_batch - start_batch:.4f} seconds")

    start_merge = time.time()
    # 최종 병합 및 CSV 파일 저장
    # 각 데이터가 존재할 때만 병합하여 저장
    pd.concat(buffer_users).to_csv("C:/Users/makenow/json/users_all.csv", index=False)
    pd.concat(buffer_orders).to_csv("C:/Users/makenow/json/orders_all.csv", index=False)
    pd.concat(buffer_items).to_csv("C:/Users/makenow/json/items_all.csv", index=False)
    end_merge = time.time()
    # 최대 프로세스 수 출력
    print(f"사용된 최대 프로세스 수: {max_thread}")
    print(f"Time for merging data: {end_merge - start_merge:.4f} seconds")
    end_total = time.time()
    print(f"Total execution time: {end_total - start_total:.4f} seconds")

if __name__ == '__main__':
    # 단계별 시간 측정(프로파일링)
    profiler = cProfile.Profile()
    profiler.enable()

    # 실행 및 시간 측정
    start_time = time.time()
    output_dir = "C:\\Users\\makenow\\json"  # 출력 디렉토리 지정
    merge_data_from_files(output_dir, 10000, 2000, 4)
    end_time = time.time()
    t = end_time - start_time
    print(f"1만 번 실행 완료, 총 소요 시간: {t} 초")
    print(f"50만 번 실시 완료 예상 시간: {t * 50} 초")

    # 프로파일링 종료
    profiler.disable()

    # 프로파일링 결과 출력
    stats = pstats.Stats(profiler)
    stats.strip_dirs()
    stats.sort_stats('cumtime')  # 누적 시간 기준 정렬
    stats.print_stats(10)  # 상위 10개의 기록만 출력