Notice
														
												
											
												
												
													Recent Posts
													
											
												
												
													Recent Comments
													
											
												
												
													Link
													
											
									| 일 | 월 | 화 | 수 | 목 | 금 | 토 | 
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | |||
| 5 | 6 | 7 | 8 | 9 | 10 | 11 | 
| 12 | 13 | 14 | 15 | 16 | 17 | 18 | 
| 19 | 20 | 21 | 22 | 23 | 24 | 25 | 
| 26 | 27 | 28 | 29 | 30 | 31 | 
													Tags
													
											
												
												- 유니티
- 함수
- javascript
- 라즈베리파이
- 리눅스
- python
- PyQt
- mssql
- GIT
- pandas
- 날짜
- 다이어트
- 맛집
- PyQt5
- MySQL
- flutter
- PER
- Linux
- ASP
- urllib
- swift
- ubuntu
- MS-SQL
- IOS
- sqlite
- tensorflow
- port
- node.js
- Unity
- Excel
													Archives
													
											
											
											
											아미(아름다운미소)
Parquet 기능 본문
import sys
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json
import traceback
from concurrent.futures import ThreadPoolExecutor
from PyQt5.QtWidgets import (
    QApplication, QMainWindow, QTableView, QFileDialog,
    QVBoxLayout, QWidget, QPushButton, QLabel,
    QStatusBar, QMessageBox, QLineEdit, QHBoxLayout,
    QComboBox, QHeaderView, QProgressDialog, QCheckBox
)
from PyQt5.QtCore import (
    Qt, QAbstractTableModel, QSortFilterProxyModel, 
    QThread, pyqtSignal, QObject, QRunnable, QThreadPool
)
from PyQt5.QtGui import QColor, QFont
class WorkerSignals(QObject):
    """작업자 스레드 시그널"""
    progress = pyqtSignal(int)
    finished = pyqtSignal(object)
    error = pyqtSignal(str)
    message = pyqtSignal(str)
class ExportWorker(QRunnable):
    """내보내기 작업을 처리하는 Runnable"""
    def __init__(self, df, file_path, file_type):
        super().__init__()
        self.df = df
        self.file_path = file_path
        self.file_type = file_type
        self.signals = WorkerSignals()
        self._is_running = True
    def run(self):
        try:
            if not self._is_running:
                return
            self.signals.message.emit(f"Exporting to {self.file_type}...")
            
            if self.file_type == "CSV (*.csv)":
                self.export_csv()
            elif self.file_type == "Excel (*.xlsx)":
                self.export_excel()
            elif self.file_type == "Parquet (*.parquet)":
                self.export_parquet()
            elif self.file_type == "JSON (*.json)":
                self.export_json()
            
            if self._is_running:
                self.signals.finished.emit(self.file_path)
                self.signals.message.emit("Export completed successfully")
        except Exception as e:
            self.signals.error.emit(f"Export failed: {str(e)}")
            self.signals.message.emit("Export failed")
    def export_csv(self):
        """CSV 형식으로 내보내기"""
        chunksize = 100000
        total_rows = len(self.df)
        
        for i in range(0, total_rows, chunksize):
            if not self._is_running:
                return
                
            chunk = self.df.iloc[i:i+chunksize]
            mode = 'w' if i == 0 else 'a'
            header = (i == 0)
            
            chunk.to_csv(
                self.file_path,
                mode=mode,
                header=header,
                index=False
            )
            
            progress = int((i + chunksize) / total_rows * 100)
            self.signals.progress.emit(min(progress, 100))
    def export_excel(self):
        """Excel 형식으로 내보내기"""
        if not self._is_running:
            return
            
        self.signals.progress.emit(20)
        try:
            self.df.to_excel(self.file_path, index=False, engine='openpyxl')
        except ImportError:
            self.df.to_excel(self.file_path, index=False)
        self.signals.progress.emit(100)
    def export_parquet(self):
        """Parquet 형식으로 내보내기"""
        if not self._is_running:
            return
            
        self.signals.progress.emit(30)
        self.df.to_parquet(self.file_path, engine='pyarrow')
        self.signals.progress.emit(100)
    def export_json(self):
        """JSON 형식으로 내보내기"""
        if not self._is_running:
            return
            
        self.signals.progress.emit(10)
        
        chunksize = 50000
        total_rows = len(self.df)
        
        with open(self.file_path, 'w', encoding='utf-8') as f:
            for i in range(0, total_rows, chunksize):
                if not self._is_running:
                    return
                    
                chunk = self.df.iloc[i:i+chunksize]
                json_str = chunk.to_json(orient='records', lines=True, force_ascii=False)
                f.write(json_str)
                
                progress = int((i + chunksize) / total_rows * 100)
                self.signals.progress.emit(min(progress, 100))
    def stop(self):
        """작업 중단"""
        self._is_running = False
