|
|
|
|
|
import streamlit as st |
|
import numpy as np |
|
import pandas as pd |
|
from smolagents import CodeAgent, tool |
|
from typing import Union, List, Dict, Optional |
|
import matplotlib.pyplot as plt |
|
import seaborn as sns |
|
import plotly.express as px |
|
import plotly.graph_objects as go |
|
import os |
|
from groq import Groq |
|
from dataclasses import dataclass |
|
import tempfile |
|
import base64 |
|
import io |
|
from sklearn.model_selection import train_test_split |
|
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc |
|
import joblib |
|
import pdfkit |
|
import uuid |
|
|
|
|
|
|
|
|
|
class GroqLLM: |
|
"""Enhanced LLM interface with support for generating natural language summaries.""" |
|
|
|
def __init__(self, model_name: str = "llama-3.1-8B-Instant"): |
|
""" |
|
Initialize the GroqLLM with a specified model. |
|
|
|
Args: |
|
model_name (str): The name of the language model to use. |
|
""" |
|
self.client = Groq(api_key=os.environ.get("GROQ_API_KEY")) |
|
self.model_name = model_name |
|
|
|
def __call__(self, prompt: Union[str, dict, List[Dict]]) -> str: |
|
""" |
|
Make the class callable as required by smolagents. |
|
|
|
Args: |
|
prompt (Union[str, dict, List[Dict]]): The input prompt for the language model. |
|
|
|
Returns: |
|
str: The generated response from the language model. |
|
""" |
|
try: |
|
|
|
if isinstance(prompt, (dict, list)): |
|
prompt_str = str(prompt) |
|
else: |
|
prompt_str = str(prompt) |
|
|
|
|
|
completion = self.client.chat.completions.create( |
|
model=self.model_name, |
|
messages=[{ |
|
"role": "user", |
|
"content": prompt_str |
|
}], |
|
temperature=0.7, |
|
max_tokens=1500, |
|
stream=False |
|
) |
|
|
|
return completion.choices[0].message.content if completion.choices else "Error: No response generated" |
|
|
|
except Exception as e: |
|
error_msg = f"Error generating response: {str(e)}" |
|
print(error_msg) |
|
return error_msg |
|
|
|
|
|
|
|
|
|
class DataAnalysisAgent(CodeAgent): |
|
"""Extended CodeAgent with dataset awareness and predictive analytics capabilities.""" |
|
|
|
def __init__(self, dataset: pd.DataFrame, *args, **kwargs): |
|
""" |
|
Initialize the DataAnalysisAgent with the provided dataset. |
|
|
|
Args: |
|
dataset (pd.DataFrame): The dataset to analyze. |
|
*args: Variable length argument list. |
|
**kwargs: Arbitrary keyword arguments. |
|
""" |
|
super().__init__(*args, **kwargs) |
|
self._dataset = dataset |
|
self.models = {} |
|
|
|
@property |
|
def dataset(self) -> pd.DataFrame: |
|
"""Access the stored dataset. |
|
|
|
Returns: |
|
pd.DataFrame: The dataset stored in the agent. |
|
""" |
|
return self._dataset |
|
|
|
def run(self, prompt: str) -> str: |
|
""" |
|
Override the run method to include dataset context and support predictive tasks. |
|
|
|
Args: |
|
prompt (str): The task prompt for analysis. |
|
|
|
Returns: |
|
str: The result of the analysis. |
|
""" |
|
dataset_info = f""" |
|
Dataset Shape: {self.dataset.shape} |
|
Columns: {', '.join(self.dataset.columns)} |
|
Data Types: {self.dataset.dtypes.to_dict()} |
|
""" |
|
enhanced_prompt = f""" |
|
Analyze the following dataset: |
|
{dataset_info} |
|
|
|
Task: {prompt} |
|
|
|
Use the provided tools to analyze this specific dataset and return detailed results. |
|
""" |
|
return super().run(enhanced_prompt) |
|
|
|
|
|
|
|
|
|
|
|
@tool |
|
def analyze_basic_stats(data: Optional[pd.DataFrame] = None) -> str: |
|
""" |
|
Calculate and visualize basic statistical measures for numerical columns. |
|
|
|
This function computes fundamental statistical metrics including mean, median, |
|
standard deviation, skewness, and counts of missing values for all numerical |
|
columns in the provided DataFrame. It also generates a bar chart visualizing |
|
the mean, median, and standard deviation for each numerical feature. |
|
|
|
Args: |
|
data (Optional[pd.DataFrame]): |
|
A pandas DataFrame containing the dataset to analyze. |
|
If None, the agent's stored dataset will be used. |
|
The DataFrame should contain at least one numerical column |
|
for meaningful analysis. |
|
|
|
Returns: |
|
str: A markdown-formatted string containing the statistics and the generated plot. |
|
""" |
|
if data is None: |
|
data = tool.agent.dataset |
|
|
|
stats = {} |
|
numeric_cols = data.select_dtypes(include=[np.number]).columns |
|
|
|
for col in numeric_cols: |
|
stats[col] = { |
|
'mean': float(data[col].mean()), |
|
'median': float(data[col].median()), |
|
'std': float(data[col].std()), |
|
'skew': float(data[col].skew()), |
|
'missing': int(data[col].isnull().sum()) |
|
} |
|
|
|
|
|
stats_df = pd.DataFrame(stats).T |
|
stats_df.reset_index(inplace=True) |
|
stats_df.rename(columns={'index': 'Feature'}, inplace=True) |
|
|
|
|
|
fig, ax = plt.subplots(figsize=(10, 6)) |
|
stats_df.set_index('Feature')[['mean', 'median', 'std']].plot(kind='bar', ax=ax) |
|
plt.title('Basic Statistics') |
|
plt.ylabel('Values') |
|
plt.tight_layout() |
|
|
|
|
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png') |
|
plt.close() |
|
stats_plot = base64.b64encode(buf.getvalue()).decode() |
|
|
|
return f"### Basic Statistics\n{stats_df.to_markdown()} \n\n" |
|
|
|
@tool |
|
def generate_correlation_matrix(data: Optional[pd.DataFrame] = None) -> str: |
|
""" |
|
Generate an interactive correlation matrix using Plotly. |
|
|
|
This function creates an interactive heatmap visualization showing the correlations between |
|
all numerical columns in the dataset. Users can hover over cells to see correlation values |
|
and interact with the plot (zoom, pan). |
|
|
|
Args: |
|
data (Optional[pd.DataFrame]): |
|
A pandas DataFrame containing the dataset to analyze. |
|
If None, the agent's stored dataset will be used. |
|
The DataFrame should contain at least two numerical columns |
|
for correlation analysis. |
|
|
|
Returns: |
|
str: An HTML string representing the interactive correlation matrix plot. |
|
""" |
|
if data is None: |
|
data = tool.agent.dataset |
|
|
|
numeric_data = data.select_dtypes(include=[np.number]) |
|
corr = numeric_data.corr() |
|
|
|
fig = px.imshow(corr, |
|
text_auto=True, |
|
aspect="auto", |
|
color_continuous_scale='RdBu', |
|
title='Correlation Matrix') |
|
|
|
fig.update_layout(width=800, height=600) |
|
|
|
|
|
correlation_html = fig.to_html(full_html=False) |
|
|
|
return correlation_html |
|
|
|
@tool |
|
def analyze_categorical_columns(data: Optional[pd.DataFrame] = None) -> str: |
|
""" |
|
Analyze categorical columns with visualizations. |
|
|
|
This function examines categorical columns to identify unique values, top categories, |
|
and missing value counts. It also generates bar charts for the top 5 categories in each |
|
categorical feature. |
|
|
|
Args: |
|
data (Optional[pd.DataFrame]): |
|
A pandas DataFrame containing the dataset to analyze. |
|
If None, the agent's stored dataset will be used. |
|
The DataFrame should contain at least one categorical column |
|
for meaningful analysis. |
|
|
|
Returns: |
|
str: A markdown-formatted string containing analysis results and embedded plots. |
|
""" |
|
if data is None: |
|
data = tool.agent.dataset |
|
|
|
categorical_cols = data.select_dtypes(include=['object', 'category']).columns |
|
analysis = {} |
|
plots = "" |
|
|
|
for col in categorical_cols: |
|
unique_vals = data[col].nunique() |
|
top_categories = data[col].value_counts().head(5).to_dict() |
|
missing = data[col].isnull().sum() |
|
|
|
analysis[col] = { |
|
'unique_values': int(unique_vals), |
|
'top_categories': top_categories, |
|
'missing': int(missing) |
|
} |
|
|
|
|
|
fig, ax = plt.subplots(figsize=(8, 4)) |
|
sns.countplot(data=data, x=col, order=data[col].value_counts().iloc[:5].index, ax=ax) |
|
plt.title(f'Top 5 Categories in {col}') |
|
plt.xticks(rotation=45) |
|
plt.tight_layout() |
|
|
|
buf = io.BytesIO() |
|
plt.savefig(buf, format='png') |
|
plt.close() |
|
plot_img = base64.b64encode(buf.getvalue()).decode() |
|
|
|
plots += f"### {col}\n" |
|
plots += f"- **Unique Values:** {unique_vals}\n" |
|
plots += f"- **Missing Values:** {missing}\n" |
|
plots += f"- **Top Categories:** {top_categories}\n" |
|
plots += f"\n\n" |
|
|
|
return plots + f"### Categorical Columns Analysis\n{pd.DataFrame(analysis).T.to_markdown()}" |
|
|
|
@tool |
|
def suggest_features(data: Optional[pd.DataFrame] = None) -> str: |
|
""" |
|
Suggest potential feature engineering steps based on data characteristics. |
|
|
|
This function analyzes the dataset's structure and statistical properties to |
|
recommend possible feature engineering steps that could improve model performance. |
|
|
|
Args: |
|
data (Optional[pd.DataFrame]): |
|
A pandas DataFrame containing the dataset to analyze. |
|
If None, the agent's stored dataset will be used. |
|
The DataFrame can contain both numerical and categorical columns. |
|
|
|
Returns: |
|
str: A string containing suggestions for feature engineering based on |
|
the characteristics of the input data. |
|
""" |
|
if data is None: |
|
data = tool.agent.dataset |
|
|
|
suggestions = [] |
|
numeric_cols = data.select_dtypes(include=[np.number]).columns |
|
categorical_cols = data.select_dtypes(include=['object', 'category']).columns |
|
|
|
|
|
if len(numeric_cols) >= 2: |
|
suggestions.append("โข **Interaction Terms:** Consider creating interaction terms between numerical features to capture combined effects.") |
|
|
|
|
|
if len(categorical_cols) > 0: |
|
suggestions.append("โข **One-Hot Encoding:** Apply one-hot encoding to categorical variables to convert them into numerical format.") |
|
suggestions.append("โข **Label Encoding:** For ordinal categorical variables, consider label encoding to maintain order information.") |
|
|
|
|
|
for col in numeric_cols: |
|
if data[col].skew() > 1 or data[col].skew() < -1: |
|
suggestions.append(f"โข **Log Transformation:** Apply log transformation to `{col}` to reduce skewness and stabilize variance.") |
|
|
|
|
|
for col in data.columns: |
|
if data[col].isnull().sum() > 0: |
|
suggestions.append(f"โข **Imputation:** Consider imputing missing values in `{col}` using mean, median, or advanced imputation techniques.") |
|
|
|
|
|
suggestions.append("โข **Feature Scaling:** Apply feature scaling (Standardization or Normalization) to numerical features to ensure uniformity.") |
|
|
|
return "\n".join(suggestions) |
|
|
|
@tool |
|
def predictive_analysis(data: Optional[pd.DataFrame] = None, target: Optional[str] = None) -> str: |
|
""" |
|
Perform predictive analytics by training a classification model. |
|
|
|
This function builds a classification model using Random Forest, evaluates its performance, |
|
and provides detailed metrics and visualizations such as the confusion matrix and ROC curve. |
|
|
|
Args: |
|
data (Optional[pd.DataFrame]): |
|
A pandas DataFrame containing the dataset to analyze. |
|
If None, the agent's stored dataset will be used. |
|
The DataFrame should contain the target variable for prediction. |
|
target (Optional[str]): |
|
The name of the target variable column in the dataset. |
|
If None, the agent must provide the target variable through the prompt. |
|
|
|
Returns: |
|
str: A markdown-formatted string containing the classification report, confusion matrix, |
|
ROC curve, AUC score, and a unique Model ID. |
|
""" |
|
if data is None: |
|
data = tool.agent.dataset |
|
|
|
if target is None or target not in data.columns: |
|
return f"Error: Target column not specified or `{target}` not found in the dataset." |
|
|
|
|
|
if data[target].dtype == 'object' or data[target].dtype.name == 'category': |
|
data[target] = data[target].astype('category').cat.codes |
|
|
|
|
|
data = data.dropna(subset=[target]) |
|
|
|
|
|
X = data.drop(columns=[target]) |
|
y = data[target] |
|
|
|
|
|
X = X.fillna(X.median()) |
|
|
|
|
|
X = pd.get_dummies(X, drop_first=True) |
|
|
|
|
|
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) |
|
|
|
|
|
from sklearn.ensemble import RandomForestClassifier |
|
clf = RandomForestClassifier(n_estimators=100, random_state=42) |
|
clf.fit(X_train, y_train) |
|
|
|
|
|
y_pred = clf.predict(X_test) |
|
y_proba = clf.predict_proba(X_test)[:,1] |
|
|
|
|
|
report = classification_report(y_test, y_pred, output_dict=True) |
|
report_df = pd.DataFrame(report).transpose() |
|
|
|
|
|
cm = confusion_matrix(y_test, y_pred) |
|
fig_cm = px.imshow(cm, text_auto=True, labels=dict(x="Predicted", y="Actual", color="Count"), |
|
x=["Negative", "Positive"], y=["Negative", "Positive"], |
|
title="Confusion Matrix") |
|
|
|
|
|
fpr, tpr, thresholds = roc_curve(y_test, y_proba) |
|
roc_auc = auc(fpr, tpr) |
|
fig_roc = go.Figure() |
|
fig_roc.add_trace(go.Scatter(x=fpr, y=tpr, mode='lines', name=f'ROC Curve (AUC = {roc_auc:.2f})')) |
|
fig_roc.add_trace(go.Scatter(x=[0,1], y=[0,1], mode='lines', name='Random Guess', line=dict(dash='dash'))) |
|
fig_roc.update_layout(title='Receiver Operating Characteristic (ROC) Curve', |
|
xaxis_title='False Positive Rate', |
|
yaxis_title='True Positive Rate') |
|
|
|
|
|
model_id = str(uuid.uuid4()) |
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.joblib') as tmp_model_file: |
|
joblib.dump(clf, tmp_model_file.name) |
|
|
|
tool.agent.models[model_id] = clf |
|
|
|
|
|
cm_html = fig_cm.to_html(full_html=False) |
|
roc_html = fig_roc.to_html(full_html=False) |
|
|
|
|
|
summary = f""" |
|
### Predictive Analytics Report for Target: `{target}` |
|
|
|
**Model Used:** Random Forest Classifier |
|
|
|
**Classification Report:** |
|
{report_df.to_markdown()} |
|
|
|
**Confusion Matrix:** |
|
{cm_html} |
|
|
|
**ROC Curve:** |
|
{roc_html} |
|
|
|
**AUC Score:** {roc_auc:.2f} |
|
|
|
**Model ID:** `{model_id}` |
|
|
|
*You can use this Model ID to retrieve or update the model in future analyses.* |
|
""" |
|
|
|
return summary |
|
|
|
|
|
|
|
|
|
def export_report(content: str, filename: str): |
|
""" |
|
Export the given content as a PDF report. |
|
|
|
This function converts markdown content into a PDF file using pdfkit and provides |
|
a download button for users to obtain the report. |
|
|
|
Args: |
|
content (str): The markdown content to be included in the PDF report. |
|
filename (str): The desired name for the exported PDF file. |
|
|
|
Returns: |
|
None |
|
""" |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.html') as tmp_file: |
|
tmp_file.write(content.encode('utf-8')) |
|
tmp_file_path = tmp_file.name |
|
|
|
|
|
pdf_path = f"{filename}.pdf" |
|
|
|
|
|
try: |
|
|
|
config = pdfkit.configuration() |
|
pdfkit.from_file(tmp_file_path, pdf_path, configuration=config) |
|
with open(pdf_path, "rb") as pdf_file: |
|
PDFbyte = pdf_file.read() |
|
|
|
|
|
st.download_button(label="๐ฅ Download Report as PDF", |
|
data=PDFbyte, |
|
file_name=pdf_path, |
|
mime='application/octet-stream') |
|
except Exception as e: |
|
st.error(f"โ ๏ธ Error exporting report: {str(e)}") |
|
finally: |
|
os.remove(tmp_file_path) |
|
if os.path.exists(pdf_path): |
|
os.remove(pdf_path) |
|
|
|
|
|
|
|
|
|
def main(): |
|
st.set_page_config(page_title="๐ Business Intelligence Assistant", layout="wide") |
|
st.title("๐ **Business Intelligence Assistant**") |
|
st.write("Upload your dataset and receive comprehensive analyses, interactive visualizations, and predictive insights.") |
|
|
|
|
|
if 'data' not in st.session_state: |
|
st.session_state['data'] = None |
|
if 'agent' not in st.session_state: |
|
st.session_state['agent'] = None |
|
if 'report_content' not in st.session_state: |
|
st.session_state['report_content'] = "" |
|
|
|
|
|
uploaded_file = st.file_uploader("๐ฅ **Upload a CSV file**", type="csv") |
|
|
|
try: |
|
if uploaded_file is not None: |
|
with st.spinner('๐ Loading and processing your data...'): |
|
|
|
data = pd.read_csv(uploaded_file) |
|
st.session_state['data'] = data |
|
|
|
|
|
st.session_state['agent'] = DataAnalysisAgent( |
|
dataset=data, |
|
tools=[analyze_basic_stats, generate_correlation_matrix, |
|
analyze_categorical_columns, suggest_features, predictive_analysis], |
|
model=GroqLLM(), |
|
additional_authorized_imports=["pandas", "numpy", "matplotlib", "seaborn", "plotly"] |
|
) |
|
|
|
st.success(f"โ
Successfully loaded dataset with {data.shape[0]} rows and {data.shape[1]} columns") |
|
st.subheader("๐ **Data Preview**") |
|
st.dataframe(data.head()) |
|
|
|
if st.session_state['data'] is not None: |
|
|
|
st.sidebar.header("๐ ๏ธ **Select Analysis Type**") |
|
analysis_type = st.sidebar.selectbox( |
|
"Choose analysis type", |
|
["Basic Statistics", "Correlation Analysis", "Categorical Analysis", |
|
"Feature Engineering", "Predictive Analytics", "Custom Question"] |
|
) |
|
|
|
if analysis_type == "Basic Statistics": |
|
with st.spinner('๐ Analyzing basic statistics...'): |
|
result = st.session_state['agent'].run( |
|
"Use the analyze_basic_stats tool to analyze this dataset and " |
|
"provide insights about the numerical distributions." |
|
) |
|
st.markdown(result, unsafe_allow_html=True) |
|
st.session_state['report_content'] += result + "\n\n" |
|
|
|
elif analysis_type == "Correlation Analysis": |
|
with st.spinner('๐ Generating correlation matrix...'): |
|
result = st.session_state['agent'].run( |
|
"Use the generate_correlation_matrix tool to analyze correlations " |
|
"and explain any strong relationships found." |
|
) |
|
st.components.v1.html(result, height=600) |
|
st.session_state['report_content'] += "### Correlation Analysis\n" + result + "\n\n" |
|
|
|
elif analysis_type == "Categorical Analysis": |
|
with st.spinner('๐ Analyzing categorical columns...'): |
|
result = st.session_state['agent'].run( |
|
"Use the analyze_categorical_columns tool to examine the " |
|
"categorical variables and explain the distributions." |
|
) |
|
st.markdown(result, unsafe_allow_html=True) |
|
st.session_state['report_content'] += result + "\n\n" |
|
|
|
elif analysis_type == "Feature Engineering": |
|
with st.spinner('๐ง Generating feature suggestions...'): |
|
result = st.session_state['agent'].run( |
|
"Use the suggest_features tool to recommend potential " |
|
"feature engineering steps for this dataset." |
|
) |
|
st.markdown(result, unsafe_allow_html=True) |
|
st.session_state['report_content'] += result + "\n\n" |
|
|
|
elif analysis_type == "Predictive Analytics": |
|
with st.form("Predictive Analytics Form"): |
|
st.write("๐ฎ **Predictive Analytics**") |
|
target = st.selectbox("Select the target variable for prediction:", options=st.session_state['data'].columns) |
|
submit = st.form_submit_button("๐ Run Predictive Analysis") |
|
|
|
if submit: |
|
with st.spinner('๐ Performing predictive analysis...'): |
|
result = st.session_state['agent'].run( |
|
f"Use the predictive_analysis tool to build a classification model with `{target}` as the target variable." |
|
) |
|
st.markdown(result, unsafe_allow_html=True) |
|
st.session_state['report_content'] += result + "\n\n" |
|
export_report(result, "Predictive_Analysis_Report") |
|
|
|
elif analysis_type == "Custom Question": |
|
with st.expander("๐ **Ask a Custom Question**"): |
|
question = st.text_input("What would you like to know about your data?") |
|
if st.button("๐ Get Answer"): |
|
if question: |
|
with st.spinner('๐ง Processing your question...'): |
|
result = st.session_state['agent'].run(question) |
|
st.markdown(result, unsafe_allow_html=True) |
|
st.session_state['report_content'] += f"### Custom Question: {question}\n{result}\n\n" |
|
else: |
|
st.warning("Please enter a question.") |
|
|
|
|
|
if st.session_state['report_content']: |
|
st.sidebar.markdown("---") |
|
if st.sidebar.button("๐ค **Export Analysis Report**"): |
|
export_report(st.session_state['report_content'], "Business_Intelligence_Report") |
|
st.sidebar.success("โ
Report exported successfully!") |
|
|
|
except Exception as e: |
|
st.error(f"โ ๏ธ An error occurred: {str(e)}") |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|