랭귀지/pandas
np.select 멀티프로세싱 적용
유키공
2025. 6. 12. 08:30
import numpy as np
import pandas as pd
import multiprocessing as mp
from functools import partial
def process_chunk(df_chunk, conditions, choices, default):
# 각 청크에 np.select 적용
result = np.select(conditions, choices, default=default)
return pd.Series(result, index=df_chunk.index)
def parallel_select(df, conditions, choices, default='default', num_processes=None):
if num_processes is None:
num_processes = mp.cpu_count()
# 데이터 분할
chunks = np.array_split(df, num_processes)
# 부분 함수 생성 (conditions, choices, default 고정)
worker = partial(process_chunk, conditions=conditions, choices=choices, default=default)
with mp.Pool(num_processes) as pool:
results = pool.map(worker, chunks)
# 결과 병합
return pd.concat(results)
# 대용량 데이터 생성 (1000만 행)
df = pd.DataFrame({
'col1': np.random.randint(0, 100, 10_000_000),
'col2': np.random.choice(['A','B','C'], 10_000_000),
'col3': np.random.randn(10_000_000)
})
# 복잡한 조건 정의
conditions = [
(df['col1'] > 50) & (df['col2'] == 'A'),
(df['col1'] < 20) | (df['col3'].abs() > 2),
df['col2'].isin(['B','C'])
]
choices = ['High A', 'Low or Outlier', 'B or C']
# 멀티프로세싱 적용
df['category'] = parallel_select(df, conditions, choices, default='Other')