Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
Advanced CSV Manipulation Tool with Gradio Interface | |
Commercial-ready application for powerful CSV data processing | |
Features: | |
- File upload with 1GB limit | |
- Data preview with selectable rows | |
- Value replacement based on conditions | |
- CSV concatenation with column selection | |
- Advanced statistical analysis and visualization | |
- Data validation and quality checks | |
- Export to CSV, Excel, JSON | |
- Batch operations and operation recipes | |
- Undo/Redo functionality | |
- Memory-efficient large file processing | |
""" | |
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import json | |
import io | |
import zipfile | |
from datetime import datetime, timedelta | |
import re | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import plotly.express as px | |
import plotly.graph_objects as go | |
from plotly.subplots import make_subplots | |
import warnings | |
import os | |
from typing import Dict, List, Tuple, Optional, Any | |
import hashlib | |
import pickle | |
from pathlib import Path | |
warnings.filterwarnings('ignore') | |
plt.style.use('seaborn-v0_8') | |
sns.set_palette("husl") | |
class CSVProcessor: | |
"""Advanced CSV processing class with state management and history""" | |
def __init__(self): | |
self.original_df = None | |
self.current_df = None | |
self.history = [] | |
self.recipes = {} | |
self.batch_files = [] | |
def load_data(self, file, preview_rows=100, encoding='utf-8'): | |
"""Load data file with error handling and memory optimization""" | |
try: | |
if file is None: | |
return None, "No file provided" | |
file_path = file.name if hasattr(file, 'name') else str(file) | |
file_extension = Path(file_path).suffix.lower() | |
# Chunked reading for large files | |
if file_extension == '.csv': | |
# Try different encodings | |
encodings = [encoding, 'utf-8', 'latin-1', 'cp1252'] | |
df = None | |
for enc in encodings: | |
try: | |
df = pd.read_csv(file_path, encoding=enc, low_memory=False) | |
break | |
except UnicodeDecodeError: | |
continue | |
if df is None: | |
return None, "Failed to decode file with supported encodings" | |
elif file_extension in ['.xlsx', '.xls']: | |
df = pd.read_excel(file_path) | |
elif file_extension == '.json': | |
df = pd.read_json(file_path) | |
elif file_extension == '.parquet': | |
df = pd.read_parquet(file_path) | |
else: | |
return None, f"Unsupported file format: {file_extension}" | |
self.original_df = df.copy() | |
self.current_df = df.copy() | |
self.history = [] | |
# Create preview | |
if preview_rows > 0: | |
preview = df.head(preview_rows) | |
else: | |
preview = df | |
# Memory and performance info | |
memory_mb = df.memory_usage(deep=True).sum() / 1024**2 | |
info = { | |
'rows': len(df), | |
'columns': len(df.columns), | |
'memory_usage': f"{memory_mb:.2f} MB", | |
'dtypes': dict(df.dtypes.astype(str)), | |
'null_counts': dict(df.isnull().sum()), | |
'duplicates': df.duplicated().sum() | |
} | |
success_msg = f"β File loaded successfully!\n" | |
success_msg += f"π {info['rows']:,} rows Γ {info['columns']} columns\n" | |
success_msg += f"πΎ Memory usage: {info['memory_usage']}\n" | |
success_msg += f"π Duplicates: {info['duplicates']:,}\n" | |
success_msg += f"β Missing values: {sum(info['null_counts'].values()):,}" | |
return preview, success_msg, info | |
except Exception as e: | |
return None, f"β Error loading file: {str(e)}", {} | |
def save_state(self, operation_name: str): | |
"""Save current state to history with memory management""" | |
if len(self.history) > 50: # Limit history to prevent memory issues | |
self.history = self.history[-25:] # Keep last 25 operations | |
self.history.append({ | |
'operation': operation_name, | |
'timestamp': datetime.now(), | |
'df': self.current_df.copy() if self.current_df is not None else None | |
}) | |
def undo_operation(self): | |
"""Undo last operation""" | |
if len(self.history) > 1: | |
self.history.pop() | |
self.current_df = self.history[-1]['df'].copy() | |
return self.current_df, f"β Undone: {self.history[-1]['operation']}" | |
elif len(self.history) == 1: | |
self.current_df = self.original_df.copy() | |
self.history = [] | |
return self.current_df, "β Reset to original data" | |
else: | |
return self.current_df, "β No operations to undo" | |
def reset_to_original(self): | |
"""Reset to original data""" | |
if self.original_df is not None: | |
self.current_df = self.original_df.copy() | |
self.history = [] | |
return self.current_df, "β Reset to original data" | |
return None, "β No original data available" | |
# Global processor instance | |
processor = CSVProcessor() | |
def create_download_file(df: pd.DataFrame, format_type: str, filename: str = "processed_data"): | |
"""Create downloadable file in specified format""" | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename_with_timestamp = f"{filename}_{timestamp}" | |
try: | |
if format_type == "csv": | |
csv_data = df.to_csv(index=False) | |
return csv_data, f"{filename_with_timestamp}.csv" | |
elif format_type == "excel": | |
buffer = io.BytesIO() | |
with pd.ExcelWriter(buffer, engine='openpyxl') as writer: | |
df.to_excel(writer, index=False, sheet_name='Data') | |
buffer.seek(0) | |
return buffer.getvalue(), f"{filename_with_timestamp}.xlsx" | |
elif format_type == "json": | |
json_data = df.to_json(orient='records', indent=2, date_format='iso') | |
return json_data, f"{filename_with_timestamp}.json" | |
except Exception as e: | |
return None, f"Error creating {format_type} file: {str(e)}" | |
def get_data_info(df: pd.DataFrame) -> str: | |
"""Get comprehensive data information""" | |
if df is None or df.empty: | |
return "No data loaded" | |
info_dict = { | |
'π Shape': f"{df.shape[0]:,} rows Γ {df.shape[1]} columns", | |
'πΎ Memory': f"{df.memory_usage(deep=True).sum() / 1024**2:.2f} MB", | |
'π Duplicates': f"{df.duplicated().sum():,}", | |
'β Missing Values': f"{df.isnull().sum().sum():,}", | |
'π Numeric Columns': f"{len(df.select_dtypes(include=[np.number]).columns)}", | |
'π Text Columns': f"{len(df.select_dtypes(include=['object']).columns)}", | |
'π Date Columns': f"{len(df.select_dtypes(include=['datetime64']).columns)}" | |
} | |
return "\n".join([f"{k}: {v}" for k, v in info_dict.items()]) | |
def get_column_options(df: pd.DataFrame) -> List[str]: | |
"""Get list of column names for dropdowns""" | |
return list(df.columns) if df is not None else [] | |
# =========================================== | |
# CORE DATA MANIPULATION FUNCTIONS | |
# =========================================== | |
def rename_values_conditional(df: pd.DataFrame, target_col: str, condition_col: str, | |
condition_value: str, new_value: str, match_type: str = "exact") -> Tuple[pd.DataFrame, str]: | |
"""Rename values in target column based on condition in another column""" | |
try: | |
if df is None or df.empty: | |
return df, "β No data available" | |
if target_col not in df.columns or condition_col not in df.columns: | |
return df, "β One or more columns not found" | |
df_result = df.copy() | |
if match_type == "exact": | |
mask = df_result[condition_col] == condition_value | |
elif match_type == "contains": | |
mask = df_result[condition_col].astype(str).str.contains(condition_value, na=False) | |
elif match_type == "regex": | |
mask = df_result[condition_col].astype(str).str.match(condition_value, na=False) | |
elif match_type == "starts_with": | |
mask = df_result[condition_col].astype(str).str.startswith(condition_value, na=False) | |
elif match_type == "ends_with": | |
mask = df_result[condition_col].astype(str).str.endswith(condition_value, na=False) | |
affected_rows = mask.sum() | |
df_result.loc[mask, target_col] = new_value | |
processor.current_df = df_result | |
processor.save_state(f"Renamed values in '{target_col}' based on '{condition_col}'") | |
return df_result, f"β Updated {affected_rows:,} rows in column '{target_col}'" | |
except Exception as e: | |
return df, f"β Error: {str(e)}" | |
def concatenate_csvs(files: List, selected_columns: str, join_type: str = "outer") -> Tuple[pd.DataFrame, str]: | |
"""Concatenate multiple CSV files with column selection""" | |
try: | |
if not files: | |
return None, "β No files provided" | |
dfs = [] | |
columns_to_use = [col.strip() for col in selected_columns.split(",") if col.strip()] if selected_columns else None | |
for file in files: | |
if hasattr(file, 'name'): | |
file_path = file.name | |
if file_path.endswith('.csv'): | |
df = pd.read_csv(file_path, encoding='utf-8', low_memory=False) | |
elif file_path.endswith(('.xlsx', '.xls')): | |
df = pd.read_excel(file_path) | |
else: | |
continue | |
# Select specific columns if specified | |
if columns_to_use: | |
available_cols = [col for col in columns_to_use if col in df.columns] | |
if available_cols: | |
df = df[available_cols] | |
else: | |
continue | |
# Add source file identifier | |
df['_source_file'] = Path(file_path).stem | |
dfs.append(df) | |
if not dfs: | |
return None, "β No valid files found or columns don't exist" | |
# Concatenate with specified join type | |
if join_type == "inner": | |
result_df = pd.concat(dfs, ignore_index=True, join='inner') | |
else: | |
result_df = pd.concat(dfs, ignore_index=True, join='outer') | |
processor.current_df = result_df | |
processor.save_state(f"Concatenated {len(dfs)} files") | |
return result_df, f"β Successfully concatenated {len(dfs)} files with {len(result_df):,} total rows" | |
except Exception as e: | |
return None, f"β Error concatenating files: {str(e)}" | |
def get_value_counts(df: pd.DataFrame, column: str, top_n: int = 20, normalize: bool = False) -> Tuple[pd.DataFrame, str]: | |
"""Get value counts for specified column""" | |
try: | |
if df is None or df.empty: | |
return None, "β No data available" | |
if column not in df.columns: | |
return None, f"β Column '{column}' not found" | |
value_counts = df[column].value_counts(normalize=normalize, dropna=False).head(top_n) | |
# Convert to DataFrame for better display | |
result_df = pd.DataFrame({ | |
'Value': value_counts.index, | |
'Count' if not normalize else 'Percentage': value_counts.values | |
}) | |
if normalize: | |
result_df['Percentage'] = result_df['Percentage'].map(lambda x: f"{x:.2%}") | |
return result_df, f"β Value counts for '{column}' (Top {min(top_n, len(result_df))})" | |
except Exception as e: | |
return None, f"β Error: {str(e)}" | |
def filter_data(df: pd.DataFrame, column: str, condition: str, value: str) -> Tuple[pd.DataFrame, str]: | |
"""Filter data based on conditions""" | |
try: | |
if df is None or df.empty: | |
return df, "β No data available" | |
if column not in df.columns: | |
return df, f"β Column '{column}' not found" | |
df_result = df.copy() | |
if condition == "equals": | |
mask = df_result[column] == value | |
elif condition == "not_equals": | |
mask = df_result[column] != value | |
elif condition == "contains": | |
mask = df_result[column].astype(str).str.contains(value, na=False) | |
elif condition == "not_contains": | |
mask = ~df_result[column].astype(str).str.contains(value, na=False) | |
elif condition == "starts_with": | |
mask = df_result[column].astype(str).str.startswith(value, na=False) | |
elif condition == "ends_with": | |
mask = df_result[column].astype(str).str.endswith(value, na=False) | |
elif condition == "greater_than": | |
mask = pd.to_numeric(df_result[column], errors='coerce') > float(value) | |
elif condition == "less_than": | |
mask = pd.to_numeric(df_result[column], errors='coerce') < float(value) | |
elif condition == "is_null": | |
mask = df_result[column].isnull() | |
elif condition == "is_not_null": | |
mask = df_result[column].notnull() | |
else: | |
return df, f"β Unknown condition: {condition}" | |
filtered_df = df_result[mask] | |
processor.current_df = filtered_df | |
processor.save_state(f"Filtered data: {column} {condition} {value}") | |
return filtered_df, f"β Filtered to {len(filtered_df):,} rows (removed {len(df) - len(filtered_df):,} rows)" | |
except Exception as e: | |
return df, f"β Error: {str(e)}" | |
def handle_missing_values(df: pd.DataFrame, column: str, method: str, fill_value: str = "") -> Tuple[pd.DataFrame, str]: | |
"""Handle missing values in specified column""" | |
try: | |
if df is None or df.empty: | |
return df, "β No data available" | |
if column != "ALL" and column not in df.columns: | |
return df, f"β Column '{column}' not found" | |
df_result = df.copy() | |
columns_to_process = [column] if column != "ALL" else df_result.columns.tolist() | |
total_missing_before = df_result.isnull().sum().sum() | |
for col in columns_to_process: | |
if method == "drop_rows": | |
df_result = df_result.dropna(subset=[col]) | |
elif method == "fill_value": | |
df_result[col] = df_result[col].fillna(fill_value) | |
elif method == "fill_mean": | |
if df_result[col].dtype in ['int64', 'float64']: | |
df_result[col] = df_result[col].fillna(df_result[col].mean()) | |
elif method == "fill_median": | |
if df_result[col].dtype in ['int64', 'float64']: | |
df_result[col] = df_result[col].fillna(df_result[col].median()) | |
elif method == "fill_mode": | |
mode_val = df_result[col].mode() | |
if len(mode_val) > 0: | |
df_result[col] = df_result[col].fillna(mode_val[0]) | |
elif method == "forward_fill": | |
df_result[col] = df_result[col].fillna(method='ffill') | |
elif method == "backward_fill": | |
df_result[col] = df_result[col].fillna(method='bfill') | |
total_missing_after = df_result.isnull().sum().sum() | |
processor.current_df = df_result | |
processor.save_state(f"Handle missing values: {method}") | |
return df_result, f"β Processed missing values. Before: {total_missing_before:,}, After: {total_missing_after:,}" | |
except Exception as e: | |
return df, f"β Error: {str(e)}" | |
def detect_and_remove_duplicates(df: pd.DataFrame, columns: str = "", keep: str = "first") -> Tuple[pd.DataFrame, str]: | |
"""Detect and remove duplicate rows""" | |
try: | |
if df is None or df.empty: | |
return df, "β No data available" | |
df_result = df.copy() | |
# Parse columns | |
if columns.strip(): | |
cols_list = [col.strip() for col in columns.split(",") if col.strip() in df.columns] | |
subset = cols_list if cols_list else None | |
else: | |
subset = None | |
duplicates_before = df_result.duplicated(subset=subset).sum() | |
if duplicates_before == 0: | |
return df_result, "β No duplicate rows found" | |
df_result = df_result.drop_duplicates(subset=subset, keep=keep) | |
processor.current_df = df_result | |
processor.save_state(f"Removed {duplicates_before:,} duplicate rows") | |
return df_result, f"β Removed {duplicates_before:,} duplicate rows. Remaining: {len(df_result):,} rows" | |
except Exception as e: | |
return df, f"β Error: {str(e)}" | |
def perform_column_operations(df: pd.DataFrame, operation: str, col1: str, col2: str = "", | |
new_col_name: str = "", constant: str = "") -> Tuple[pd.DataFrame, str]: | |
"""Perform mathematical and string operations on columns""" | |
try: | |
if df is None or df.empty: | |
return df, "β No data available" | |
if col1 not in df.columns: | |
return df, f"β Column '{col1}' not found" | |
df_result = df.copy() | |
if not new_col_name: | |
new_col_name = f"{col1}_{operation}" | |
if operation == "add": | |
if col2 and col2 in df.columns: | |
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') + pd.to_numeric(df_result[col2], errors='coerce') | |
elif constant: | |
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') + float(constant) | |
elif operation == "subtract": | |
if col2 and col2 in df.columns: | |
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') - pd.to_numeric(df_result[col2], errors='coerce') | |
elif constant: | |
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') - float(constant) | |
elif operation == "multiply": | |
if col2 and col2 in df.columns: | |
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') * pd.to_numeric(df_result[col2], errors='coerce') | |
elif constant: | |
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') * float(constant) | |
elif operation == "divide": | |
if col2 and col2 in df.columns: | |
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') / pd.to_numeric(df_result[col2], errors='coerce') | |
elif constant: | |
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') / float(constant) | |
elif operation == "concatenate": | |
if col2 and col2 in df.columns: | |
df_result[new_col_name] = df_result[col1].astype(str) + " " + df_result[col2].astype(str) | |
elif constant: | |
df_result[new_col_name] = df_result[col1].astype(str) + constant | |
elif operation == "extract_numbers": | |
df_result[new_col_name] = df_result[col1].astype(str).str.extract(r'(\d+)')[0] | |
elif operation == "upper": | |
df_result[new_col_name] = df_result[col1].astype(str).str.upper() | |
elif operation == "lower": | |
df_result[new_col_name] = df_result[col1].astype(str).str.lower() | |
elif operation == "title": | |
df_result[new_col_name] = df_result[col1].astype(str).str.title() | |
elif operation == "length": | |
df_result[new_col_name] = df_result[col1].astype(str).str.len() | |
else: | |
return df, f"β Unknown operation: {operation}" | |
processor.current_df = df_result | |
processor.save_state(f"Column operation: {operation} on {col1}") | |
return df_result, f"β Created new column '{new_col_name}' using {operation} operation" | |
except Exception as e: | |
return df, f"β Error: {str(e)}" | |
def convert_data_types(df: pd.DataFrame, column: str, target_type: str) -> Tuple[pd.DataFrame, str]: | |
"""Convert column data types""" | |
try: | |
if df is None or df.empty: | |
return df, "β No data available" | |
if column not in df.columns: | |
return df, f"β Column '{column}' not found" | |
df_result = df.copy() | |
if target_type == "string": | |
df_result[column] = df_result[column].astype(str) | |
elif target_type == "integer": | |
df_result[column] = pd.to_numeric(df_result[column], errors='coerce').astype('Int64') | |
elif target_type == "float": | |
df_result[column] = pd.to_numeric(df_result[column], errors='coerce') | |
elif target_type == "datetime": | |
df_result[column] = pd.to_datetime(df_result[column], errors='coerce') | |
elif target_type == "boolean": | |
df_result[column] = df_result[column].astype(bool) | |
elif target_type == "category": | |
df_result[column] = df_result[column].astype('category') | |
else: | |
return df, f"β Unknown data type: {target_type}" | |
processor.current_df = df_result | |
processor.save_state(f"Converted '{column}' to {target_type}") | |
return df_result, f"β Converted column '{column}' to {target_type}" | |
except Exception as e: | |
return df, f"β Error: {str(e)}" | |
# =========================================== | |
# ANALYSIS AND VISUALIZATION FUNCTIONS | |
# =========================================== | |
def generate_statistical_summary(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]: | |
"""Generate comprehensive statistical summary""" | |
try: | |
if df is None or df.empty: | |
return None, "β No data available" | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
if len(numeric_cols) == 0: | |
return None, "β No numeric columns found" | |
stats_df = df[numeric_cols].describe() | |
# Add additional statistics | |
stats_df.loc['variance'] = df[numeric_cols].var() | |
stats_df.loc['skewness'] = df[numeric_cols].skew() | |
stats_df.loc['kurtosis'] = df[numeric_cols].kurtosis() | |
stats_df.loc['missing'] = df[numeric_cols].isnull().sum() | |
return stats_df.round(4), "β Statistical summary generated" | |
except Exception as e: | |
return None, f"β Error: {str(e)}" | |
def create_correlation_matrix(df: pd.DataFrame) -> Tuple[str, str]: | |
"""Create correlation matrix visualization""" | |
try: | |
if df is None or df.empty: | |
return None, "β No data available" | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
if len(numeric_cols) < 2: | |
return None, "β Need at least 2 numeric columns for correlation" | |
# Calculate correlation matrix | |
corr_matrix = df[numeric_cols].corr() | |
# Create heatmap | |
plt.figure(figsize=(12, 8)) | |
mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) | |
sns.heatmap(corr_matrix, mask=mask, annot=True, cmap='coolwarm', center=0, | |
square=True, linewidths=0.5, cbar_kws={"shrink": 0.8}) | |
plt.title('Correlation Matrix Heatmap', fontsize=16, fontweight='bold') | |
plt.tight_layout() | |
# Save plot | |
plt.savefig('correlation_matrix.png', dpi=300, bbox_inches='tight') | |
plt.close() | |
return 'correlation_matrix.png', "β Correlation matrix created" | |
except Exception as e: | |
return None, f"β Error: {str(e)}" | |
def create_distribution_plots(df: pd.DataFrame, column: str, plot_type: str = "histogram") -> Tuple[str, str]: | |
"""Create distribution plots""" | |
try: | |
if df is None or df.empty: | |
return None, "β No data available" | |
if column not in df.columns: | |
return None, f"β Column '{column}' not found" | |
plt.figure(figsize=(12, 6)) | |
if plot_type == "histogram": | |
plt.subplot(1, 2, 1) | |
df[column].hist(bins=30, edgecolor='black', alpha=0.7) | |
plt.title(f'Histogram of {column}') | |
plt.xlabel(column) | |
plt.ylabel('Frequency') | |
plt.subplot(1, 2, 2) | |
df.boxplot(column=column) | |
plt.title(f'Box Plot of {column}') | |
elif plot_type == "density": | |
plt.subplot(1, 2, 1) | |
df[column].plot(kind='density') | |
plt.title(f'Density Plot of {column}') | |
plt.xlabel(column) | |
plt.subplot(1, 2, 2) | |
df[column].plot(kind='box') | |
plt.title(f'Box Plot of {column}') | |
plt.tight_layout() | |
plt.savefig(f'distribution_{column}_{plot_type}.png', dpi=300, bbox_inches='tight') | |
plt.close() | |
return f'distribution_{column}_{plot_type}.png', f"β Distribution plot created for {column}" | |
except Exception as e: | |
return None, f"β Error: {str(e)}" | |
# =========================================== | |
# GRADIO INTERFACE SETUP | |
# =========================================== | |
def create_interface(): | |
"""Create the main Gradio interface""" | |
with gr.Blocks(title="Advanced CSV Manipulation Tool", theme=gr.themes.Soft()) as demo: | |
gr.HTML(""" | |
<div style="text-align: center; padding: 20px;"> | |
<h1 style="color: #2e7d32; margin-bottom: 10px;">π₯ Advanced CSV Manipulation Tool</h1> | |
<p style="font-size: 18px; color: #666;">Commercial-ready data processing with advanced analytics</p> | |
<hr style="margin: 20px 0;"> | |
</div> | |
""") | |
# Global state variables | |
current_data = gr.State(None) | |
data_info = gr.State({}) | |
with gr.Tabs(): | |
# ===== FILE UPLOAD TAB ===== | |
with gr.TabItem("π File Upload & Preview"): | |
with gr.Row(): | |
with gr.Column(scale=1): | |
file_upload = gr.File( | |
label="Upload CSV/Excel/JSON file (Max 1GB)", | |
file_types=[".csv", ".xlsx", ".xls", ".json"], | |
file_count="single" | |
) | |
preview_rows = gr.Slider( | |
minimum=0, | |
maximum=1000, | |
value=100, | |
step=50, | |
label="Preview Rows (0 = All)", | |
info="Number of rows to display in preview" | |
) | |
upload_btn = gr.Button("π Load & Analyze Data", variant="primary", size="lg") | |
with gr.Column(scale=2): | |
upload_status = gr.Textbox(label="Status", lines=5, interactive=False) | |
data_info_display = gr.Textbox(label="Data Information", lines=8, interactive=False) | |
data_preview = gr.DataFrame(label="Data Preview", interactive=False, height=400) | |
def load_file_handler(file, rows): | |
if file is None: | |
return None, "Please upload a file first", "", None, {} | |
preview, status, info = processor.load_data(file, rows) | |
info_text = get_data_info(processor.current_df) if processor.current_df is not None else "" | |
return preview, status, info_text, processor.current_df, info | |
upload_btn.click( | |
load_file_handler, | |
inputs=[file_upload, preview_rows], | |
outputs=[data_preview, upload_status, data_info_display, current_data, data_info] | |
) | |
# ===== VALUE REPLACEMENT TAB ===== | |
with gr.TabItem("π Value Replacement"): | |
gr.HTML("<h3>Replace values in one column based on conditions in another column</h3>") | |
with gr.Row(): | |
with gr.Column(): | |
target_col = gr.Dropdown(label="Target Column (to modify)", choices=[], interactive=True) | |
condition_col = gr.Dropdown(label="Condition Column (to check)", choices=[], interactive=True) | |
condition_value = gr.Textbox(label="Condition Value", placeholder="Value to match in condition column") | |
new_value = gr.Textbox(label="New Value", placeholder="Replacement value for target column") | |
match_type = gr.Radio( | |
choices=["exact", "contains", "starts_with", "ends_with", "regex"], | |
value="exact", | |
label="Match Type" | |
) | |
replace_btn = gr.Button("π Replace Values", variant="primary") | |
with gr.Column(): | |
replace_status = gr.Textbox(label="Status", lines=3, interactive=False) | |
# Update column choices when data changes | |
def update_columns(df): | |
if df is not None: | |
cols = list(df.columns) | |
return gr.Dropdown(choices=cols), gr.Dropdown(choices=cols) | |
return gr.Dropdown(choices=[]), gr.Dropdown(choices=[]) | |
current_data.change( | |
update_columns, | |
inputs=[current_data], | |
outputs=[target_col, condition_col] | |
) | |
def replace_values_handler(df, tcol, ccol, cval, nval, mtype): | |
if df is None: | |
return None, "β No data loaded", "" | |
result_df, status = rename_values_conditional(df, tcol, ccol, cval, nval, mtype) | |
info_text = get_data_info(result_df) if result_df is not None else "" | |
return result_df, status, info_text | |
replace_btn.click( | |
replace_values_handler, | |
inputs=[current_data, target_col, condition_col, condition_value, new_value, match_type], | |
outputs=[current_data, replace_status, data_info_display] | |
) | |
# ===== CSV CONCATENATION TAB ===== | |
with gr.TabItem("π CSV Concatenation"): | |
gr.HTML("<h3>Combine multiple CSV files with column selection</h3>") | |
with gr.Row(): | |
with gr.Column(): | |
multi_files = gr.File( | |
label="Upload Multiple Files", | |
file_types=[".csv", ".xlsx", ".xls"], | |
file_count="multiple" | |
) | |
selected_columns = gr.Textbox( | |
label="Columns to Include", | |
placeholder="column1, column2, column3 (leave empty for all)", | |
info="Comma-separated list of column names" | |
) | |
join_type = gr.Radio( | |
choices=["outer", "inner"], | |
value="outer", | |
label="Join Type", | |
info="Outer: keep all columns, Inner: only common columns" | |
) | |
concat_btn = gr.Button("π Concatenate Files", variant="primary") | |
with gr.Column(): | |
concat_status = gr.Textbox(label="Status", lines=5, interactive=False) | |
def concat_handler(files, cols, jtype): | |
if not files: | |
return None, "β Please upload files first", "" | |
result_df, status = concatenate_csvs(files, cols, jtype) | |
info_text = get_data_info(result_df) if result_df is not None else "" | |
return result_df, status, info_text | |
concat_btn.click( | |
concat_handler, | |
inputs=[multi_files, selected_columns, join_type], | |
outputs=[current_data, concat_status, data_info_display] | |
) | |
# ===== VALUE COUNTS TAB ===== | |
with gr.TabItem("π Value Analysis"): | |
gr.HTML("<h3>Analyze value frequencies and distributions</h3>") | |
with gr.Row(): | |
with gr.Column(): | |
analysis_col = gr.Dropdown(label="Column to Analyze", choices=[], interactive=True) | |
top_n = gr.Slider(minimum=5, maximum=100, value=20, step=5, label="Top N Values") | |
normalize_counts = gr.Checkbox(label="Show Percentages", value=False) | |
analyze_btn = gr.Button("π Analyze Values", variant="primary") | |
with gr.Column(): | |
analysis_status = gr.Textbox(label="Status", lines=3, interactive=False) | |
analysis_results = gr.DataFrame(label="Value Counts", height=400) | |
# Update analysis column choices | |
current_data.change( | |
lambda df: gr.Dropdown(choices=list(df.columns) if df is not None else []), | |
inputs=[current_data], | |
outputs=[analysis_col] | |
) | |
def analysis_handler(df, col, n, norm): | |
if df is None: | |
return None, "β No data loaded" | |
return get_value_counts(df, col, n, norm) | |
analyze_btn.click( | |
analysis_handler, | |
inputs=[current_data, analysis_col, top_n, normalize_counts], | |
outputs=[analysis_results, analysis_status] | |
) | |
# ===== DATA CLEANING TAB ===== | |
with gr.TabItem("π§Ή Data Cleaning"): | |
gr.HTML("<h3>Clean and preprocess your data</h3>") | |
with gr.Tabs(): | |
# Missing Values | |
with gr.TabItem("Missing Values"): | |
with gr.Row(): | |
with gr.Column(): | |
missing_col = gr.Dropdown(label="Column", choices=["ALL"], value="ALL", interactive=True) | |
missing_method = gr.Radio( | |
choices=["drop_rows", "fill_value", "fill_mean", "fill_median", "fill_mode", "forward_fill", "backward_fill"], | |
value="drop_rows", | |
label="Method" | |
) | |
fill_value_input = gr.Textbox(label="Fill Value", placeholder="For fill_value method") | |
missing_btn = gr.Button("π§Ή Handle Missing Values", variant="primary") | |
with gr.Column(): | |
missing_status = gr.Textbox(label="Status", lines=4, interactive=False) | |
# Duplicates | |
with gr.TabItem("Duplicates"): | |
with gr.Row(): | |
with gr.Column(): | |
duplicate_cols = gr.Textbox( | |
label="Columns to Check", | |
placeholder="column1, column2 (empty = all columns)" | |
) | |
keep_method = gr.Radio( | |
choices=["first", "last", "false"], | |
value="first", | |
label="Keep Method" | |
) | |
duplicate_btn = gr.Button("ποΈ Remove Duplicates", variant="primary") | |
with gr.Column(): | |
duplicate_status = gr.Textbox(label="Status", lines=4, interactive=False) | |
# Data Filtering | |
with gr.TabItem("Filtering"): | |
with gr.Row(): | |
with gr.Column(): | |
filter_col = gr.Dropdown(label="Column", choices=[], interactive=True) | |
filter_condition = gr.Dropdown( | |
choices=["equals", "not_equals", "contains", "not_contains", "starts_with", "ends_with", | |
"greater_than", "less_than", "is_null", "is_not_null"], | |
value="equals", | |
label="Condition" | |
) | |
filter_value = gr.Textbox(label="Value") | |
filter_btn = gr.Button("π Filter Data", variant="primary") | |
with gr.Column(): | |
filter_status = gr.Textbox(label="Status", lines=4, interactive=False) | |
# Update dropdown choices | |
current_data.change( | |
lambda df: ( | |
gr.Dropdown(choices=["ALL"] + list(df.columns) if df is not None else ["ALL"]), | |
gr.Dropdown(choices=list(df.columns) if df is not None else []) | |
), | |
inputs=[current_data], | |
outputs=[missing_col, filter_col] | |
) | |
# Event handlers | |
missing_btn.click( | |
lambda df, col, method, val: handle_missing_values(df, col, method, val)[1] if df is not None else "β No data", | |
inputs=[current_data, missing_col, missing_method, fill_value_input], | |
outputs=[missing_status] | |
).then( | |
lambda: processor.current_df, | |
outputs=[current_data] | |
).then( | |
lambda df: get_data_info(df), | |
inputs=[current_data], | |
outputs=[data_info_display] | |
) | |
duplicate_btn.click( | |
lambda df, cols, keep: detect_and_remove_duplicates(df, cols, keep)[1] if df is not None else "β No data", | |
inputs=[current_data, duplicate_cols, keep_method], | |
outputs=[duplicate_status] | |
).then( | |
lambda: processor.current_df, | |
outputs=[current_data] | |
).then( | |
lambda df: get_data_info(df), | |
inputs=[current_data], | |
outputs=[data_info_display] | |
) | |
filter_btn.click( | |
lambda df, col, cond, val: filter_data(df, col, cond, val)[1] if df is not None else "β No data", | |
inputs=[current_data, filter_col, filter_condition, filter_value], | |
outputs=[filter_status] | |
).then( | |
lambda: processor.current_df, | |
outputs=[current_data] | |
).then( | |
lambda df: get_data_info(df), | |
inputs=[current_data], | |
outputs=[data_info_display] | |
) | |
# ===== COLUMN OPERATIONS TAB ===== | |
with gr.TabItem("βοΈ Column Operations"): | |
gr.HTML("<h3>Perform operations on columns</h3>") | |
with gr.Row(): | |
with gr.Column(): | |
op_type = gr.Dropdown( | |
choices=["add", "subtract", "multiply", "divide", "concatenate", | |
"extract_numbers", "upper", "lower", "title", "length"], | |
value="add", | |
label="Operation" | |
) | |
op_col1 = gr.Dropdown(label="Primary Column", choices=[], interactive=True) | |
op_col2 = gr.Dropdown(label="Second Column (optional)", choices=[], interactive=True) | |
op_constant = gr.Textbox(label="Constant Value (optional)") | |
op_new_name = gr.Textbox(label="New Column Name") | |
op_btn = gr.Button("βοΈ Execute Operation", variant="primary") | |
with gr.Column(): | |
op_status = gr.Textbox(label="Status", lines=5, interactive=False) | |
# Data type conversion | |
gr.HTML("<hr><h4>Data Type Conversion</h4>") | |
convert_col = gr.Dropdown(label="Column", choices=[], interactive=True) | |
convert_type = gr.Dropdown( | |
choices=["string", "integer", "float", "datetime", "boolean", "category"], | |
value="string", | |
label="Target Type" | |
) | |
convert_btn = gr.Button("π Convert Type", variant="secondary") | |
convert_status = gr.Textbox(label="Conversion Status", lines=2, interactive=False) | |
# Update column choices | |
current_data.change( | |
lambda df: ( | |
gr.Dropdown(choices=list(df.columns) if df is not None else []), | |
gr.Dropdown(choices=list(df.columns) if df is not None else []), | |
gr.Dropdown(choices=list(df.columns) if df is not None else []) | |
), | |
inputs=[current_data], | |
outputs=[op_col1, op_col2, convert_col] | |
) | |
# Event handlers | |
def operation_handler(df, op, col1, col2, const, new_name): | |
if df is None: | |
return None, "β No data loaded", "" | |
result_df, status = perform_column_operations(df, op, col1, col2, new_name, const) | |
info_text = get_data_info(result_df) if result_df is not None else "" | |
return result_df, status, info_text | |
op_btn.click( | |
operation_handler, | |
inputs=[current_data, op_type, op_col1, op_col2, op_constant, op_new_name], | |
outputs=[current_data, op_status, data_info_display] | |
) | |
def convert_handler(df, col, target_type): | |
if df is None: | |
return None, "β No data loaded", "" | |
result_df, status = convert_data_types(df, col, target_type) | |
info_text = get_data_info(result_df) if result_df is not None else "" | |
return result_df, status, info_text | |
convert_btn.click( | |
convert_handler, | |
inputs=[current_data, convert_col, convert_type], | |
outputs=[current_data, convert_status, data_info_display] | |
) | |
# ===== STATISTICS TAB ===== | |
with gr.TabItem("π Statistics & Analysis"): | |
gr.HTML("<h3>Statistical analysis and insights</h3>") | |
with gr.Row(): | |
with gr.Column(): | |
stats_btn = gr.Button("π Generate Statistical Summary", variant="primary") | |
corr_btn = gr.Button("π Create Correlation Matrix", variant="secondary") | |
# Distribution plots | |
gr.HTML("<hr><h4>Distribution Analysis</h4>") | |
dist_col = gr.Dropdown(label="Column", choices=[], interactive=True) | |
plot_type = gr.Radio(choices=["histogram", "density"], value="histogram", label="Plot Type") | |
dist_btn = gr.Button("π Create Distribution Plot", variant="secondary") | |
with gr.Column(): | |
stats_status = gr.Textbox(label="Status", lines=3, interactive=False) | |
plot_output = gr.Image(label="Visualization") | |
stats_results = gr.DataFrame(label="Statistical Summary", height=400) | |
# Update column choices | |
current_data.change( | |
lambda df: gr.Dropdown(choices=list(df.select_dtypes(include=[np.number]).columns) if df is not None else []), | |
inputs=[current_data], | |
outputs=[dist_col] | |
) | |
# Event handlers | |
stats_btn.click( | |
lambda df: generate_statistical_summary(df) if df is not None else (None, "β No data"), | |
inputs=[current_data], | |
outputs=[stats_results, stats_status] | |
) | |
corr_btn.click( | |
lambda df: create_correlation_matrix(df) if df is not None else (None, "β No data"), | |
inputs=[current_data], | |
outputs=[plot_output, stats_status] | |
) | |
dist_btn.click( | |
lambda df, col, ptype: create_distribution_plots(df, col, ptype) if df is not None else (None, "β No data"), | |
inputs=[current_data, dist_col, plot_type], | |
outputs=[plot_output, stats_status] | |
) | |
# ===== EXPORT TAB ===== | |
with gr.TabItem("πΎ Export & Download"): | |
gr.HTML("<h3>Export your processed data</h3>") | |
with gr.Row(): | |
with gr.Column(): | |
export_format = gr.Radio( | |
choices=["csv", "excel", "json"], | |
value="csv", | |
label="Export Format" | |
) | |
export_filename = gr.Textbox( | |
label="Filename (without extension)", | |
value="processed_data", | |
placeholder="Enter filename" | |
) | |
export_btn = gr.Button("πΎ Create Download File", variant="primary", size="lg") | |
with gr.Column(): | |
export_status = gr.Textbox(label="Status", lines=3, interactive=False) | |
download_file = gr.File(label="Download", visible=False) | |
# History and Undo/Redo | |
with gr.Row(): | |
with gr.Column(): | |
gr.HTML("<hr><h4>History & Undo Operations</h4>") | |
undo_btn = gr.Button("βΆ Undo Last Operation", variant="secondary") | |
reset_btn = gr.Button("π Reset to Original", variant="secondary") | |
with gr.Column(): | |
history_status = gr.Textbox(label="History Status", lines=3, interactive=False) | |
def export_handler(df, fmt, filename): | |
if df is None: | |
return None, "β No data to export", gr.File(visible=False) | |
try: | |
file_data, file_name = create_download_file(df, fmt, filename) | |
# Save file temporarily | |
with open(file_name, 'wb' if fmt == 'excel' else 'w', encoding=None if fmt == 'excel' else 'utf-8') as f: | |
if fmt == 'excel': | |
f.write(file_data) | |
else: | |
f.write(file_data) | |
return file_name, f"β File created successfully: {file_name}", gr.File(value=file_name, visible=True) | |
except Exception as e: | |
return None, f"β Export error: {str(e)}", gr.File(visible=False) | |
export_btn.click( | |
export_handler, | |
inputs=[current_data, export_format, export_filename], | |
outputs=[download_file, export_status, download_file] | |
) | |
def undo_handler(): | |
result_df, status = processor.undo_operation() | |
info_text = get_data_info(result_df) if result_df is not None else "" | |
return result_df, status, info_text | |
def reset_handler(): | |
result_df, status = processor.reset_to_original() | |
info_text = get_data_info(result_df) if result_df is not None else "" | |
return result_df, status, info_text | |
undo_btn.click( | |
undo_handler, | |
outputs=[current_data, history_status, data_info_display] | |
) | |
reset_btn.click( | |
reset_handler, | |
outputs=[current_data, history_status, data_info_display] | |
) | |
# Footer | |
gr.HTML(""" | |
<div style="text-align: center; padding: 20px; margin-top: 30px; border-top: 1px solid #ddd;"> | |
<p style="color: #666; font-size: 14px;"> | |
π <strong>Advanced CSV Manipulation Tool</strong> | | |
Commercial-ready data processing with enterprise features | | |
Built with Gradio & Python | |
</p> | |
</div> | |
""") | |
return demo | |
if __name__ == "__main__": | |
# Create and launch the interface | |
demo = create_interface() | |
demo.launch( | |
share=True, | |
inbrowser=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
max_file_size="1gb" | |
) |