Spaces:
Sleeping
Sleeping
import time | |
import json | |
import requests | |
import re | |
import logging | |
from flask import Flask, jsonify | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from datetime import datetime, timedelta | |
from db import deeper, signals | |
from ai import analyze_forex_pairs | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
# Initialize Flask app | |
app = Flask(__name__) | |
# Telegram configuration | |
TELEGRAM_TOKEN = "7750258010:AAEfEn1Hc1h0n6uRc1KcPdZf7ozBEkehnEY" | |
TELEGRAM_CHAT_ID = "6859142642" | |
TELEGRAM_CHAT_ID2 = "5666511049" | |
def send_telegram_message(message): | |
"""Send message to Telegram user""" | |
url = f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage" | |
data = { | |
"chat_id": TELEGRAM_CHAT_ID, | |
"text": message, | |
"parse_mode": "HTML" | |
} | |
data2 = { | |
"chat_id": TELEGRAM_CHAT_ID2, | |
"text": message, | |
"parse_mode": "HTML" | |
} | |
try: | |
logger.info(f"Sending Telegram message to chat ID: {TELEGRAM_CHAT_ID}") | |
response = requests.post(url, data=data) | |
response2 = requests.post(url, data=data2) | |
response_json = response.json() | |
if response_json.get("ok"): | |
logger.info("Telegram message sent successfully") | |
return response_json | |
else: | |
logger.error(f"Telegram API error: {response_json.get('description', 'Unknown error')}") | |
return None | |
except Exception as e: | |
logger.error(f"Error sending Telegram message: {e}", exc_info=True) | |
return None | |
def get_active_signals(): | |
"""Get all active signals from the signals database""" | |
signals_response = signals.fetch_json_from_github() | |
if not signals_response["success"]: | |
logger.error(f"Error fetching signals data: {signals_response['message']}") | |
return "لا توجد صفقات نشطة حالياً." | |
signals_data = signals_response["data"] | |
# Defensive check - ensure signals_data is a list | |
if not isinstance(signals_data, list) or len(signals_data) == 0: | |
logger.warning("No active signals found in database") | |
return "لا توجد صفقات نشطة حالياً." | |
deals_string = [] | |
for signal in signals_data: | |
deal = f"الزوج: {signal.get('pair', '')}, " | |
deal += f"النوع: {signal.get('type', '')}, " | |
deal += f"سعر الدخول: {signal.get('entry', '')}, " | |
deal += f"وقف الخسارة: {signal.get('stop_loss', '')}, " | |
deal += f"الهدف: {signal.get('take_profit', '')}" | |
deals_string.append(deal) | |
if not deals_string: | |
return "لا توجد صفقات نشطة حالياً." | |
return "\n".join(deals_string) | |
def extract_signal_from_ai_response(response): | |
"""Extract signal data from AI response if present""" | |
signal_pattern = r'<signal>(.*?)</signal>' | |
match = re.search(signal_pattern, response, re.DOTALL) | |
if not match: | |
return None | |
signal_text = match.group(1) | |
# Extract individual fields from signal - full format | |
pair_match = re.search(r'<pair>(.*?)</pair>', signal_text, re.DOTALL) | |
type_match = re.search(r'<type>(.*?)</type>', signal_text, re.DOTALL) | |
entry_match = re.search(r'<entry>(.*?)</entry>', signal_text, re.DOTALL) | |
stop_loss_match = re.search(r'<stop_loss>(.*?)</stop_loss>', signal_text, re.DOTALL) | |
take_profit_match = re.search(r'<take_profit>(.*?)</take_profit>', signal_text, re.DOTALL) | |
duration_match = re.search(r'<duration>(.*?)</duration>', signal_text, re.DOTALL) | |
reason_match = re.search(r'<reason>(.*?)</reason>', signal_text, re.DOTALL) | |
# Create signal dictionary | |
signal_data = { | |
"pair": pair_match.group(1).strip() if pair_match else "", | |
"timeframe": "15min", # Default timeframe | |
"type": type_match.group(1).strip() if type_match else "", | |
"entry": entry_match.group(1).strip() if entry_match else "", | |
"stop_loss": stop_loss_match.group(1).strip() if stop_loss_match else "", | |
"take_profit": take_profit_match.group(1).strip() if take_profit_match else "", | |
"duration": duration_match.group(1).strip() if duration_match else "1-3 ساعات", | |
"reason": reason_match.group(1).strip() if reason_match else "تم التحليل بواسطة النظام الآلي", | |
"status": "starting" | |
} | |
return signal_data | |
def check_if_pairs_exist_in_signals(pairs_list): | |
"""Check if a list of pairs already exists in the signals database""" | |
signals_response = signals.fetch_json_from_github() | |
if not signals_response["success"]: | |
logger.error(f"Error fetching signals data: {signals_response['message']}") | |
return False | |
signals_data = signals_response["data"] | |
# Defensive check - ensure signals_data is a list | |
if not isinstance(signals_data, list): | |
logger.warning(f"signals_data is not a list (type: {type(signals_data)}), converting to empty list") | |
signals_data = [] | |
# Convert pairs_list to a sorted JSON string for consistent comparison | |
pairs_json = json.dumps(sorted(pairs_list), ensure_ascii=False) | |
# Check if pairs list exists | |
for signal in signals_data: | |
if "pairs" in signal and json.dumps(sorted(signal.get("pairs", [])), ensure_ascii=False) == pairs_json: | |
return True | |
return False | |
def update_signals_file(signal_data, pairs_list): | |
"""Update signals JSON file with new signal data including pairs list""" | |
# Fetch current signals data | |
signals_response = signals.fetch_json_from_github() | |
if not signals_response["success"]: | |
logger.error(f"Error fetching signals data: {signals_response['message']}") | |
return False | |
signals_data = signals_response["data"] | |
# Defensive check - ensure signals_data is a list | |
if not isinstance(signals_data, list): | |
logger.warning(f"signals_data is not a list (type: {type(signals_data)}), converting to empty list") | |
signals_data = [] | |
# Add pairs list to the signal data | |
signal_data["pairs"] = pairs_list | |
# Add new signal | |
signals_data.append(signal_data) | |
# Get authenticity token and commit OID for signals file | |
auth_token, commit_oid = signals.fetch_authenticity_token_and_commit_oid() | |
if not auth_token or not commit_oid: | |
logger.error("Failed to get authenticity token or commit OID for signals file") | |
return False | |
# Update the signals file - using json.dumps with separators to ensure a single line | |
update_response = signals.update_user_json_file( | |
auth_token, | |
commit_oid, | |
json.dumps(signals_data, ensure_ascii=False, separators=(',', ':')) | |
) | |
return update_response["success"] | |
def remove_group_from_deeper(group_key): | |
"""Remove analyzed group from deeper.json file""" | |
# Fetch current deeper data | |
deeper_response = deeper.fetch_json_from_github() | |
if not deeper_response["success"]: | |
logger.error(f"Error fetching deeper data: {deeper_response['message']}") | |
return False | |
deeper_data = deeper_response["data"] | |
# Remove the group from forwards if it exists | |
if group_key in deeper_data.get("forwards", {}): | |
del deeper_data["forwards"][group_key] | |
# Get authenticity token and commit OID for deeper file | |
auth_token, commit_oid = deeper.fetch_authenticity_token_and_commit_oid() | |
if not auth_token or not commit_oid: | |
logger.error("Failed to get authenticity token or commit OID for deeper file") | |
return False | |
# Update the deeper file - using json.dumps with separators to ensure a single line | |
update_response = deeper.update_user_json_file( | |
auth_token, | |
commit_oid, | |
json.dumps(deeper_data, ensure_ascii=False, separators=(',', ':')) | |
) | |
return update_response["success"] | |
def format_telegram_message(signal_data): | |
"""تنسيق إشارة الفوركس للإرسال عبر تيليجرام - بصيغة احترافية""" | |
message = "🔔 <b>إشارة فوركس جديدة</b> 🔔\n\n" | |
message += f"<b>🔹 الزوج:</b> {signal_data['pair']}\n" | |
message += f"<b>📊 النوع:</b> {signal_data['type']}\n" | |
message += f"<b>🎯 الدخول:</b> {signal_data['entry']}\n" | |
message += f"<b>🛡️ وقف الخسارة:</b> {signal_data['stop_loss']}\n" | |
message += f"<b>✨ الهدف:</b> {signal_data['take_profit']}\n" | |
return message | |
def analyze_forex_groups(): | |
"""Function to analyze forex groups from the deeper.json file""" | |
logger.info("Starting forex group analysis cycle") | |
try: | |
# Fetch data from deeper.json | |
deeper_response = deeper.fetch_json_from_github() | |
if not deeper_response["success"]: | |
logger.error(f"Error fetching deeper data: {deeper_response['message']}") | |
return | |
deeper_data = deeper_response["data"] | |
# Check if system is enabled | |
if not deeper_data.get("status", False): | |
logger.info("System is currently turned OFF. Please turn it ON to continue.") | |
return | |
# Get active signals to pass to AI | |
active_deals = get_active_signals() | |
logger.info(f"Active deals: {active_deals}") | |
# Process each forex group in the forwards section | |
for group_key, group_data in deeper_data.get("forwards", {}).items(): | |
pairs = group_data.get("pairs", []) | |
message = group_data.get("message", "") | |
if not pairs: | |
logger.warning(f"Group {group_key} has no pairs. Skipping.") | |
continue | |
logger.info(f"Analyzing group {group_key} with pairs: {', '.join(pairs)}") | |
# Check if this group already exists in signals database | |
if check_if_pairs_exist_in_signals(pairs): | |
logger.info(f"Signal for group {group_key} already exists in database. Skipping analysis.") | |
# Remove the group from deeper.json since we're not processing it | |
if remove_group_from_deeper(group_key): | |
logger.info(f"Group {group_key} removed from deeper.json successfully") | |
else: | |
logger.error(f"Failed to remove group {group_key} from deeper.json") | |
continue | |
# Call AI to analyze the forex pairs with active deals | |
ai_response = analyze_forex_pairs(pairs, message, active_deals) | |
# Check if the AI response contains a signal | |
signal_data = extract_signal_from_ai_response(ai_response) | |
if signal_data: | |
logger.info(f"Signal detected for group {group_key}") | |
# Update signals file with the new signal, including pairs list | |
if update_signals_file(signal_data, pairs): | |
logger.info(f"Signal for group {group_key} saved successfully") | |
# Format and send Telegram message AFTER successfully saving to database | |
telegram_message = format_telegram_message(signal_data) | |
logger.info("Attempting to send Telegram message...") | |
telegram_response = send_telegram_message(telegram_message) | |
if telegram_response and telegram_response.get("ok"): | |
logger.info(f"Telegram message for group {group_key} sent successfully") | |
else: | |
logger.error(f"Failed to send Telegram message for group {group_key}. Response: {telegram_response}") | |
# Remove the group from deeper.json | |
if remove_group_from_deeper(group_key): | |
logger.info(f"Group {group_key} removed from deeper.json successfully") | |
else: | |
logger.error(f"Failed to remove group {group_key} from deeper.json") | |
else: | |
logger.error(f"Failed to save signal for group {group_key}") | |
else: | |
logger.info(f"No signal detected in AI response for group {group_key}") | |
logger.info("Analysis cycle completed successfully") | |
except Exception as e: | |
logger.error(f"Error in analyze_forex_groups: {e}", exc_info=True) | |
# Flask routes | |
def health_check(): | |
"""Health check endpoint to verify the service is running""" | |
return jsonify({ | |
"status": "running", | |
"message": "Forex Analysis System is active" | |
}) | |
def trigger_analysis(): | |
"""Endpoint to manually trigger analysis""" | |
try: | |
analyze_forex_groups() | |
return jsonify({ | |
"status": "success", | |
"message": "Analysis triggered successfully" | |
}) | |
except Exception as e: | |
logger.error(f"Error triggering analysis: {e}", exc_info=True) | |
return jsonify({ | |
"status": "error", | |
"message": f"Error triggering analysis: {str(e)}" | |
}), 500 | |
def get_status(): | |
"""Endpoint to get system status""" | |
try: | |
deeper_response = deeper.fetch_json_from_github() | |
if deeper_response["success"]: | |
system_status = deeper_response["data"].get("status", False) | |
return jsonify({ | |
"system_enabled": system_status, | |
"service_status": "running" | |
}) | |
else: | |
return jsonify({ | |
"service_status": "running", | |
"error": deeper_response["message"] | |
}) | |
except Exception as e: | |
logger.error(f"Error getting status: {e}", exc_info=True) | |
return jsonify({ | |
"service_status": "running", | |
"error": str(e) | |
}) | |
def schedule_candle_analysis(): | |
"""Schedule analysis at candle close times (00, 15, 30, 45)""" | |
now = datetime.now() | |
current_minute = now.minute | |
# Calculate the next 15-minute mark | |
if current_minute < 17: | |
next_minute = 17 | |
elif current_minute < 32: | |
next_minute = 32 | |
elif current_minute < 47: | |
next_minute = 47 | |
else: | |
next_minute = 0 # Next hour | |
# Calculate the target time | |
if next_minute == 0: | |
target_time = datetime(now.year, now.month, now.day, now.hour, 0, 0) + timedelta(hours=1) | |
else: | |
target_time = datetime(now.year, now.month, now.day, now.hour, next_minute, 0) | |
# Calculate seconds until next run | |
time_diff = (target_time - now).total_seconds() | |
logger.info(f"Scheduling analysis to run at {target_time.strftime('%H:%M:%S')} (in {time_diff:.1f} seconds)") | |
# Schedule the job | |
scheduler.add_job( | |
run_analysis_and_reschedule, | |
'date', | |
run_date=target_time, | |
id='candle_analysis' | |
) | |
def run_analysis_and_reschedule(): | |
"""Run the analysis and schedule the next run""" | |
logger.info("Running scheduled analysis at candle close") | |
try: | |
analyze_forex_groups() | |
except Exception as e: | |
logger.error(f"Error in scheduled analysis: {e}", exc_info=True) | |
finally: | |
# Always reschedule the next run | |
schedule_candle_analysis() | |
# Initialize scheduler | |
scheduler = BackgroundScheduler() | |
def start_scheduler(): | |
"""Start the scheduler with the analysis job at candle close times""" | |
logger.info("Starting scheduler for forex analysis at candle close times (00, 15, 30, 45)") | |
# Schedule the first analysis | |
schedule_candle_analysis() | |
# Start the scheduler if it's not already running | |
if not scheduler.running: | |
scheduler.start() | |
logger.info("Scheduler started successfully") | |
if __name__ == "__main__": | |
logger.info("Starting Forex Analysis System...") | |
# Start the scheduler | |
start_scheduler() | |
# Start the Flask application | |
app.run(host='0.0.0.0', port=7860, debug=False) |