deeper / app.py
Dooratre's picture
Update app.py
c1f9f48 verified
import time
import json
import requests
import re
import logging
from flask import Flask, jsonify
from apscheduler.schedulers.background import BackgroundScheduler
from datetime import datetime, timedelta
from db import deeper, signals
from ai import analyze_forex_pairs
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize Flask app
app = Flask(__name__)
# Message API configuration
MESSAGE_API_URL = "https://aoamrnuwara.pythonanywhere.com/api/send-message"
MESSAGE_API_KEY = "Seakp0683asppoit"
def send_message_to_api(message):
"""Send a message via the message forwarding API."""
headers = {
"Content-Type": "application/json",
"X-API-Key": MESSAGE_API_KEY
}
payload = {
"message": message
}
try:
logger.info("Sending message via API")
response = requests.post(MESSAGE_API_URL, headers=headers, data=json.dumps(payload))
response.raise_for_status()
response_json = response.json()
logger.info(f"Message sent to API successfully. Status Code: {response.status_code}")
return {"success": True, "response": response_json}
except requests.exceptions.RequestException as e:
logger.error(f"API request error: {e}", exc_info=True)
return {"success": False, "error": str(e)}
except Exception as e:
logger.error(f"Unexpected error sending message to API: {e}", exc_info=True)
return {"success": False, "error": str(e)}
def get_active_signals():
"""Get all active signals from the signals database"""
signals_response = signals.fetch_json_from_github()
if not signals_response["success"]:
logger.error(f"Error fetching signals data: {signals_response['message']}")
return "لا توجد صفقات نشطة حالياً."
signals_data = signals_response["data"]
# Defensive check - ensure signals_data is a list
if not isinstance(signals_data, list) or len(signals_data) == 0:
logger.warning("No active signals found in database")
return "لا توجد صفقات نشطة حالياً."
deals_string = []
for signal in signals_data:
deal = f"الزوج: {signal.get('pair', '')}, "
deal += f"النوع: {signal.get('type', '')}, "
deal += f"سعر الدخول: {signal.get('entry', '')}, "
deal += f"وقف الخسارة: {signal.get('stop_loss', '')}, "
deal += f"الهدف: {signal.get('take_profit', '')}"
deals_string.append(deal)
if not deals_string:
return "لا توجد صفقات نشطة حالياً."
return "\n".join(deals_string)
def extract_signal_from_ai_response(response):
"""Extract signal data from AI response if present"""
signal_pattern = r'<signal>(.*?)</signal>'
match = re.search(signal_pattern, response, re.DOTALL)
if not match:
return None
signal_text = match.group(1)
# Extract individual fields from signal - full format
pair_match = re.search(r'<pair>(.*?)</pair>', signal_text, re.DOTALL)
type_match = re.search(r'<type>(.*?)</type>', signal_text, re.DOTALL)
entry_match = re.search(r'<entry>(.*?)</entry>', signal_text, re.DOTALL)
stop_loss_match = re.search(r'<stop_loss>(.*?)</stop_loss>', signal_text, re.DOTALL)
take_profit_match = re.search(r'<take_profit>(.*?)</take_profit>', signal_text, re.DOTALL)
duration_match = re.search(r'<duration>(.*?)</duration>', signal_text, re.DOTALL)
reason_match = re.search(r'<reason>(.*?)</reason>', signal_text, re.DOTALL)
# Create signal dictionary
signal_data = {
"pair": pair_match.group(1).strip() if pair_match else "",
"timeframe": "15min", # Default timeframe
"type": type_match.group(1).strip() if type_match else "",
"entry": entry_match.group(1).strip() if entry_match else "",
"stop_loss": stop_loss_match.group(1).strip() if stop_loss_match else "",
"take_profit": take_profit_match.group(1).strip() if take_profit_match else "",
"duration": duration_match.group(1).strip() if duration_match else "1-3 ساعات",
"reason": reason_match.group(1).strip() if reason_match else "تم التحليل بواسطة النظام الآلي",
"status": "starting"
}
return signal_data
def check_if_pairs_exist_in_signals(pairs_list):
"""Check if a list of pairs already exists in the signals database"""
signals_response = signals.fetch_json_from_github()
if not signals_response["success"]:
logger.error(f"Error fetching signals data: {signals_response['message']}")
return False
signals_data = signals_response["data"]
# Defensive check - ensure signals_data is a list
if not isinstance(signals_data, list):
logger.warning(f"signals_data is not a list (type: {type(signals_data)}), converting to empty list")
signals_data = []
# Convert pairs_list to a sorted JSON string for consistent comparison
pairs_json = json.dumps(sorted(pairs_list), ensure_ascii=False)
# Check if pairs list exists
for signal in signals_data:
if "pairs" in signal and json.dumps(sorted(signal.get("pairs", [])), ensure_ascii=False) == pairs_json:
return True
return False
def update_signals_file(signal_data, pairs_list):
"""Update signals JSON file with new signal data including pairs list"""
# Fetch current signals data
signals_response = signals.fetch_json_from_github()
if not signals_response["success"]:
logger.error(f"Error fetching signals data: {signals_response['message']}")
return False
signals_data = signals_response["data"]
# Defensive check - ensure signals_data is a list
if not isinstance(signals_data, list):
logger.warning(f"signals_data is not a list (type: {type(signals_data)}), converting to empty list")
signals_data = []
# Add pairs list to the signal data
signal_data["pairs"] = pairs_list
# Add new signal
signals_data.append(signal_data)
# Get authenticity token and commit OID for signals file
auth_token, commit_oid = signals.fetch_authenticity_token_and_commit_oid()
if not auth_token or not commit_oid:
logger.error("Failed to get authenticity token or commit OID for signals file")
return False
# Update the signals file - using json.dumps with separators to ensure a single line
update_response = signals.update_user_json_file(
auth_token,
commit_oid,
json.dumps(signals_data, ensure_ascii=False, separators=(',', ':'))
)
return update_response["success"]
def remove_group_from_deeper(group_key):
"""Remove analyzed group from deeper.json file"""
# Fetch current deeper data
deeper_response = deeper.fetch_json_from_github()
if not deeper_response["success"]:
logger.error(f"Error fetching deeper data: {deeper_response['message']}")
return False
deeper_data = deeper_response["data"]
# Remove the group from forwards if it exists
if group_key in deeper_data.get("forwards", {}):
del deeper_data["forwards"][group_key]
# Get authenticity token and commit OID for deeper file
auth_token, commit_oid = deeper.fetch_authenticity_token_and_commit_oid()
if not auth_token or not commit_oid:
logger.error("Failed to get authenticity token or commit OID for deeper file")
return False
# Update the deeper file - using json.dumps with separators to ensure a single line
update_response = deeper.update_user_json_file(
auth_token,
commit_oid,
json.dumps(deeper_data, ensure_ascii=False, separators=(',', ':'))
)
return update_response["success"]
def format_telegram_message(signal_data):
"""تنسيق إشارة الفوركس للإرسال عبر تيليجرام - بصيغة احترافية"""
message = "🔔 <b>إشارة فوركس جديدة</b> 🔔\n\n"
message += f"<b>🔹 الزوج:</b> {signal_data['pair']}\n"
message += f"<b>📊 النوع:</b> {signal_data['type']}\n"
message += f"<b>🎯 الد��ول:</b> {signal_data['entry']}\n"
message += f"<b>🛡️ وقف الخسارة:</b> {signal_data['stop_loss']}\n"
message += f"<b>💰 الهدف:</b> {signal_data['take_profit']}\n"
return message
def analyze_forex_groups():
"""Function to analyze forex groups from the deeper.json file"""
logger.info("Starting forex group analysis cycle")
try:
# Fetch data from deeper.json
deeper_response = deeper.fetch_json_from_github()
if not deeper_response["success"]:
logger.error(f"Error fetching deeper data: {deeper_response['message']}")
return
deeper_data = deeper_response["data"]
# Check if system is enabled
if not deeper_data.get("status", False):
logger.info("System is currently turned OFF. Please turn it ON to continue.")
return
# Get active signals to pass to AI
active_deals = get_active_signals()
logger.info(f"Active deals: {active_deals}")
# Process each forex group in the forwards section
for group_key, group_data in deeper_data.get("forwards", {}).items():
pairs = group_data.get("pairs", [])
message = group_data.get("message", "")
if not pairs:
logger.warning(f"Group {group_key} has no pairs. Skipping.")
continue
logger.info(f"Analyzing group {group_key} with pairs: {', '.join(pairs)}")
# Check if this group already exists in signals database
if check_if_pairs_exist_in_signals(pairs):
logger.info(f"Signal for group {group_key} already exists in database. Skipping analysis.")
# Remove the group from deeper.json since we're not processing it
if remove_group_from_deeper(group_key):
logger.info(f"Group {group_key} removed from deeper.json successfully")
else:
logger.error(f"Failed to remove group {group_key} from deeper.json")
continue
# Call AI to analyze the forex pairs with active deals
ai_response = analyze_forex_pairs(pairs, message, active_deals)
# Check if the AI response contains a signal
signal_data = extract_signal_from_ai_response(ai_response)
if signal_data:
logger.info(f"Signal detected for group {group_key}")
# Update signals file with the new signal, including pairs list
if update_signals_file(signal_data, pairs):
logger.info(f"Signal for group {group_key} saved successfully")
# Format message and send via API instead of directly to Telegram
formatted_message = format_telegram_message(signal_data)
logger.info("Attempting to send message via API...")
api_response = send_message_to_api(formatted_message)
if api_response["success"]:
logger.info(f"Message for group {group_key} sent successfully via API")
else:
logger.error(f"Failed to send message for group {group_key} via API. Error: {api_response.get('error')}")
# Remove the group from deeper.json
if remove_group_from_deeper(group_key):
logger.info(f"Group {group_key} removed from deeper.json successfully")
else:
logger.error(f"Failed to remove group {group_key} from deeper.json")
else:
logger.error(f"Failed to save signal for group {group_key}")
else:
logger.info(f"No signal detected in AI response for group {group_key}")
logger.info("Analysis cycle completed successfully")
except Exception as e:
logger.error(f"Error in analyze_forex_groups: {e}", exc_info=True)
# Flask routes
@app.route('/')
def health_check():
"""Health check endpoint to verify the service is running"""
return jsonify({
"status": "running",
"message": "Forex Analysis System is active"
})
@app.route('/analyze/now')
def trigger_analysis():
"""Endpoint to manually trigger analysis"""
try:
analyze_forex_groups()
return jsonify({
"status": "success",
"message": "Analysis triggered successfully"
})
except Exception as e:
logger.error(f"Error triggering analysis: {e}", exc_info=True)
return jsonify({
"status": "error",
"message": f"Error triggering analysis: {str(e)}"
}), 500
@app.route('/status')
def get_status():
"""Endpoint to get system status"""
try:
deeper_response = deeper.fetch_json_from_github()
if deeper_response["success"]:
system_status = deeper_response["data"].get("status", False)
return jsonify({
"system_enabled": system_status,
"service_status": "running"
})
else:
return jsonify({
"service_status": "running",
"error": deeper_response["message"]
})
except Exception as e:
logger.error(f"Error getting status: {e}", exc_info=True)
return jsonify({
"service_status": "running",
"error": str(e)
})
def schedule_candle_analysis():
"""Schedule analysis at candle close times (00, 15, 30, 45)"""
now = datetime.now()
current_minute = now.minute
# Calculate the next 15-minute mark
if current_minute < 17:
next_minute = 17
elif current_minute < 32:
next_minute = 32
elif current_minute < 47:
next_minute = 47
else:
next_minute = 0 # Next hour
# Calculate the target time
if next_minute == 0:
target_time = datetime(now.year, now.month, now.day, now.hour, 0, 0) + timedelta(hours=1)
else:
target_time = datetime(now.year, now.month, now.day, now.hour, next_minute, 0)
# Calculate seconds until next run
time_diff = (target_time - now).total_seconds()
logger.info(f"Scheduling analysis to run at {target_time.strftime('%H:%M:%S')} (in {time_diff:.1f} seconds)")
# Schedule the job
scheduler.add_job(
run_analysis_and_reschedule,
'date',
run_date=target_time,
id='candle_analysis'
)
def run_analysis_and_reschedule():
"""Run the analysis and schedule the next run"""
logger.info("Running scheduled analysis at candle close")
try:
analyze_forex_groups()
except Exception as e:
logger.error(f"Error in scheduled analysis: {e}", exc_info=True)
finally:
# Always reschedule the next run
schedule_candle_analysis()
# Initialize scheduler
scheduler = BackgroundScheduler()
def start_scheduler():
"""Start the scheduler with the analysis job at candle close times"""
logger.info("Starting scheduler for forex analysis at candle close times (00, 15, 30, 45)")
# Schedule the first analysis
schedule_candle_analysis()
# Start the scheduler if it's not already running
if not scheduler.running:
scheduler.start()
logger.info("Scheduler started successfully")
if __name__ == "__main__":
logger.info("Starting Forex Analysis System...")
# Start the scheduler
start_scheduler()
# Start the Flask application
app.run(host='0.0.0.0', port=7860, debug=False)