배경
- 다층구조 json 데이터를 3가지 테이블로, 분리하는 작업 필요.
- 예전 프로젝트 당시 glue를 통한 분산처리를 했었음.
- 그게 최선이었냐는 질문을 받음.
- 로컬에서 빠르게 할 수 있는 방법을 찾아서, 멀티스레드를 적용하기로함.
해결과정
- ThreadPoolExecutor를 적용해 코드 작성
- for문을 10000번 실행 하는데, 한번에 배치실행수(스레드수)를 조절하며 비교
- 오히려 스레드 수가 작을때, 빠른 현상... 왜지..
스레드 수 |
총 소요 시간 |
총 함수 호출 수 |
wait 호출 수 |
acquire 호출 수 |
as_completed 호출 누적 시간 |
1 |
13.69 초 |
3,796 |
2 |
15 |
13.54 초 |
2 |
15.50 초 |
5,207 |
10 |
28 |
15.35 초 |
5 |
15.23 초 |
7,494 |
10 |
67 |
15.07 초 |
10 |
16.33 초 |
11,334 |
20 |
132 |
16.18 초 |
20 |
16.11 초 |
19,000 |
39 |
258 |
15.72 초 |
- GPT 답변
- 스레드 수가 늘어날수록 wait와 acquire 호출이 빈번해지며, 이는 GIL(Global Interpreter Lock)과 락 경합으로 인한 대기 시간이 늘어났음
- 특히 acquire의 호출 횟수가 스레드 수가 20일 때 258회로 크게 증가하여, 스레드 간의 락 점유 경쟁이 성능 저하의 원인이 되고 있음.
- as_completed를 사용하여 각 스레드가 완료될 때까지 기다리는 구조로 인해, 스레드 수가 증가할수록 락을 해제하고 점유하는 시간이 증가
- 저 답변을 제대로 이해하기 위해 다음편에 ThreadPoolExecutor를 구조를 공부 하기로함.
코드
import pandas as pd
import cProfile
import pstats
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from concurrent.futures import ProcessPoolExecutor
data = {
"users": [
{
"user_id": "U001",
"name": "Alice",
"email": "alice@example.com",
"orders": [
{
"order_id": "O1001",
"date": "2024-10-21",
"total": 150.0,
"items": [
{"item_id": "I001", "name": "Laptop", "quantity": 1, "price": 1000.0},
{"item_id": "I002", "name": "Mouse", "quantity": 2, "price": 25.0}
]
}
]
}
]
}
def create_users_df():
try:
return pd.json_normalize(data['users'], sep='_')
except Exception as e:
print(f"Error in create_users_df: {e}")
return pd.DataFrame()
def create_orders_df():
return pd.json_normalize(
data['users'],
'orders',
['user_id', 'name', 'email'],
sep='_'
)
def create_items_df():
return pd.json_normalize(
data['users'],
record_path=['orders', 'items'],
meta=[['user_id'], ['name'], ['email'], ['orders', 'order_id'], ['orders', 'date']],
sep='_',
meta_prefix='user_'
)
max_thread=0
def process_single_iteration():
users_df = create_users_df()
orders_df = create_orders_df()
items_df = create_items_df()
return users_df, orders_df, items_df
def batch_process(batch_size):
buffer_users, buffer_orders, buffer_items = [], [], []
for _ in range(batch_size):
buffer_users.append(create_users_df())
buffer_orders.append(create_orders_df())
buffer_items.append(create_items_df())
return pd.concat(buffer_users), pd.concat(buffer_orders), pd.concat(buffer_items)
def run_100k_times_parallel():
batch_size = 500
total_batches = 20
global max_thread
buffer_users = []
buffer_orders = []
buffer_items = []
with ThreadPoolExecutor() as executor:
futures = [executor.submit(batch_process, batch_size) for _ in range(total_batches)]
for future in as_completed(futures):
users_df, orders_df, items_df = future.result()
buffer_users.append(users_df)
buffer_orders.append(orders_df)
buffer_items.append(items_df)
pd.concat(buffer_users).to_csv("C:/Users/makenow/json/users_all.csv", index=False)
pd.concat(buffer_orders).to_csv("C:/Users/makenow/json/orders_all.csv", index=False)
pd.concat(buffer_items).to_csv("C:/Users/makenow/json/items_all.csv", index=False)
if max_thread < len(executor._threads):
max_thread = len(executor._threads)
profiler = cProfile.Profile()
profiler.enable()
start_time = time.time()
run_100k_times_parallel()
end_time = time.time()
t=end_time - start_time
print(f"1만 번 실행 완료, 총 소요 시간: {t} 초")
print(f"50만 번 실시 완료{t*50}")
print(f'사용된 최대 스레드 숫자. {max_thread}')
profiler.disable()
stats = pstats.Stats(profiler)
stats.strip_dirs()
stats.sort_stats('cumtime')
stats.print_stats(10)