class ArrowTableConverter:
    """모든 PyArrow 버전에서 안전하게 테이블 변환"""
    @staticmethod
    def get_columns(table):
        """모든 버전에서 컬럼 이름 추출"""
        if hasattr(table, 'column_names'):
            return table.column_names
        return [table.schema[i].name for i in range(table.num_columns)]
    @staticmethod
    def get_column_data(table, col):
        """모든 버전에서 컬럼 데이터 추출"""
        if hasattr(table, 'column'):
            return table.column(col)
        return table[col]
    @staticmethod
    def to_dataframe(table):
        """테이블을 DataFrame으로 안전하게 변환"""
        try:
            return table.to_pandas()
        except:
            return ArrowTableConverter.manual_conversion(table)
    @staticmethod
    def manual_conversion(table):
        """수동 테이블 변환 (최후의 방법)"""
        data = {}
        columns = ArrowTableConverter.get_columns(table)
        
        for col in columns:
            try:
                col_data = ArrowTableConverter.get_column_data(table, col)
                if hasattr(col_data, 'to_pandas'):
                    data[col] = col_data.to_pandas()
                else:
                    data[col] = [str(x) for x in col_data]
            except:
                data[col] = ["[Conversion Error]"] * len(table)
        
        return pd.DataFrame(data)
class ParquetLoader(QThread):
    """강력한 Parquet 파일 로더"""
    def __init__(self, file_path, max_rows=None):
        super().__init__()
        self.file_path = file_path
        self.max_rows = max_rows
        self.signals = WorkerSignals()
        self._is_running = True
    def run(self):
        try:
            self.signals.progress.emit(5)
            parquet_file = pq.ParquetFile(self.file_path)
            num_row_groups = parquet_file.num_row_groups
            self.signals.progress.emit(10)
            chunks = []
            loaded_rows = 0
            
            for i in range(num_row_groups):
                if not self._is_running:
                    return
                
                self.signals.progress.emit(10 + int((i+1)/num_row_groups*70))
                table = parquet_file.read_row_group(i)
                df_chunk = ArrowTableConverter.to_dataframe(table)
                
                if self.max_rows:
                    remaining = self.max_rows - loaded_rows
                    if remaining <= 0:
                        break
                    df_chunk = df_chunk.head(remaining)
                
                chunks.append(df_chunk)
                loaded_rows += len(df_chunk)
                
                if self.max_rows and loaded_rows >= self.max_rows:
                    break
            self.signals.progress.emit(85)
            combined_df = pd.concat(chunks, ignore_index=True)
            final_df = self.clean_data(combined_df)
            
            self.signals.progress.emit(100)
            self.signals.finished.emit(final_df)
            
        except Exception as e:
            error_trace = traceback.format_exc()
            self.signals.error.emit(f"로딩 실패:\n{str(e)}\n\n{error_trace}")
        finally:
            if 'parquet_file' in locals():
                del parquet_file
    def clean_data(self, df):
        """데이터 정제"""
        for col in df.columns:
            try:
                df[col] = df[col].fillna("")
                sample = df[col].iloc[0] if len(df) > 0 else None
                if isinstance(sample, (dict, list)):
                    df[col] = df[col].apply(self.safe_json_dumps)
                elif not pd.api.types.is_string_dtype(df[col]):
                    df[col] = df[col].astype(str)
            except Exception as col_error:
                print(f"컬럼 {col} 정제 오류: {col_error}")
                df[col] = "[Error] " + df[col].astype(str)
        return df
    def safe_json_dumps(self, value):
        """안전한 JSON 변환"""
        try:
            if value is None:
                return ""
            if isinstance(value, (dict, list)):
                return json.dumps(value, ensure_ascii=False, default=str)[:2000]
            return str(value)
        except:
            return "[Conversion Error]"
    def stop(self):
        """작업 중단"""
        self._is_running = False
