Aigg / app.py
Athspi's picture
Update app.py
a9f9924 verified
raw
history blame
8.21 kB
import os
import zipfile
import uuid
import google.generativeai as genai
from flask import Flask, request, jsonify, render_template, session, send_from_directory
from flask_session import Session # Import the extension
from dotenv import load_dotenv
# --- API and App Configuration ---
load_dotenv()
app = Flask(__name__)
# IMPORTANT: The secret key will be set via Hugging Face secrets, not the .env file
app.config["SECRET_KEY"] = os.environ.get("FLASK_SECRET_KEY", 'a-strong-default-secret-key')
# --- SERVER-SIDE SESSION CONFIGURATION ---
app.config["SESSION_PERMANENT"] = False
app.config["SESSION_TYPE"] = "filesystem"
# On Hugging Face, the path must be persistent across reloads if possible
# A temporary directory is fine for this stateless app
app.config['SESSION_FILE_DIR'] = './flask_session'
Session(app)
# --- END OF SESSION CONFIGURATION ---
# Configure the Gemini API client
# The API key will be set via Hugging Face secrets
try:
gemini_api_key = os.environ.get("GEMINI_API_KEY")
if not gemini_api_key:
print("Warning: GEMINI_API_KEY not found. The AI will not function.")
model = None
else:
genai.configure(api_key=gemini_api_key)
model = genai.GenerativeModel('gemini-1.5-flash')
except Exception as e:
print(f"Error configuring Gemini API: {e}")
model = None
# Configure folders
UPLOAD_FOLDER = 'uploads'
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
if not os.path.exists('./flask_session'):
os.makedirs('./flask_session')
# --- Helper Functions (No Changes) ---
def get_project_files(project_path):
file_data = {}
for root, _, files in os.walk(project_path):
for file in files:
if file in ['.DS_Store', '__MACOSX']: continue
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, project_path)
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
file_data[relative_path] = content
except Exception as e:
file_data[relative_path] = f"Error reading file: {e}"
return file_data
# --- Flask Routes (No Changes from previous final version) ---
@app.route('/')
def index():
if 'project_id' not in session:
session['project_id'] = str(uuid.uuid4())
session['chat_history'] = []
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_project():
if 'project_id' not in session:
session['project_id'] = str(uuid.uuid4())
session['chat_history'] = []
if 'project_zip' not in request.files: return jsonify({"error": "No file part"}), 400
file = request.files['project_zip']
if file.filename == '': return jsonify({"error": "No selected file"}), 400
if file and file.filename.endswith('.zip'):
project_id = session['project_id']
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id)
if not os.path.exists(project_path): os.makedirs(project_path)
zip_path = os.path.join(project_path, file.filename)
file.save(zip_path)
with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(project_path)
os.remove(zip_path)
project_files = get_project_files(project_path)
initial_context = "The user has uploaded a new project. Here is the file structure and content:\n\n"
for path, content in project_files.items(): initial_context += f"**File:** `{path}`\n```\n{content}\n```\n\n"
session['chat_history'] = [
{"role": "user", "parts": [initial_context]},
{"role": "model", "parts": ["I have analyzed the project. I am ready to help. How can I assist you with your code?"]}
]
frontend_chat_history = [
{"role": "user", "content": initial_context},
{"role": "assistant", "content": "I have analyzed the project. I am ready to help. How can I assist you with your code?"}
]
return jsonify({ "message": "Project uploaded and analyzed.", "file_tree": list(project_files.keys()), "chat_history": frontend_chat_history })
return jsonify({"error": "Invalid file type. Please upload a .zip file."}), 400
@app.route('/chat', methods=['POST'])
def chat():
if not model: return jsonify({"error": "AI model not configured. The admin needs to set the API key."}), 500
user_message = request.json.get('message')
if not user_message: return jsonify({"error": "Empty message"}), 400
project_id = session.get('project_id')
if not project_id: return jsonify({"error": "Your session has expired. Please upload your project again."}), 400
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id)
session['chat_history'].append({"role": "user", "parts": [user_message]})
chat_session = model.start_chat(history=session['chat_history'])
try:
response = chat_session.send_message(user_message)
ai_response = response.text
session['chat_history'].append({"role": "model", "parts": [ai_response]})
if "```" in ai_response:
lines = ai_response.split('\n')
if lines[0].strip().startswith('`') and lines[0].strip().endswith('`'):
file_path = lines[0].strip().strip('`')
try:
code_content = '\n'.join(ai_response.split('```')[1].split('\n')[1:])
full_path = os.path.join(project_path, file_path)
os.makedirs(os.path.dirname(full_path), exist_ok=True)
with open(full_path, 'w', encoding='utf-8') as f: f.write(code_content)
except IndexError: pass
elif ai_response.strip().startswith("DELETE:"):
file_to_delete = ai_response.strip().split(":", 1)[1].strip()
full_path = os.path.join(project_path, file_to_delete)
if os.path.exists(full_path):
try: os.remove(full_path)
except OSError as e: print(f"Error deleting file {full_path}: {e}")
return jsonify({"reply": ai_response})
except Exception as e: return jsonify({"error": str(e)}), 500
@app.route('/file_tree')
def get_file_tree():
project_id = session.get('project_id')
if not project_id: return jsonify({"error": "No project in session."}), 400
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id)
if not os.path.exists(project_path): return jsonify({"file_tree": []})
return jsonify({"file_tree": list(get_project_files(project_path).keys())})
@app.route('/file_content')
def get_file_content():
project_id = session.get('project_id')
file_path = request.args.get('path')
if not project_id or not file_path: return jsonify({"error": "Missing project/file path"}), 400
full_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id, file_path)
if not os.path.exists(full_path): return jsonify({"error": "File not found"}), 404
try:
with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: content = f.read()
return jsonify({"path": file_path, "content": content})
except Exception as e: return jsonify({"error": str(e)}), 500
@app.route('/download')
def download_project():
project_id = session.get('project_id')
if not project_id: return "No project to download.", 404
project_path = os.path.join(app.config['UPLOAD_FOLDER'], project_id)
zip_filename = f"project_{project_id}.zip"
zip_path = os.path.join(app.config['UPLOAD_FOLDER'], zip_filename)
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(project_path):
for file in files:
file_path = os.path.join(root, file)
arcname = os.path.relpath(file_path, project_path)
zipf.write(file_path, arcname)
return send_from_directory(app.config['UPLOAD_FOLDER'], zip_filename, as_attachment=True)
# The following is not needed for Hugging Face deployment but good for local testing
if __name__ == '__main__':
app.run(host="0.0.0.0", port=7860)