import time import json import requests import re import logging from flask import Flask, jsonify from apscheduler.schedulers.background import BackgroundScheduler from datetime import datetime, timedelta from db import deeper, signals from ai import analyze_forex_pairs # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Initialize Flask app app = Flask(__name__) # Message API configuration MESSAGE_API_URL = "https://aoamrnuwara.pythonanywhere.com/api/send-message" MESSAGE_API_KEY = "Seakp0683asppoit" def send_message_to_api(message): """Send a message via the message forwarding API.""" headers = { "Content-Type": "application/json", "X-API-Key": MESSAGE_API_KEY } payload = { "message": message } try: logger.info("Sending message via API") response = requests.post(MESSAGE_API_URL, headers=headers, data=json.dumps(payload)) response.raise_for_status() response_json = response.json() logger.info(f"Message sent to API successfully. Status Code: {response.status_code}") return {"success": True, "response": response_json} except requests.exceptions.RequestException as e: logger.error(f"API request error: {e}", exc_info=True) return {"success": False, "error": str(e)} except Exception as e: logger.error(f"Unexpected error sending message to API: {e}", exc_info=True) return {"success": False, "error": str(e)} def get_active_signals(): """Get all active signals from the signals database""" signals_response = signals.fetch_json_from_github() if not signals_response["success"]: logger.error(f"Error fetching signals data: {signals_response['message']}") return "لا توجد صفقات نشطة حالياً." signals_data = signals_response["data"] # Defensive check - ensure signals_data is a list if not isinstance(signals_data, list) or len(signals_data) == 0: logger.warning("No active signals found in database") return "لا توجد صفقات نشطة حالياً." deals_string = [] for signal in signals_data: deal = f"الزوج: {signal.get('pair', '')}, " deal += f"النوع: {signal.get('type', '')}, " deal += f"سعر الدخول: {signal.get('entry', '')}, " deal += f"وقف الخسارة: {signal.get('stop_loss', '')}, " deal += f"الهدف: {signal.get('take_profit', '')}" deals_string.append(deal) if not deals_string: return "لا توجد صفقات نشطة حالياً." return "\n".join(deals_string) def extract_signal_from_ai_response(response): """Extract signal data from AI response if present""" signal_pattern = r'(.*?)' match = re.search(signal_pattern, response, re.DOTALL) if not match: return None signal_text = match.group(1) # Extract individual fields from signal - full format pair_match = re.search(r'(.*?)', signal_text, re.DOTALL) type_match = re.search(r'(.*?)', signal_text, re.DOTALL) entry_match = re.search(r'(.*?)', signal_text, re.DOTALL) stop_loss_match = re.search(r'(.*?)', signal_text, re.DOTALL) take_profit_match = re.search(r'(.*?)', signal_text, re.DOTALL) duration_match = re.search(r'(.*?)', signal_text, re.DOTALL) reason_match = re.search(r'(.*?)', signal_text, re.DOTALL) # Create signal dictionary signal_data = { "pair": pair_match.group(1).strip() if pair_match else "", "timeframe": "15min", # Default timeframe "type": type_match.group(1).strip() if type_match else "", "entry": entry_match.group(1).strip() if entry_match else "", "stop_loss": stop_loss_match.group(1).strip() if stop_loss_match else "", "take_profit": take_profit_match.group(1).strip() if take_profit_match else "", "duration": duration_match.group(1).strip() if duration_match else "1-3 ساعات", "reason": reason_match.group(1).strip() if reason_match else "تم التحليل بواسطة النظام الآلي", "status": "starting" } return signal_data def check_if_pairs_exist_in_signals(pairs_list): """Check if a list of pairs already exists in the signals database""" signals_response = signals.fetch_json_from_github() if not signals_response["success"]: logger.error(f"Error fetching signals data: {signals_response['message']}") return False signals_data = signals_response["data"] # Defensive check - ensure signals_data is a list if not isinstance(signals_data, list): logger.warning(f"signals_data is not a list (type: {type(signals_data)}), converting to empty list") signals_data = [] # Convert pairs_list to a sorted JSON string for consistent comparison pairs_json = json.dumps(sorted(pairs_list), ensure_ascii=False) # Check if pairs list exists for signal in signals_data: if "pairs" in signal and json.dumps(sorted(signal.get("pairs", [])), ensure_ascii=False) == pairs_json: return True return False def update_signals_file(signal_data, pairs_list): """Update signals JSON file with new signal data including pairs list""" # Fetch current signals data signals_response = signals.fetch_json_from_github() if not signals_response["success"]: logger.error(f"Error fetching signals data: {signals_response['message']}") return False signals_data = signals_response["data"] # Defensive check - ensure signals_data is a list if not isinstance(signals_data, list): logger.warning(f"signals_data is not a list (type: {type(signals_data)}), converting to empty list") signals_data = [] # Add pairs list to the signal data signal_data["pairs"] = pairs_list # Add new signal signals_data.append(signal_data) # Get authenticity token and commit OID for signals file auth_token, commit_oid = signals.fetch_authenticity_token_and_commit_oid() if not auth_token or not commit_oid: logger.error("Failed to get authenticity token or commit OID for signals file") return False # Update the signals file - using json.dumps with separators to ensure a single line update_response = signals.update_user_json_file( auth_token, commit_oid, json.dumps(signals_data, ensure_ascii=False, separators=(',', ':')) ) return update_response["success"] def remove_group_from_deeper(group_key): """Remove analyzed group from deeper.json file""" # Fetch current deeper data deeper_response = deeper.fetch_json_from_github() if not deeper_response["success"]: logger.error(f"Error fetching deeper data: {deeper_response['message']}") return False deeper_data = deeper_response["data"] # Remove the group from forwards if it exists if group_key in deeper_data.get("forwards", {}): del deeper_data["forwards"][group_key] # Get authenticity token and commit OID for deeper file auth_token, commit_oid = deeper.fetch_authenticity_token_and_commit_oid() if not auth_token or not commit_oid: logger.error("Failed to get authenticity token or commit OID for deeper file") return False # Update the deeper file - using json.dumps with separators to ensure a single line update_response = deeper.update_user_json_file( auth_token, commit_oid, json.dumps(deeper_data, ensure_ascii=False, separators=(',', ':')) ) return update_response["success"] def format_telegram_message(signal_data): """تنسيق إشارة الفوركس للإرسال عبر تيليجرام - بصيغة احترافية""" message = "🔔 إشارة فوركس جديدة 🔔\n\n" message += f"🔹 الزوج: {signal_data['pair']}\n" message += f"📊 النوع: {signal_data['type']}\n" message += f"🎯 الد��ول: {signal_data['entry']}\n" message += f"🛡️ وقف الخسارة: {signal_data['stop_loss']}\n" message += f"💰 الهدف: {signal_data['take_profit']}\n" return message def analyze_forex_groups(): """Function to analyze forex groups from the deeper.json file""" logger.info("Starting forex group analysis cycle") try: # Fetch data from deeper.json deeper_response = deeper.fetch_json_from_github() if not deeper_response["success"]: logger.error(f"Error fetching deeper data: {deeper_response['message']}") return deeper_data = deeper_response["data"] # Check if system is enabled if not deeper_data.get("status", False): logger.info("System is currently turned OFF. Please turn it ON to continue.") return # Get active signals to pass to AI active_deals = get_active_signals() logger.info(f"Active deals: {active_deals}") # Process each forex group in the forwards section for group_key, group_data in deeper_data.get("forwards", {}).items(): pairs = group_data.get("pairs", []) message = group_data.get("message", "") if not pairs: logger.warning(f"Group {group_key} has no pairs. Skipping.") continue logger.info(f"Analyzing group {group_key} with pairs: {', '.join(pairs)}") # Check if this group already exists in signals database if check_if_pairs_exist_in_signals(pairs): logger.info(f"Signal for group {group_key} already exists in database. Skipping analysis.") # Remove the group from deeper.json since we're not processing it if remove_group_from_deeper(group_key): logger.info(f"Group {group_key} removed from deeper.json successfully") else: logger.error(f"Failed to remove group {group_key} from deeper.json") continue # Call AI to analyze the forex pairs with active deals ai_response = analyze_forex_pairs(pairs, message, active_deals) # Check if the AI response contains a signal signal_data = extract_signal_from_ai_response(ai_response) if signal_data: logger.info(f"Signal detected for group {group_key}") # Update signals file with the new signal, including pairs list if update_signals_file(signal_data, pairs): logger.info(f"Signal for group {group_key} saved successfully") # Format message and send via API instead of directly to Telegram formatted_message = format_telegram_message(signal_data) logger.info("Attempting to send message via API...") api_response = send_message_to_api(formatted_message) if api_response["success"]: logger.info(f"Message for group {group_key} sent successfully via API") else: logger.error(f"Failed to send message for group {group_key} via API. Error: {api_response.get('error')}") # Remove the group from deeper.json if remove_group_from_deeper(group_key): logger.info(f"Group {group_key} removed from deeper.json successfully") else: logger.error(f"Failed to remove group {group_key} from deeper.json") else: logger.error(f"Failed to save signal for group {group_key}") else: logger.info(f"No signal detected in AI response for group {group_key}") logger.info("Analysis cycle completed successfully") except Exception as e: logger.error(f"Error in analyze_forex_groups: {e}", exc_info=True) # Flask routes @app.route('/') def health_check(): """Health check endpoint to verify the service is running""" return jsonify({ "status": "running", "message": "Forex Analysis System is active" }) @app.route('/analyze/now') def trigger_analysis(): """Endpoint to manually trigger analysis""" try: analyze_forex_groups() return jsonify({ "status": "success", "message": "Analysis triggered successfully" }) except Exception as e: logger.error(f"Error triggering analysis: {e}", exc_info=True) return jsonify({ "status": "error", "message": f"Error triggering analysis: {str(e)}" }), 500 @app.route('/status') def get_status(): """Endpoint to get system status""" try: deeper_response = deeper.fetch_json_from_github() if deeper_response["success"]: system_status = deeper_response["data"].get("status", False) return jsonify({ "system_enabled": system_status, "service_status": "running" }) else: return jsonify({ "service_status": "running", "error": deeper_response["message"] }) except Exception as e: logger.error(f"Error getting status: {e}", exc_info=True) return jsonify({ "service_status": "running", "error": str(e) }) def schedule_candle_analysis(): """Schedule analysis at candle close times (00, 15, 30, 45)""" now = datetime.now() current_minute = now.minute # Calculate the next 15-minute mark if current_minute < 17: next_minute = 17 elif current_minute < 32: next_minute = 32 elif current_minute < 47: next_minute = 47 else: next_minute = 0 # Next hour # Calculate the target time if next_minute == 0: target_time = datetime(now.year, now.month, now.day, now.hour, 0, 0) + timedelta(hours=1) else: target_time = datetime(now.year, now.month, now.day, now.hour, next_minute, 0) # Calculate seconds until next run time_diff = (target_time - now).total_seconds() logger.info(f"Scheduling analysis to run at {target_time.strftime('%H:%M:%S')} (in {time_diff:.1f} seconds)") # Schedule the job scheduler.add_job( run_analysis_and_reschedule, 'date', run_date=target_time, id='candle_analysis' ) def run_analysis_and_reschedule(): """Run the analysis and schedule the next run""" logger.info("Running scheduled analysis at candle close") try: analyze_forex_groups() except Exception as e: logger.error(f"Error in scheduled analysis: {e}", exc_info=True) finally: # Always reschedule the next run schedule_candle_analysis() # Initialize scheduler scheduler = BackgroundScheduler() def start_scheduler(): """Start the scheduler with the analysis job at candle close times""" logger.info("Starting scheduler for forex analysis at candle close times (00, 15, 30, 45)") # Schedule the first analysis schedule_candle_analysis() # Start the scheduler if it's not already running if not scheduler.running: scheduler.start() logger.info("Scheduler started successfully") if __name__ == "__main__": logger.info("Starting Forex Analysis System...") # Start the scheduler start_scheduler() # Start the Flask application app.run(host='0.0.0.0', port=7860, debug=False)