|
import time |
|
import json |
|
import requests |
|
import re |
|
import logging |
|
from flask import Flask, jsonify |
|
from apscheduler.schedulers.background import BackgroundScheduler |
|
from datetime import datetime, timedelta |
|
from db import deeper, signals |
|
from ai import analyze_forex_pairs |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
MESSAGE_API_URL = "https://aoamrnuwara.pythonanywhere.com/api/send-message" |
|
MESSAGE_API_KEY = "Seakp0683asppoit" |
|
|
|
def send_message_to_api(message): |
|
"""Send a message via the message forwarding API.""" |
|
headers = { |
|
"Content-Type": "application/json", |
|
"X-API-Key": MESSAGE_API_KEY |
|
} |
|
|
|
payload = { |
|
"message": message |
|
} |
|
|
|
try: |
|
logger.info("Sending message via API") |
|
response = requests.post(MESSAGE_API_URL, headers=headers, data=json.dumps(payload)) |
|
response.raise_for_status() |
|
response_json = response.json() |
|
logger.info(f"Message sent to API successfully. Status Code: {response.status_code}") |
|
return {"success": True, "response": response_json} |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"API request error: {e}", exc_info=True) |
|
return {"success": False, "error": str(e)} |
|
except Exception as e: |
|
logger.error(f"Unexpected error sending message to API: {e}", exc_info=True) |
|
return {"success": False, "error": str(e)} |
|
|
|
def get_active_signals(): |
|
"""Get all active signals from the signals database""" |
|
signals_response = signals.fetch_json_from_github() |
|
|
|
if not signals_response["success"]: |
|
logger.error(f"Error fetching signals data: {signals_response['message']}") |
|
return "لا توجد صفقات نشطة حالياً." |
|
|
|
signals_data = signals_response["data"] |
|
|
|
|
|
if not isinstance(signals_data, list) or len(signals_data) == 0: |
|
logger.warning("No active signals found in database") |
|
return "لا توجد صفقات نشطة حالياً." |
|
|
|
deals_string = [] |
|
for signal in signals_data: |
|
deal = f"الزوج: {signal.get('pair', '')}, " |
|
deal += f"النوع: {signal.get('type', '')}, " |
|
deal += f"سعر الدخول: {signal.get('entry', '')}, " |
|
deal += f"وقف الخسارة: {signal.get('stop_loss', '')}, " |
|
deal += f"الهدف: {signal.get('take_profit', '')}" |
|
deals_string.append(deal) |
|
|
|
if not deals_string: |
|
return "لا توجد صفقات نشطة حالياً." |
|
|
|
return "\n".join(deals_string) |
|
|
|
def extract_signal_from_ai_response(response): |
|
"""Extract signal data from AI response if present""" |
|
signal_pattern = r'<signal>(.*?)</signal>' |
|
match = re.search(signal_pattern, response, re.DOTALL) |
|
|
|
if not match: |
|
return None |
|
|
|
signal_text = match.group(1) |
|
|
|
|
|
pair_match = re.search(r'<pair>(.*?)</pair>', signal_text, re.DOTALL) |
|
type_match = re.search(r'<type>(.*?)</type>', signal_text, re.DOTALL) |
|
entry_match = re.search(r'<entry>(.*?)</entry>', signal_text, re.DOTALL) |
|
stop_loss_match = re.search(r'<stop_loss>(.*?)</stop_loss>', signal_text, re.DOTALL) |
|
take_profit_match = re.search(r'<take_profit>(.*?)</take_profit>', signal_text, re.DOTALL) |
|
duration_match = re.search(r'<duration>(.*?)</duration>', signal_text, re.DOTALL) |
|
reason_match = re.search(r'<reason>(.*?)</reason>', signal_text, re.DOTALL) |
|
|
|
|
|
signal_data = { |
|
"pair": pair_match.group(1).strip() if pair_match else "", |
|
"timeframe": "15min", |
|
"type": type_match.group(1).strip() if type_match else "", |
|
"entry": entry_match.group(1).strip() if entry_match else "", |
|
"stop_loss": stop_loss_match.group(1).strip() if stop_loss_match else "", |
|
"take_profit": take_profit_match.group(1).strip() if take_profit_match else "", |
|
"duration": duration_match.group(1).strip() if duration_match else "1-3 ساعات", |
|
"reason": reason_match.group(1).strip() if reason_match else "تم التحليل بواسطة النظام الآلي", |
|
"status": "starting" |
|
} |
|
|
|
return signal_data |
|
|
|
def check_if_pairs_exist_in_signals(pairs_list): |
|
"""Check if a list of pairs already exists in the signals database""" |
|
signals_response = signals.fetch_json_from_github() |
|
|
|
if not signals_response["success"]: |
|
logger.error(f"Error fetching signals data: {signals_response['message']}") |
|
return False |
|
|
|
signals_data = signals_response["data"] |
|
|
|
|
|
if not isinstance(signals_data, list): |
|
logger.warning(f"signals_data is not a list (type: {type(signals_data)}), converting to empty list") |
|
signals_data = [] |
|
|
|
|
|
pairs_json = json.dumps(sorted(pairs_list), ensure_ascii=False) |
|
|
|
|
|
for signal in signals_data: |
|
if "pairs" in signal and json.dumps(sorted(signal.get("pairs", [])), ensure_ascii=False) == pairs_json: |
|
return True |
|
|
|
return False |
|
|
|
def update_signals_file(signal_data, pairs_list): |
|
"""Update signals JSON file with new signal data including pairs list""" |
|
|
|
signals_response = signals.fetch_json_from_github() |
|
|
|
if not signals_response["success"]: |
|
logger.error(f"Error fetching signals data: {signals_response['message']}") |
|
return False |
|
|
|
signals_data = signals_response["data"] |
|
|
|
|
|
if not isinstance(signals_data, list): |
|
logger.warning(f"signals_data is not a list (type: {type(signals_data)}), converting to empty list") |
|
signals_data = [] |
|
|
|
|
|
signal_data["pairs"] = pairs_list |
|
|
|
|
|
signals_data.append(signal_data) |
|
|
|
|
|
auth_token, commit_oid = signals.fetch_authenticity_token_and_commit_oid() |
|
if not auth_token or not commit_oid: |
|
logger.error("Failed to get authenticity token or commit OID for signals file") |
|
return False |
|
|
|
|
|
update_response = signals.update_user_json_file( |
|
auth_token, |
|
commit_oid, |
|
json.dumps(signals_data, ensure_ascii=False, separators=(',', ':')) |
|
) |
|
|
|
return update_response["success"] |
|
|
|
def remove_group_from_deeper(group_key): |
|
"""Remove analyzed group from deeper.json file""" |
|
|
|
deeper_response = deeper.fetch_json_from_github() |
|
|
|
if not deeper_response["success"]: |
|
logger.error(f"Error fetching deeper data: {deeper_response['message']}") |
|
return False |
|
|
|
deeper_data = deeper_response["data"] |
|
|
|
|
|
if group_key in deeper_data.get("forwards", {}): |
|
del deeper_data["forwards"][group_key] |
|
|
|
|
|
auth_token, commit_oid = deeper.fetch_authenticity_token_and_commit_oid() |
|
if not auth_token or not commit_oid: |
|
logger.error("Failed to get authenticity token or commit OID for deeper file") |
|
return False |
|
|
|
|
|
update_response = deeper.update_user_json_file( |
|
auth_token, |
|
commit_oid, |
|
json.dumps(deeper_data, ensure_ascii=False, separators=(',', ':')) |
|
) |
|
|
|
return update_response["success"] |
|
|
|
def format_telegram_message(signal_data): |
|
"""تنسيق إشارة الفوركس للإرسال عبر تيليجرام - بصيغة احترافية""" |
|
message = "🔔 <b>إشارة فوركس جديدة</b> 🔔\n\n" |
|
message += f"<b>🔹 الزوج:</b> {signal_data['pair']}\n" |
|
message += f"<b>📊 النوع:</b> {signal_data['type']}\n" |
|
message += f"<b>🎯 الد��ول:</b> {signal_data['entry']}\n" |
|
message += f"<b>🛡️ وقف الخسارة:</b> {signal_data['stop_loss']}\n" |
|
message += f"<b>💰 الهدف:</b> {signal_data['take_profit']}\n" |
|
|
|
return message |
|
|
|
def analyze_forex_groups(): |
|
"""Function to analyze forex groups from the deeper.json file""" |
|
logger.info("Starting forex group analysis cycle") |
|
|
|
try: |
|
|
|
deeper_response = deeper.fetch_json_from_github() |
|
|
|
if not deeper_response["success"]: |
|
logger.error(f"Error fetching deeper data: {deeper_response['message']}") |
|
return |
|
|
|
deeper_data = deeper_response["data"] |
|
|
|
|
|
if not deeper_data.get("status", False): |
|
logger.info("System is currently turned OFF. Please turn it ON to continue.") |
|
return |
|
|
|
|
|
active_deals = get_active_signals() |
|
logger.info(f"Active deals: {active_deals}") |
|
|
|
|
|
for group_key, group_data in deeper_data.get("forwards", {}).items(): |
|
pairs = group_data.get("pairs", []) |
|
message = group_data.get("message", "") |
|
|
|
if not pairs: |
|
logger.warning(f"Group {group_key} has no pairs. Skipping.") |
|
continue |
|
|
|
logger.info(f"Analyzing group {group_key} with pairs: {', '.join(pairs)}") |
|
|
|
|
|
if check_if_pairs_exist_in_signals(pairs): |
|
logger.info(f"Signal for group {group_key} already exists in database. Skipping analysis.") |
|
|
|
|
|
if remove_group_from_deeper(group_key): |
|
logger.info(f"Group {group_key} removed from deeper.json successfully") |
|
else: |
|
logger.error(f"Failed to remove group {group_key} from deeper.json") |
|
|
|
continue |
|
|
|
|
|
ai_response = analyze_forex_pairs(pairs, message, active_deals) |
|
|
|
|
|
signal_data = extract_signal_from_ai_response(ai_response) |
|
|
|
if signal_data: |
|
logger.info(f"Signal detected for group {group_key}") |
|
|
|
|
|
if update_signals_file(signal_data, pairs): |
|
logger.info(f"Signal for group {group_key} saved successfully") |
|
|
|
|
|
formatted_message = format_telegram_message(signal_data) |
|
logger.info("Attempting to send message via API...") |
|
api_response = send_message_to_api(formatted_message) |
|
|
|
if api_response["success"]: |
|
logger.info(f"Message for group {group_key} sent successfully via API") |
|
else: |
|
logger.error(f"Failed to send message for group {group_key} via API. Error: {api_response.get('error')}") |
|
|
|
|
|
if remove_group_from_deeper(group_key): |
|
logger.info(f"Group {group_key} removed from deeper.json successfully") |
|
else: |
|
logger.error(f"Failed to remove group {group_key} from deeper.json") |
|
else: |
|
logger.error(f"Failed to save signal for group {group_key}") |
|
else: |
|
logger.info(f"No signal detected in AI response for group {group_key}") |
|
|
|
logger.info("Analysis cycle completed successfully") |
|
|
|
except Exception as e: |
|
logger.error(f"Error in analyze_forex_groups: {e}", exc_info=True) |
|
|
|
|
|
@app.route('/') |
|
def health_check(): |
|
"""Health check endpoint to verify the service is running""" |
|
return jsonify({ |
|
"status": "running", |
|
"message": "Forex Analysis System is active" |
|
}) |
|
|
|
@app.route('/analyze/now') |
|
def trigger_analysis(): |
|
"""Endpoint to manually trigger analysis""" |
|
try: |
|
analyze_forex_groups() |
|
return jsonify({ |
|
"status": "success", |
|
"message": "Analysis triggered successfully" |
|
}) |
|
except Exception as e: |
|
logger.error(f"Error triggering analysis: {e}", exc_info=True) |
|
return jsonify({ |
|
"status": "error", |
|
"message": f"Error triggering analysis: {str(e)}" |
|
}), 500 |
|
|
|
@app.route('/status') |
|
def get_status(): |
|
"""Endpoint to get system status""" |
|
try: |
|
deeper_response = deeper.fetch_json_from_github() |
|
if deeper_response["success"]: |
|
system_status = deeper_response["data"].get("status", False) |
|
return jsonify({ |
|
"system_enabled": system_status, |
|
"service_status": "running" |
|
}) |
|
else: |
|
return jsonify({ |
|
"service_status": "running", |
|
"error": deeper_response["message"] |
|
}) |
|
except Exception as e: |
|
logger.error(f"Error getting status: {e}", exc_info=True) |
|
return jsonify({ |
|
"service_status": "running", |
|
"error": str(e) |
|
}) |
|
|
|
def schedule_candle_analysis(): |
|
"""Schedule analysis at candle close times (00, 15, 30, 45)""" |
|
now = datetime.now() |
|
current_minute = now.minute |
|
|
|
|
|
if current_minute < 17: |
|
next_minute = 17 |
|
elif current_minute < 32: |
|
next_minute = 32 |
|
elif current_minute < 47: |
|
next_minute = 47 |
|
else: |
|
next_minute = 0 |
|
|
|
|
|
if next_minute == 0: |
|
target_time = datetime(now.year, now.month, now.day, now.hour, 0, 0) + timedelta(hours=1) |
|
else: |
|
target_time = datetime(now.year, now.month, now.day, now.hour, next_minute, 0) |
|
|
|
|
|
time_diff = (target_time - now).total_seconds() |
|
|
|
logger.info(f"Scheduling analysis to run at {target_time.strftime('%H:%M:%S')} (in {time_diff:.1f} seconds)") |
|
|
|
|
|
scheduler.add_job( |
|
run_analysis_and_reschedule, |
|
'date', |
|
run_date=target_time, |
|
id='candle_analysis' |
|
) |
|
|
|
def run_analysis_and_reschedule(): |
|
"""Run the analysis and schedule the next run""" |
|
logger.info("Running scheduled analysis at candle close") |
|
try: |
|
analyze_forex_groups() |
|
except Exception as e: |
|
logger.error(f"Error in scheduled analysis: {e}", exc_info=True) |
|
finally: |
|
|
|
schedule_candle_analysis() |
|
|
|
|
|
scheduler = BackgroundScheduler() |
|
|
|
def start_scheduler(): |
|
"""Start the scheduler with the analysis job at candle close times""" |
|
logger.info("Starting scheduler for forex analysis at candle close times (00, 15, 30, 45)") |
|
|
|
|
|
schedule_candle_analysis() |
|
|
|
|
|
if not scheduler.running: |
|
scheduler.start() |
|
logger.info("Scheduler started successfully") |
|
|
|
if __name__ == "__main__": |
|
logger.info("Starting Forex Analysis System...") |
|
|
|
|
|
start_scheduler() |
|
|
|
|
|
app.run(host='0.0.0.0', port=7860, debug=False) |