from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, HttpUrl from typing import List, Optional import asyncio import os import aiograpi from aiograpi import Client from datetime import datetime app = FastAPI( title="Instagram Scraper API", description="An API to scrape Instagram posts with Apify-like formatting", version="1.0.0", docs_url="/docs", ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Pydantic models for request and response class ScraperRequest(BaseModel): username: str post_count: int = 10 session_id: Optional[str] =None class TaggedUser(BaseModel): username: str id: str # Update the ChildPost model to handle None values for URLs class ChildPost(BaseModel): displayUrl: Optional[HttpUrl] = None videoUrl: Optional[HttpUrl] = None type: str # Update the InstagramPost model similarly class InstagramPost(BaseModel): debug_id: str debug_mediaType: int type: str id: str shortCode: str displayUrl: Optional[HttpUrl] = None videoUrl: Optional[HttpUrl] = None caption: Optional[str] = None hashtags: List[str] mentions: List[str] url: HttpUrl commentsCount: int dimensionsHeight: Optional[int] = None dimensionsWidth: Optional[int] = None likesCount: int timestamp: str childPosts: List[ChildPost] locationName: Optional[str] = None locationId: Optional[str] = None ownerUsername: str ownerId: str ownerFullName: Optional[str] = None viewCount: Optional[int] = None albumId: Optional[str] = None alt: Optional[str] = None sponsored: bool taggedUsers: List[TaggedUser] # Update the format_media_data function to handle URLs properly async def format_media_data(media): """Format media data to match Apify Instagram scraper output""" return { "debug_id": str(media.pk), "debug_mediaType": media.media_type, "type": "Image" if media.media_type == 1 else "Video" if media.media_type == 2 else "Carousel", "id": str(media.pk), "shortCode": media.code, "displayUrl": str(media.thumbnail_url) if media.thumbnail_url else None, "videoUrl": str(media.video_url) if media.video_url else None, "caption": media.caption_text, "hashtags": [tag[1:] for tag in media.caption_text.split() if tag.startswith("#")] if media.caption_text else [], "mentions": [mention[1:] for mention in media.caption_text.split() if mention.startswith("@")] if media.caption_text else [], "url": f"https://www.instagram.com/p/{media.code}/", "commentsCount": media.comment_count, "dimensionsHeight": None, "dimensionsWidth": None, "likesCount": media.like_count, "timestamp": media.taken_at.isoformat() if media.taken_at else None, "childPosts": [ { "displayUrl": str(resource.thumbnail_url) if hasattr(resource, 'thumbnail_url') and resource.thumbnail_url else None, "videoUrl": str(resource.video_url) if hasattr(resource, 'video_url') and resource.video_url else None, "type": "Image" if resource.media_type == 1 else "Video" if resource.media_type == 2 else "Carousel" } for resource in media.resources ] if media.resources else [], "locationName": media.location.name if media.location else None, "locationId": str(media.location.pk) if media.location else None, "ownerUsername": media.user.username, "ownerId": str(media.user.pk), "ownerFullName": getattr(media.user, 'full_name', None), "viewCount": media.view_count if media.view_count else None, "albumId": None, "alt": media.accessibility_caption, "sponsored": bool(media.sponsor_tags) or media.is_paid_partnership, "taggedUsers": [ { "username": tag.user.username, "id": str(tag.user.pk) } for tag in media.usertags ] if media.usertags else [] } class ScraperResponse(BaseModel): success: bool message: str posts_count: int posts: List[InstagramPost] @app.post("/scrape", response_model=ScraperResponse, tags=["Scraper"]) async def scrape_instagram(request: ScraperRequest): """ Scrape Instagram posts for a given username - **username**: Instagram username to scrape - **post_count**: Number of posts to scrape (default: 10) - **session_id**: Instagram session ID for authentication """ try: cl = Client() await cl.login_by_sessionid(os.getenv('session_id',request.session_id)) user_id = await cl.user_id_from_username(request.username) medias = await cl.user_medias(user_id, request.post_count) formatted_posts = [] for media in medias: formatted_post = await format_media_data(media) formatted_posts.append(formatted_post) return ScraperResponse( success=True, message=f"Successfully scraped {len(formatted_posts)} posts from {request.username}", posts_count=len(formatted_posts), posts=formatted_posts ) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/health", tags=["Health"]) async def health_check(): """Check if the API is running""" return {"status": "healthy", "timestamp": datetime.now().isoformat()} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)