lucifer7210's picture
Create app.py
75ea76e verified
import pandas as pd
import numpy as np
import yfinance as yf
import gradio as gr
from datetime import datetime, timedelta
import warnings
import logging
from typing import List, Dict, Tuple
import os
import json
# Hugging Face and LangChain imports
from langchain.docstore.document import Document
from langchain_huggingface import HuggingFaceEmbeddings, HuggingFacePipeline
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import transformers
from transformers import AutoTokenizer
warnings.filterwarnings('ignore')
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MutualFundRAG:
"""RAG system for mutual fund portfolio optimization with LLM"""
def __init__(self):
# Popular mutual fund tickers
self.fund_tickers = [
'VTIAX', # Vanguard Total International Stock Index
'VTSAX', # Vanguard Total Stock Market Index
'VBTLX', # Vanguard Total Bond Market Index
'VTBLX', # Vanguard Total International Bond Index
'VGIAX', # Vanguard Growth Index
'VIMAX', # Vanguard Mid-Cap Index
'VSMAX', # Vanguard Small-Cap Index
'VGSLX', # Vanguard Real Estate Index
'VHDYX', # Vanguard High Dividend Yield Index
'VTAPX' # Vanguard Target Retirement 2065
]
# Additional popular funds
self.extended_tickers = [
'FXNAX', # Fidelity US Bond Index
'FSKAX', # Fidelity Total Market Index
'FTIHX', # Fidelity Total International Index
'SPY', # SPDR S&P 500 ETF
'QQQ', # Invesco QQQ Trust
'VTI', # Vanguard Total Stock Market ETF
'BND', # Vanguard Total Bond Market ETF
]
self.fund_data = None
self.embeddings = None
self.vectorstore = None
self.qa_chain = None
self.llm = None
# Market indicators
self.market_indicators = {}
# User profile
self.user_profile = {
'risk_tolerance': 'moderate',
'investment_amount': 50000,
'investment_horizon': 5,
'preferred_sectors': []
}
def initialize_llm(self, model_name="Qwen/Qwen3-0.6B-Base"):
"""Initialize the LLM for RAG system"""
try:
logger.info(f"Initializing LLM: {model_name}")
# Initialize tokenizer and model
tokenizer = AutoTokenizer.from_pretrained(model_name)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
model = transformers.AutoModelForCausalLM.from_pretrained(
model_name,
device_map="auto",
torch_dtype="auto"
)
# Create pipeline
pipeline = transformers.pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
max_new_tokens=512,
temperature=0.7,
do_sample=True,
pad_token_id=tokenizer.eos_token_id
)
self.llm = HuggingFacePipeline(pipeline=pipeline)
logger.info("LLM initialized successfully")
return "βœ… LLM initialized successfully"
except Exception as e:
logger.error(f"Error initializing LLM: {e}")
return f"❌ Error initializing LLM: {str(e)}"
def fetch_fund_data(self, tickers: List[str] = None, period: str = '1y') -> pd.DataFrame:
"""Fetch real mutual fund data from Yahoo Finance"""
if tickers is None:
tickers = self.fund_tickers
fund_data = []
logger.info("Fetching mutual fund data from Yahoo Finance...")
for ticker in tickers:
try:
fund = yf.Ticker(ticker)
hist = fund.history(period=period)
info = fund.info
if hist.empty:
continue
# Calculate metrics
returns = hist['Close'].pct_change().dropna()
avg_return = returns.mean() * 252 # Annualized
volatility = returns.std() * np.sqrt(252) # Annualized
sharpe_ratio = avg_return / volatility if volatility != 0 else 0
# Get latest NAV
latest_nav = hist['Close'].iloc[-1]
# Risk categorization
if volatility < 0.1:
risk_level = 'Low'
elif volatility < 0.2:
risk_level = 'Medium'
else:
risk_level = 'High'
# Get fund information
fund_name = info.get('longName', ticker)
category = info.get('category', 'Unknown')
expense_ratio = info.get('annualReportExpenseRatio', np.nan)
# Estimate sector exposure (simplified)
sector_exposure = self.estimate_sector_exposure(fund_name, category)
fund_data.append({
'Ticker': ticker,
'Name': fund_name[:50] + '...' if len(fund_name) > 50 else fund_name,
'Category': category,
'NAV': round(latest_nav, 2),
'Annual_Return_%': round(avg_return * 100, 2),
'Volatility_%': round(volatility * 100, 2),
'Sharpe_Ratio': round(sharpe_ratio, 3),
'Risk_Level': risk_level,
'Expense_Ratio_%': round(expense_ratio * 100, 2) if not np.isnan(expense_ratio) else 'N/A',
**sector_exposure
})
logger.info(f"Successfully fetched data for {ticker}")
except Exception as e:
logger.error(f"Error fetching {ticker}: {e}")
continue
self.fund_data = pd.DataFrame(fund_data)
return self.fund_data
def estimate_sector_exposure(self, fund_name: str, category: str) -> Dict:
"""Estimate sector exposure based on fund type"""
sector_exposure = {
'Technology_%': 0,
'Healthcare_%': 0,
'Finance_%': 0,
'Energy_%': 0,
'Consumer_%': 0,
'Real_Estate_%': 0
}
fund_name_lower = fund_name.lower()
category_lower = category.lower()
if 'technology' in fund_name_lower or 'tech' in fund_name_lower:
sector_exposure['Technology_%'] = np.random.uniform(60, 90)
elif 'real estate' in fund_name_lower or 'reit' in fund_name_lower:
sector_exposure['Real_Estate_%'] = np.random.uniform(70, 95)
elif 'total' in fund_name_lower or 'market' in fund_name_lower:
# Diversified fund
total = 100
for sector in sector_exposure.keys():
if total > 0:
allocation = np.random.uniform(10, 25)
allocation = min(allocation, total)
sector_exposure[sector] = round(allocation, 1)
total -= allocation
else:
# Random allocation for other funds
remaining = 100
sectors = list(sector_exposure.keys())
for i, sector in enumerate(sectors[:-1]):
if remaining > 0:
allocation = np.random.uniform(0, min(30, remaining))
sector_exposure[sector] = round(allocation, 1)
remaining -= allocation
sector_exposure[sectors[-1]] = round(remaining, 1)
return sector_exposure
def get_market_indicators(self) -> Dict:
"""Fetch current market indicators"""
try:
# Fetch 10-year treasury yield
treasury = yf.Ticker("^TNX")
treasury_hist = treasury.history(period="5d")
interest_rate = treasury_hist['Close'].iloc[-1] if not treasury_hist.empty else 3.5
# VIX for market volatility
vix = yf.Ticker("^VIX")
vix_hist = vix.history(period="5d")
market_volatility = vix_hist['Close'].iloc[-1] if not vix_hist.empty else 20
self.market_indicators = {
'Interest_Rate_%': round(interest_rate, 2),
'Inflation_Rate_%': 3.2, # Static for demo
'Market_Volatility_VIX': round(market_volatility, 2),
'Last_Updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
return self.market_indicators
except Exception as e:
logger.error(f"Error fetching market indicators: {e}")
return {
'Interest_Rate_%': 3.5,
'Inflation_Rate_%': 3.2,
'Market_Volatility_VIX': 20.0,
'Last_Updated': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
def prepare_documents(self) -> List[Document]:
"""Convert fund data to documents for ChromaDB"""
if self.fund_data is None or self.fund_data.empty:
return []
documents = []
for _, row in self.fund_data.iterrows():
content = f"""
Fund: {row['Ticker']} - {row['Name']}
Category: {row['Category']}
NAV: ${row['NAV']}
Annual Return: {row['Annual_Return_%']}%
Volatility: {row['Volatility_%']}%
Sharpe Ratio: {row['Sharpe_Ratio']}
Risk Level: {row['Risk_Level']}
Expense Ratio: {row['Expense_Ratio_%']}%
Sector Allocation - Technology: {row['Technology_%']}%, Healthcare: {row['Healthcare_%']}%,
Finance: {row['Finance_%']}%, Energy: {row['Energy_%']}%,
Consumer: {row['Consumer_%']}%, Real Estate: {row['Real_Estate_%']}%
Market Context - Interest Rate: {self.market_indicators.get('Interest_Rate_%', 'N/A')}%,
Inflation: {self.market_indicators.get('Inflation_Rate_%', 'N/A')}%,
VIX: {self.market_indicators.get('Market_Volatility_VIX', 'N/A')}
"""
documents.append(Document(page_content=content.strip()))
return documents
def setup_rag_system(self):
"""Setup the complete RAG system"""
try:
logger.info("Setting up RAG system...")
# Initialize embeddings
self.embeddings = HuggingFaceEmbeddings(
model_name="sentence-transformers/all-MiniLM-L6-v2"
)
# Prepare documents
documents = self.prepare_documents()
if not documents:
return "❌ No documents to process. Please fetch fund data first."
# Setup ChromaDB
self.vectorstore = Chroma.from_documents(
documents=documents,
collection_name="mutual_fund_optimization",
embedding=self.embeddings,
persist_directory="./mutual_fund_db"
)
# Setup QA chain if LLM is available
if self.llm is not None:
template = """
You are a financial advisor specializing in mutual fund portfolio optimization.
Based on the following mutual fund data, provide specific investment recommendations.
Context: {context}
Question: {question}
Please provide:
1. Recommended portfolio allocation percentages
2. Risk assessment based on the user's profile
3. Expected returns analysis
4. Sector diversification recommendations
5. Specific fund recommendations with rationale
Keep your response concise and actionable.
Answer:
"""
prompt = PromptTemplate(
input_variables=["context", "question"],
template=template
)
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type="stuff",
retriever=self.vectorstore.as_retriever(search_kwargs={"k": 5}),
chain_type_kwargs={"prompt": prompt}
)
logger.info("RAG system setup complete")
return "βœ… RAG system initialized successfully"
except Exception as e:
logger.error(f"Error setting up RAG system: {e}")
return f"❌ Error setting up RAG system: {str(e)}"
def get_ai_recommendations(self, user_query: str) -> str:
"""Get AI-powered investment recommendations"""
try:
if self.qa_chain is None:
return "❌ AI system not initialized. Please setup the RAG system first."
# Add user profile context to query
contextual_query = f"""
User Profile:
- Risk Tolerance: {self.user_profile['risk_tolerance']}
- Investment Amount: ${self.user_profile['investment_amount']:,}
- Investment Horizon: {self.user_profile['investment_horizon']} years
Market Context:
- Interest Rate: {self.market_indicators.get('Interest_Rate_%', 'N/A')}%
- Market Volatility (VIX): {self.market_indicators.get('Market_Volatility_VIX', 'N/A')}
User Question: {user_query}
"""
logger.info("Generating AI recommendations...")
result = self.qa_chain({"query": contextual_query})
return result.get('result', 'No recommendation generated')
except Exception as e:
logger.error(f"Error getting AI recommendations: {e}")
return f"❌ Error generating recommendations: {str(e)}"
def calculate_portfolio_metrics(self, selected_funds: List[str], weights: List[float]) -> Dict:
"""Calculate portfolio-level metrics"""
try:
# Fetch historical data for selected funds
tickers_str = ' '.join(selected_funds)
data = yf.download(tickers_str, period='1y', progress=False)['Close']
if data.empty:
return {"error": "No data available for selected funds"}
# Calculate returns
returns = data.pct_change().dropna()
# Portfolio returns
weights = np.array(weights) / np.sum(weights) # Normalize weights
portfolio_returns = returns.dot(weights)
# Portfolio metrics
annual_return = portfolio_returns.mean() * 252
annual_volatility = portfolio_returns.std() * np.sqrt(252)
sharpe_ratio = annual_return / annual_volatility if annual_volatility != 0 else 0
# Risk metrics
var_95 = np.percentile(portfolio_returns, 5)
max_drawdown = self.calculate_max_drawdown(portfolio_returns)
return {
'Annual Return (%)': round(annual_return * 100, 2),
'Annual Volatility (%)': round(annual_volatility * 100, 2),
'Sharpe Ratio': round(sharpe_ratio, 3),
'VaR (95%)': round(var_95 * 100, 2),
'Max Drawdown (%)': round(max_drawdown * 100, 2)
}
except Exception as e:
return {"error": f"Error calculating portfolio metrics: {str(e)}"}
def calculate_max_drawdown(self, returns: pd.Series) -> float:
"""Calculate maximum drawdown"""
cumulative = (1 + returns).cumprod()
rolling_max = cumulative.expanding().max()
drawdowns = (cumulative - rolling_max) / rolling_max
return drawdowns.min()
# Initialize the RAG system
rag_system = MutualFundRAG()
def initialize_system():
"""Initialize the complete system"""
try:
# Initialize LLM
llm_status = rag_system.initialize_llm()
# Fetch market indicators
rag_system.get_market_indicators()
return llm_status
except Exception as e:
return f"❌ Error initializing system: {str(e)}"
def fetch_data_interface(include_extended: bool = False):
"""Interface function to fetch fund data"""
try:
tickers = rag_system.fund_tickers + (rag_system.extended_tickers if include_extended else [])
df = rag_system.fetch_fund_data(tickers)
if df.empty:
return "❌ No data fetched. Please check your internet connection.", None
# Setup RAG system after fetching data
rag_status = rag_system.setup_rag_system()
status = f"βœ… Successfully fetched data for {len(df)} funds\n{rag_status}"
return status, df
except Exception as e:
return f"❌ Error fetching data: {str(e)}", None
def get_ai_recommendation_interface(user_query: str, risk_tolerance: str, investment_amount: float, horizon: int):
"""Interface function for AI recommendations"""
try:
if not user_query.strip():
return "❌ Please enter a question about your investment needs."
# Update user profile
rag_system.user_profile.update({
'risk_tolerance': risk_tolerance.lower(),
'investment_amount': investment_amount,
'investment_horizon': horizon
})
# Get AI recommendations
recommendation = rag_system.get_ai_recommendations(user_query)
return recommendation
except Exception as e:
return f"❌ Error getting AI recommendations: {str(e)}"
def calculate_metrics_interface(selected_funds_text: str, weights_text: str):
"""Interface function to calculate portfolio metrics"""
try:
if not selected_funds_text.strip() or not weights_text.strip():
return "Please provide both fund tickers and weights"
# Parse inputs
selected_funds = [ticker.strip().upper() for ticker in selected_funds_text.split(',')]
weights = [float(w.strip()) for w in weights_text.split(',')]
if len(selected_funds) != len(weights):
return "Number of funds and weights must match"
metrics = rag_system.calculate_portfolio_metrics(selected_funds, weights)
if 'error' in metrics:
return metrics['error']
# Format metrics for display
formatted_metrics = "\n".join([f"{key}: {value}" for key, value in metrics.items()])
return f"πŸ“Š Portfolio Metrics:\n\n{formatted_metrics}"
except Exception as e:
return f"❌ Error calculating metrics: {str(e)}"
# Initialize system on startup
print("πŸš€ Initializing Mutual Fund RAG System...")
init_status = initialize_system()
print(init_status)
# Create the Gradio interface
with gr.Blocks(title="AI-Powered Mutual Fund Optimizer", theme="default") as app:
gr.Markdown("""
# πŸ€– AI-Powered Mutual Fund Portfolio Optimizer
Get personalized investment recommendations using real Yahoo Finance data and advanced AI analysis.
""")
with gr.Tabs():
# Data Fetching Tab
with gr.Tab("πŸ“Š Fund Data"):
gr.Markdown("### Fetch Real-Time Mutual Fund Data")
with gr.Row():
with gr.Column():
include_extended = gr.Checkbox(
label="Include Extended Fund List",
value=False,
info="Include additional ETFs and funds"
)
fetch_btn = gr.Button("πŸ”„ Fetch Fund Data", variant="primary")
with gr.Column():
fetch_status = gr.Textbox(
label="Status",
interactive=False,
placeholder="Click 'Fetch Fund Data' to start",
lines=3
)
fund_data_display = gr.Dataframe(
label="πŸ“‹ Available Mutual Funds",
interactive=False,
wrap=True
)
fetch_btn.click(
fn=fetch_data_interface,
inputs=[include_extended],
outputs=[fetch_status, fund_data_display]
)
# AI Recommendations Tab
with gr.Tab("πŸ€– AI Investment Advisor"):
gr.Markdown("### Get Personalized AI Investment Recommendations")
with gr.Row():
with gr.Column():
user_query = gr.Textbox(
label="Your Investment Question",
placeholder="e.g., 'I want to invest $50,000 for retirement in 20 years with moderate risk'",
lines=3,
info="Ask about portfolio allocation, fund selection, or investment strategy"
)
with gr.Row():
risk_tolerance = gr.Radio(
choices=["Conservative", "Moderate", "Aggressive"],
label="Risk Tolerance",
value="Moderate"
)
investment_amount = gr.Number(
label="Investment Amount ($)",
value=50000,
minimum=1000
)
investment_horizon = gr.Slider(
label="Investment Horizon (Years)",
minimum=1,
maximum=30,
value=5,
step=1
)
get_recommendation_btn = gr.Button("🧠 Get AI Recommendation", variant="primary")
with gr.Column():
ai_recommendation = gr.Textbox(
label="πŸ’‘ AI Investment Recommendation",
interactive=False,
lines=15,
placeholder="AI recommendations will appear here..."
)
# Example questions
gr.Markdown("### πŸ’‘ Example Questions:")
with gr.Row():
example1 = gr.Button("Conservative portfolio for retirement", size="sm")
example2 = gr.Button("Growth-focused portfolio for young investor", size="sm")
example3 = gr.Button("Balanced portfolio with international exposure", size="sm")
# Connect example buttons
example1.click(
lambda: "I'm 55 years old and want a conservative portfolio for retirement in 10 years. What funds should I choose?",
outputs=[user_query]
)
example2.click(
lambda: "I'm 25 years old and want an aggressive growth portfolio for long-term wealth building. What's your recommendation?",
outputs=[user_query]
)
example3.click(
lambda: "I want a balanced portfolio with both US and international exposure. What allocation do you recommend?",
outputs=[user_query]
)
get_recommendation_btn.click(
fn=get_ai_recommendation_interface,
inputs=[user_query, risk_tolerance, investment_amount, investment_horizon],
outputs=[ai_recommendation]
)
# Portfolio Analysis Tab
with gr.Tab("πŸ“ˆ Portfolio Analysis"):
gr.Markdown("### Analyze Custom Portfolio Metrics")
with gr.Row():
with gr.Column():
gr.Markdown("**Enter your fund selection:**")
custom_funds = gr.Textbox(
label="Fund Tickers",
placeholder="e.g., VTSAX, VTIAX, VBTLX",
info="Comma-separated list of fund tickers"
)
custom_weights = gr.Textbox(
label="Allocation Weights",
placeholder="e.g., 50, 30, 20",
info="Comma-separated percentages (should sum to 100)"
)
analyze_btn = gr.Button("πŸ“Š Calculate Metrics", variant="primary")
with gr.Column():
metrics_output = gr.Textbox(
label="Portfolio Metrics",
interactive=False,
lines=10,
placeholder="Enter fund tickers and weights, then click 'Calculate Metrics'"
)
analyze_btn.click(
fn=calculate_metrics_interface,
inputs=[custom_funds, custom_weights],
outputs=[metrics_output]
)
# System Status Tab
with gr.Tab("βš™οΈ System Status"):
gr.Markdown("### AI System Status and Information")
with gr.Row():
with gr.Column():
system_status = gr.Textbox(
label="πŸ€– AI System Status",
value=init_status,
interactive=False,
lines=3
)
market_indicators = gr.JSON(
label="πŸ“Š Current Market Indicators",
value=rag_system.market_indicators
)
with gr.Column():
gr.Markdown("""
### 🧠 AI Capabilities
**LLM Model**: Microsoft DialoGPT
**Embeddings**: Sentence Transformers
**Vector Database**: ChromaDB
**Data Source**: Yahoo Finance
**What the AI can help with:**
- Personalized portfolio recommendations
- Risk assessment and analysis
- Fund selection based on your goals
- Market-aware investment strategies
- Sector allocation suggestions
""")
refresh_status_btn = gr.Button("πŸ”„ Refresh Status", variant="secondary")
def refresh_system_status():
rag_system.get_market_indicators()
return "βœ… System operational", rag_system.market_indicators
refresh_status_btn.click(
fn=refresh_system_status,
outputs=[system_status, market_indicators]
)
# User Guide Tab
with gr.Tab("πŸ“– User Guide"):
gr.Markdown("""
## How to Use the AI-Powered Mutual Fund Optimizer
### 1. πŸ“Š **Setup Data**
- Go to "Fund Data" tab and click "Fetch Fund Data"
- This loads real-time data and initializes the AI system
- Review available funds and their characteristics
### 2. πŸ€– **Get AI Recommendations**
- Use the "AI Investment Advisor" tab
- Describe your investment goals and situation
- Set your risk tolerance and investment parameters
- Get personalized AI-powered recommendations
### 3. πŸ“ˆ **Analyze Portfolios**
- Use "Portfolio Analysis" for custom calculations
- Enter specific fund combinations and weights
- Get detailed risk and return metrics
### πŸ€– **AI System Architecture**
**RAG (Retrieval-Augmented Generation)**:
- Real fund data stored in vector database
- AI retrieves relevant information for your query
- Generates contextual recommendations
**Components**:
- **LLM**: Language model for generating advice
- **Embeddings**: Convert fund data to vectors
- **Vector Database**: ChromaDB for similarity search
- **Real Data**: Live Yahoo Finance integration
### πŸ’‘ **Sample Queries**
- "I have $100k to invest for 15 years, what's the best allocation?"
- "Compare growth vs value funds for my situation"
- "Should I include international funds in my portfolio?"
- "What's the optimal bond allocation for a 40-year-old?"
- "How should I adjust my portfolio during market volatility?"
### ⚠️ **Important Notes**
- AI recommendations are for educational purposes
- Always verify suggestions with financial advisors
- Past performance doesn't guarantee future results
- Consider your complete financial situation
- The AI learns from real fund data and market conditions
### πŸ”§ **Technical Details**
- **Data Source**: Yahoo Finance API
- **AI Model**: Microsoft DialoGPT (can be upgraded)
- **Embeddings**: Sentence Transformers all-MiniLM-L6-v2
- **Vector DB**: ChromaDB with persistent storage
- **Update Frequency**: Real-time when data is refreshed
""")
# Footer
gr.Markdown("""
---
**πŸ€– AI-Powered**: This system uses advanced AI to analyze real market data and provide personalized investment recommendations.
**⚠️ Disclaimer**: AI recommendations are for educational purposes only. Always consult with qualified financial advisors before making investment decisions.
""")
# Launch the app
if __name__ == "__main__":
print("πŸš€ Starting AI-Powered Mutual Fund Portfolio Optimizer...")
print("πŸ€– LLM and RAG system initialized")
print("πŸ“Š Real-time Yahoo Finance data integration enabled")
print("🧠 AI investment advisor ready")
app.launch(
share=True,
server_name="0.0.0.0",
show_error=True,
)