배경
- 다층구조 json 데이터를 3가지 테이블로, 분리하는 작업이 오래걸림(25초)
- 일반실행, 멀티스레드, 멀티프로세서 각각 방식에서 어떤 방식이 빠른지 관찰하려 한다.
일반 실행시간
- 병렬 처리 없이 단일 스레드에서 모든 데이터 처리를 수행하니, 25초가 걸렸다.
멀티스레드 실행 시간 분석
스레드 수 |
총 소요 시간 |
총 함수 호출 수 |
wait 호출 수 |
acquire 호출 수 |
as_completed 호출 누적 시간 |
1 |
13.69 초 |
3,796 |
2 |
15 |
13.54 초 |
2 |
15.50 초 |
5,207 |
10 |
28 |
15.35 초 |
5 |
15.23 초 |
7,494 |
10 |
67 |
15.07 초 |
10 |
16.33 초 |
11,334 |
20 |
132 |
16.18 초 |
20 |
16.11 초 |
19,000 |
39 |
258 |
15.72 초 |
- 주요 관찰 사항
- 스레드 수 1일 때 최단 시간(13.69초)이 소요되었습니다.
- 스레드 수 증가에 따라 오히려 실행 시간이 길어지는 현상이 나타났습니다. 예를 들어, 스레드 수 2 이상부터 13초대에서 15초 이상으로 증가하였습니다.
- wait와 acquire 호출 수가 스레드 수 증가에 따라 급증했습니다. 특히 acquire 호출 수가 늘어나면서 전체 소요 시간에 영향을 미쳤습니다.
- 분석
- 멀티스레드 방식에서 스레드 수가 증가함에 따라 실행 시간이 길어지는 이유는 스레드 경합과 GIL(Global Interpreter Lock)의 영향 때문으로 보입니다. Python의 GIL은 멀티스레딩에서 스레드가 동시에 실행되는 것을 방지하고, 한 번에 하나의 스레드만 실행되도록 제어하므로, 특히 CPU 집약적인 작업에서 스레드 수가 많아질수록 성능이 저하될 수 있습니다.
- 스레드 수 1에서는 오히려 GIL의 영향을 덜 받으며 최단 시간에 완료되었습니다.
- 스레드 수가 2 이상이 되면, 스레드 간의 자원 경합과 문맥 전환 비용이 늘어나면서 GIL에 의한 제약으로 실행 시간이 오히려 늘어납니다.
- wait와 acquire 호출 수가 늘어남에 따라, 각 스레드가 자원을 획득하기 위해 대기하는 시간이 증가해 성능이 떨어졌습니다.
멀티프로세서 실행 시간 분석
최대 프로세스 수 |
데이터 병합 시간 (초) |
1만 번 실행 총 소요 시간 (초) |
50만 번 예상 실행 시간 (초) |
acquire 호출 수 |
acquire 소요 시간 (초) |
result_iterator 소요 시간 (초) |
concat 소요 시간 (초) |
1 |
2.4495 |
15.08 |
754.01 |
60,017 |
12.068 |
12.264 |
2.287 |
4 |
2.6034 |
12.38 |
618.75 |
29,092 |
8.603 |
8.320 |
2.430 |
10 |
2.4498 |
13.75 |
687.66 |
29,478 |
9.344 |
8.556 |
2.291 |
15 |
2.5561 |
16.43 |
821.37 |
28,564 |
11.195 |
9.897 |
2.383 |
- 주요 관찰 사항
- 프로세스 수 4에서 최단 시간(12.38초)을 기록했습니다.
- 프로세스 수 10 이상이 되면 오히려 실행 시간이 증가하는 현상이 나타났습니다.
- acquire 호출 수와 소요 시간은 프로세스 수에 따라 약간의 차이를 보였지만, 상대적으로 멀티스레드 방식보다는 효율적이었습니다.
- 분석
- 멀티프로세서 방식은 GIL의 영향을 받지 않기 때문에 멀티스레드 방식보다 높은 성능을 발휘합니다. 각 프로세스가 독립적인 메모리 공간을 사용하여 작업을 수행하므로, CPU 집약적인 작업에서 더욱 효과적입니다.
- 프로세스 수 4에서 최적의 성능을 보였으며, 프로세스 수가 10 이상이 되면 프로세스 간의 자원 경합과 통신 비용이 증가하여 오히려 성능이 저하되었습니다.
- 데이터 병합 시간(약 2.5초)은 프로세스 수에 크게 영향을 받지 않았으며, 이는 프로세스 간에 분할된 작업을 모두 완료한 후 데이터가 병합되는 단계에서 시간이 소요되는 것을 의미합니다.
- acquire와 result_iterator의 시간이 상대적으로 짧게 나타나, 멀티프로세서 방식에서는 프로세스별로 작업이 분리되어 효율적으로 관리됨을 확인할 수 있습니다
결론
- 최적의 방식: 멀티프로세서 방식에서 프로세스 수 4로 설정했을 때 가장 빠른 성능(12.38초)을 보였습니다. 이 방식은 GIL의 제약을 받지 않고 각 프로세스가 독립적으로 작업을 수행하여, CPU를 효과적으로 활용할 수 있었습니다.
- 멀티스레드 방식 한계: 멀티스레드는 스레드 간 자원 경합과 GIL로 인해 성능이 저하되었습니다. 특히 스레드 수가 증가할수록 오히려 성능이 떨어지는 현상이 두드러졌습니다.
- 일반 실행 대비 효율성: 멀티프로세서 방식(4 프로세스 사용)이 일반 실행(25초)보다 약 2배 빠르며, 특히 대규모 데이터 처리에서 적합한 방법임을 확인했습니다.
코드
import json
import os
import pandas as pd
import cProfile
import pstats
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor,as_completed
# JSON 파일을 읽고 DataFrame으로 변환하는 함수
def read_json_to_df(filename):
try:
with open(filename) as f:
data = json.load(f)
users_df = pd.json_normalize(data['users'], sep='_')
orders_df = pd.json_normalize(data['users'], 'orders', ['user_id', 'name', 'email'], sep='_')
items_df = pd.json_normalize(
data['users'],
record_path=['orders', 'items'],
meta=[['user_id'], ['name'], ['email'], ['orders', 'order_id'], ['orders', 'date']],
sep='_',
meta_prefix='user_'
)
return users_df, orders_df, items_df
except Exception as e:
print(f"Error processing file {filename}: {e}")
# 빈 데이터프레임 반환하여 실패한 파일에 대한 결과 방지
return pd.DataFrame(), pd.DataFrame(), pd.DataFrame()
max_thread =0
# JSON 파일들을 읽고 병합 (배치 처리로 나눔)
def merge_data_from_files(output_dir, num_files, batch_size, max_workers):
buffer_users = []
buffer_orders = []
buffer_items = []
global max_thread
start_total = time.time()
# 전체 파일을 batch_size 단위로 나누어 처리
for i in range(0, num_files, batch_size):
batch_files = [os.path.join(output_dir, f"data_{j}.json") for j in range(i, min(i + batch_size, num_files))]
start_batch = time.time()
with ProcessPoolExecutor(max_workers=max_workers) as executor:
results = executor.map(read_json_to_df, batch_files)
for users_df, orders_df, items_df in results:
buffer_users.append(users_df)
buffer_orders.append(orders_df)
buffer_items.append(items_df)
if max_thread < executor._max_workers:
max_thread = executor._max_workers
end_batch = time.time()
print(f"Time for batch {i // batch_size}: {end_batch - start_batch:.4f} seconds")
start_merge = time.time()
# 최종 병합 및 CSV 파일 저장
# 각 데이터가 존재할 때만 병합하여 저장
pd.concat(buffer_users).to_csv("C:/Users/makenow/json/users_all.csv", index=False)
pd.concat(buffer_orders).to_csv("C:/Users/makenow/json/orders_all.csv", index=False)
pd.concat(buffer_items).to_csv("C:/Users/makenow/json/items_all.csv", index=False)
end_merge = time.time()
# 최대 프로세스 수 출력
print(f"사용된 최대 프로세스 수: {max_thread}")
print(f"Time for merging data: {end_merge - start_merge:.4f} seconds")
end_total = time.time()
print(f"Total execution time: {end_total - start_total:.4f} seconds")
if __name__ == '__main__':
# 단계별 시간 측정(프로파일링)
profiler = cProfile.Profile()
profiler.enable()
# 실행 및 시간 측정
start_time = time.time()
output_dir = "C:\\Users\\makenow\\json" # 출력 디렉토리 지정
merge_data_from_files(output_dir, 10000, 2000, 4)
end_time = time.time()
t = end_time - start_time
print(f"1만 번 실행 완료, 총 소요 시간: {t} 초")
print(f"50만 번 실시 완료 예상 시간: {t * 50} 초")
# 프로파일링 종료
profiler.disable()
# 프로파일링 결과 출력
stats = pstats.Stats(profiler)
stats.strip_dirs()
stats.sort_stats('cumtime') # 누적 시간 기준 정렬
stats.print_stats(10) # 상위 10개의 기록만 출력