Spaces:
Sleeping
Sleeping
from string import Template | |
from pathlib import Path | |
import sys | |
import json | |
import os | |
import time | |
from tqdm import tqdm | |
from typing import List, Any, Optional, Union | |
import numpy as np | |
import pandas as pd | |
import gc | |
import re | |
from dataclasses import dataclass | |
from enum import Enum | |
from json.decoder import JSONDecodeError | |
CREATOR = "0x89c5cc945dd550BcFfb72Fe42BfF002429F46Fec" | |
BATCH_SIZE = 1000 | |
# OMEN_SUBGRAPH = "https://api.thegraph.com/subgraphs/name/protofire/omen-xdai" | |
OMEN_SUBGRAPH_URL = Template( | |
"""https://gateway-arbitrum.network.thegraph.com/api/${subgraph_api_key}/subgraphs/id/9fUVQpFwzpdWS9bq5WkAnmKbNNcoBwatMR4yZq81pbbz""" | |
) | |
SCRIPTS_DIR = Path(__file__).parent | |
ROOT_DIR = SCRIPTS_DIR.parent | |
DATA_DIR = ROOT_DIR / "live_data" | |
MAX_UINT_HEX = "0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" | |
SUBGRAPH_API_KEY = os.environ.get("SUBGRAPH_API_KEY", None) | |
def measure_execution_time(func): | |
def wrapper(*args, **kwargs): | |
start_time = time.time() | |
result = func(*args, **kwargs) | |
end_time = time.time() | |
execution_time = end_time - start_time | |
print(f"Execution time: {execution_time:.6f} seconds") | |
return result | |
return wrapper | |
def _to_content(q: str) -> dict[str, Any]: | |
"""Convert the given query string to payload content, i.e., add it under a `queries` key and convert it to bytes.""" | |
finalized_query = { | |
"query": q, | |
"variables": None, | |
"extensions": {"headers": None}, | |
} | |
return finalized_query | |