limitedonly41's picture
Rename csv-manipulator.py to app.py
ad61191 verified
raw
history blame
51.8 kB
#!/usr/bin/env python3
"""
Advanced CSV Manipulation Tool with Gradio Interface
Commercial-ready application for powerful CSV data processing
Features:
- File upload with 1GB limit
- Data preview with selectable rows
- Value replacement based on conditions
- CSV concatenation with column selection
- Advanced statistical analysis and visualization
- Data validation and quality checks
- Export to CSV, Excel, JSON
- Batch operations and operation recipes
- Undo/Redo functionality
- Memory-efficient large file processing
"""
import gradio as gr
import pandas as pd
import numpy as np
import json
import io
import zipfile
from datetime import datetime, timedelta
import re
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import warnings
import os
from typing import Dict, List, Tuple, Optional, Any
import hashlib
import pickle
from pathlib import Path
warnings.filterwarnings('ignore')
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
class CSVProcessor:
"""Advanced CSV processing class with state management and history"""
def __init__(self):
self.original_df = None
self.current_df = None
self.history = []
self.recipes = {}
self.batch_files = []
def load_data(self, file, preview_rows=100, encoding='utf-8'):
"""Load data file with error handling and memory optimization"""
try:
if file is None:
return None, "No file provided"
file_path = file.name if hasattr(file, 'name') else str(file)
file_extension = Path(file_path).suffix.lower()
# Chunked reading for large files
if file_extension == '.csv':
# Try different encodings
encodings = [encoding, 'utf-8', 'latin-1', 'cp1252']
df = None
for enc in encodings:
try:
df = pd.read_csv(file_path, encoding=enc, low_memory=False)
break
except UnicodeDecodeError:
continue
if df is None:
return None, "Failed to decode file with supported encodings"
elif file_extension in ['.xlsx', '.xls']:
df = pd.read_excel(file_path)
elif file_extension == '.json':
df = pd.read_json(file_path)
elif file_extension == '.parquet':
df = pd.read_parquet(file_path)
else:
return None, f"Unsupported file format: {file_extension}"
self.original_df = df.copy()
self.current_df = df.copy()
self.history = []
# Create preview
if preview_rows > 0:
preview = df.head(preview_rows)
else:
preview = df
# Memory and performance info
memory_mb = df.memory_usage(deep=True).sum() / 1024**2
info = {
'rows': len(df),
'columns': len(df.columns),
'memory_usage': f"{memory_mb:.2f} MB",
'dtypes': dict(df.dtypes.astype(str)),
'null_counts': dict(df.isnull().sum()),
'duplicates': df.duplicated().sum()
}
success_msg = f"βœ… File loaded successfully!\n"
success_msg += f"πŸ“Š {info['rows']:,} rows Γ— {info['columns']} columns\n"
success_msg += f"πŸ’Ύ Memory usage: {info['memory_usage']}\n"
success_msg += f"πŸ”„ Duplicates: {info['duplicates']:,}\n"
success_msg += f"❌ Missing values: {sum(info['null_counts'].values()):,}"
return preview, success_msg, info
except Exception as e:
return None, f"❌ Error loading file: {str(e)}", {}
def save_state(self, operation_name: str):
"""Save current state to history with memory management"""
if len(self.history) > 50: # Limit history to prevent memory issues
self.history = self.history[-25:] # Keep last 25 operations
self.history.append({
'operation': operation_name,
'timestamp': datetime.now(),
'df': self.current_df.copy() if self.current_df is not None else None
})
def undo_operation(self):
"""Undo last operation"""
if len(self.history) > 1:
self.history.pop()
self.current_df = self.history[-1]['df'].copy()
return self.current_df, f"βœ… Undone: {self.history[-1]['operation']}"
elif len(self.history) == 1:
self.current_df = self.original_df.copy()
self.history = []
return self.current_df, "βœ… Reset to original data"
else:
return self.current_df, "❌ No operations to undo"
def reset_to_original(self):
"""Reset to original data"""
if self.original_df is not None:
self.current_df = self.original_df.copy()
self.history = []
return self.current_df, "βœ… Reset to original data"
return None, "❌ No original data available"
# Global processor instance
processor = CSVProcessor()
def create_download_file(df: pd.DataFrame, format_type: str, filename: str = "processed_data"):
"""Create downloadable file in specified format"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename_with_timestamp = f"{filename}_{timestamp}"
try:
if format_type == "csv":
csv_data = df.to_csv(index=False)
return csv_data, f"{filename_with_timestamp}.csv"
elif format_type == "excel":
buffer = io.BytesIO()
with pd.ExcelWriter(buffer, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='Data')
buffer.seek(0)
return buffer.getvalue(), f"{filename_with_timestamp}.xlsx"
elif format_type == "json":
json_data = df.to_json(orient='records', indent=2, date_format='iso')
return json_data, f"{filename_with_timestamp}.json"
except Exception as e:
return None, f"Error creating {format_type} file: {str(e)}"
def get_data_info(df: pd.DataFrame) -> str:
"""Get comprehensive data information"""
if df is None or df.empty:
return "No data loaded"
info_dict = {
'πŸ“Š Shape': f"{df.shape[0]:,} rows Γ— {df.shape[1]} columns",
'πŸ’Ύ Memory': f"{df.memory_usage(deep=True).sum() / 1024**2:.2f} MB",
'πŸ”„ Duplicates': f"{df.duplicated().sum():,}",
'❌ Missing Values': f"{df.isnull().sum().sum():,}",
'πŸ“ˆ Numeric Columns': f"{len(df.select_dtypes(include=[np.number]).columns)}",
'πŸ“ Text Columns': f"{len(df.select_dtypes(include=['object']).columns)}",
'πŸ“… Date Columns': f"{len(df.select_dtypes(include=['datetime64']).columns)}"
}
return "\n".join([f"{k}: {v}" for k, v in info_dict.items()])
def get_column_options(df: pd.DataFrame) -> List[str]:
"""Get list of column names for dropdowns"""
return list(df.columns) if df is not None else []
# ===========================================
# CORE DATA MANIPULATION FUNCTIONS
# ===========================================
def rename_values_conditional(df: pd.DataFrame, target_col: str, condition_col: str,
condition_value: str, new_value: str, match_type: str = "exact") -> Tuple[pd.DataFrame, str]:
"""Rename values in target column based on condition in another column"""
try:
if df is None or df.empty:
return df, "❌ No data available"
if target_col not in df.columns or condition_col not in df.columns:
return df, "❌ One or more columns not found"
df_result = df.copy()
if match_type == "exact":
mask = df_result[condition_col] == condition_value
elif match_type == "contains":
mask = df_result[condition_col].astype(str).str.contains(condition_value, na=False)
elif match_type == "regex":
mask = df_result[condition_col].astype(str).str.match(condition_value, na=False)
elif match_type == "starts_with":
mask = df_result[condition_col].astype(str).str.startswith(condition_value, na=False)
elif match_type == "ends_with":
mask = df_result[condition_col].astype(str).str.endswith(condition_value, na=False)
affected_rows = mask.sum()
df_result.loc[mask, target_col] = new_value
processor.current_df = df_result
processor.save_state(f"Renamed values in '{target_col}' based on '{condition_col}'")
return df_result, f"βœ… Updated {affected_rows:,} rows in column '{target_col}'"
except Exception as e:
return df, f"❌ Error: {str(e)}"
def concatenate_csvs(files: List, selected_columns: str, join_type: str = "outer") -> Tuple[pd.DataFrame, str]:
"""Concatenate multiple CSV files with column selection"""
try:
if not files:
return None, "❌ No files provided"
dfs = []
columns_to_use = [col.strip() for col in selected_columns.split(",") if col.strip()] if selected_columns else None
for file in files:
if hasattr(file, 'name'):
file_path = file.name
if file_path.endswith('.csv'):
df = pd.read_csv(file_path, encoding='utf-8', low_memory=False)
elif file_path.endswith(('.xlsx', '.xls')):
df = pd.read_excel(file_path)
else:
continue
# Select specific columns if specified
if columns_to_use:
available_cols = [col for col in columns_to_use if col in df.columns]
if available_cols:
df = df[available_cols]
else:
continue
# Add source file identifier
df['_source_file'] = Path(file_path).stem
dfs.append(df)
if not dfs:
return None, "❌ No valid files found or columns don't exist"
# Concatenate with specified join type
if join_type == "inner":
result_df = pd.concat(dfs, ignore_index=True, join='inner')
else:
result_df = pd.concat(dfs, ignore_index=True, join='outer')
processor.current_df = result_df
processor.save_state(f"Concatenated {len(dfs)} files")
return result_df, f"βœ… Successfully concatenated {len(dfs)} files with {len(result_df):,} total rows"
except Exception as e:
return None, f"❌ Error concatenating files: {str(e)}"
def get_value_counts(df: pd.DataFrame, column: str, top_n: int = 20, normalize: bool = False) -> Tuple[pd.DataFrame, str]:
"""Get value counts for specified column"""
try:
if df is None or df.empty:
return None, "❌ No data available"
if column not in df.columns:
return None, f"❌ Column '{column}' not found"
value_counts = df[column].value_counts(normalize=normalize, dropna=False).head(top_n)
# Convert to DataFrame for better display
result_df = pd.DataFrame({
'Value': value_counts.index,
'Count' if not normalize else 'Percentage': value_counts.values
})
if normalize:
result_df['Percentage'] = result_df['Percentage'].map(lambda x: f"{x:.2%}")
return result_df, f"βœ… Value counts for '{column}' (Top {min(top_n, len(result_df))})"
except Exception as e:
return None, f"❌ Error: {str(e)}"
def filter_data(df: pd.DataFrame, column: str, condition: str, value: str) -> Tuple[pd.DataFrame, str]:
"""Filter data based on conditions"""
try:
if df is None or df.empty:
return df, "❌ No data available"
if column not in df.columns:
return df, f"❌ Column '{column}' not found"
df_result = df.copy()
if condition == "equals":
mask = df_result[column] == value
elif condition == "not_equals":
mask = df_result[column] != value
elif condition == "contains":
mask = df_result[column].astype(str).str.contains(value, na=False)
elif condition == "not_contains":
mask = ~df_result[column].astype(str).str.contains(value, na=False)
elif condition == "starts_with":
mask = df_result[column].astype(str).str.startswith(value, na=False)
elif condition == "ends_with":
mask = df_result[column].astype(str).str.endswith(value, na=False)
elif condition == "greater_than":
mask = pd.to_numeric(df_result[column], errors='coerce') > float(value)
elif condition == "less_than":
mask = pd.to_numeric(df_result[column], errors='coerce') < float(value)
elif condition == "is_null":
mask = df_result[column].isnull()
elif condition == "is_not_null":
mask = df_result[column].notnull()
else:
return df, f"❌ Unknown condition: {condition}"
filtered_df = df_result[mask]
processor.current_df = filtered_df
processor.save_state(f"Filtered data: {column} {condition} {value}")
return filtered_df, f"βœ… Filtered to {len(filtered_df):,} rows (removed {len(df) - len(filtered_df):,} rows)"
except Exception as e:
return df, f"❌ Error: {str(e)}"
def handle_missing_values(df: pd.DataFrame, column: str, method: str, fill_value: str = "") -> Tuple[pd.DataFrame, str]:
"""Handle missing values in specified column"""
try:
if df is None or df.empty:
return df, "❌ No data available"
if column != "ALL" and column not in df.columns:
return df, f"❌ Column '{column}' not found"
df_result = df.copy()
columns_to_process = [column] if column != "ALL" else df_result.columns.tolist()
total_missing_before = df_result.isnull().sum().sum()
for col in columns_to_process:
if method == "drop_rows":
df_result = df_result.dropna(subset=[col])
elif method == "fill_value":
df_result[col] = df_result[col].fillna(fill_value)
elif method == "fill_mean":
if df_result[col].dtype in ['int64', 'float64']:
df_result[col] = df_result[col].fillna(df_result[col].mean())
elif method == "fill_median":
if df_result[col].dtype in ['int64', 'float64']:
df_result[col] = df_result[col].fillna(df_result[col].median())
elif method == "fill_mode":
mode_val = df_result[col].mode()
if len(mode_val) > 0:
df_result[col] = df_result[col].fillna(mode_val[0])
elif method == "forward_fill":
df_result[col] = df_result[col].fillna(method='ffill')
elif method == "backward_fill":
df_result[col] = df_result[col].fillna(method='bfill')
total_missing_after = df_result.isnull().sum().sum()
processor.current_df = df_result
processor.save_state(f"Handle missing values: {method}")
return df_result, f"βœ… Processed missing values. Before: {total_missing_before:,}, After: {total_missing_after:,}"
except Exception as e:
return df, f"❌ Error: {str(e)}"
def detect_and_remove_duplicates(df: pd.DataFrame, columns: str = "", keep: str = "first") -> Tuple[pd.DataFrame, str]:
"""Detect and remove duplicate rows"""
try:
if df is None or df.empty:
return df, "❌ No data available"
df_result = df.copy()
# Parse columns
if columns.strip():
cols_list = [col.strip() for col in columns.split(",") if col.strip() in df.columns]
subset = cols_list if cols_list else None
else:
subset = None
duplicates_before = df_result.duplicated(subset=subset).sum()
if duplicates_before == 0:
return df_result, "βœ… No duplicate rows found"
df_result = df_result.drop_duplicates(subset=subset, keep=keep)
processor.current_df = df_result
processor.save_state(f"Removed {duplicates_before:,} duplicate rows")
return df_result, f"βœ… Removed {duplicates_before:,} duplicate rows. Remaining: {len(df_result):,} rows"
except Exception as e:
return df, f"❌ Error: {str(e)}"
def perform_column_operations(df: pd.DataFrame, operation: str, col1: str, col2: str = "",
new_col_name: str = "", constant: str = "") -> Tuple[pd.DataFrame, str]:
"""Perform mathematical and string operations on columns"""
try:
if df is None or df.empty:
return df, "❌ No data available"
if col1 not in df.columns:
return df, f"❌ Column '{col1}' not found"
df_result = df.copy()
if not new_col_name:
new_col_name = f"{col1}_{operation}"
if operation == "add":
if col2 and col2 in df.columns:
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') + pd.to_numeric(df_result[col2], errors='coerce')
elif constant:
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') + float(constant)
elif operation == "subtract":
if col2 and col2 in df.columns:
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') - pd.to_numeric(df_result[col2], errors='coerce')
elif constant:
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') - float(constant)
elif operation == "multiply":
if col2 and col2 in df.columns:
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') * pd.to_numeric(df_result[col2], errors='coerce')
elif constant:
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') * float(constant)
elif operation == "divide":
if col2 and col2 in df.columns:
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') / pd.to_numeric(df_result[col2], errors='coerce')
elif constant:
df_result[new_col_name] = pd.to_numeric(df_result[col1], errors='coerce') / float(constant)
elif operation == "concatenate":
if col2 and col2 in df.columns:
df_result[new_col_name] = df_result[col1].astype(str) + " " + df_result[col2].astype(str)
elif constant:
df_result[new_col_name] = df_result[col1].astype(str) + constant
elif operation == "extract_numbers":
df_result[new_col_name] = df_result[col1].astype(str).str.extract(r'(\d+)')[0]
elif operation == "upper":
df_result[new_col_name] = df_result[col1].astype(str).str.upper()
elif operation == "lower":
df_result[new_col_name] = df_result[col1].astype(str).str.lower()
elif operation == "title":
df_result[new_col_name] = df_result[col1].astype(str).str.title()
elif operation == "length":
df_result[new_col_name] = df_result[col1].astype(str).str.len()
else:
return df, f"❌ Unknown operation: {operation}"
processor.current_df = df_result
processor.save_state(f"Column operation: {operation} on {col1}")
return df_result, f"βœ… Created new column '{new_col_name}' using {operation} operation"
except Exception as e:
return df, f"❌ Error: {str(e)}"
def convert_data_types(df: pd.DataFrame, column: str, target_type: str) -> Tuple[pd.DataFrame, str]:
"""Convert column data types"""
try:
if df is None or df.empty:
return df, "❌ No data available"
if column not in df.columns:
return df, f"❌ Column '{column}' not found"
df_result = df.copy()
if target_type == "string":
df_result[column] = df_result[column].astype(str)
elif target_type == "integer":
df_result[column] = pd.to_numeric(df_result[column], errors='coerce').astype('Int64')
elif target_type == "float":
df_result[column] = pd.to_numeric(df_result[column], errors='coerce')
elif target_type == "datetime":
df_result[column] = pd.to_datetime(df_result[column], errors='coerce')
elif target_type == "boolean":
df_result[column] = df_result[column].astype(bool)
elif target_type == "category":
df_result[column] = df_result[column].astype('category')
else:
return df, f"❌ Unknown data type: {target_type}"
processor.current_df = df_result
processor.save_state(f"Converted '{column}' to {target_type}")
return df_result, f"βœ… Converted column '{column}' to {target_type}"
except Exception as e:
return df, f"❌ Error: {str(e)}"
# ===========================================
# ANALYSIS AND VISUALIZATION FUNCTIONS
# ===========================================
def generate_statistical_summary(df: pd.DataFrame) -> Tuple[pd.DataFrame, str]:
"""Generate comprehensive statistical summary"""
try:
if df is None or df.empty:
return None, "❌ No data available"
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) == 0:
return None, "❌ No numeric columns found"
stats_df = df[numeric_cols].describe()
# Add additional statistics
stats_df.loc['variance'] = df[numeric_cols].var()
stats_df.loc['skewness'] = df[numeric_cols].skew()
stats_df.loc['kurtosis'] = df[numeric_cols].kurtosis()
stats_df.loc['missing'] = df[numeric_cols].isnull().sum()
return stats_df.round(4), "βœ… Statistical summary generated"
except Exception as e:
return None, f"❌ Error: {str(e)}"
def create_correlation_matrix(df: pd.DataFrame) -> Tuple[str, str]:
"""Create correlation matrix visualization"""
try:
if df is None or df.empty:
return None, "❌ No data available"
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) < 2:
return None, "❌ Need at least 2 numeric columns for correlation"
# Calculate correlation matrix
corr_matrix = df[numeric_cols].corr()
# Create heatmap
plt.figure(figsize=(12, 8))
mask = np.triu(np.ones_like(corr_matrix, dtype=bool))
sns.heatmap(corr_matrix, mask=mask, annot=True, cmap='coolwarm', center=0,
square=True, linewidths=0.5, cbar_kws={"shrink": 0.8})
plt.title('Correlation Matrix Heatmap', fontsize=16, fontweight='bold')
plt.tight_layout()
# Save plot
plt.savefig('correlation_matrix.png', dpi=300, bbox_inches='tight')
plt.close()
return 'correlation_matrix.png', "βœ… Correlation matrix created"
except Exception as e:
return None, f"❌ Error: {str(e)}"
def create_distribution_plots(df: pd.DataFrame, column: str, plot_type: str = "histogram") -> Tuple[str, str]:
"""Create distribution plots"""
try:
if df is None or df.empty:
return None, "❌ No data available"
if column not in df.columns:
return None, f"❌ Column '{column}' not found"
plt.figure(figsize=(12, 6))
if plot_type == "histogram":
plt.subplot(1, 2, 1)
df[column].hist(bins=30, edgecolor='black', alpha=0.7)
plt.title(f'Histogram of {column}')
plt.xlabel(column)
plt.ylabel('Frequency')
plt.subplot(1, 2, 2)
df.boxplot(column=column)
plt.title(f'Box Plot of {column}')
elif plot_type == "density":
plt.subplot(1, 2, 1)
df[column].plot(kind='density')
plt.title(f'Density Plot of {column}')
plt.xlabel(column)
plt.subplot(1, 2, 2)
df[column].plot(kind='box')
plt.title(f'Box Plot of {column}')
plt.tight_layout()
plt.savefig(f'distribution_{column}_{plot_type}.png', dpi=300, bbox_inches='tight')
plt.close()
return f'distribution_{column}_{plot_type}.png', f"βœ… Distribution plot created for {column}"
except Exception as e:
return None, f"❌ Error: {str(e)}"
# ===========================================
# GRADIO INTERFACE SETUP
# ===========================================
def create_interface():
"""Create the main Gradio interface"""
with gr.Blocks(title="Advanced CSV Manipulation Tool", theme=gr.themes.Soft()) as demo:
gr.HTML("""
<div style="text-align: center; padding: 20px;">
<h1 style="color: #2e7d32; margin-bottom: 10px;">πŸ”₯ Advanced CSV Manipulation Tool</h1>
<p style="font-size: 18px; color: #666;">Commercial-ready data processing with advanced analytics</p>
<hr style="margin: 20px 0;">
</div>
""")
# Global state variables
current_data = gr.State(None)
data_info = gr.State({})
with gr.Tabs():
# ===== FILE UPLOAD TAB =====
with gr.TabItem("πŸ“ File Upload & Preview"):
with gr.Row():
with gr.Column(scale=1):
file_upload = gr.File(
label="Upload CSV/Excel/JSON file (Max 1GB)",
file_types=[".csv", ".xlsx", ".xls", ".json"],
file_count="single"
)
preview_rows = gr.Slider(
minimum=0,
maximum=1000,
value=100,
step=50,
label="Preview Rows (0 = All)",
info="Number of rows to display in preview"
)
upload_btn = gr.Button("πŸ“Š Load & Analyze Data", variant="primary", size="lg")
with gr.Column(scale=2):
upload_status = gr.Textbox(label="Status", lines=5, interactive=False)
data_info_display = gr.Textbox(label="Data Information", lines=8, interactive=False)
data_preview = gr.DataFrame(label="Data Preview", interactive=False, height=400)
def load_file_handler(file, rows):
if file is None:
return None, "Please upload a file first", "", None, {}
preview, status, info = processor.load_data(file, rows)
info_text = get_data_info(processor.current_df) if processor.current_df is not None else ""
return preview, status, info_text, processor.current_df, info
upload_btn.click(
load_file_handler,
inputs=[file_upload, preview_rows],
outputs=[data_preview, upload_status, data_info_display, current_data, data_info]
)
# ===== VALUE REPLACEMENT TAB =====
with gr.TabItem("πŸ”„ Value Replacement"):
gr.HTML("<h3>Replace values in one column based on conditions in another column</h3>")
with gr.Row():
with gr.Column():
target_col = gr.Dropdown(label="Target Column (to modify)", choices=[], interactive=True)
condition_col = gr.Dropdown(label="Condition Column (to check)", choices=[], interactive=True)
condition_value = gr.Textbox(label="Condition Value", placeholder="Value to match in condition column")
new_value = gr.Textbox(label="New Value", placeholder="Replacement value for target column")
match_type = gr.Radio(
choices=["exact", "contains", "starts_with", "ends_with", "regex"],
value="exact",
label="Match Type"
)
replace_btn = gr.Button("πŸ”„ Replace Values", variant="primary")
with gr.Column():
replace_status = gr.Textbox(label="Status", lines=3, interactive=False)
# Update column choices when data changes
def update_columns(df):
if df is not None:
cols = list(df.columns)
return gr.Dropdown(choices=cols), gr.Dropdown(choices=cols)
return gr.Dropdown(choices=[]), gr.Dropdown(choices=[])
current_data.change(
update_columns,
inputs=[current_data],
outputs=[target_col, condition_col]
)
def replace_values_handler(df, tcol, ccol, cval, nval, mtype):
if df is None:
return None, "❌ No data loaded", ""
result_df, status = rename_values_conditional(df, tcol, ccol, cval, nval, mtype)
info_text = get_data_info(result_df) if result_df is not None else ""
return result_df, status, info_text
replace_btn.click(
replace_values_handler,
inputs=[current_data, target_col, condition_col, condition_value, new_value, match_type],
outputs=[current_data, replace_status, data_info_display]
)
# ===== CSV CONCATENATION TAB =====
with gr.TabItem("πŸ“‹ CSV Concatenation"):
gr.HTML("<h3>Combine multiple CSV files with column selection</h3>")
with gr.Row():
with gr.Column():
multi_files = gr.File(
label="Upload Multiple Files",
file_types=[".csv", ".xlsx", ".xls"],
file_count="multiple"
)
selected_columns = gr.Textbox(
label="Columns to Include",
placeholder="column1, column2, column3 (leave empty for all)",
info="Comma-separated list of column names"
)
join_type = gr.Radio(
choices=["outer", "inner"],
value="outer",
label="Join Type",
info="Outer: keep all columns, Inner: only common columns"
)
concat_btn = gr.Button("πŸ“‹ Concatenate Files", variant="primary")
with gr.Column():
concat_status = gr.Textbox(label="Status", lines=5, interactive=False)
def concat_handler(files, cols, jtype):
if not files:
return None, "❌ Please upload files first", ""
result_df, status = concatenate_csvs(files, cols, jtype)
info_text = get_data_info(result_df) if result_df is not None else ""
return result_df, status, info_text
concat_btn.click(
concat_handler,
inputs=[multi_files, selected_columns, join_type],
outputs=[current_data, concat_status, data_info_display]
)
# ===== VALUE COUNTS TAB =====
with gr.TabItem("πŸ“Š Value Analysis"):
gr.HTML("<h3>Analyze value frequencies and distributions</h3>")
with gr.Row():
with gr.Column():
analysis_col = gr.Dropdown(label="Column to Analyze", choices=[], interactive=True)
top_n = gr.Slider(minimum=5, maximum=100, value=20, step=5, label="Top N Values")
normalize_counts = gr.Checkbox(label="Show Percentages", value=False)
analyze_btn = gr.Button("πŸ“Š Analyze Values", variant="primary")
with gr.Column():
analysis_status = gr.Textbox(label="Status", lines=3, interactive=False)
analysis_results = gr.DataFrame(label="Value Counts", height=400)
# Update analysis column choices
current_data.change(
lambda df: gr.Dropdown(choices=list(df.columns) if df is not None else []),
inputs=[current_data],
outputs=[analysis_col]
)
def analysis_handler(df, col, n, norm):
if df is None:
return None, "❌ No data loaded"
return get_value_counts(df, col, n, norm)
analyze_btn.click(
analysis_handler,
inputs=[current_data, analysis_col, top_n, normalize_counts],
outputs=[analysis_results, analysis_status]
)
# ===== DATA CLEANING TAB =====
with gr.TabItem("🧹 Data Cleaning"):
gr.HTML("<h3>Clean and preprocess your data</h3>")
with gr.Tabs():
# Missing Values
with gr.TabItem("Missing Values"):
with gr.Row():
with gr.Column():
missing_col = gr.Dropdown(label="Column", choices=["ALL"], value="ALL", interactive=True)
missing_method = gr.Radio(
choices=["drop_rows", "fill_value", "fill_mean", "fill_median", "fill_mode", "forward_fill", "backward_fill"],
value="drop_rows",
label="Method"
)
fill_value_input = gr.Textbox(label="Fill Value", placeholder="For fill_value method")
missing_btn = gr.Button("🧹 Handle Missing Values", variant="primary")
with gr.Column():
missing_status = gr.Textbox(label="Status", lines=4, interactive=False)
# Duplicates
with gr.TabItem("Duplicates"):
with gr.Row():
with gr.Column():
duplicate_cols = gr.Textbox(
label="Columns to Check",
placeholder="column1, column2 (empty = all columns)"
)
keep_method = gr.Radio(
choices=["first", "last", "false"],
value="first",
label="Keep Method"
)
duplicate_btn = gr.Button("πŸ—‘οΈ Remove Duplicates", variant="primary")
with gr.Column():
duplicate_status = gr.Textbox(label="Status", lines=4, interactive=False)
# Data Filtering
with gr.TabItem("Filtering"):
with gr.Row():
with gr.Column():
filter_col = gr.Dropdown(label="Column", choices=[], interactive=True)
filter_condition = gr.Dropdown(
choices=["equals", "not_equals", "contains", "not_contains", "starts_with", "ends_with",
"greater_than", "less_than", "is_null", "is_not_null"],
value="equals",
label="Condition"
)
filter_value = gr.Textbox(label="Value")
filter_btn = gr.Button("πŸ” Filter Data", variant="primary")
with gr.Column():
filter_status = gr.Textbox(label="Status", lines=4, interactive=False)
# Update dropdown choices
current_data.change(
lambda df: (
gr.Dropdown(choices=["ALL"] + list(df.columns) if df is not None else ["ALL"]),
gr.Dropdown(choices=list(df.columns) if df is not None else [])
),
inputs=[current_data],
outputs=[missing_col, filter_col]
)
# Event handlers
missing_btn.click(
lambda df, col, method, val: handle_missing_values(df, col, method, val)[1] if df is not None else "❌ No data",
inputs=[current_data, missing_col, missing_method, fill_value_input],
outputs=[missing_status]
).then(
lambda: processor.current_df,
outputs=[current_data]
).then(
lambda df: get_data_info(df),
inputs=[current_data],
outputs=[data_info_display]
)
duplicate_btn.click(
lambda df, cols, keep: detect_and_remove_duplicates(df, cols, keep)[1] if df is not None else "❌ No data",
inputs=[current_data, duplicate_cols, keep_method],
outputs=[duplicate_status]
).then(
lambda: processor.current_df,
outputs=[current_data]
).then(
lambda df: get_data_info(df),
inputs=[current_data],
outputs=[data_info_display]
)
filter_btn.click(
lambda df, col, cond, val: filter_data(df, col, cond, val)[1] if df is not None else "❌ No data",
inputs=[current_data, filter_col, filter_condition, filter_value],
outputs=[filter_status]
).then(
lambda: processor.current_df,
outputs=[current_data]
).then(
lambda df: get_data_info(df),
inputs=[current_data],
outputs=[data_info_display]
)
# ===== COLUMN OPERATIONS TAB =====
with gr.TabItem("βš™οΈ Column Operations"):
gr.HTML("<h3>Perform operations on columns</h3>")
with gr.Row():
with gr.Column():
op_type = gr.Dropdown(
choices=["add", "subtract", "multiply", "divide", "concatenate",
"extract_numbers", "upper", "lower", "title", "length"],
value="add",
label="Operation"
)
op_col1 = gr.Dropdown(label="Primary Column", choices=[], interactive=True)
op_col2 = gr.Dropdown(label="Second Column (optional)", choices=[], interactive=True)
op_constant = gr.Textbox(label="Constant Value (optional)")
op_new_name = gr.Textbox(label="New Column Name")
op_btn = gr.Button("βš™οΈ Execute Operation", variant="primary")
with gr.Column():
op_status = gr.Textbox(label="Status", lines=5, interactive=False)
# Data type conversion
gr.HTML("<hr><h4>Data Type Conversion</h4>")
convert_col = gr.Dropdown(label="Column", choices=[], interactive=True)
convert_type = gr.Dropdown(
choices=["string", "integer", "float", "datetime", "boolean", "category"],
value="string",
label="Target Type"
)
convert_btn = gr.Button("πŸ”„ Convert Type", variant="secondary")
convert_status = gr.Textbox(label="Conversion Status", lines=2, interactive=False)
# Update column choices
current_data.change(
lambda df: (
gr.Dropdown(choices=list(df.columns) if df is not None else []),
gr.Dropdown(choices=list(df.columns) if df is not None else []),
gr.Dropdown(choices=list(df.columns) if df is not None else [])
),
inputs=[current_data],
outputs=[op_col1, op_col2, convert_col]
)
# Event handlers
def operation_handler(df, op, col1, col2, const, new_name):
if df is None:
return None, "❌ No data loaded", ""
result_df, status = perform_column_operations(df, op, col1, col2, new_name, const)
info_text = get_data_info(result_df) if result_df is not None else ""
return result_df, status, info_text
op_btn.click(
operation_handler,
inputs=[current_data, op_type, op_col1, op_col2, op_constant, op_new_name],
outputs=[current_data, op_status, data_info_display]
)
def convert_handler(df, col, target_type):
if df is None:
return None, "❌ No data loaded", ""
result_df, status = convert_data_types(df, col, target_type)
info_text = get_data_info(result_df) if result_df is not None else ""
return result_df, status, info_text
convert_btn.click(
convert_handler,
inputs=[current_data, convert_col, convert_type],
outputs=[current_data, convert_status, data_info_display]
)
# ===== STATISTICS TAB =====
with gr.TabItem("πŸ“ˆ Statistics & Analysis"):
gr.HTML("<h3>Statistical analysis and insights</h3>")
with gr.Row():
with gr.Column():
stats_btn = gr.Button("πŸ“Š Generate Statistical Summary", variant="primary")
corr_btn = gr.Button("πŸ”— Create Correlation Matrix", variant="secondary")
# Distribution plots
gr.HTML("<hr><h4>Distribution Analysis</h4>")
dist_col = gr.Dropdown(label="Column", choices=[], interactive=True)
plot_type = gr.Radio(choices=["histogram", "density"], value="histogram", label="Plot Type")
dist_btn = gr.Button("πŸ“ˆ Create Distribution Plot", variant="secondary")
with gr.Column():
stats_status = gr.Textbox(label="Status", lines=3, interactive=False)
plot_output = gr.Image(label="Visualization")
stats_results = gr.DataFrame(label="Statistical Summary", height=400)
# Update column choices
current_data.change(
lambda df: gr.Dropdown(choices=list(df.select_dtypes(include=[np.number]).columns) if df is not None else []),
inputs=[current_data],
outputs=[dist_col]
)
# Event handlers
stats_btn.click(
lambda df: generate_statistical_summary(df) if df is not None else (None, "❌ No data"),
inputs=[current_data],
outputs=[stats_results, stats_status]
)
corr_btn.click(
lambda df: create_correlation_matrix(df) if df is not None else (None, "❌ No data"),
inputs=[current_data],
outputs=[plot_output, stats_status]
)
dist_btn.click(
lambda df, col, ptype: create_distribution_plots(df, col, ptype) if df is not None else (None, "❌ No data"),
inputs=[current_data, dist_col, plot_type],
outputs=[plot_output, stats_status]
)
# ===== EXPORT TAB =====
with gr.TabItem("πŸ’Ύ Export & Download"):
gr.HTML("<h3>Export your processed data</h3>")
with gr.Row():
with gr.Column():
export_format = gr.Radio(
choices=["csv", "excel", "json"],
value="csv",
label="Export Format"
)
export_filename = gr.Textbox(
label="Filename (without extension)",
value="processed_data",
placeholder="Enter filename"
)
export_btn = gr.Button("πŸ’Ύ Create Download File", variant="primary", size="lg")
with gr.Column():
export_status = gr.Textbox(label="Status", lines=3, interactive=False)
download_file = gr.File(label="Download", visible=False)
# History and Undo/Redo
with gr.Row():
with gr.Column():
gr.HTML("<hr><h4>History & Undo Operations</h4>")
undo_btn = gr.Button("β†Ά Undo Last Operation", variant="secondary")
reset_btn = gr.Button("πŸ”„ Reset to Original", variant="secondary")
with gr.Column():
history_status = gr.Textbox(label="History Status", lines=3, interactive=False)
def export_handler(df, fmt, filename):
if df is None:
return None, "❌ No data to export", gr.File(visible=False)
try:
file_data, file_name = create_download_file(df, fmt, filename)
# Save file temporarily
with open(file_name, 'wb' if fmt == 'excel' else 'w', encoding=None if fmt == 'excel' else 'utf-8') as f:
if fmt == 'excel':
f.write(file_data)
else:
f.write(file_data)
return file_name, f"βœ… File created successfully: {file_name}", gr.File(value=file_name, visible=True)
except Exception as e:
return None, f"❌ Export error: {str(e)}", gr.File(visible=False)
export_btn.click(
export_handler,
inputs=[current_data, export_format, export_filename],
outputs=[download_file, export_status, download_file]
)
def undo_handler():
result_df, status = processor.undo_operation()
info_text = get_data_info(result_df) if result_df is not None else ""
return result_df, status, info_text
def reset_handler():
result_df, status = processor.reset_to_original()
info_text = get_data_info(result_df) if result_df is not None else ""
return result_df, status, info_text
undo_btn.click(
undo_handler,
outputs=[current_data, history_status, data_info_display]
)
reset_btn.click(
reset_handler,
outputs=[current_data, history_status, data_info_display]
)
# Footer
gr.HTML("""
<div style="text-align: center; padding: 20px; margin-top: 30px; border-top: 1px solid #ddd;">
<p style="color: #666; font-size: 14px;">
πŸš€ <strong>Advanced CSV Manipulation Tool</strong> |
Commercial-ready data processing with enterprise features |
Built with Gradio & Python
</p>
</div>
""")
return demo
if __name__ == "__main__":
# Create and launch the interface
demo = create_interface()
demo.launch(
share=True,
inbrowser=True,
server_name="0.0.0.0",
server_port=7860,
max_file_size="1gb"
)