File size: 5,194 Bytes
ff484e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1df2fec
 
 
 
 
 
ff484e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import gradio as gr
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, accuracy_score
import torch
from torch import nn
from torch.autograd import Variable

# GAN-based anomaly detection for financial analysis
class GANRiskAnalyzer:
    def __init__(self, input_dim, hidden_dim, output_dim):
        self.generator = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, output_dim),
            nn.Tanh()
        )
        self.discriminator = nn.Sequential(
            nn.Linear(output_dim, hidden_dim),
            nn.LeakyReLU(0.2),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )
        self.loss = nn.BCELoss()
        self.generator_optimizer = torch.optim.Adam(self.generator.parameters(), lr=0.0002)
        self.discriminator_optimizer = torch.optim.Adam(self.discriminator.parameters(), lr=0.0002)

    def train(self, data, epochs=100):
        real_labels = Variable(torch.ones(data.size(0), 1))
        fake_labels = Variable(torch.zeros(data.size(0), 1))
        for epoch in range(epochs):
            # Train Discriminator
            self.discriminator_optimizer.zero_grad()
            real_data = Variable(data)
            real_output = self.discriminator(real_data)
            real_loss = self.loss(real_output, real_labels)

            z = Variable(torch.randn(data.size(0), data.size(1)))
            fake_data = self.generator(z)
            fake_output = self.discriminator(fake_data.detach())
            fake_loss = self.loss(fake_output, fake_labels)

            d_loss = real_loss + fake_loss
            d_loss.backward()
            self.discriminator_optimizer.step()

            # Train Generator
            self.generator_optimizer.zero_grad()
            fake_output = self.discriminator(fake_data)
            g_loss = self.loss(fake_output, real_labels)
            g_loss.backward()
            self.generator_optimizer.step()

    def generate(self, n_samples, input_dim):
        z = Variable(torch.randn(n_samples, input_dim))
        generated_data = self.generator(z)
        return generated_data.detach().numpy()

# Risk Analysis
def analyze_financial_data(file):
    try:
        # Attempt to read the CSV file
        data = pd.read_csv(file, encoding="utf-8")
    except UnicodeDecodeError:
        # Fallback for files with non-UTF-8 encodings
        data = pd.read_csv(file, encoding="ISO-8859-1")
    
    # Check required columns
    required_columns = ["Revenue", "Profit", "Loss", "Expenses", "Risk_Level"]
    if not all(column in data.columns for column in required_columns):
        return "The uploaded CSV must include these columns: Revenue, Profit, Loss, Expenses, Risk_Level."

    # Data Preprocessing
    X = data[["Revenue", "Profit", "Loss", "Expenses"]]
    y = data["Risk_Level"]
    
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # Dimensionality Reduction
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(X_scaled)
    
    # Train-Test Split
    X_train, X_test, y_train, y_test = train_test_split(X_pca, y, test_size=0.2, random_state=42)
    
    # Gradient Boosting Classifier
    model = GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, max_depth=5)
    model.fit(X_train, y_train)
    y_pred = model.predict(X_test)
    
    accuracy = accuracy_score(y_test, y_pred)
    report = classification_report(y_test, y_pred, output_dict=True)

    # GAN-based Anomaly Detection
    gan = GANRiskAnalyzer(input_dim=X_pca.shape[1], hidden_dim=128, output_dim=X_pca.shape[1])
    gan.train(torch.tensor(X_pca, dtype=torch.float32), epochs=200)
    anomalies = gan.generate(n_samples=5, input_dim=X_pca.shape[1])

    # Analysis Insights
    total_revenue = data["Revenue"].sum()
    total_profit = data["Profit"].sum()
    total_loss = data["Loss"].sum()

    insights = {
        "Accuracy": f"{accuracy * 100:.2f}%",
        "Classification Report": report,
        "Generated Anomalies (GAN)": anomalies.tolist(),
        "Financial Summary": {
            "Total Revenue": f"${total_revenue:,.2f}",
            "Total Profit": f"${total_profit:,.2f}",
            "Total Loss": f"${total_loss:,.2f}",
            "Net Balance": f"${(total_revenue - total_loss):,.2f}"
        }
    }
    return insights

with gr.Blocks(theme=gr.themes.Monochrome()) as interface:
    gr.Markdown("# **AI Risk Analyst Agent**")
    gr.Markdown(
        "Analyze your financial risks and identify anomalies using advanced AI models. Upload financial data in CSV format to get started."
    )
    
    with gr.Row():
        with gr.Column():
            data_file = gr.File(label="Upload Financial Data (CSV)")
            submit_button = gr.Button("Analyze")
        with gr.Column():
            output = gr.JSON(label="Risk Analysis Insights")

    submit_button.click(analyze_financial_data, inputs=[data_file], outputs=output)

interface.launch()