카테고리 없음

Parquet 기능

유키공 2025. 4. 30. 08:08
import sys
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json
import traceback
from concurrent.futures import ThreadPoolExecutor
from PyQt5.QtWidgets import (
    QApplication, QMainWindow, QTableView, QFileDialog,
    QVBoxLayout, QWidget, QPushButton, QLabel,
    QStatusBar, QMessageBox, QLineEdit, QHBoxLayout,
    QComboBox, QHeaderView, QProgressDialog, QCheckBox
)
from PyQt5.QtCore import (
    Qt, QAbstractTableModel, QSortFilterProxyModel, 
    QThread, pyqtSignal, QObject, QRunnable, QThreadPool
)
from PyQt5.QtGui import QColor, QFont

class WorkerSignals(QObject):
    """작업자 스레드 시그널"""
    progress = pyqtSignal(int)
    finished = pyqtSignal(object)
    error = pyqtSignal(str)
    message = pyqtSignal(str)

class ExportWorker(QRunnable):
    """내보내기 작업을 처리하는 Runnable"""
    def __init__(self, df, file_path, file_type):
        super().__init__()
        self.df = df
        self.file_path = file_path
        self.file_type = file_type
        self.signals = WorkerSignals()
        self._is_running = True

    def run(self):
        try:
            if not self._is_running:
                return

            self.signals.message.emit(f"Exporting to {self.file_type}...")
            
            if self.file_type == "CSV (*.csv)":
                self.export_csv()
            elif self.file_type == "Excel (*.xlsx)":
                self.export_excel()
            elif self.file_type == "Parquet (*.parquet)":
                self.export_parquet()
            elif self.file_type == "JSON (*.json)":
                self.export_json()
            
            if self._is_running:
                self.signals.finished.emit(self.file_path)
                self.signals.message.emit("Export completed successfully")

        except Exception as e:
            self.signals.error.emit(f"Export failed: {str(e)}")
            self.signals.message.emit("Export failed")

    def export_csv(self):
        """CSV 형식으로 내보내기"""
        chunksize = 100000
        total_rows = len(self.df)
        
        for i in range(0, total_rows, chunksize):
            if not self._is_running:
                return
                
            chunk = self.df.iloc[i:i+chunksize]
            mode = 'w' if i == 0 else 'a'
            header = (i == 0)
            
            chunk.to_csv(
                self.file_path,
                mode=mode,
                header=header,
                index=False
            )
            
            progress = int((i + chunksize) / total_rows * 100)
            self.signals.progress.emit(min(progress, 100))

    def export_excel(self):
        """Excel 형식으로 내보내기"""
        if not self._is_running:
            return
            
        self.signals.progress.emit(20)
        try:
            self.df.to_excel(self.file_path, index=False, engine='openpyxl')
        except ImportError:
            self.df.to_excel(self.file_path, index=False)
        self.signals.progress.emit(100)

    def export_parquet(self):
        """Parquet 형식으로 내보내기"""
        if not self._is_running:
            return
            
        self.signals.progress.emit(30)
        self.df.to_parquet(self.file_path, engine='pyarrow')
        self.signals.progress.emit(100)

    def export_json(self):
        """JSON 형식으로 내보내기"""
        if not self._is_running:
            return
            
        self.signals.progress.emit(10)
        
        chunksize = 50000
        total_rows = len(self.df)
        
        with open(self.file_path, 'w', encoding='utf-8') as f:
            for i in range(0, total_rows, chunksize):
                if not self._is_running:
                    return
                    
                chunk = self.df.iloc[i:i+chunksize]
                json_str = chunk.to_json(orient='records', lines=True, force_ascii=False)
                f.write(json_str)
                
                progress = int((i + chunksize) / total_rows * 100)
                self.signals.progress.emit(min(progress, 100))

    def stop(self):
        """작업 중단"""
        self._is_running = False

