|
import requests |
|
import json |
|
import time |
|
import re |
|
import logging |
|
from datetime import datetime, timedelta |
|
from flask import Flask, jsonify |
|
from apscheduler.schedulers.background import BackgroundScheduler |
|
from apscheduler.triggers.cron import CronTrigger |
|
from db.signals import fetch_json_from_github, update_user_json_file, fetch_authenticity_token_and_commit_oid |
|
from ai import analyze_forex_pairs |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
MESSAGE_API_URL = "https://aoamrnuwara.pythonanywhere.com/api/send-message" |
|
MESSAGE_API_KEY = "Seakp0683asppoit" |
|
|
|
def send_message_to_api(message): |
|
"""Send a message via the message forwarding API.""" |
|
headers = { |
|
"Content-Type": "application/json", |
|
"X-API-Key": MESSAGE_API_KEY |
|
} |
|
|
|
payload = { |
|
"message": message |
|
} |
|
|
|
try: |
|
response = requests.post(MESSAGE_API_URL, headers=headers, data=json.dumps(payload)) |
|
response.raise_for_status() |
|
logger.info(f"Message sent to API successfully. Status Code: {response.status_code}") |
|
return {"success": True, "response": response.json()} |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"API request error: {e}") |
|
return {"success": False, "error": str(e)} |
|
except Exception as e: |
|
logger.error(f"Unexpected error sending message to API: {e}") |
|
return {"success": False, "error": str(e)} |
|
|
|
def send_to_api(pair_name, message): |
|
"""Send the message to the external API.""" |
|
url = "https://forextrade-app.hf.space/api/message" |
|
data = { |
|
"pair_name": pair_name, |
|
"message": message |
|
} |
|
|
|
headers = { |
|
"Content-Type": "application/json" |
|
} |
|
|
|
try: |
|
response = requests.post(url, json=data, headers=headers, timeout=30) |
|
response.raise_for_status() |
|
return {"success": True, "response": response.json()} |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"API request error: {e}") |
|
return {"success": False, "error": str(e)} |
|
except Exception as e: |
|
logger.error(f"Unexpected error sending to API: {e}") |
|
return {"success": False, "error": str(e)} |
|
|
|
def extract_message_content(analysis_text, pair_name): |
|
"""Extract message, reason, prediction, and advice from the analysis text.""" |
|
try: |
|
|
|
message_match = re.search(r'<send_message>\s*<message>(.*?)</message>\s*<reason>(.*?)</reason>\s*<Prediction>(.*?)</Prediction>\s*<Advice>(.*?)</Advice>\s*</send_message>', |
|
analysis_text, re.DOTALL) |
|
|
|
if message_match: |
|
message = message_match.group(1).strip() |
|
reason = message_match.group(2).strip() |
|
prediction = message_match.group(3).strip() |
|
advice = message_match.group(4).strip() |
|
|
|
return { |
|
"type": "regular", |
|
"message": message, |
|
"reason": reason, |
|
"prediction": prediction, |
|
"advice": advice, |
|
"full_message": f"🔹 {pair_name} 🔹\n\n💬 الرسالة: {message}\n\n📝 النصيحة: {advice}" |
|
} |
|
|
|
|
|
close_match = re.search(r'<close_deal>(.*?)</close_deal>', analysis_text, re.DOTALL) |
|
if close_match: |
|
close_reason = close_match.group(1).strip() |
|
return { |
|
"type": "close", |
|
"reason": close_reason, |
|
"full_message": f"🔴 إغلاق صفقة 🔴\n\n🔹 {pair_name} 🔹\n\n⛔️ تم إغلاق الصفقة\n\n🧠 السبب: {close_reason}" |
|
} |
|
|
|
return None |
|
except Exception as e: |
|
logger.error(f"Error extracting message content: {e}") |
|
return None |
|
|
|
def extract_all_messages(analysis_text): |
|
"""Extract all messages for different pairs from the analysis text.""" |
|
try: |
|
|
|
pair_sections = re.findall(r'🔹 ([A-Z]+) 🔹(.*?)(?=🔹 [A-Z]+ 🔹|$)', analysis_text, re.DOTALL) |
|
|
|
if pair_sections: |
|
results = {} |
|
for pair, content in pair_sections: |
|
|
|
message_match = re.search(r'💬 الرسالة: (.*?)(?=🧠 السبب:|$)', content, re.DOTALL) |
|
reason_match = re.search(r'🧠 السبب: (.*?)(?=🔮 التوقع:|$)', content, re.DOTALL) |
|
prediction_match = re.search(r'🔮 التوقع: (.*?)(?=📝 النصيحة:|$)', content, re.DOTALL) |
|
advice_match = re.search(r'📝 النصيحة: (.*?)$', content, re.DOTALL) |
|
|
|
close_match = re.search(r'⛔️ تم إغلاق الصفقة\s*🧠 السبب: (.*?)$', content, re.DOTALL) |
|
|
|
if message_match and reason_match and prediction_match and advice_match: |
|
message = message_match.group(1).strip() |
|
reason = reason_match.group(1).strip() |
|
prediction = prediction_match.group(1).strip() |
|
advice = advice_match.group(1).strip() |
|
|
|
results[pair] = { |
|
"type": "regular", |
|
"message": message, |
|
"reason": reason, |
|
"prediction": prediction, |
|
"advice": advice, |
|
"full_message": f"🔹 {pair} 🔹\n\n💬 الرسالة: {message}\n\n📝 النصيحة: {advice}" |
|
} |
|
elif close_match: |
|
close_reason = close_match.group(1).strip() |
|
results[pair] = { |
|
"type": "close", |
|
"reason": close_reason, |
|
"full_message": f"🔴 إغلاق صفقة 🔴\n\n🔹 {pair} 🔹\n\n⛔️ تم إغلاق الصفقة\n\n🧠 السبب: {close_reason}" |
|
} |
|
|
|
return results |
|
else: |
|
|
|
return None |
|
except Exception as e: |
|
logger.error(f"Error extracting all messages: {e}") |
|
return None |
|
|
|
def format_deal_details(signal): |
|
"""Format the signal details for the AI analysis.""" |
|
details = f"زوج: {signal.get('pair')}, " |
|
details += f"نوع: {signal.get('type')}, " |
|
details += f"سعر الدخول: {signal.get('entry')}, " |
|
details += f"وقف الخسارة: {signal.get('stop_loss')}, " |
|
details += f"هدف الربح: {signal.get('take_profit')}, " |
|
details += f"المدة: {signal.get('duration')}, " |
|
details += f"السبب: {signal.get('reason')}" |
|
|
|
return details |
|
|
|
def check_and_process_signals(): |
|
"""Check for signals and process them.""" |
|
current_time = datetime.now() |
|
logger.info(f"Processing signals at candle close: {current_time.strftime('%Y-%m-%d %H:%M:%S')}") |
|
|
|
try: |
|
|
|
result = fetch_json_from_github() |
|
|
|
if not result["success"]: |
|
logger.error(f"Error fetching signals: {result['message']}") |
|
return |
|
|
|
signals = result["data"] |
|
updated = False |
|
signals_to_remove = [] |
|
|
|
for i, signal in enumerate(signals): |
|
|
|
if signal.get("status") == "starting": |
|
pairs = signal.get("pairs", [signal.get("pair")]) |
|
main_pair = signal.get("pair") |
|
logger.info(f"Processing signal for main pair {main_pair} with group: {pairs}") |
|
|
|
try: |
|
|
|
deal_details = format_deal_details(signal) |
|
|
|
|
|
analysis = analyze_forex_pairs(pairs, deal_details) |
|
|
|
|
|
all_messages = extract_all_messages(analysis) |
|
|
|
if all_messages: |
|
|
|
close_signal = False |
|
|
|
for pair, message_data in all_messages.items(): |
|
|
|
api_msg_result = send_message_to_api(message_data["full_message"]) |
|
if api_msg_result["success"]: |
|
logger.info(f"Message sent via API for {pair}") |
|
else: |
|
logger.error(f"Failed to send message via API: {api_msg_result.get('error')}") |
|
|
|
|
|
if message_data["type"] == "regular": |
|
|
|
api_result = send_to_api(pair, message_data["message"]) |
|
if api_result["success"]: |
|
logger.info(f"API message sent for {pair}") |
|
else: |
|
logger.error(f"Failed to send to API: {api_result['error']}") |
|
elif message_data["type"] == "close" and pair == main_pair: |
|
|
|
close_signal = True |
|
|
|
if close_signal: |
|
signals_to_remove.append(i) |
|
updated = True |
|
logger.info(f"Signal for {main_pair} marked for removal due to close deal") |
|
else: |
|
|
|
if "<send_message>" in analysis or "<close_deal>" in analysis: |
|
message_data = extract_message_content(analysis, main_pair) |
|
|
|
if message_data: |
|
|
|
api_msg_result = send_message_to_api(message_data["full_message"]) |
|
if api_msg_result["success"]: |
|
logger.info(f"Message sent via API for {main_pair}") |
|
else: |
|
logger.error(f"Failed to send message via API: {api_msg_result.get('error')}") |
|
|
|
|
|
if message_data["type"] == "regular": |
|
|
|
api_result = send_to_api(main_pair, message_data["message"]) |
|
if api_result["success"]: |
|
logger.info(f"API message sent for {main_pair}") |
|
else: |
|
logger.error(f"Failed to send to API: {api_result['error']}") |
|
elif message_data["type"] == "close": |
|
|
|
signals_to_remove.append(i) |
|
updated = True |
|
logger.info(f"Signal for {main_pair} marked for removal due to close deal") |
|
else: |
|
logger.warning(f"Could not extract message content for {main_pair}") |
|
else: |
|
logger.info(f"No message to send for {main_pair}") |
|
except Exception as e: |
|
logger.error(f"Error processing signal for {main_pair}: {e}", exc_info=True) |
|
|
|
|
|
|
|
for index in sorted(signals_to_remove, reverse=True): |
|
signals.pop(index) |
|
|
|
|
|
if updated: |
|
try: |
|
|
|
updated_content = json.dumps(signals, separators=(',', ':')) |
|
|
|
|
|
authenticity_token, commit_oid = fetch_authenticity_token_and_commit_oid() |
|
|
|
if authenticity_token and commit_oid: |
|
|
|
update_result = update_user_json_file(authenticity_token, commit_oid, updated_content) |
|
if update_result["success"]: |
|
logger.info("GitHub signals file updated successfully") |
|
else: |
|
logger.error(f"Failed to update GitHub file: {update_result['message']}") |
|
else: |
|
logger.error("Failed to get authenticity token and commit OID") |
|
except Exception as e: |
|
logger.error(f"Error updating GitHub file: {e}", exc_info=True) |
|
else: |
|
logger.info("No signals needed updating") |
|
|
|
except Exception as e: |
|
logger.error(f"Unexpected error in check_and_process_signals: {e}", exc_info=True) |
|
|
|
|
|
@app.route('/') |
|
def health_check(): |
|
"""Health check endpoint to verify the service is running""" |
|
return jsonify({ |
|
"status": "running", |
|
"message": "Forex Signal Processing System is active", |
|
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
}) |
|
|
|
@app.route('/process/now') |
|
def trigger_processing(): |
|
"""Endpoint to manually trigger signal processing""" |
|
try: |
|
check_and_process_signals() |
|
return jsonify({ |
|
"status": "success", |
|
"message": "Signal processing triggered successfully", |
|
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
}) |
|
except Exception as e: |
|
logger.error(f"Error triggering signal processing: {e}", exc_info=True) |
|
return jsonify({ |
|
"status": "error", |
|
"message": f"Error triggering signal processing: {str(e)}" |
|
}), 500 |
|
|
|
|
|
scheduler = BackgroundScheduler(daemon=True) |
|
|
|
def add_candle_close_jobs(): |
|
"""Add jobs to run at 5-minute candle closes""" |
|
logger.info("Setting up scheduler jobs for candle closes") |
|
|
|
|
|
scheduler.add_job( |
|
check_and_process_signals, |
|
CronTrigger(minute='0,5,10,15,20,25,30,35,40,45,50,55'), |
|
id='candle_close_processing', |
|
replace_existing=True, |
|
) |
|
|
|
|
|
now = datetime.now() |
|
|
|
current_minute = now.minute |
|
next_5min = ((current_minute // 5) + 1) * 5 |
|
if next_5min >= 60: |
|
next_5min = 0 |
|
|
|
next_run = now.replace(minute=next_5min, second=5, microsecond=0) |
|
if next_run <= now: |
|
next_run = next_run + timedelta(hours=1) |
|
|
|
time_to_next = (next_run - now).total_seconds() |
|
|
|
logger.info(f"Next candle close processing scheduled for: {next_run.strftime('%Y-%m-%d %H:%M:%S')} " |
|
f"(in {time_to_next:.2f} seconds)") |
|
|
|
|
|
scheduler.add_job( |
|
check_and_process_signals, |
|
'date', |
|
run_date=datetime.now() + timedelta(seconds=10), |
|
id='initial_check', |
|
replace_existing=True |
|
) |
|
|
|
def start_scheduler(): |
|
"""Start the scheduler with the candle close processing jobs""" |
|
logger.info("Starting scheduler for candle close signal processing") |
|
|
|
|
|
add_candle_close_jobs() |
|
|
|
|
|
if not scheduler.running: |
|
scheduler.start() |
|
logger.info("Scheduler started successfully") |
|
|
|
if __name__ == "__main__": |
|
logger.info("Starting Forex Signal Processing System...") |
|
|
|
|
|
start_scheduler() |
|
|
|
|
|
app.run(host='0.0.0.0', port=7860, debug=False) |