그때그때 CS 정리

멀티스레드(ThreadPoolExecutor), 멀티프로세서(ProcessPoolExecutor) 비교

필만이 2024. 11. 6. 16:20

배경

  • 다층구조 json 데이터를 3가지 테이블로, 분리하는 작업이 오래걸림(25초)
  • 일반실행, 멀티스레드, 멀티프로세서 각각 방식에서 어떤 방식이 빠른지 관찰하려 한다.

일반 실행시간

  • 병렬 처리 없이 단일 스레드에서 모든 데이터 처리를 수행하니, 25초가 걸렸다.

멀티스레드 실행 시간 분석

스레드 수 총 소요 시간 총 함수 호출 수 wait 호출 수 acquire 호출 수 as_completed 호출 누적 시간
1 13.69 초 3,796 2 15 13.54 초
2 15.50 초 5,207 10 28 15.35 초
5 15.23 초 7,494 10 67 15.07 초
10 16.33 초 11,334 20 132 16.18 초
20 16.11 초 19,000 39 258 15.72 초
  • 주요 관찰 사항
    • 스레드 수 1일 때 최단 시간(13.69초)이 소요되었습니다.
    • 스레드 수 증가에 따라 오히려 실행 시간이 길어지는 현상이 나타났습니다. 예를 들어, 스레드 수 2 이상부터 13초대에서 15초 이상으로 증가하였습니다.
    • wait와 acquire 호출 수가 스레드 수 증가에 따라 급증했습니다. 특히 acquire 호출 수가 늘어나면서 전체 소요 시간에 영향을 미쳤습니다.
    • 분석
      • 멀티스레드 방식에서 스레드 수가 증가함에 따라 실행 시간이 길어지는 이유는 스레드 경합과 GIL(Global Interpreter Lock)의 영향 때문으로 보입니다. Python의 GIL은 멀티스레딩에서 스레드가 동시에 실행되는 것을 방지하고, 한 번에 하나의 스레드만 실행되도록 제어하므로, 특히 CPU 집약적인 작업에서 스레드 수가 많아질수록 성능이 저하될 수 있습니다.
        • 스레드 수 1에서는 오히려 GIL의 영향을 덜 받으며 최단 시간에 완료되었습니다.
        • 스레드 수가 2 이상이 되면, 스레드 간의 자원 경합과 문맥 전환 비용이 늘어나면서 GIL에 의한 제약으로 실행 시간이 오히려 늘어납니다.
        • wait와 acquire 호출 수가 늘어남에 따라, 각 스레드가 자원을 획득하기 위해 대기하는 시간이 증가해 성능이 떨어졌습니다.

멀티프로세서 실행 시간 분석

최대 프로세스 수 데이터 병합 시간 (초) 1만 번 실행 총 소요 시간 (초) 50만 번 예상 실행 시간 (초) acquire 호출 수 acquire 소요 시간 (초) result_iterator 소요 시간 (초) concat 소요 시간 (초)
1 2.4495 15.08 754.01 60,017 12.068 12.264 2.287
4 2.6034 12.38 618.75 29,092 8.603 8.320 2.430
10 2.4498 13.75 687.66 29,478 9.344 8.556 2.291
15 2.5561 16.43 821.37 28,564 11.195 9.897 2.383
  • 주요 관찰 사항
    • 프로세스 수 4에서 최단 시간(12.38초)을 기록했습니다.
    • 프로세스 수 10 이상이 되면 오히려 실행 시간이 증가하는 현상이 나타났습니다.
    • acquire 호출 수와 소요 시간은 프로세스 수에 따라 약간의 차이를 보였지만, 상대적으로 멀티스레드 방식보다는 효율적이었습니다.
  • 분석
    • 멀티프로세서 방식은 GIL의 영향을 받지 않기 때문에 멀티스레드 방식보다 높은 성능을 발휘합니다. 각 프로세스가 독립적인 메모리 공간을 사용하여 작업을 수행하므로, CPU 집약적인 작업에서 더욱 효과적입니다.
    • 프로세스 수 4에서 최적의 성능을 보였으며, 프로세스 수가 10 이상이 되면 프로세스 간의 자원 경합과 통신 비용이 증가하여 오히려 성능이 저하되었습니다.
    • 데이터 병합 시간(약 2.5초)은 프로세스 수에 크게 영향을 받지 않았으며, 이는 프로세스 간에 분할된 작업을 모두 완료한 후 데이터가 병합되는 단계에서 시간이 소요되는 것을 의미합니다.
    • acquire와 result_iterator의 시간이 상대적으로 짧게 나타나, 멀티프로세서 방식에서는 프로세스별로 작업이 분리되어 효율적으로 관리됨을 확인할 수 있습니다

