카테고리 없음
Parquet 기능
유키공
2025. 4. 30. 08:08
import sys
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json
import traceback
from concurrent.futures import ThreadPoolExecutor
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QTableView, QFileDialog,
QVBoxLayout, QWidget, QPushButton, QLabel,
QStatusBar, QMessageBox, QLineEdit, QHBoxLayout,
QComboBox, QHeaderView, QProgressDialog, QCheckBox
)
from PyQt5.QtCore import (
Qt, QAbstractTableModel, QSortFilterProxyModel,
QThread, pyqtSignal, QObject, QRunnable, QThreadPool
)
from PyQt5.QtGui import QColor, QFont
class WorkerSignals(QObject):
"""작업자 스레드 시그널"""
progress = pyqtSignal(int)
finished = pyqtSignal(object)
error = pyqtSignal(str)
message = pyqtSignal(str)
class ExportWorker(QRunnable):
"""내보내기 작업을 처리하는 Runnable"""
def __init__(self, df, file_path, file_type):
super().__init__()
self.df = df
self.file_path = file_path
self.file_type = file_type
self.signals = WorkerSignals()
self._is_running = True
def run(self):
try:
if not self._is_running:
return
self.signals.message.emit(f"Exporting to {self.file_type}...")
if self.file_type == "CSV (*.csv)":
self.export_csv()
elif self.file_type == "Excel (*.xlsx)":
self.export_excel()
elif self.file_type == "Parquet (*.parquet)":
self.export_parquet()
elif self.file_type == "JSON (*.json)":
self.export_json()
if self._is_running:
self.signals.finished.emit(self.file_path)
self.signals.message.emit("Export completed successfully")
except Exception as e:
self.signals.error.emit(f"Export failed: {str(e)}")
self.signals.message.emit("Export failed")
def export_csv(self):
"""CSV 형식으로 내보내기"""
chunksize = 100000
total_rows = len(self.df)
for i in range(0, total_rows, chunksize):
if not self._is_running:
return
chunk = self.df.iloc[i:i+chunksize]
mode = 'w' if i == 0 else 'a'
header = (i == 0)
chunk.to_csv(
self.file_path,
mode=mode,
header=header,
index=False
)
progress = int((i + chunksize) / total_rows * 100)
self.signals.progress.emit(min(progress, 100))
def export_excel(self):
"""Excel 형식으로 내보내기"""
if not self._is_running:
return
self.signals.progress.emit(20)
try:
self.df.to_excel(self.file_path, index=False, engine='openpyxl')
except ImportError:
self.df.to_excel(self.file_path, index=False)
self.signals.progress.emit(100)
def export_parquet(self):
"""Parquet 형식으로 내보내기"""
if not self._is_running:
return
self.signals.progress.emit(30)
self.df.to_parquet(self.file_path, engine='pyarrow')
self.signals.progress.emit(100)
def export_json(self):
"""JSON 형식으로 내보내기"""
if not self._is_running:
return
self.signals.progress.emit(10)
chunksize = 50000
total_rows = len(self.df)
with open(self.file_path, 'w', encoding='utf-8') as f:
for i in range(0, total_rows, chunksize):
if not self._is_running:
return
chunk = self.df.iloc[i:i+chunksize]
json_str = chunk.to_json(orient='records', lines=True, force_ascii=False)
f.write(json_str)
progress = int((i + chunksize) / total_rows * 100)
self.signals.progress.emit(min(progress, 100))
def stop(self):
"""작업 중단"""
self._is_running = False
class ArrowTableConverter:
"""모든 PyArrow 버전에서 안전하게 테이블 변환"""
@staticmethod
def get_columns(table):
"""모든 버전에서 컬럼 이름 추출"""
if hasattr(table, 'column_names'):
return table.column_names
return [table.schema[i].name for i in range(table.num_columns)]
@staticmethod
def get_column_data(table, col):
"""모든 버전에서 컬럼 데이터 추출"""
if hasattr(table, 'column'):
return table.column(col)
return table[col]
@staticmethod
def to_dataframe(table):
"""테이블을 DataFrame으로 안전하게 변환"""
try:
return table.to_pandas()
except:
return ArrowTableConverter.manual_conversion(table)
@staticmethod
def manual_conversion(table):
"""수동 테이블 변환 (최후의 방법)"""
data = {}
columns = ArrowTableConverter.get_columns(table)
for col in columns:
try:
col_data = ArrowTableConverter.get_column_data(table, col)
if hasattr(col_data, 'to_pandas'):
data[col] = col_data.to_pandas()
else:
data[col] = [str(x) for x in col_data]
except:
data[col] = ["[Conversion Error]"] * len(table)
return pd.DataFrame(data)
class ParquetLoader(QThread):
"""강력한 Parquet 파일 로더"""
def __init__(self, file_path, max_rows=None):
super().__init__()
self.file_path = file_path
self.max_rows = max_rows
self.signals = WorkerSignals()
self._is_running = True
def run(self):
try:
self.signals.progress.emit(5)
parquet_file = pq.ParquetFile(self.file_path)
num_row_groups = parquet_file.num_row_groups
self.signals.progress.emit(10)
chunks = []
loaded_rows = 0
for i in range(num_row_groups):
if not self._is_running:
return
self.signals.progress.emit(10 + int((i+1)/num_row_groups*70))
table = parquet_file.read_row_group(i)
df_chunk = ArrowTableConverter.to_dataframe(table)
if self.max_rows:
remaining = self.max_rows - loaded_rows
if remaining <= 0:
break
df_chunk = df_chunk.head(remaining)
chunks.append(df_chunk)
loaded_rows += len(df_chunk)
if self.max_rows and loaded_rows >= self.max_rows:
break
self.signals.progress.emit(85)
combined_df = pd.concat(chunks, ignore_index=True)
final_df = self.clean_data(combined_df)
self.signals.progress.emit(100)
self.signals.finished.emit(final_df)
except Exception as e:
error_trace = traceback.format_exc()
self.signals.error.emit(f"로딩 실패:\n{str(e)}\n\n{error_trace}")
finally:
if 'parquet_file' in locals():
del parquet_file
def clean_data(self, df):
"""데이터 정제"""
for col in df.columns:
try:
df[col] = df[col].fillna("")
sample = df[col].iloc[0] if len(df) > 0 else None
if isinstance(sample, (dict, list)):
df[col] = df[col].apply(self.safe_json_dumps)
elif not pd.api.types.is_string_dtype(df[col]):
df[col] = df[col].astype(str)
except Exception as col_error:
print(f"컬럼 {col} 정제 오류: {col_error}")
df[col] = "[Error] " + df[col].astype(str)
return df
def safe_json_dumps(self, value):
"""안전한 JSON 변환"""
try:
if value is None:
return ""
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False, default=str)[:2000]
return str(value)
except:
return "[Conversion Error]"
def stop(self):
"""작업 중단"""
self._is_running = False
class DataFrameModel(QAbstractTableModel):
"""고성능 데이터 모델"""
def __init__(self, data):
super().__init__()
self._data = data
def rowCount(self, parent=None):
return len(self._data)
def columnCount(self, parent=None):
return len(self._data.columns)
def data(self, index, role=Qt.DisplayRole):
if not index.isValid():
return None
value = self._data.iloc[index.row(), index.column()]
if role == Qt.DisplayRole:
return str(value) if not pd.isna(value) else ""
elif role == Qt.BackgroundRole:
if isinstance(value, (dict, list)):
return QColor(240, 248, 255)
return QColor(255, 255, 255)
elif role == Qt.TextAlignmentRole:
return Qt.AlignLeft | Qt.AlignVCenter
return None
def headerData(self, section, orientation, role):
if role != Qt.DisplayRole:
return None
if orientation == Qt.Horizontal:
return str(self._data.columns[section])
return str(self._data.index[section])
class ParquetViewer(QMainWindow):
"""메인 뷰어 클래스"""
def __init__(self):
super().__init__()
self.setWindowTitle("Universal Parquet Viewer")
self.setGeometry(100, 100, 1400, 900)
self.setup_ui()
self.thread_pool = QThreadPool.globalInstance()
self.thread_pool.setMaxThreadCount(2)
self.export_worker = None
def setup_ui(self):
"""UI 초기화"""
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
layout = QVBoxLayout(self.central_widget)
# 컨트롤 패널
panel = QWidget()
panel_layout = QHBoxLayout(panel)
self.btn_open = QPushButton("Open Parquet")
self.btn_open.clicked.connect(self.open_file)
panel_layout.addWidget(self.btn_open)
self.preview_check = QCheckBox("Preview Mode (First 1,000 rows)")
self.preview_check.setChecked(True)
panel_layout.addWidget(self.preview_check)
panel_layout.addWidget(QLabel("Search:"))
self.search_input = QLineEdit()
self.search_input.setPlaceholderText("Search...")
self.search_input.textChanged.connect(self.apply_filter)
self.search_input.setEnabled(False)
panel_layout.addWidget(self.search_input)
self.column_combo = QComboBox()
self.column_combo.addItem("All Columns")
self.column_combo.setEnabled(False)
panel_layout.addWidget(self.column_combo)
self.btn_export = QPushButton("Export")
self.btn_export.clicked.connect(self.export_data)
self.btn_export.setEnabled(False)
panel_layout.addWidget(self.btn_export)
layout.addWidget(panel)
# 테이블 뷰
self.table_view = QTableView()
self.table_view.setSortingEnabled(True)
self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
self.table_view.setStyleSheet("""
QTableView {
font-size: 10pt;
selection-background-color: #3498db;
selection-color: white;
}
QHeaderView::section {
background-color: #34495e;
color: white;
padding: 5px;
font-weight: bold;
}
""")
layout.addWidget(self.table_view)
# 상태 바
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
# 프록시 모델
self.proxy_model = QSortFilterProxyModel()
self.proxy_model.setFilterCaseSensitivity(Qt.CaseInsensitive)
def open_file(self):
"""파일 열기 다이얼로그"""
options = QFileDialog.Options()
file_path, _ = QFileDialog.getOpenFileName(
self, "Open Parquet File", "",
"Parquet Files (*.parquet);;All Files (*)",
options=options)
if file_path:
self.load_parquet(file_path)
def load_parquet(self, file_path):
"""Parquet 파일 로드"""
self.progress = QProgressDialog("Loading...", "Cancel", 0, 100, self)
self.progress.setWindowModality(Qt.WindowModal)
self.progress.canceled.connect(self.cancel_loading)
max_rows = 1000 if self.preview_check.isChecked() else None
self.loader = ParquetLoader(file_path, max_rows=max_rows)
self.loader.signals.progress.connect(self.update_progress)
self.loader.signals.finished.connect(self.on_load_complete)
self.loader.signals.error.connect(self.on_load_error)
self.loader.start()
self.progress.show()
def update_progress(self, value):
"""진행률 업데이트"""
self.progress.setValue(value)
def cancel_loading(self):
"""로딩 취소"""
if hasattr(self, 'loader'):
self.loader.stop()
self.progress.close()
self.status_bar.showMessage("Loading canceled", 3000)
def on_load_complete(self, df):
"""로딩 완료 처리"""
self.progress.close()
model = DataFrameModel(df)
self.proxy_model.setSourceModel(model)
self.table_view.setModel(self.proxy_model)
self.table_view.resizeColumnsToContents()
self.column_combo.clear()
self.column_combo.addItem("All Columns")
self.column_combo.addItems(df.columns.tolist())
self.column_combo.setEnabled(True)
self.search_input.setEnabled(True)
self.btn_export.setEnabled(True)
file_size = os.path.getsize(self.loader.file_path) / (1024 * 1024)
self.status_bar.showMessage(
f"Loaded: {len(df):,} rows | {len(df.columns)} cols | {file_size:.2f} MB",
5000
)
def on_load_error(self, error_msg):
"""로딩 오류 처리"""
self.progress.close()
QMessageBox.critical(self, "Load Error", error_msg)
self.status_bar.showMessage("Load failed", 5000)
def apply_filter(self, text):
"""테이블 필터링 적용"""
if not hasattr(self, 'proxy_model') or not hasattr(self.proxy_model, 'sourceModel'):
return
if self.column_combo.currentText() == "All Columns":
self.proxy_model.setFilterKeyColumn(-1)
else:
try:
col_idx = self.proxy_model.sourceModel()._data.columns.get_loc(
self.column_combo.currentText())
self.proxy_model.setFilterKeyColumn(col_idx)
except (AttributeError, KeyError):
return
self.proxy_model.setFilterFixedString(text)
def export_data(self):
"""데이터 내보내기"""
if not hasattr(self, 'proxy_model') or not hasattr(self.proxy_model, 'sourceModel'):
QMessageBox.warning(
self,
"No Data",
"Please load a Parquet file first before exporting.",
QMessageBox.Ok
)
self.status_bar.showMessage("Export failed: No data loaded", 3000)
return
df = self.proxy_model.sourceModel()._data
options = QFileDialog.Options()
file_path, selected_filter = QFileDialog.getSaveFileName(
self, "Export Data", "",
"CSV (*.csv);;Excel (*.xlsx);;Parquet (*.parquet);;JSON (*.json)",
options=options)
if not file_path:
return
if selected_filter == "CSV (*.csv)" and not file_path.endswith('.csv'):
file_path += '.csv'
elif selected_filter == "Excel (*.xlsx)" and not file_path.endswith('.xlsx'):
file_path += '.xlsx'
elif selected_filter == "Parquet (*.parquet)" and not file_path.endswith('.parquet'):
file_path += '.parquet'
elif selected_filter == "JSON (*.json)" and not file_path.endswith('.json'):
file_path += '.json'
self.export_progress = QProgressDialog("Exporting...", "Cancel", 0, 100, self)
self.export_progress.setWindowModality(Qt.WindowModal)
self.export_progress.canceled.connect(self.cancel_export)
self.export_worker = ExportWorker(df, file_path, selected_filter)
self.export_worker.signals.progress.connect(self.export_progress.setValue)
self.export_worker.signals.message.connect(self.status_bar.showMessage)
self.export_worker.signals.finished.connect(self.on_export_complete)
self.export_worker.signals.error.connect(self.on_export_error)
self.thread_pool.start(self.export_worker)
self.export_progress.show()
def cancel_export(self):
"""내보내기 취소"""
if self.export_worker:
self.export_worker.stop()
self.export_progress.close()
self.status_bar.showMessage("Export canceled", 3000)
def on_export_complete(self, file_path):
"""내보내기 완료 처리"""
self.export_progress.close()
QMessageBox.information(
self,
"Success",
f"Data exported to:\n{file_path}"
)
self.status_bar.showMessage(f"Exported to {file_path}", 5000)
def on_export_error(self, error_msg):
"""내보내기 오류 처리"""
self.export_progress.close()
QMessageBox.critical(
self,
"Export Error",
f"Export failed:\n{error_msg}"
)
self.status_bar.showMessage("Export failed", 5000)
if __name__ == "__main__":
app = QApplication(sys.argv)
app.setStyle('Fusion')
if hasattr(Qt, 'AA_EnableHighDpiScaling'):
app.setAttribute(Qt.AA_EnableHighDpiScaling, True)
if hasattr(Qt, 'AA_UseHighDpiPixmaps'):
app.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
viewer = ParquetViewer()
viewer.show()
sys.exit(app.exec_())