아미(아름다운미소)

merge 본문

랭귀지/pandas

merge

유키공 2025. 5. 28. 16:04
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import pandas as pd

def safe_parallel_merge(df1, df2, left_key, right_key=None, n_partitions=4, how='left'):
    """
    개선된 병렬 merge 함수 - 안정성 강화 버전
    
    Parameters:
    - df1: 왼쪽 DataFrame
    - df2: 오른쪽 DataFrame
    - left_key: df1의 조인 키 (컬럼명 또는 컬럼 리스트)
    - right_key: df2의 조인 키 (None이면 left_key와 동일)
    - n_partitions: 분할 개수
    - how: merge 방식 ('left', 'right', 'inner', 'outer')
    """
    # 1. 키 컬럼 검증 및 표준화
    right_key = right_key if right_key is not None else left_key
    
    left_keys = [left_key] if isinstance(left_key, str) else list(left_key)
    right_keys = [right_key] if isinstance(right_key, str) else list(right_key)
    
    # 2. 키 컬럼 존재 여부 확인
    missing_in_left = set(left_keys) - set(df1.columns)
    missing_in_right = set(right_keys) - set(df2.columns)
    
    if missing_in_left:
        raise ValueError(f"df1에 다음 키 컬럼이 없습니다: {missing_in_left}")
    if missing_in_right:
        raise ValueError(f"df2에 다음 키 컬럼이 없습니다: {missing_in_right}")
    
    # 3. 키 컬럼 타입 통일 (중요!)
    for lk, rk in zip(left_keys, right_keys):
        df1[lk] = df1[lk].astype(df2[rk].dtype)
    
    # 4. 안정적인 데이터 분할
    df1 = df1.reset_index(drop=True)
    split_indices = np.linspace(0, len(df1), n_partitions + 1, dtype=int)
    chunks = [df1.iloc[split_indices[i]:split_indices[i+1]] for i in range(n_partitions)]
    
    # 5. 병렬 처리
    results = []
    with ThreadPoolExecutor(max_workers=n_partitions) as executor:
        futures = []
        for chunk in chunks:
            futures.append(
                executor.submit(
                    pd.merge,
                    chunk.copy(),  # 안정성을 위해 복사본 사용
                    df2.copy(),
                    left_on=left_keys,
                    right_on=right_keys,
                    how=how
                )
            )
        
        for future in futures:
            try:
                result = future.result()
                results.append(result)
            except Exception as e:
                executor.shutdown(wait=False)
                raise RuntimeError(f"병렬 merge 실패: {str(e)}")
    
    # 6. 결과 병합 및 중복 처리
    final_df = pd.concat(results, ignore_index=True)
    
    if how in ['outer', 'right']:
        final_df = final_df.drop_duplicates(subset=left_keys if how == 'right' else right_keys)
    
    return final_df
# 같은 키 이름
result = safe_parallel_merge(df1, df2, left_key='id', n_partitions=4)

# 다른 키 이름
result = safe_parallel_merge(df1, df2, left_key='df1_id', right_key='df2_id')
# 여러 컬럼으로 조인
result = safe_parallel_merge(
    df1, df2, 
    left_key=['date', 'user_id'],
    right_key=['transaction_date', 'customer_id']
)
Comments