Spaces:
Running
Running
# api_clients/openfda_client.py | |
""" | |
Client for the OpenFDA API. | |
This module specializes in fetching critical, real-world drug safety data, | |
including the most frequent adverse events and active enforcement reports (recalls). | |
""" | |
import asyncio | |
import aiohttp | |
from urllib.parse import quote | |
from .config import OPENFDA_BASE_URL, REQUEST_HEADERS | |
async def get_adverse_events(session: aiohttp.ClientSession, drug_name: str, top_n: int = 5) -> list[dict]: | |
""" | |
Finds the most frequently reported adverse events for a given drug. | |
This function uses the 'count' feature of the OpenFDA API to get a summary | |
of the most common patient reactions, which is far more efficient than | |
downloading individual reports. | |
Args: | |
session (aiohttp.ClientSession): The active HTTP session. | |
drug_name (str): The brand or generic name of the drug. | |
top_n (int): The number of top adverse events to return. | |
Returns: | |
list[dict]: A list of top adverse events, e.g., [{'term': 'Nausea', 'count': 5000}]. | |
Returns an empty list on failure or if no results are found. | |
""" | |
if not drug_name: | |
return [] | |
# OpenFDA uses Lucene query syntax. We search in both brand name and generic name fields. | |
search_query = f'(patient.drug.openfda.brand_name:"{drug_name}" OR patient.drug.openfda.generic_name:"{drug_name}")' | |
params = { | |
'search': search_query, | |
'count': 'patient.reaction.reactionmeddrapt.exact', # The field for patient reactions | |
'limit': top_n | |
} | |
url = f"{OPENFDA_BASE_URL}/drug/event.json" | |
try: | |
async with session.get(url, params=params, headers=REQUEST_HEADERS, timeout=10) as resp: | |
if resp.status == 404: # 404 means no results found for the query | |
return [] | |
resp.raise_for_status() | |
data = await resp.json() | |
return data.get('results', []) | |
except aiohttp.ClientError as e: | |
print(f"An error occurred fetching adverse events for '{drug_name}': {e}") | |
return [] | |
async def check_for_recalls(session: aiohttp.ClientSession, drug_name: str, limit: int = 3) -> list[dict]: | |
""" | |
Checks for recent, ongoing drug enforcement reports (recalls) for a given drug. | |
It prioritizes finding active and serious recalls. | |
Args: | |
session (aiohttp.ClientSession): The active HTTP session. | |
drug_name (str): The brand or generic name of the drug. | |
limit (int): The maximum number of recall reports to return. | |
Returns: | |
list[dict]: A list of recall reports, containing reason and severity. | |
Returns an empty list on failure or if no recalls are found. | |
""" | |
if not drug_name: | |
return [] | |
# We search for the drug name and filter for 'Ongoing' status to find active recalls. | |
search_query = f'"{quote(drug_name)}" AND status:Ongoing' | |
params = { | |
'search': search_query, | |
'sort': 'report_date:desc', # Get the most recent ones first | |
'limit': limit | |
} | |
url = f"{OPENFDA_BASE_URL}/drug/enforcement.json" | |
try: | |
async with session.get(url, params=params, headers=REQUEST_HEADERS, timeout=10) as resp: | |
if resp.status == 404: | |
return [] | |
resp.raise_for_status() | |
data = await resp.json() | |
results = data.get('results', []) | |
# We parse the complex result into a clean, simple structure | |
parsed_recalls = [ | |
{ | |
"reason": r.get("reason_for_recall", "N/A"), | |
"classification": r.get("classification", "N/A"), # Class I is most serious | |
"report_date": r.get("report_date", "N/A") | |
} | |
for r in results | |
] | |
return parsed_recalls | |
except aiohttp.ClientError as e: | |
print(f"An error occurred fetching recalls for '{drug_name}': {e}") | |
return [] | |
async def get_safety_profile(session: aiohttp.ClientSession, drug_name: str) -> dict: | |
""" | |
A high-level orchestrator that gathers a complete safety profile for a single drug | |
by concurrently fetching adverse events and recalls. | |
Args: | |
session (aiohttp.ClientSession): The active HTTP session. | |
drug_name (str): The drug to profile. | |
Returns: | |
dict: A dictionary containing 'adverse_events' and 'recalls' keys. | |
""" | |
# Run both API calls in parallel for maximum efficiency | |
tasks = { | |
"adverse_events": get_adverse_events(session, drug_name), | |
"recalls": check_for_recalls(session, drug_name) | |
} | |
results = await asyncio.gather(*tasks.values(), return_exceptions=True) | |
# Map results back, handling potential errors from gather() | |
safety_data = dict(zip(tasks.keys(), results)) | |
for key, value in safety_data.items(): | |
if isinstance(value, Exception): | |
print(f"Sub-task for {key} failed for {drug_name}: {value}") | |
safety_data[key] = [] # Ensure return type is consistent (list) | |
return safety_data |