결론

  • 최적의 방식: 멀티프로세서 방식에서 프로세스 수 4로 설정했을 때 가장 빠른 성능(12.38초)을 보였습니다. 이 방식은 GIL의 제약을 받지 않고 각 프로세스가 독립적으로 작업을 수행하여, CPU를 효과적으로 활용할 수 있었습니다.
  • 멀티스레드 방식 한계: 멀티스레드는 스레드 간 자원 경합과 GIL로 인해 성능이 저하되었습니다. 특히 스레드 수가 증가할수록 오히려 성능이 떨어지는 현상이 두드러졌습니다.
  • 일반 실행 대비 효율성: 멀티프로세서 방식(4 프로세스 사용)이 일반 실행(25초)보다 약 2배 빠르며, 특히 대규모 데이터 처리에서 적합한 방법임을 확인했습니다.

코드

import json
import os
import pandas as pd
import cProfile
import pstats
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor,as_completed

# JSON 파일을 읽고 DataFrame으로 변환하는 함수
def read_json_to_df(filename):
    try:
        with open(filename) as f:
            data = json.load(f)
            users_df = pd.json_normalize(data['users'], sep='_')
            orders_df = pd.json_normalize(data['users'], 'orders', ['user_id', 'name', 'email'], sep='_')
            items_df = pd.json_normalize(
                data['users'],
                record_path=['orders', 'items'],
                meta=[['user_id'], ['name'], ['email'], ['orders', 'order_id'], ['orders', 'date']],
                sep='_',
                meta_prefix='user_'
            )
        return users_df, orders_df, items_df
    except Exception as e:
        print(f"Error processing file {filename}: {e}")
        # 빈 데이터프레임 반환하여 실패한 파일에 대한 결과 방지
        return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
max_thread =0
# JSON 파일들을 읽고 병합 (배치 처리로 나눔)
def merge_data_from_files(output_dir, num_files, batch_size, max_workers):
    buffer_users = []
    buffer_orders = []
    buffer_items = []
    global max_thread

    start_total = time.time()
    # 전체 파일을 batch_size 단위로 나누어 처리
    for i in range(0, num_files, batch_size):
        batch_files = [os.path.join(output_dir, f"data_{j}.json") for j in range(i, min(i + batch_size, num_files))]

        start_batch = time.time()
        with ProcessPoolExecutor(max_workers=max_workers) as executor:
            results = executor.map(read_json_to_df, batch_files)
            for users_df, orders_df, items_df in results:
                buffer_users.append(users_df)
                buffer_orders.append(orders_df)
                buffer_items.append(items_df)
            if max_thread < executor._max_workers:
                max_thread = executor._max_workers
        end_batch = time.time()
        print(f"Time for batch {i // batch_size}: {end_batch - start_batch:.4f} seconds")

    start_merge = time.time()
    # 최종 병합 및 CSV 파일 저장
    # 각 데이터가 존재할 때만 병합하여 저장
    pd.concat(buffer_users).to_csv("C:/Users/makenow/json/users_all.csv", index=False)
    pd.concat(buffer_orders).to_csv("C:/Users/makenow/json/orders_all.csv", index=False)
    pd.concat(buffer_items).to_csv("C:/Users/makenow/json/items_all.csv", index=False)
    end_merge = time.time()
    # 최대 프로세스 수 출력
    print(f"사용된 최대 프로세스 수: {max_thread}")
    print(f"Time for merging data: {end_merge - start_merge:.4f} seconds")
    end_total = time.time()
    print(f"Total execution time: {end_total - start_total:.4f} seconds")

if __name__ == '__main__':
    # 단계별 시간 측정(프로파일링)
    profiler = cProfile.Profile()
    profiler.enable()

    # 실행 및 시간 측정
    start_time = time.time()
    output_dir = "C:\\Users\\makenow\\json"  # 출력 디렉토리 지정
    merge_data_from_files(output_dir, 10000, 2000, 4)
    end_time = time.time()
    t = end_time - start_time
    print(f"1만 번 실행 완료, 총 소요 시간: {t} 초")
    print(f"50만 번 실시 완료 예상 시간: {t * 50} 초")

    # 프로파일링 종료
    profiler.disable()

    # 프로파일링 결과 출력
    stats = pstats.Stats(profiler)
    stats.strip_dirs()
    stats.sort_stats('cumtime')  # 누적 시간 기준 정렬
    stats.print_stats(10)  # 상위 10개의 기록만 출력