그때그때 CS 정리

json 평활화 작업을 통한, 멀티스레드 속도 비교(ThreadPoolExecutor)

필만이 2024. 11. 3. 23:41

배경

  • 다층구조 json 데이터를 3가지 테이블로, 분리하는 작업 필요.
  • 예전 프로젝트 당시 glue를 통한 분산처리를 했었음.
  • 그게 최선이었냐는 질문을 받음.
  • 로컬에서 빠르게 할 수 있는 방법을 찾아서, 멀티스레드를 적용하기로함.

해결과정

  1. ThreadPoolExecutor를 적용해 코드 작성
  2. for문을 10000번 실행 하는데, 한번에 배치실행수(스레드수)를 조절하며 비교
  3. 오히려 스레드 수가 작을때, 빠른 현상... 왜지..
스레드 수 총 소요 시간 총 함수 호출 수 wait 호출 수 acquire 호출 수 as_completed 호출 누적 시간
1 13.69 초 3,796 2 15 13.54 초
2 15.50 초 5,207 10 28 15.35 초
5 15.23 초 7,494 10 67 15.07 초
10 16.33 초 11,334 20 132 16.18 초
20 16.11 초 19,000 39 258 15.72 초
  1. GPT 답변
    • 스레드 수가 늘어날수록 wait와 acquire 호출이 빈번해지며, 이는 GIL(Global Interpreter Lock)과 락 경합으로 인한 대기 시간이 늘어났음
    • 특히 acquire의 호출 횟수가 스레드 수가 20일 때 258회로 크게 증가하여, 스레드 간의 락 점유 경쟁이 성능 저하의 원인이 되고 있음.
    • as_completed를 사용하여 각 스레드가 완료될 때까지 기다리는 구조로 인해, 스레드 수가 증가할수록 락을 해제하고 점유하는 시간이 증가
  2. 저 답변을 제대로 이해하기 위해 다음편에 ThreadPoolExecutor를 구조를 공부 하기로함.

코드

import pandas as pd

import cProfile
import pstats
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from concurrent.futures import ProcessPoolExecutor

# JSON 데이터 예시
data = {
    "users": [
        {
            "user_id": "U001",
            "name": "Alice",
            "email": "alice@example.com",
            "orders": [
                {
                    "order_id": "O1001",
                    "date": "2024-10-21",
                    "total": 150.0,
                    "items": [
                        {"item_id": "I001", "name": "Laptop", "quantity": 1, "price": 1000.0},
                        {"item_id": "I002", "name": "Mouse", "quantity": 2, "price": 25.0}
                    ]
                }
            ]
        }
    ]
}

# 각 레벨의 JSON 데이터를 풀어내는 함수 정의
def create_users_df():
    try:
        return pd.json_normalize(data['users'], sep='_')
    except Exception as e:
        print(f"Error in create_users_df: {e}")
        return pd.DataFrame()  # 빈 데이터프레임 반환

def create_orders_df():
    return pd.json_normalize(
        data['users'],
        'orders',
        ['user_id', 'name', 'email'],
        sep='_'
    )

def create_items_df():
    # name 중복을 방지하기 위해 user_name과 order_name으로 구분
    return pd.json_normalize(
        data['users'],
        record_path=['orders', 'items'],
        meta=[['user_id'], ['name'], ['email'], ['orders', 'order_id'], ['orders', 'date']],
        sep='_',
        meta_prefix='user_'  # user 필드에 접두사 추가
    )

max_thread=0

# 개별 작업 함수: 각 루프 작업을 하나로 묶음
def process_single_iteration():
    users_df = create_users_df()
    orders_df = create_orders_df()
    items_df = create_items_df()
    return users_df, orders_df, items_df

# batch_process 함수는 스레드 풀에 의해 각각의 스레드에서 실행됨
def batch_process(batch_size):
    # batch_size에 따라 지정된 수만큼 반복하면서 create_users_df, create_orders_df, create_items_df 함수를 호출하여 데이터를 생성
    buffer_users, buffer_orders, buffer_items = [], [], []
    for _ in range(batch_size):
        # 각 스레드는 create_users_df, create_orders_df, create_items_df를 호출해 데이터를 생성하고 이를 버퍼(buffer_users, buffer_orders, buffer_items)에 저장
        buffer_users.append(create_users_df())
        buffer_orders.append(create_orders_df())
        buffer_items.append(create_items_df())
    # 모든 데이터가 수집되면 pd.concat()을 사용해 데이터를 하나로 병합한 뒤, 스레드가 이 병합된 데이터프레임을 반환
    return pd.concat(buffer_users), pd.concat(buffer_orders), pd.concat(buffer_items)


# 10만 번 실행하는 함수
# 10000번 실행하는 병렬 처리 함수
def run_100k_times_parallel():
    batch_size = 500
    total_batches = 20
    global max_thread  # 전역 변수 사용 선언
    buffer_users = []
    buffer_orders = []
    buffer_items = []

    with ThreadPoolExecutor() as executor:  # 필요에 따라 max_workers 조정
        # executor.submit(batch_process, batch_size)를 통해 batch_process 함수를 반복 실행하는 작업을 병렬로 제출합니다.
        # 각 batch_process 함수 호출이 독립된 작업으로 스레드 풀에 제출됩니다.
        # 결과적으로, 각 배치가 병렬적으로 처리됨
        futures = [executor.submit(batch_process, batch_size) for _ in range(total_batches)]

        # 작업 완료 시마다 결과를 버퍼에 추가
        for future in as_completed(futures):
            users_df, orders_df, items_df = future.result()
            buffer_users.append(users_df)
            buffer_orders.append(orders_df)
            buffer_items.append(items_df)

    # 모든 batch_process 함수가 완료된 후 각 데이터 버퍼(buffer_users, buffer_orders, buffer_items)를 pd.concat()으로 병합하여 최종 데이터를 만듦
    pd.concat(buffer_users).to_csv("C:/Users/makenow/json/users_all.csv", index=False)
    pd.concat(buffer_orders).to_csv("C:/Users/makenow/json/orders_all.csv", index=False)
    pd.concat(buffer_items).to_csv("C:/Users/makenow/json/items_all.csv", index=False)
    # 최대 스레드 수를 기록
    if max_thread < len(executor._threads):
        max_thread = len(executor._threads)

# 단계별 시간 측정(프로파일링)
profiler = cProfile.Profile()
profiler.enable()

# 실행 및 시간 측정
start_time = time.time()
run_100k_times_parallel()
end_time = time.time()
t=end_time - start_time
print(f"1만 번 실행 완료, 총 소요 시간: {t} 초")
print(f"50만 번 실시 완료{t*50}")
print(f'사용된 최대 스레드 숫자. {max_thread}')

# 프로파일링 종료
profiler.disable()

# 프로파일링 결과 출력
stats = pstats.Stats(profiler)
stats.strip_dirs()
stats.sort_stats('cumtime')  # 누적 시간 기준 정렬
stats.print_stats(10)  # 상위 10개의 기록만 출력