project-asclepius / api_clients /openfda_client.py
mgbam's picture
Update api_clients/openfda_client.py
e998c1c verified
# api_clients/openfda_client.py
"""
Client for the OpenFDA API.
This module specializes in fetching critical, real-world drug safety data,
including the most frequent adverse events and active enforcement reports (recalls).
"""
import asyncio
import aiohttp
from urllib.parse import quote
from .config import OPENFDA_BASE_URL, REQUEST_HEADERS
async def get_adverse_events(session: aiohttp.ClientSession, drug_name: str, top_n: int = 5) -> list[dict]:
"""
Finds the most frequently reported adverse events for a given drug.
This function uses the 'count' feature of the OpenFDA API to get a summary
of the most common patient reactions, which is far more efficient than
downloading individual reports.
Args:
session (aiohttp.ClientSession): The active HTTP session.
drug_name (str): The brand or generic name of the drug.
top_n (int): The number of top adverse events to return.
Returns:
list[dict]: A list of top adverse events, e.g., [{'term': 'Nausea', 'count': 5000}].
Returns an empty list on failure or if no results are found.
"""
if not drug_name:
return []
# OpenFDA uses Lucene query syntax. We search in both brand name and generic name fields.
search_query = f'(patient.drug.openfda.brand_name:"{drug_name}" OR patient.drug.openfda.generic_name:"{drug_name}")'
params = {
'search': search_query,
'count': 'patient.reaction.reactionmeddrapt.exact', # The field for patient reactions
'limit': top_n
}
url = f"{OPENFDA_BASE_URL}/drug/event.json"
try:
async with session.get(url, params=params, headers=REQUEST_HEADERS, timeout=10) as resp:
if resp.status == 404: # 404 means no results found for the query
return []
resp.raise_for_status()
data = await resp.json()
return data.get('results', [])
except aiohttp.ClientError as e:
print(f"An error occurred fetching adverse events for '{drug_name}': {e}")
return []
async def check_for_recalls(session: aiohttp.ClientSession, drug_name: str, limit: int = 3) -> list[dict]:
"""
Checks for recent, ongoing drug enforcement reports (recalls) for a given drug.
It prioritizes finding active and serious recalls.
Args:
session (aiohttp.ClientSession): The active HTTP session.
drug_name (str): The brand or generic name of the drug.
limit (int): The maximum number of recall reports to return.
Returns:
list[dict]: A list of recall reports, containing reason and severity.
Returns an empty list on failure or if no recalls are found.
"""
if not drug_name:
return []
# We search for the drug name and filter for 'Ongoing' status to find active recalls.
search_query = f'"{quote(drug_name)}" AND status:Ongoing'
params = {
'search': search_query,
'sort': 'report_date:desc', # Get the most recent ones first
'limit': limit
}
url = f"{OPENFDA_BASE_URL}/drug/enforcement.json"
try:
async with session.get(url, params=params, headers=REQUEST_HEADERS, timeout=10) as resp:
if resp.status == 404:
return []
resp.raise_for_status()
data = await resp.json()
results = data.get('results', [])
# We parse the complex result into a clean, simple structure
parsed_recalls = [
{
"reason": r.get("reason_for_recall", "N/A"),
"classification": r.get("classification", "N/A"), # Class I is most serious
"report_date": r.get("report_date", "N/A")
}
for r in results
]
return parsed_recalls
except aiohttp.ClientError as e:
print(f"An error occurred fetching recalls for '{drug_name}': {e}")
return []
async def get_safety_profile(session: aiohttp.ClientSession, drug_name: str) -> dict:
"""
A high-level orchestrator that gathers a complete safety profile for a single drug
by concurrently fetching adverse events and recalls.
Args:
session (aiohttp.ClientSession): The active HTTP session.
drug_name (str): The drug to profile.
Returns:
dict: A dictionary containing 'adverse_events' and 'recalls' keys.
"""
# Run both API calls in parallel for maximum efficiency
tasks = {
"adverse_events": get_adverse_events(session, drug_name),
"recalls": check_for_recalls(session, drug_name)
}
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
# Map results back, handling potential errors from gather()
safety_data = dict(zip(tasks.keys(), results))
for key, value in safety_data.items():
if isinstance(value, Exception):
print(f"Sub-task for {key} failed for {drug_name}: {value}")
safety_data[key] = [] # Ensure return type is consistent (list)
return safety_data