class ArrowTableConverter:
    """모든 PyArrow 버전에서 안전하게 테이블 변환"""
    @staticmethod
    def get_columns(table):
        """모든 버전에서 컬럼 이름 추출"""
        if hasattr(table, 'column_names'):
            return table.column_names
        return [table.schema[i].name for i in range(table.num_columns)]

    @staticmethod
    def get_column_data(table, col):
        """모든 버전에서 컬럼 데이터 추출"""
        if hasattr(table, 'column'):
            return table.column(col)
        return table[col]

    @staticmethod
    def to_dataframe(table):
        """테이블을 DataFrame으로 안전하게 변환"""
        try:
            return table.to_pandas()
        except:
            return ArrowTableConverter.manual_conversion(table)

    @staticmethod
    def manual_conversion(table):
        """수동 테이블 변환 (최후의 방법)"""
        data = {}
        columns = ArrowTableConverter.get_columns(table)
        
        for col in columns:
            try:
                col_data = ArrowTableConverter.get_column_data(table, col)
                if hasattr(col_data, 'to_pandas'):
                    data[col] = col_data.to_pandas()
                else:
                    data[col] = [str(x) for x in col_data]
            except:
                data[col] = ["[Conversion Error]"] * len(table)
        
        return pd.DataFrame(data)

class ParquetLoader(QThread):
    """강력한 Parquet 파일 로더"""
    def __init__(self, file_path, max_rows=None):
        super().__init__()
        self.file_path = file_path
        self.max_rows = max_rows
        self.signals = WorkerSignals()
        self._is_running = True

    def run(self):
        try:
            self.signals.progress.emit(5)
            parquet_file = pq.ParquetFile(self.file_path)
            num_row_groups = parquet_file.num_row_groups
            self.signals.progress.emit(10)

            chunks = []
            loaded_rows = 0
            
            for i in range(num_row_groups):
                if not self._is_running:
                    return
                
                self.signals.progress.emit(10 + int((i+1)/num_row_groups*70))
                table = parquet_file.read_row_group(i)
                df_chunk = ArrowTableConverter.to_dataframe(table)
                
                if self.max_rows:
                    remaining = self.max_rows - loaded_rows
                    if remaining <= 0:
                        break
                    df_chunk = df_chunk.head(remaining)
                
                chunks.append(df_chunk)
                loaded_rows += len(df_chunk)
                
                if self.max_rows and loaded_rows >= self.max_rows:
                    break

            self.signals.progress.emit(85)
            combined_df = pd.concat(chunks, ignore_index=True)
            final_df = self.clean_data(combined_df)
            
            self.signals.progress.emit(100)
            self.signals.finished.emit(final_df)
            
        except Exception as e:
            error_trace = traceback.format_exc()
            self.signals.error.emit(f"로딩 실패:\n{str(e)}\n\n{error_trace}")
        finally:
            if 'parquet_file' in locals():
                del parquet_file

    def clean_data(self, df):
        """데이터 정제"""
        for col in df.columns:
            try:
                df[col] = df[col].fillna("")
                sample = df[col].iloc[0] if len(df) > 0 else None
                if isinstance(sample, (dict, list)):
                    df[col] = df[col].apply(self.safe_json_dumps)
                elif not pd.api.types.is_string_dtype(df[col]):
                    df[col] = df[col].astype(str)
            except Exception as col_error:
                print(f"컬럼 {col} 정제 오류: {col_error}")
                df[col] = "[Error] " + df[col].astype(str)
        return df

    def safe_json_dumps(self, value):
        """안전한 JSON 변환"""
        try:
            if value is None:
                return ""
            if isinstance(value, (dict, list)):
                return json.dumps(value, ensure_ascii=False, default=str)[:2000]
            return str(value)
        except:
            return "[Conversion Error]"

    def stop(self):
        """작업 중단"""
        self._is_running = False

class DataFrameModel(QAbstractTableModel):
    """고성능 데이터 모델"""
    def __init__(self, data):
        super().__init__()
        self._data = data

    def rowCount(self, parent=None):
        return len(self._data)

    def columnCount(self, parent=None):
        return len(self._data.columns)

    def data(self, index, role=Qt.DisplayRole):
        if not index.isValid():
            return None

        value = self._data.iloc[index.row(), index.column()]

        if role == Qt.DisplayRole:
            return str(value) if not pd.isna(value) else ""
        elif role == Qt.BackgroundRole:
            if isinstance(value, (dict, list)):
                return QColor(240, 248, 255)
            return QColor(255, 255, 255)
        elif role == Qt.TextAlignmentRole:
            return Qt.AlignLeft | Qt.AlignVCenter
        return None

    def headerData(self, section, orientation, role):
        if role != Qt.DisplayRole:
            return None
        if orientation == Qt.Horizontal:
            return str(self._data.columns[section])
        return str(self._data.index[section])

class ParquetViewer(QMainWindow):
    """메인 뷰어 클래스"""
    def __init__(self):
        super().__init__()
        self.setWindowTitle("Universal Parquet Viewer")
        self.setGeometry(100, 100, 1400, 900)
        self.setup_ui()
        self.thread_pool = QThreadPool.globalInstance()
        self.thread_pool.setMaxThreadCount(2)
        self.export_worker = None
        
    def setup_ui(self):
        """UI 초기화"""
        self.central_widget = QWidget()
        self.setCentralWidget(self.central_widget)
        layout = QVBoxLayout(self.central_widget)
        
        # 컨트롤 패널
        panel = QWidget()
        panel_layout = QHBoxLayout(panel)
        
        self.btn_open = QPushButton("Open Parquet")
        self.btn_open.clicked.connect(self.open_file)
        panel_layout.addWidget(self.btn_open)
        
        self.preview_check = QCheckBox("Preview Mode (First 1,000 rows)")
        self.preview_check.setChecked(True)
        panel_layout.addWidget(self.preview_check)
        
        panel_layout.addWidget(QLabel("Search:"))
        self.search_input = QLineEdit()
        self.search_input.setPlaceholderText("Search...")
        self.search_input.textChanged.connect(self.apply_filter)
        self.search_input.setEnabled(False)
        panel_layout.addWidget(self.search_input)
        
        self.column_combo = QComboBox()
        self.column_combo.addItem("All Columns")
        self.column_combo.setEnabled(False)
        panel_layout.addWidget(self.column_combo)
        
        self.btn_export = QPushButton("Export")
        self.btn_export.clicked.connect(self.export_data)
        self.btn_export.setEnabled(False)
        panel_layout.addWidget(self.btn_export)
        
        layout.addWidget(panel)
        
        # 테이블 뷰
        self.table_view = QTableView()
        self.table_view.setSortingEnabled(True)
        self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
        self.table_view.setStyleSheet("""
            QTableView {
                font-size: 10pt;
                selection-background-color: #3498db;
                selection-color: white;
            }
            QHeaderView::section {
                background-color: #34495e;
                color: white;
                padding: 5px;
                font-weight: bold;
            }
        """)
        layout.addWidget(self.table_view)
        
        # 상태 바
        self.status_bar = QStatusBar()
        self.setStatusBar(self.status_bar)
        
        # 프록시 모델
        self.proxy_model = QSortFilterProxyModel()
        self.proxy_model.setFilterCaseSensitivity(Qt.CaseInsensitive)
    
    def open_file(self):
        """파일 열기 다이얼로그"""
        options = QFileDialog.Options()
        file_path, _ = QFileDialog.getOpenFileName(
            self, "Open Parquet File", "", 
            "Parquet Files (*.parquet);;All Files (*)", 
            options=options)
        
        if file_path:
            self.load_parquet(file_path)
    
    def load_parquet(self, file_path):
        """Parquet 파일 로드"""
        self.progress = QProgressDialog("Loading...", "Cancel", 0, 100, self)
        self.progress.setWindowModality(Qt.WindowModal)
        self.progress.canceled.connect(self.cancel_loading)
        
        max_rows = 1000 if self.preview_check.isChecked() else None
        self.loader = ParquetLoader(file_path, max_rows=max_rows)
        self.loader.signals.progress.connect(self.update_progress)
        self.loader.signals.finished.connect(self.on_load_complete)
        self.loader.signals.error.connect(self.on_load_error)
        self.loader.start()
        
        self.progress.show()
    
    def update_progress(self, value):
        """진행률 업데이트"""
        self.progress.setValue(value)
    
    def cancel_loading(self):
        """로딩 취소"""
        if hasattr(self, 'loader'):
            self.loader.stop()
        self.progress.close()
        self.status_bar.showMessage("Loading canceled", 3000)
    
    def on_load_complete(self, df):
        """로딩 완료 처리"""
        self.progress.close()
        
        model = DataFrameModel(df)
        self.proxy_model.setSourceModel(model)
        self.table_view.setModel(self.proxy_model)
        self.table_view.resizeColumnsToContents()
        
        self.column_combo.clear()
        self.column_combo.addItem("All Columns")
        self.column_combo.addItems(df.columns.tolist())
        self.column_combo.setEnabled(True)
        
        self.search_input.setEnabled(True)
        self.btn_export.setEnabled(True)
        
        file_size = os.path.getsize(self.loader.file_path) / (1024 * 1024)
        self.status_bar.showMessage(
            f"Loaded: {len(df):,} rows | {len(df.columns)} cols | {file_size:.2f} MB", 
            5000
        )
    
    def on_load_error(self, error_msg):
        """로딩 오류 처리"""
        self.progress.close()
        QMessageBox.critical(self, "Load Error", error_msg)
        self.status_bar.showMessage("Load failed", 5000)
    
    def apply_filter(self, text):
        """테이블 필터링 적용"""
        if not hasattr(self, 'proxy_model') or not hasattr(self.proxy_model, 'sourceModel'):
            return
            
        if self.column_combo.currentText() == "All Columns":
            self.proxy_model.setFilterKeyColumn(-1)
        else:
            try:
                col_idx = self.proxy_model.sourceModel()._data.columns.get_loc(
                    self.column_combo.currentText())
                self.proxy_model.setFilterKeyColumn(col_idx)
            except (AttributeError, KeyError):
                return
                
        self.proxy_model.setFilterFixedString(text)
    
    def export_data(self):
        """데이터 내보내기"""
        if not hasattr(self, 'proxy_model') or not hasattr(self.proxy_model, 'sourceModel'):
            QMessageBox.warning(
                self, 
                "No Data", 
                "Please load a Parquet file first before exporting.",
                QMessageBox.Ok
            )
            self.status_bar.showMessage("Export failed: No data loaded", 3000)
            return
            
        df = self.proxy_model.sourceModel()._data
        
        options = QFileDialog.Options()
        file_path, selected_filter = QFileDialog.getSaveFileName(
            self, "Export Data", "", 
            "CSV (*.csv);;Excel (*.xlsx);;Parquet (*.parquet);;JSON (*.json)", 
            options=options)
        
        if not file_path:
            return
            
        if selected_filter == "CSV (*.csv)" and not file_path.endswith('.csv'):
            file_path += '.csv'
        elif selected_filter == "Excel (*.xlsx)" and not file_path.endswith('.xlsx'):
            file_path += '.xlsx'
        elif selected_filter == "Parquet (*.parquet)" and not file_path.endswith('.parquet'):
            file_path += '.parquet'
        elif selected_filter == "JSON (*.json)" and not file_path.endswith('.json'):
            file_path += '.json'
        
        self.export_progress = QProgressDialog("Exporting...", "Cancel", 0, 100, self)
        self.export_progress.setWindowModality(Qt.WindowModal)
        self.export_progress.canceled.connect(self.cancel_export)
        
        self.export_worker = ExportWorker(df, file_path, selected_filter)
        self.export_worker.signals.progress.connect(self.export_progress.setValue)
        self.export_worker.signals.message.connect(self.status_bar.showMessage)
        self.export_worker.signals.finished.connect(self.on_export_complete)
        self.export_worker.signals.error.connect(self.on_export_error)
        
        self.thread_pool.start(self.export_worker)
        self.export_progress.show()
    
    def cancel_export(self):
        """내보내기 취소"""
        if self.export_worker:
            self.export_worker.stop()
        self.export_progress.close()
        self.status_bar.showMessage("Export canceled", 3000)
    
    def on_export_complete(self, file_path):
        """내보내기 완료 처리"""
        self.export_progress.close()
        QMessageBox.information(
            self, 
            "Success", 
            f"Data exported to:\n{file_path}"
        )
        self.status_bar.showMessage(f"Exported to {file_path}", 5000)
    
    def on_export_error(self, error_msg):
        """내보내기 오류 처리"""
        self.export_progress.close()
        QMessageBox.critical(
            self, 
            "Export Error", 
            f"Export failed:\n{error_msg}"
        )
        self.status_bar.showMessage("Export failed", 5000)

if __name__ == "__main__":
    app = QApplication(sys.argv)
    app.setStyle('Fusion')
    
    if hasattr(Qt, 'AA_EnableHighDpiScaling'):
        app.setAttribute(Qt.AA_EnableHighDpiScaling, True)
    if hasattr(Qt, 'AA_UseHighDpiPixmaps'):
        app.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
    
    viewer = ParquetViewer()
    viewer.show()
    sys.exit(app.exec_())