class DataFrameModel(QAbstractTableModel):
    """고성능 데이터 모델"""
    def __init__(self, data):
        super().__init__()
        self._data = data
    def rowCount(self, parent=None):
        return len(self._data)
    def columnCount(self, parent=None):
        return len(self._data.columns)
    def data(self, index, role=Qt.DisplayRole):
        if not index.isValid():
            return None
        value = self._data.iloc[index.row(), index.column()]
        if role == Qt.DisplayRole:
            return str(value) if not pd.isna(value) else ""
        elif role == Qt.BackgroundRole:
            if isinstance(value, (dict, list)):
                return QColor(240, 248, 255)
            return QColor(255, 255, 255)
        elif role == Qt.TextAlignmentRole:
            return Qt.AlignLeft | Qt.AlignVCenter
        return None
    def headerData(self, section, orientation, role):
        if role != Qt.DisplayRole:
            return None
        if orientation == Qt.Horizontal:
            return str(self._data.columns[section])
        return str(self._data.index[section])
class ParquetViewer(QMainWindow):
    """메인 뷰어 클래스"""
    def __init__(self):
        super().__init__()
        self.setWindowTitle("Universal Parquet Viewer")
        self.setGeometry(100, 100, 1400, 900)
        self.setup_ui()
        self.thread_pool = QThreadPool.globalInstance()
        self.thread_pool.setMaxThreadCount(2)
        self.export_worker = None
        
    def setup_ui(self):
        """UI 초기화"""
        self.central_widget = QWidget()
        self.setCentralWidget(self.central_widget)
        layout = QVBoxLayout(self.central_widget)
        
        # 컨트롤 패널
        panel = QWidget()
        panel_layout = QHBoxLayout(panel)
        
        self.btn_open = QPushButton("Open Parquet")
        self.btn_open.clicked.connect(self.open_file)
        panel_layout.addWidget(self.btn_open)
        
        self.preview_check = QCheckBox("Preview Mode (First 1,000 rows)")
        self.preview_check.setChecked(True)
        panel_layout.addWidget(self.preview_check)
        
        panel_layout.addWidget(QLabel("Search:"))
        self.search_input = QLineEdit()
        self.search_input.setPlaceholderText("Search...")
        self.search_input.textChanged.connect(self.apply_filter)
        self.search_input.setEnabled(False)
        panel_layout.addWidget(self.search_input)
        
        self.column_combo = QComboBox()
        self.column_combo.addItem("All Columns")
        self.column_combo.setEnabled(False)
        panel_layout.addWidget(self.column_combo)
        
        self.btn_export = QPushButton("Export")
        self.btn_export.clicked.connect(self.export_data)
        self.btn_export.setEnabled(False)
        panel_layout.addWidget(self.btn_export)
        
        layout.addWidget(panel)
        
        # 테이블 뷰
        self.table_view = QTableView()
        self.table_view.setSortingEnabled(True)
        self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
        self.table_view.setStyleSheet("""
            QTableView {
                font-size: 10pt;
                selection-background-color: #3498db;
                selection-color: white;
            }
            QHeaderView::section {
                background-color: #34495e;
                color: white;
                padding: 5px;
                font-weight: bold;
            }
        """)
        layout.addWidget(self.table_view)
        
        # 상태 바
        self.status_bar = QStatusBar()
        self.setStatusBar(self.status_bar)
        
        # 프록시 모델
        self.proxy_model = QSortFilterProxyModel()
        self.proxy_model.setFilterCaseSensitivity(Qt.CaseInsensitive)
    
    def open_file(self):
        """파일 열기 다이얼로그"""
        options = QFileDialog.Options()
        file_path, _ = QFileDialog.getOpenFileName(
            self, "Open Parquet File", "", 
            "Parquet Files (*.parquet);;All Files (*)", 
            options=options)
        
        if file_path:
            self.load_parquet(file_path)
    
    def load_parquet(self, file_path):
        """Parquet 파일 로드"""
        self.progress = QProgressDialog("Loading...", "Cancel", 0, 100, self)
        self.progress.setWindowModality(Qt.WindowModal)
        self.progress.canceled.connect(self.cancel_loading)
        
        max_rows = 1000 if self.preview_check.isChecked() else None
        self.loader = ParquetLoader(file_path, max_rows=max_rows)
        self.loader.signals.progress.connect(self.update_progress)
        self.loader.signals.finished.connect(self.on_load_complete)
        self.loader.signals.error.connect(self.on_load_error)
        self.loader.start()
        
        self.progress.show()
    
    def update_progress(self, value):
        """진행률 업데이트"""
        self.progress.setValue(value)
    
    def cancel_loading(self):
        """로딩 취소"""
        if hasattr(self, 'loader'):
            self.loader.stop()
        self.progress.close()
        self.status_bar.showMessage("Loading canceled", 3000)
    
    def on_load_complete(self, df):
        """로딩 완료 처리"""
        self.progress.close()
        
        model = DataFrameModel(df)
        self.proxy_model.setSourceModel(model)
        self.table_view.setModel(self.proxy_model)
        self.table_view.resizeColumnsToContents()
        
        self.column_combo.clear()
        self.column_combo.addItem("All Columns")
        self.column_combo.addItems(df.columns.tolist())
        self.column_combo.setEnabled(True)
        
        self.search_input.setEnabled(True)
        self.btn_export.setEnabled(True)
        
        file_size = os.path.getsize(self.loader.file_path) / (1024 * 1024)
        self.status_bar.showMessage(
            f"Loaded: {len(df):,} rows | {len(df.columns)} cols | {file_size:.2f} MB", 
            5000
        )
    
    def on_load_error(self, error_msg):
        """로딩 오류 처리"""
        self.progress.close()
        QMessageBox.critical(self, "Load Error", error_msg)
        self.status_bar.showMessage("Load failed", 5000)
    
    def apply_filter(self, text):
        """테이블 필터링 적용"""
        if not hasattr(self, 'proxy_model') or not hasattr(self.proxy_model, 'sourceModel'):
            return
            
        if self.column_combo.currentText() == "All Columns":
            self.proxy_model.setFilterKeyColumn(-1)
        else:
            try:
                col_idx = self.proxy_model.sourceModel()._data.columns.get_loc(
                    self.column_combo.currentText())
                self.proxy_model.setFilterKeyColumn(col_idx)
            except (AttributeError, KeyError):
                return
                
        self.proxy_model.setFilterFixedString(text)
    
    def export_data(self):
        """데이터 내보내기"""
        if not hasattr(self, 'proxy_model') or not hasattr(self.proxy_model, 'sourceModel'):
            QMessageBox.warning(
                self, 
                "No Data", 
                "Please load a Parquet file first before exporting.",
                QMessageBox.Ok
            )
            self.status_bar.showMessage("Export failed: No data loaded", 3000)
            return
            
        df = self.proxy_model.sourceModel()._data
        
        options = QFileDialog.Options()
        file_path, selected_filter = QFileDialog.getSaveFileName(
            self, "Export Data", "", 
            "CSV (*.csv);;Excel (*.xlsx);;Parquet (*.parquet);;JSON (*.json)", 
            options=options)
        
        if not file_path:
            return
            
        if selected_filter == "CSV (*.csv)" and not file_path.endswith('.csv'):
            file_path += '.csv'
        elif selected_filter == "Excel (*.xlsx)" and not file_path.endswith('.xlsx'):
            file_path += '.xlsx'
        elif selected_filter == "Parquet (*.parquet)" and not file_path.endswith('.parquet'):
            file_path += '.parquet'
        elif selected_filter == "JSON (*.json)" and not file_path.endswith('.json'):
            file_path += '.json'
        
        self.export_progress = QProgressDialog("Exporting...", "Cancel", 0, 100, self)
        self.export_progress.setWindowModality(Qt.WindowModal)
        self.export_progress.canceled.connect(self.cancel_export)
        
        self.export_worker = ExportWorker(df, file_path, selected_filter)
        self.export_worker.signals.progress.connect(self.export_progress.setValue)
        self.export_worker.signals.message.connect(self.status_bar.showMessage)
        self.export_worker.signals.finished.connect(self.on_export_complete)
        self.export_worker.signals.error.connect(self.on_export_error)
        
        self.thread_pool.start(self.export_worker)
        self.export_progress.show()
    
    def cancel_export(self):
        """내보내기 취소"""
        if self.export_worker:
            self.export_worker.stop()
        self.export_progress.close()
        self.status_bar.showMessage("Export canceled", 3000)
    
    def on_export_complete(self, file_path):
        """내보내기 완료 처리"""
        self.export_progress.close()
        QMessageBox.information(
            self, 
            "Success", 
            f"Data exported to:\n{file_path}"
        )
        self.status_bar.showMessage(f"Exported to {file_path}", 5000)
    
    def on_export_error(self, error_msg):
        """내보내기 오류 처리"""
        self.export_progress.close()
        QMessageBox.critical(
            self, 
            "Export Error", 
            f"Export failed:\n{error_msg}"
        )
        self.status_bar.showMessage("Export failed", 5000)
if __name__ == "__main__":
    app = QApplication(sys.argv)
    app.setStyle('Fusion')
    
    if hasattr(Qt, 'AA_EnableHighDpiScaling'):
        app.setAttribute(Qt.AA_EnableHighDpiScaling, True)
    if hasattr(Qt, 'AA_UseHighDpiPixmaps'):
        app.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
    
    viewer = ParquetViewer()
    viewer.show()
    sys.exit(app.exec_())
			  Comments