gimmeDat / app.py
Yakova's picture
Create app.py
2d508dd verified
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl
from typing import List, Optional
import asyncio
import os
import aiograpi
from aiograpi import Client
from datetime import datetime
app = FastAPI(
title="Instagram Scraper API",
description="An API to scrape Instagram posts with Apify-like formatting",
version="1.0.0",
docs_url="/docs",
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Pydantic models for request and response
class ScraperRequest(BaseModel):
username: str
post_count: int = 10
session_id: Optional[str] =None
class TaggedUser(BaseModel):
username: str
id: str
# Update the ChildPost model to handle None values for URLs
class ChildPost(BaseModel):
displayUrl: Optional[HttpUrl] = None
videoUrl: Optional[HttpUrl] = None
type: str
# Update the InstagramPost model similarly
class InstagramPost(BaseModel):
debug_id: str
debug_mediaType: int
type: str
id: str
shortCode: str
displayUrl: Optional[HttpUrl] = None
videoUrl: Optional[HttpUrl] = None
caption: Optional[str] = None
hashtags: List[str]
mentions: List[str]
url: HttpUrl
commentsCount: int
dimensionsHeight: Optional[int] = None
dimensionsWidth: Optional[int] = None
likesCount: int
timestamp: str
childPosts: List[ChildPost]
locationName: Optional[str] = None
locationId: Optional[str] = None
ownerUsername: str
ownerId: str
ownerFullName: Optional[str] = None
viewCount: Optional[int] = None
albumId: Optional[str] = None
alt: Optional[str] = None
sponsored: bool
taggedUsers: List[TaggedUser]
# Update the format_media_data function to handle URLs properly
async def format_media_data(media):
"""Format media data to match Apify Instagram scraper output"""
return {
"debug_id": str(media.pk),
"debug_mediaType": media.media_type,
"type": "Image" if media.media_type == 1 else "Video" if media.media_type == 2 else "Carousel",
"id": str(media.pk),
"shortCode": media.code,
"displayUrl": str(media.thumbnail_url) if media.thumbnail_url else None,
"videoUrl": str(media.video_url) if media.video_url else None,
"caption": media.caption_text,
"hashtags": [tag[1:] for tag in media.caption_text.split() if tag.startswith("#")] if media.caption_text else [],
"mentions": [mention[1:] for mention in media.caption_text.split() if mention.startswith("@")] if media.caption_text else [],
"url": f"https://www.instagram.com/p/{media.code}/",
"commentsCount": media.comment_count,
"dimensionsHeight": None,
"dimensionsWidth": None,
"likesCount": media.like_count,
"timestamp": media.taken_at.isoformat() if media.taken_at else None,
"childPosts": [
{
"displayUrl": str(resource.thumbnail_url) if hasattr(resource, 'thumbnail_url') and resource.thumbnail_url else None,
"videoUrl": str(resource.video_url) if hasattr(resource, 'video_url') and resource.video_url else None,
"type": "Image" if resource.media_type == 1 else "Video" if resource.media_type == 2 else "Carousel"
} for resource in media.resources
] if media.resources else [],
"locationName": media.location.name if media.location else None,
"locationId": str(media.location.pk) if media.location else None,
"ownerUsername": media.user.username,
"ownerId": str(media.user.pk),
"ownerFullName": getattr(media.user, 'full_name', None),
"viewCount": media.view_count if media.view_count else None,
"albumId": None,
"alt": media.accessibility_caption,
"sponsored": bool(media.sponsor_tags) or media.is_paid_partnership,
"taggedUsers": [
{
"username": tag.user.username,
"id": str(tag.user.pk)
} for tag in media.usertags
] if media.usertags else []
}
class ScraperResponse(BaseModel):
success: bool
message: str
posts_count: int
posts: List[InstagramPost]
@app.post("/scrape", response_model=ScraperResponse, tags=["Scraper"])
async def scrape_instagram(request: ScraperRequest):
"""
Scrape Instagram posts for a given username
- **username**: Instagram username to scrape
- **post_count**: Number of posts to scrape (default: 10)
- **session_id**: Instagram session ID for authentication
"""
try:
cl = Client()
await cl.login_by_sessionid(os.getenv('session_id',request.session_id))
user_id = await cl.user_id_from_username(request.username)
medias = await cl.user_medias(user_id, request.post_count)
formatted_posts = []
for media in medias:
formatted_post = await format_media_data(media)
formatted_posts.append(formatted_post)
return ScraperResponse(
success=True,
message=f"Successfully scraped {len(formatted_posts)} posts from {request.username}",
posts_count=len(formatted_posts),
posts=formatted_posts
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health", tags=["Health"])
async def health_check():
"""Check if the API is running"""
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)