Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- 리눅스
- node.js
- 맛집
- GIT
- 라즈베리파이
- MS-SQL
- 유니티
- MySQL
- ubuntu
- PyQt
- IOS
- javascript
- Excel
- 날짜
- PyQt5
- ASP
- pandas
- PER
- mssql
- tensorflow
- python
- Unity
- 다이어트
- port
- flutter
- Linux
- sqlite
- 함수
- swift
- urllib
Archives
아미(아름다운미소)
Parquet 기능 본문
import sys
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json
import traceback
from concurrent.futures import ThreadPoolExecutor
from PyQt5.QtWidgets import (
QApplication, QMainWindow, QTableView, QFileDialog,
QVBoxLayout, QWidget, QPushButton, QLabel,
QStatusBar, QMessageBox, QLineEdit, QHBoxLayout,
QComboBox, QHeaderView, QProgressDialog, QCheckBox
)
from PyQt5.QtCore import (
Qt, QAbstractTableModel, QSortFilterProxyModel,
QThread, pyqtSignal, QObject, QRunnable, QThreadPool
)
from PyQt5.QtGui import QColor, QFont
class WorkerSignals(QObject):
"""작업자 스레드 시그널"""
progress = pyqtSignal(int)
finished = pyqtSignal(object)
error = pyqtSignal(str)
message = pyqtSignal(str)
class ExportWorker(QRunnable):
"""내보내기 작업을 처리하는 Runnable"""
def __init__(self, df, file_path, file_type):
super().__init__()
self.df = df
self.file_path = file_path
self.file_type = file_type
self.signals = WorkerSignals()
self._is_running = True
def run(self):
try:
if not self._is_running:
return
self.signals.message.emit(f"Exporting to {self.file_type}...")
if self.file_type == "CSV (*.csv)":
self.export_csv()
elif self.file_type == "Excel (*.xlsx)":
self.export_excel()
elif self.file_type == "Parquet (*.parquet)":
self.export_parquet()
elif self.file_type == "JSON (*.json)":
self.export_json()
if self._is_running:
self.signals.finished.emit(self.file_path)
self.signals.message.emit("Export completed successfully")
except Exception as e:
self.signals.error.emit(f"Export failed: {str(e)}")
self.signals.message.emit("Export failed")
def export_csv(self):
"""CSV 형식으로 내보내기"""
chunksize = 100000
total_rows = len(self.df)
for i in range(0, total_rows, chunksize):
if not self._is_running:
return
chunk = self.df.iloc[i:i+chunksize]
mode = 'w' if i == 0 else 'a'
header = (i == 0)
chunk.to_csv(
self.file_path,
mode=mode,
header=header,
index=False
)
progress = int((i + chunksize) / total_rows * 100)
self.signals.progress.emit(min(progress, 100))
def export_excel(self):
"""Excel 형식으로 내보내기"""
if not self._is_running:
return
self.signals.progress.emit(20)
try:
self.df.to_excel(self.file_path, index=False, engine='openpyxl')
except ImportError:
self.df.to_excel(self.file_path, index=False)
self.signals.progress.emit(100)
def export_parquet(self):
"""Parquet 형식으로 내보내기"""
if not self._is_running:
return
self.signals.progress.emit(30)
self.df.to_parquet(self.file_path, engine='pyarrow')
self.signals.progress.emit(100)
def export_json(self):
"""JSON 형식으로 내보내기"""
if not self._is_running:
return
self.signals.progress.emit(10)
chunksize = 50000
total_rows = len(self.df)
with open(self.file_path, 'w', encoding='utf-8') as f:
for i in range(0, total_rows, chunksize):
if not self._is_running:
return
chunk = self.df.iloc[i:i+chunksize]
json_str = chunk.to_json(orient='records', lines=True, force_ascii=False)
f.write(json_str)
progress = int((i + chunksize) / total_rows * 100)
self.signals.progress.emit(min(progress, 100))
def stop(self):
"""작업 중단"""
self._is_running = False
class ArrowTableConverter:
"""모든 PyArrow 버전에서 안전하게 테이블 변환"""
@staticmethod
def get_columns(table):
"""모든 버전에서 컬럼 이름 추출"""
if hasattr(table, 'column_names'):
return table.column_names
return [table.schema[i].name for i in range(table.num_columns)]
@staticmethod
def get_column_data(table, col):
"""모든 버전에서 컬럼 데이터 추출"""
if hasattr(table, 'column'):
return table.column(col)
return table[col]
@staticmethod
def to_dataframe(table):
"""테이블을 DataFrame으로 안전하게 변환"""
try:
return table.to_pandas()
except:
return ArrowTableConverter.manual_conversion(table)
@staticmethod
def manual_conversion(table):
"""수동 테이블 변환 (최후의 방법)"""
data = {}
columns = ArrowTableConverter.get_columns(table)
for col in columns:
try:
col_data = ArrowTableConverter.get_column_data(table, col)
if hasattr(col_data, 'to_pandas'):
data[col] = col_data.to_pandas()
else:
data[col] = [str(x) for x in col_data]
except:
data[col] = ["[Conversion Error]"] * len(table)
return pd.DataFrame(data)
class ParquetLoader(QThread):
"""강력한 Parquet 파일 로더"""
def __init__(self, file_path, max_rows=None):
super().__init__()
self.file_path = file_path
self.max_rows = max_rows
self.signals = WorkerSignals()
self._is_running = True
def run(self):
try:
self.signals.progress.emit(5)
parquet_file = pq.ParquetFile(self.file_path)
num_row_groups = parquet_file.num_row_groups
self.signals.progress.emit(10)
chunks = []
loaded_rows = 0
for i in range(num_row_groups):
if not self._is_running:
return
self.signals.progress.emit(10 + int((i+1)/num_row_groups*70))
table = parquet_file.read_row_group(i)
df_chunk = ArrowTableConverter.to_dataframe(table)
if self.max_rows:
remaining = self.max_rows - loaded_rows
if remaining <= 0:
break
df_chunk = df_chunk.head(remaining)
chunks.append(df_chunk)
loaded_rows += len(df_chunk)
if self.max_rows and loaded_rows >= self.max_rows:
break
self.signals.progress.emit(85)
combined_df = pd.concat(chunks, ignore_index=True)
final_df = self.clean_data(combined_df)
self.signals.progress.emit(100)
self.signals.finished.emit(final_df)
except Exception as e:
error_trace = traceback.format_exc()
self.signals.error.emit(f"로딩 실패:\n{str(e)}\n\n{error_trace}")
finally:
if 'parquet_file' in locals():
del parquet_file
def clean_data(self, df):
"""데이터 정제"""
for col in df.columns:
try:
df[col] = df[col].fillna("")
sample = df[col].iloc[0] if len(df) > 0 else None
if isinstance(sample, (dict, list)):
df[col] = df[col].apply(self.safe_json_dumps)
elif not pd.api.types.is_string_dtype(df[col]):
df[col] = df[col].astype(str)
except Exception as col_error:
print(f"컬럼 {col} 정제 오류: {col_error}")
df[col] = "[Error] " + df[col].astype(str)
return df
def safe_json_dumps(self, value):
"""안전한 JSON 변환"""
try:
if value is None:
return ""
if isinstance(value, (dict, list)):
return json.dumps(value, ensure_ascii=False, default=str)[:2000]
return str(value)
except:
return "[Conversion Error]"
def stop(self):
"""작업 중단"""
self._is_running = False
class DataFrameModel(QAbstractTableModel):
"""고성능 데이터 모델"""
def __init__(self, data):
super().__init__()
self._data = data
def rowCount(self, parent=None):
return len(self._data)
def columnCount(self, parent=None):
return len(self._data.columns)
def data(self, index, role=Qt.DisplayRole):
if not index.isValid():
return None
value = self._data.iloc[index.row(), index.column()]
if role == Qt.DisplayRole:
return str(value) if not pd.isna(value) else ""
elif role == Qt.BackgroundRole:
if isinstance(value, (dict, list)):
return QColor(240, 248, 255)
return QColor(255, 255, 255)
elif role == Qt.TextAlignmentRole:
return Qt.AlignLeft | Qt.AlignVCenter
return None
def headerData(self, section, orientation, role):
if role != Qt.DisplayRole:
return None
if orientation == Qt.Horizontal:
return str(self._data.columns[section])
return str(self._data.index[section])
class ParquetViewer(QMainWindow):
"""메인 뷰어 클래스"""
def __init__(self):
super().__init__()
self.setWindowTitle("Universal Parquet Viewer")
self.setGeometry(100, 100, 1400, 900)
self.setup_ui()
self.thread_pool = QThreadPool.globalInstance()
self.thread_pool.setMaxThreadCount(2)
self.export_worker = None
def setup_ui(self):
"""UI 초기화"""
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
layout = QVBoxLayout(self.central_widget)
# 컨트롤 패널
panel = QWidget()
panel_layout = QHBoxLayout(panel)
self.btn_open = QPushButton("Open Parquet")
self.btn_open.clicked.connect(self.open_file)
panel_layout.addWidget(self.btn_open)
self.preview_check = QCheckBox("Preview Mode (First 1,000 rows)")
self.preview_check.setChecked(True)
panel_layout.addWidget(self.preview_check)
panel_layout.addWidget(QLabel("Search:"))
self.search_input = QLineEdit()
self.search_input.setPlaceholderText("Search...")
self.search_input.textChanged.connect(self.apply_filter)
self.search_input.setEnabled(False)
panel_layout.addWidget(self.search_input)
self.column_combo = QComboBox()
self.column_combo.addItem("All Columns")
self.column_combo.setEnabled(False)
panel_layout.addWidget(self.column_combo)
self.btn_export = QPushButton("Export")
self.btn_export.clicked.connect(self.export_data)
self.btn_export.setEnabled(False)
panel_layout.addWidget(self.btn_export)
layout.addWidget(panel)
# 테이블 뷰
self.table_view = QTableView()
self.table_view.setSortingEnabled(True)
self.table_view.horizontalHeader().setSectionResizeMode(QHeaderView.Interactive)
self.table_view.setStyleSheet("""
QTableView {
font-size: 10pt;
selection-background-color: #3498db;
selection-color: white;
}
QHeaderView::section {
background-color: #34495e;
color: white;
padding: 5px;
font-weight: bold;
}
""")
layout.addWidget(self.table_view)
# 상태 바
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
# 프록시 모델
self.proxy_model = QSortFilterProxyModel()
self.proxy_model.setFilterCaseSensitivity(Qt.CaseInsensitive)
def open_file(self):
"""파일 열기 다이얼로그"""
options = QFileDialog.Options()
file_path, _ = QFileDialog.getOpenFileName(
self, "Open Parquet File", "",
"Parquet Files (*.parquet);;All Files (*)",
options=options)
if file_path:
self.load_parquet(file_path)
def load_parquet(self, file_path):
"""Parquet 파일 로드"""
self.progress = QProgressDialog("Loading...", "Cancel", 0, 100, self)
self.progress.setWindowModality(Qt.WindowModal)
self.progress.canceled.connect(self.cancel_loading)
max_rows = 1000 if self.preview_check.isChecked() else None
self.loader = ParquetLoader(file_path, max_rows=max_rows)
self.loader.signals.progress.connect(self.update_progress)
self.loader.signals.finished.connect(self.on_load_complete)
self.loader.signals.error.connect(self.on_load_error)
self.loader.start()
self.progress.show()
def update_progress(self, value):
"""진행률 업데이트"""
self.progress.setValue(value)
def cancel_loading(self):
"""로딩 취소"""
if hasattr(self, 'loader'):
self.loader.stop()
self.progress.close()
self.status_bar.showMessage("Loading canceled", 3000)
def on_load_complete(self, df):
"""로딩 완료 처리"""
self.progress.close()
model = DataFrameModel(df)
self.proxy_model.setSourceModel(model)
self.table_view.setModel(self.proxy_model)
self.table_view.resizeColumnsToContents()
self.column_combo.clear()
self.column_combo.addItem("All Columns")
self.column_combo.addItems(df.columns.tolist())
self.column_combo.setEnabled(True)
self.search_input.setEnabled(True)
self.btn_export.setEnabled(True)
file_size = os.path.getsize(self.loader.file_path) / (1024 * 1024)
self.status_bar.showMessage(
f"Loaded: {len(df):,} rows | {len(df.columns)} cols | {file_size:.2f} MB",
5000
)
def on_load_error(self, error_msg):
"""로딩 오류 처리"""
self.progress.close()
QMessageBox.critical(self, "Load Error", error_msg)
self.status_bar.showMessage("Load failed", 5000)
def apply_filter(self, text):
"""테이블 필터링 적용"""
if not hasattr(self, 'proxy_model') or not hasattr(self.proxy_model, 'sourceModel'):
return
if self.column_combo.currentText() == "All Columns":
self.proxy_model.setFilterKeyColumn(-1)
else:
try:
col_idx = self.proxy_model.sourceModel()._data.columns.get_loc(
self.column_combo.currentText())
self.proxy_model.setFilterKeyColumn(col_idx)
except (AttributeError, KeyError):
return
self.proxy_model.setFilterFixedString(text)
def export_data(self):
"""데이터 내보내기"""
if not hasattr(self, 'proxy_model') or not hasattr(self.proxy_model, 'sourceModel'):
QMessageBox.warning(
self,
"No Data",
"Please load a Parquet file first before exporting.",
QMessageBox.Ok
)
self.status_bar.showMessage("Export failed: No data loaded", 3000)
return
df = self.proxy_model.sourceModel()._data
options = QFileDialog.Options()
file_path, selected_filter = QFileDialog.getSaveFileName(
self, "Export Data", "",
"CSV (*.csv);;Excel (*.xlsx);;Parquet (*.parquet);;JSON (*.json)",
options=options)
if not file_path:
return
if selected_filter == "CSV (*.csv)" and not file_path.endswith('.csv'):
file_path += '.csv'
elif selected_filter == "Excel (*.xlsx)" and not file_path.endswith('.xlsx'):
file_path += '.xlsx'
elif selected_filter == "Parquet (*.parquet)" and not file_path.endswith('.parquet'):
file_path += '.parquet'
elif selected_filter == "JSON (*.json)" and not file_path.endswith('.json'):
file_path += '.json'
self.export_progress = QProgressDialog("Exporting...", "Cancel", 0, 100, self)
self.export_progress.setWindowModality(Qt.WindowModal)
self.export_progress.canceled.connect(self.cancel_export)
self.export_worker = ExportWorker(df, file_path, selected_filter)
self.export_worker.signals.progress.connect(self.export_progress.setValue)
self.export_worker.signals.message.connect(self.status_bar.showMessage)
self.export_worker.signals.finished.connect(self.on_export_complete)
self.export_worker.signals.error.connect(self.on_export_error)
self.thread_pool.start(self.export_worker)
self.export_progress.show()
def cancel_export(self):
"""내보내기 취소"""
if self.export_worker:
self.export_worker.stop()
self.export_progress.close()
self.status_bar.showMessage("Export canceled", 3000)
def on_export_complete(self, file_path):
"""내보내기 완료 처리"""
self.export_progress.close()
QMessageBox.information(
self,
"Success",
f"Data exported to:\n{file_path}"
)
self.status_bar.showMessage(f"Exported to {file_path}", 5000)
def on_export_error(self, error_msg):
"""내보내기 오류 처리"""
self.export_progress.close()
QMessageBox.critical(
self,
"Export Error",
f"Export failed:\n{error_msg}"
)
self.status_bar.showMessage("Export failed", 5000)
if __name__ == "__main__":
app = QApplication(sys.argv)
app.setStyle('Fusion')
if hasattr(Qt, 'AA_EnableHighDpiScaling'):
app.setAttribute(Qt.AA_EnableHighDpiScaling, True)
if hasattr(Qt, 'AA_UseHighDpiPixmaps'):
app.setAttribute(Qt.AA_UseHighDpiPixmaps, True)
viewer = ParquetViewer()
viewer.show()
sys.exit(app.exec_())
Comments