import os import textwrap from datetime import date, datetime, timedelta import gradio as gr import pandas as pd from apscheduler.schedulers.background import BackgroundScheduler from datasets import Dataset from dotenv import load_dotenv from huggingface_hub import HfApi from predibench.agent.agent import run_smolagent from predibench.polymarket_api import ( MarketRequest, filter_interesting_questions, filter_out_resolved_markets, get_open_markets, ) load_dotenv() # Configuration WEEKLY_MARKETS_REPO = "m-ric/predibench-weekly-markets" AGENT_CHOICES_REPO = "m-ric/predibench-agent-choices" N_MARKETS = 10 def restart_space(): """Restart the HuggingFace space""" try: HfApi(token=os.getenv("HF_TOKEN", None)).restart_space( repo_id="m-ric/predibench-backend" # Update with your actual space repo ) print(f"Space restarted on {datetime.now()}") except Exception as e: print(f"Failed to restart space: {e}") def get_top_polymarket_questions(n_markets: int = 10) -> list: """Fetch top questions from Polymarket API""" end_date = date.today() + timedelta(days=7) request = MarketRequest( limit=n_markets * 10, active=True, closed=False, order="volumeNum", ascending=False, end_date_min=end_date, end_date_max=end_date + timedelta(days=21), ) markets = get_open_markets(request) markets = filter_out_resolved_markets(markets) # Filter for interesting questions interesting_questions = filter_interesting_questions( [market.question for market in markets] ) markets = [market for market in markets if market.question in interesting_questions] return markets[:n_markets] def upload_weekly_markets(markets: list): """Upload weekly markets to HuggingFace dataset""" markets_data = [] for market in markets: markets_data.append( { "id": market.id, "question": market.question, "slug": market.slug, "description": market.description, "end_date": market.end_date, "active": market.active, "closed": market.closed, "volume": market.volume, "liquidity": market.liquidity, "week_selected": date.today().strftime("%Y-%m-%d"), "timestamp": datetime.now(), } ) df = pd.DataFrame(markets_data) dataset = Dataset.from_pandas(df) try: # Try to load existing dataset and append existing_dataset = Dataset.from_parquet(f"hf://datasets/{WEEKLY_MARKETS_REPO}") combined_dataset = existing_dataset.concatenate(dataset) combined_dataset.push_to_hub(WEEKLY_MARKETS_REPO, private=False) except: # If no existing dataset, create new one dataset.push_to_hub(WEEKLY_MARKETS_REPO, private=False) print(f"Uploaded {len(markets_data)} markets to {WEEKLY_MARKETS_REPO}") def run_agent_decisions(markets: list, model_ids: list = None): """Run agent decision-making logic on the selected markets""" if model_ids is None: model_ids = [ "gpt-4o", "gpt-4.1", "anthropic/claude-sonnet-4-20250514", "huggingface/Qwen/Qwen3-30B-A3B-Instruct-2507", ] today = date.today() decisions = [] for model_id in model_ids: for market in markets: # Create the investment decision prompt full_question = textwrap.dedent( f"""Let's say we are the {today.strftime("%B %d, %Y")}. Please answer the below question by yes or no. But first, run a detailed analysis. Here is the question: {market.question} More details: {market.description} Invest in yes only if you think the yes is underrated, and invest in no only if you think that the yes is overrated. What would you decide: buy yes, buy no, or do nothing? """ ) try: response = run_smolagent(model_id, full_question, cutoff_date=today) choice = response.output.lower() # Standardize choice format if "yes" in choice: choice_standardized = 1 choice_raw = "yes" elif "no" in choice: choice_standardized = -1 choice_raw = "no" else: choice_standardized = 0 choice_raw = "nothing" decisions.append( { "agent_name": f"smolagent_{model_id}".replace("/", "--"), "date": today, "question": market.question, "question_id": market.id, "choice": choice_standardized, "choice_raw": choice_raw, "messages_count": len(response.messages), "has_reasoning": len(response.messages) > 0, "timestamp": datetime.now(), } ) print(f"Completed decision for {model_id} on {market.question[:50]}...") except Exception as e: print(f"Error processing {model_id} on {market.question[:50]}: {e}") continue return decisions def upload_agent_choices(decisions: list): """Upload agent choices to HuggingFace dataset""" df = pd.DataFrame(decisions) dataset = Dataset.from_pandas(df) try: # Try to load existing dataset and append existing_dataset = Dataset.from_parquet(f"hf://datasets/{AGENT_CHOICES_REPO}") combined_dataset = existing_dataset.concatenate(dataset) combined_dataset.push_to_hub(AGENT_CHOICES_REPO, private=False) except: # If no existing dataset, create new one dataset.push_to_hub(AGENT_CHOICES_REPO, private=False) print(f"Uploaded {len(decisions)} agent decisions to {AGENT_CHOICES_REPO}") def weekly_pipeline(): """Main weekly pipeline that runs every Sunday""" print(f"Starting weekly pipeline at {datetime.now()}") try: # 1. Get top 10 questions from Polymarket markets = get_top_polymarket_questions(N_MARKETS) print(f"Retrieved {len(markets)} markets from Polymarket") # 2. Upload to weekly markets dataset upload_weekly_markets(markets) # 3. Run agent decision-making decisions = run_agent_decisions(markets) print(f"Generated {len(decisions)} agent decisions") # 4. Upload agent choices upload_agent_choices(decisions) print("Weekly pipeline completed successfully") except Exception as e: print(f"Weekly pipeline failed: {e}") # Set up scheduler to run every Sunday at 8:30 AM scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "cron", day_of_week="sun", hour=8, minute=0) scheduler.add_job(weekly_pipeline, "cron", day_of_week="sun", hour=8, minute=30) scheduler.start() # Simple Gradio interface for monitoring def get_status(): """Get current status of the backend""" try: # Check if datasets exist and get their info api = HfApi() weekly_info = "Not available" choices_info = "Not available" try: weekly_dataset = Dataset.from_parquet( f"hf://datasets/{WEEKLY_MARKETS_REPO}" ) weekly_info = f"{len(weekly_dataset)} markets" except: pass try: choices_dataset = Dataset.from_parquet( f"hf://datasets/{AGENT_CHOICES_REPO}" ) choices_info = f"{len(choices_dataset)} decisions" except: pass return f""" Backend Status (Last updated: {datetime.now()}) Weekly Markets Dataset: {weekly_info} Agent Choices Dataset: {choices_info} Next scheduled run: Next Sunday at 8:30 AM UTC """ except Exception as e: return f"Error getting status: {e}" def manual_run(): """Manually trigger the weekly pipeline""" weekly_pipeline() return "Manual pipeline run completed. Check logs for details." # Create Gradio interface with gr.Blocks(title="PrediBench Backend") as demo: gr.Markdown("# PrediBench Backend Monitor") gr.Markdown( "This backend automatically fetches new Polymarket questions every Sunday and runs agent predictions." ) with gr.Row(): status_text = gr.Textbox(label="Status", value=get_status(), lines=10) refresh_btn = gr.Button("Refresh Status") refresh_btn.click(get_status, outputs=status_text) with gr.Row(): manual_btn = gr.Button("Run Manual Pipeline", variant="primary") manual_output = gr.Textbox(label="Manual Run Output", lines=5) manual_btn.click(manual_run, outputs=manual_output) if __name__ == "__main__": print("Starting PrediBench backend...") demo.launch(server_name="0.0.0.0", server_port=7860)