Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | 7 |
8 | 9 | 10 | 11 | 12 | 13 | 14 |
15 | 16 | 17 | 18 | 19 | 20 | 21 |
22 | 23 | 24 | 25 | 26 | 27 | 28 |
29 | 30 |
Tags
- python
- 다이어트
- ASP
- javascript
- port
- 리눅스
- 함수
- ubuntu
- MySQL
- PyQt5
- MS-SQL
- node.js
- flutter
- urllib
- sqlite
- swift
- 라즈베리파이
- Excel
- GIT
- tensorflow
- 맛집
- Linux
- 날짜
- 유니티
- PER
- Unity
- PyQt
- mssql
- pandas
- IOS
Archives
아미(아름다운미소)
merge 본문
from concurrent.futures import ThreadPoolExecutor
import numpy as np
import pandas as pd
def safe_parallel_merge(df1, df2, left_key, right_key=None, n_partitions=4, how='left'):
"""
개선된 병렬 merge 함수 - 안정성 강화 버전
Parameters:
- df1: 왼쪽 DataFrame
- df2: 오른쪽 DataFrame
- left_key: df1의 조인 키 (컬럼명 또는 컬럼 리스트)
- right_key: df2의 조인 키 (None이면 left_key와 동일)
- n_partitions: 분할 개수
- how: merge 방식 ('left', 'right', 'inner', 'outer')
"""
# 1. 키 컬럼 검증 및 표준화
right_key = right_key if right_key is not None else left_key
left_keys = [left_key] if isinstance(left_key, str) else list(left_key)
right_keys = [right_key] if isinstance(right_key, str) else list(right_key)
# 2. 키 컬럼 존재 여부 확인
missing_in_left = set(left_keys) - set(df1.columns)
missing_in_right = set(right_keys) - set(df2.columns)
if missing_in_left:
raise ValueError(f"df1에 다음 키 컬럼이 없습니다: {missing_in_left}")
if missing_in_right:
raise ValueError(f"df2에 다음 키 컬럼이 없습니다: {missing_in_right}")
# 3. 키 컬럼 타입 통일 (중요!)
for lk, rk in zip(left_keys, right_keys):
df1[lk] = df1[lk].astype(df2[rk].dtype)
# 4. 안정적인 데이터 분할
df1 = df1.reset_index(drop=True)
split_indices = np.linspace(0, len(df1), n_partitions + 1, dtype=int)
chunks = [df1.iloc[split_indices[i]:split_indices[i+1]] for i in range(n_partitions)]
# 5. 병렬 처리
results = []
with ThreadPoolExecutor(max_workers=n_partitions) as executor:
futures = []
for chunk in chunks:
futures.append(
executor.submit(
pd.merge,
chunk.copy(), # 안정성을 위해 복사본 사용
df2.copy(),
left_on=left_keys,
right_on=right_keys,
how=how
)
)
for future in futures:
try:
result = future.result()
results.append(result)
except Exception as e:
executor.shutdown(wait=False)
raise RuntimeError(f"병렬 merge 실패: {str(e)}")
# 6. 결과 병합 및 중복 처리
final_df = pd.concat(results, ignore_index=True)
if how in ['outer', 'right']:
final_df = final_df.drop_duplicates(subset=left_keys if how == 'right' else right_keys)
return final_df
# 같은 키 이름
result = safe_parallel_merge(df1, df2, left_key='id', n_partitions=4)
# 다른 키 이름
result = safe_parallel_merge(df1, df2, left_key='df1_id', right_key='df2_id')
# 여러 컬럼으로 조인
result = safe_parallel_merge(
df1, df2,
left_key=['date', 'user_id'],
right_key=['transaction_date', 'customer_id']
)
'랭귀지 > pandas' 카테고리의 다른 글
join (0) | 2025.06.02 |
---|---|
object 타입 컬럼을 모두 문자열(str)로 변환 (0) | 2025.05.26 |
df count (0) | 2025.05.15 |
parquet (0) | 2025.04.23 |
두 데이터프레임 df와 df2에서 'a'와 'b' 컬럼을 비교하여 매핑이 다른 행을 찾는 예제 (0) | 2025.04.14 |
Comments