Spaces:
Sleeping
Sleeping
| from string import Template | |
| from pathlib import Path | |
| import sys | |
| import json | |
| import os | |
| import time | |
| from tqdm import tqdm | |
| from typing import List, Any, Optional, Union | |
| import numpy as np | |
| import pandas as pd | |
| import gc | |
| import re | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from json.decoder import JSONDecodeError | |
| CREATOR = "0x89c5cc945dd550BcFfb72Fe42BfF002429F46Fec" | |
| BATCH_SIZE = 1000 | |
| # OMEN_SUBGRAPH = "https://api.thegraph.com/subgraphs/name/protofire/omen-xdai" | |
| OMEN_SUBGRAPH_URL = Template( | |
| """https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" | |
| ) | |
| SCRIPTS_DIR = Path(__file__).parent | |
| ROOT_DIR = SCRIPTS_DIR.parent | |
| DATA_DIR = ROOT_DIR / "live_data" | |
| MAX_UINT_HEX = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" | |
| SUBGRAPH_API_KEY = os.environ.get("SUBGRAPH_API_KEY", None) | |
| def measure_execution_time(func): | |
| def wrapper(*args, **kwargs): | |
| start_time = time.time() | |
| result = func(*args, **kwargs) | |
| end_time = time.time() | |
| execution_time = end_time - start_time | |
| print(f"Execution time: {execution_time:.6f} seconds") | |
| return result | |
| return wrapper | |
| def _to_content(q: str) -> dict[str, Any]: | |
| """Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes.""" | |
| finalized_query = { | |
| "query": q, | |
| "variables": None, | |
| "extensions": {"headers": None}, | |
| } | |
| return finalized_query | |