배경
- 다층구조 json 데이터를 3가지 테이블로, 분리하는 작업 필요.
- 예전 프로젝트 당시 glue를 통한 분산처리를 했었음.
- 그게 최선이었냐는 질문을 받음.
- 로컬에서 빠르게 할 수 있는 방법을 찾아서, 멀티스레드를 적용하기로함.
해결과정
- ThreadPoolExecutor를 적용해 코드 작성
- for문을 10000번 실행 하는데, 한번에 배치실행수(스레드수)를 조절하며 비교
- 오히려 스레드 수가 작을때, 빠른 현상... 왜지..
스레드 수 |
총 소요 시간 |
총 함수 호출 수 |
wait 호출 수 |
acquire 호출 수 |
as_completed 호출 누적 시간 |
1 |
13.69 초 |
3,796 |
2 |
15 |
13.54 초 |
2 |
15.50 초 |
5,207 |
10 |
28 |
15.35 초 |
5 |
15.23 초 |
7,494 |
10 |
67 |
15.07 초 |
10 |
16.33 초 |
11,334 |
20 |
132 |
16.18 초 |
20 |
16.11 초 |
19,000 |
39 |
258 |
15.72 초 |
- GPT 답변
- 스레드 수가 늘어날수록 wait와 acquire 호출이 빈번해지며, 이는 GIL(Global Interpreter Lock)과 락 경합으로 인한 대기 시간이 늘어났음
- 특히 acquire의 호출 횟수가 스레드 수가 20일 때 258회로 크게 증가하여, 스레드 간의 락 점유 경쟁이 성능 저하의 원인이 되고 있음.
- as_completed를 사용하여 각 스레드가 완료될 때까지 기다리는 구조로 인해, 스레드 수가 증가할수록 락을 해제하고 점유하는 시간이 증가
- 저 답변을 제대로 이해하기 위해 다음편에 ThreadPoolExecutor를 구조를 공부 하기로함.
코드
import pandas as pd
import cProfile
import pstats
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from concurrent.futures import ProcessPoolExecutor
# JSON 데이터 예시
data = {
"users": [
{
"user_id": "U001",
"name": "Alice",
"email": "alice@example.com",
"orders": [
{
"order_id": "O1001",
"date": "2024-10-21",
"total": 150.0,
"items": [
{"item_id": "I001", "name": "Laptop", "quantity": 1, "price": 1000.0},
{"item_id": "I002", "name": "Mouse", "quantity": 2, "price": 25.0}
]
}
]
}
]
}
# 각 레벨의 JSON 데이터를 풀어내는 함수 정의
def create_users_df():
try:
return pd.json_normalize(data['users'], sep='_')
except Exception as e:
print(f"Error in create_users_df: {e}")
return pd.DataFrame() # 빈 데이터프레임 반환
def create_orders_df():
return pd.json_normalize(
data['users'],
'orders',
['user_id', 'name', 'email'],
sep='_'
)
def create_items_df():
# name 중복을 방지하기 위해 user_name과 order_name으로 구분
return pd.json_normalize(
data['users'],
record_path=['orders', 'items'],
meta=[['user_id'], ['name'], ['email'], ['orders', 'order_id'], ['orders', 'date']],
sep='_',
meta_prefix='user_' # user 필드에 접두사 추가
)
max_thread=0
# 개별 작업 함수: 각 루프 작업을 하나로 묶음
def process_single_iteration():
users_df = create_users_df()
orders_df = create_orders_df()
items_df = create_items_df()
return users_df, orders_df, items_df
# batch_process 함수는 스레드 풀에 의해 각각의 스레드에서 실행됨
def batch_process(batch_size):
# batch_size에 따라 지정된 수만큼 반복하면서 create_users_df, create_orders_df, create_items_df 함수를 호출하여 데이터를 생성
buffer_users, buffer_orders, buffer_items = [], [], []
for _ in range(batch_size):
# 각 스레드는 create_users_df, create_orders_df, create_items_df를 호출해 데이터를 생성하고 이를 버퍼(buffer_users, buffer_orders, buffer_items)에 저장
buffer_users.append(create_users_df())
buffer_orders.append(create_orders_df())
buffer_items.append(create_items_df())
# 모든 데이터가 수집되면 pd.concat()을 사용해 데이터를 하나로 병합한 뒤, 스레드가 이 병합된 데이터프레임을 반환
return pd.concat(buffer_users), pd.concat(buffer_orders), pd.concat(buffer_items)
# 10만 번 실행하는 함수
# 10000번 실행하는 병렬 처리 함수
def run_100k_times_parallel():
batch_size = 500
total_batches = 20
global max_thread # 전역 변수 사용 선언
buffer_users = []
buffer_orders = []
buffer_items = []
with ThreadPoolExecutor() as executor: # 필요에 따라 max_workers 조정
# executor.submit(batch_process, batch_size)를 통해 batch_process 함수를 반복 실행하는 작업을 병렬로 제출합니다.
# 각 batch_process 함수 호출이 독립된 작업으로 스레드 풀에 제출됩니다.
# 결과적으로, 각 배치가 병렬적으로 처리됨
futures = [executor.submit(batch_process, batch_size) for _ in range(total_batches)]
# 작업 완료 시마다 결과를 버퍼에 추가
for future in as_completed(futures):
users_df, orders_df, items_df = future.result()
buffer_users.append(users_df)
buffer_orders.append(orders_df)
buffer_items.append(items_df)
# 모든 batch_process 함수가 완료된 후 각 데이터 버퍼(buffer_users, buffer_orders, buffer_items)를 pd.concat()으로 병합하여 최종 데이터를 만듦
pd.concat(buffer_users).to_csv("C:/Users/makenow/json/users_all.csv", index=False)
pd.concat(buffer_orders).to_csv("C:/Users/makenow/json/orders_all.csv", index=False)
pd.concat(buffer_items).to_csv("C:/Users/makenow/json/items_all.csv", index=False)
# 최대 스레드 수를 기록
if max_thread < len(executor._threads):
max_thread = len(executor._threads)
# 단계별 시간 측정(프로파일링)
profiler = cProfile.Profile()
profiler.enable()
# 실행 및 시간 측정
start_time = time.time()
run_100k_times_parallel()
end_time = time.time()
t=end_time - start_time
print(f"1만 번 실행 완료, 총 소요 시간: {t} 초")
print(f"50만 번 실시 완료{t*50}")
print(f'사용된 최대 스레드 숫자. {max_thread}')
# 프로파일링 종료
profiler.disable()
# 프로파일링 결과 출력
stats = pstats.Stats(profiler)
stats.strip_dirs()
stats.sort_stats('cumtime') # 누적 시간 기준 정렬
stats.print_stats(10) # 상위 10개의 기록만 출력