File size: 5,034 Bytes
8c94afd
b70d3fd
 
 
 
8c94afd
b70d3fd
e35b779
 
 
8c94afd
b70d3fd
 
 
8c94afd
e35b779
 
 
 
b70d3fd
e35b779
b70d3fd
e35b779
b70d3fd
 
 
e35b779
b70d3fd
8c94afd
e35b779
 
 
 
b70d3fd
 
e35b779
b70d3fd
 
 
e35b779
b70d3fd
8c94afd
e35b779
 
 
b70d3fd
 
8c94afd
e35b779
8c94afd
b70d3fd
8c94afd
e35b779
b70d3fd
8c94afd
e35b779
 
 
b70d3fd
 
 
 
 
 
 
e35b779
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# genesis/funding.py
"""
Global Funding Intelligence for GENESIS-AI
Fetches worldwide biotech & synthetic biology grant opportunities.
"""

import requests
import logging

logging.basicConfig(level=logging.INFO)

NIH_FUNDING_API = "https://api.reporter.nih.gov/v2/projects/search"
EU_CORDIS_API = "https://cordis.europa.eu/api/project"  # Horizon Europe
WELLCOME_TRUST_API = "https://api.wellcome.org/grants"  # Example placeholder

# -----------------------
# NIH Funding
# -----------------------
def get_nih_funding(query="synthetic biology", limit=10):
    """Fetch NIH funding opportunities."""
    payload = {"criteria": {"textCriteria": {"searchText": query}}, "offset": 0, "limit": limit}
    try:
        r = requests.post(NIH_FUNDING_API, json=payload, timeout=30)
        r.raise_for_status()
        return r.json()
    except Exception as e:
        logging.error(f"[Funding] NIH API error: {e}")
        return {"error": str(e)}

# -----------------------
# EU Horizon Europe / CORDIS Funding
# -----------------------
def get_eu_funding(query="synthetic biology", limit=10):
    """Fetch Horizon Europe / EU CORDIS funding calls."""
    try:
        r = requests.get(f"{EU_CORDIS_API}?q={query}&num={limit}", timeout=30)
        r.raise_for_status()
        return r.json()
    except Exception as e:
        logging.error(f"[Funding] EU CORDIS API error: {e}")
        return {"error": str(e)}

# -----------------------
# Wellcome Trust Grants
# -----------------------
def get_wellcome_funding(query="synthetic biology"):
    """Fetch Wellcome Trust grants."""
    try:
        r = requests.get(f"{WELLCOME_TRUST_API}?query={query}", timeout=30)
        r.raise_for_status()
        return r.json()
    except Exception as e:
        logging.error(f"[Funding] Wellcome Trust API error: {e}")
        return {"error": str(e)}

# -----------------------
# Aggregator
# -----------------------
def get_global_funding(query="synthetic biology"):
    """Aggregate funding from all major sources."""
    return {
        "NIH": get_nih_funding(query),
        "EU_CORDIS": get_eu_funding(query),
        "Wellcome_Trust": get_wellcome_funding(query)
    }

# -----------------------
# Pipeline-Compatible Function
# -----------------------
def fetch_funding_data(query, max_results=10):
    """
    Pipeline-friendly function that returns a list of funding entries.
    Falls back to mock data if APIs fail.
    """
    logging.info(f"[Funding] Fetching global funding data for: {query}")
    try:
        results = []
        
        nih_data = get_nih_funding(query, limit=max_results)
        if isinstance(nih_data, dict) and "projects" in nih_data:
            for proj in nih_data.get("projects", []):
                results.append({
                    "source": "NIH",
                    "title": proj.get("projectTitle", "No title"),
                    "amount": proj.get("awardAmount", "Unknown"),
                    "date": proj.get("projectStartDate", ""),
                    "link": f"https://reporter.nih.gov/project-details/{proj.get('projectNumber', '')}"
                })

        eu_data = get_eu_funding(query, limit=max_results)
        if isinstance(eu_data, dict) and "projects" in eu_data:
            for proj in eu_data.get("projects", []):
                results.append({
                    "source": "EU CORDIS",
                    "title": proj.get("title", "No title"),
                    "amount": proj.get("totalCost", "Unknown"),
                    "date": proj.get("startDate", ""),
                    "link": proj.get("rcn", "")
                })

        wellcome_data = get_wellcome_funding(query)
        if isinstance(wellcome_data, dict) and "grants" in wellcome_data:
            for grant in wellcome_data.get("grants", []):
                results.append({
                    "source": "Wellcome Trust",
                    "title": grant.get("title", "No title"),
                    "amount": grant.get("amountAwarded", "Unknown"),
                    "date": grant.get("awardDate", ""),
                    "link": grant.get("url", "")
                })

        # If no real data found, return mock data
        if not results:
            logging.warning("[Funding] No live API results β€” returning mock data.")
            results = [
                {
                    "company": "Example Biotech Inc.",
                    "amount": "$5M",
                    "date": "2024-05-10",
                    "source": "Mock Funding API",
                    "link": "https://example.com/funding/123"
                },
                {
                    "company": "Synthetic Bio Solutions",
                    "amount": "$2.5M",
                    "date": "2023-12-20",
                    "source": "Mock Funding API",
                    "link": "https://example.com/funding/456"
                }
            ]

        return results

    except Exception as e:
        logging.error(f"[Funding] fetch_funding_data failed: {e}")
        return []