|
from fastapi import FastAPI, HTTPException |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel, HttpUrl |
|
from typing import List, Optional |
|
import asyncio |
|
import os |
|
import aiograpi |
|
from aiograpi import Client |
|
from datetime import datetime |
|
|
|
app = FastAPI( |
|
title="Instagram Scraper API", |
|
description="An API to scrape Instagram posts with Apify-like formatting", |
|
version="1.0.0", |
|
docs_url="/docs", |
|
) |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
class ScraperRequest(BaseModel): |
|
username: str |
|
post_count: int = 10 |
|
session_id: Optional[str] =None |
|
|
|
class TaggedUser(BaseModel): |
|
username: str |
|
id: str |
|
|
|
|
|
|
|
class ChildPost(BaseModel): |
|
displayUrl: Optional[HttpUrl] = None |
|
videoUrl: Optional[HttpUrl] = None |
|
type: str |
|
|
|
|
|
class InstagramPost(BaseModel): |
|
debug_id: str |
|
debug_mediaType: int |
|
type: str |
|
id: str |
|
shortCode: str |
|
displayUrl: Optional[HttpUrl] = None |
|
videoUrl: Optional[HttpUrl] = None |
|
caption: Optional[str] = None |
|
hashtags: List[str] |
|
mentions: List[str] |
|
url: HttpUrl |
|
commentsCount: int |
|
dimensionsHeight: Optional[int] = None |
|
dimensionsWidth: Optional[int] = None |
|
likesCount: int |
|
timestamp: str |
|
childPosts: List[ChildPost] |
|
locationName: Optional[str] = None |
|
locationId: Optional[str] = None |
|
ownerUsername: str |
|
ownerId: str |
|
ownerFullName: Optional[str] = None |
|
viewCount: Optional[int] = None |
|
albumId: Optional[str] = None |
|
alt: Optional[str] = None |
|
sponsored: bool |
|
taggedUsers: List[TaggedUser] |
|
|
|
|
|
async def format_media_data(media): |
|
"""Format media data to match Apify Instagram scraper output""" |
|
return { |
|
"debug_id": str(media.pk), |
|
"debug_mediaType": media.media_type, |
|
"type": "Image" if media.media_type == 1 else "Video" if media.media_type == 2 else "Carousel", |
|
"id": str(media.pk), |
|
"shortCode": media.code, |
|
"displayUrl": str(media.thumbnail_url) if media.thumbnail_url else None, |
|
"videoUrl": str(media.video_url) if media.video_url else None, |
|
"caption": media.caption_text, |
|
"hashtags": [tag[1:] for tag in media.caption_text.split() if tag.startswith("#")] if media.caption_text else [], |
|
"mentions": [mention[1:] for mention in media.caption_text.split() if mention.startswith("@")] if media.caption_text else [], |
|
"url": f"https://www.instagram.com/p/{media.code}/", |
|
"commentsCount": media.comment_count, |
|
"dimensionsHeight": None, |
|
"dimensionsWidth": None, |
|
"likesCount": media.like_count, |
|
"timestamp": media.taken_at.isoformat() if media.taken_at else None, |
|
"childPosts": [ |
|
{ |
|
"displayUrl": str(resource.thumbnail_url) if hasattr(resource, 'thumbnail_url') and resource.thumbnail_url else None, |
|
"videoUrl": str(resource.video_url) if hasattr(resource, 'video_url') and resource.video_url else None, |
|
"type": "Image" if resource.media_type == 1 else "Video" if resource.media_type == 2 else "Carousel" |
|
} for resource in media.resources |
|
] if media.resources else [], |
|
"locationName": media.location.name if media.location else None, |
|
"locationId": str(media.location.pk) if media.location else None, |
|
"ownerUsername": media.user.username, |
|
"ownerId": str(media.user.pk), |
|
"ownerFullName": getattr(media.user, 'full_name', None), |
|
"viewCount": media.view_count if media.view_count else None, |
|
"albumId": None, |
|
"alt": media.accessibility_caption, |
|
"sponsored": bool(media.sponsor_tags) or media.is_paid_partnership, |
|
"taggedUsers": [ |
|
{ |
|
"username": tag.user.username, |
|
"id": str(tag.user.pk) |
|
} for tag in media.usertags |
|
] if media.usertags else [] |
|
} |
|
|
|
class ScraperResponse(BaseModel): |
|
success: bool |
|
message: str |
|
posts_count: int |
|
posts: List[InstagramPost] |
|
|
|
|
|
@app.post("/scrape", response_model=ScraperResponse, tags=["Scraper"]) |
|
async def scrape_instagram(request: ScraperRequest): |
|
""" |
|
Scrape Instagram posts for a given username |
|
|
|
- **username**: Instagram username to scrape |
|
- **post_count**: Number of posts to scrape (default: 10) |
|
- **session_id**: Instagram session ID for authentication |
|
""" |
|
try: |
|
cl = Client() |
|
await cl.login_by_sessionid(os.getenv('session_id',request.session_id)) |
|
user_id = await cl.user_id_from_username(request.username) |
|
medias = await cl.user_medias(user_id, request.post_count) |
|
|
|
formatted_posts = [] |
|
for media in medias: |
|
formatted_post = await format_media_data(media) |
|
formatted_posts.append(formatted_post) |
|
|
|
return ScraperResponse( |
|
success=True, |
|
message=f"Successfully scraped {len(formatted_posts)} posts from {request.username}", |
|
posts_count=len(formatted_posts), |
|
posts=formatted_posts |
|
) |
|
|
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get("/health", tags=["Health"]) |
|
async def health_check(): |
|
"""Check if the API is running""" |
|
return {"status": "healthy", "timestamp": datetime.now().isoformat()} |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |