File size: 5,758 Bytes
2d508dd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl
from typing import List, Optional
import asyncio
import os
import aiograpi
from aiograpi import Client
from datetime import datetime

app = FastAPI(
    title="Instagram Scraper API",
    description="An API to scrape Instagram posts with Apify-like formatting",
    version="1.0.0",
    docs_url="/docs",
)

# Add CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Pydantic models for request and response
class ScraperRequest(BaseModel):
    username: str
    post_count: int = 10
    session_id: Optional[str] =None 

class TaggedUser(BaseModel):
    username: str
    id: str


# Update the ChildPost model to handle None values for URLs
class ChildPost(BaseModel):
    displayUrl: Optional[HttpUrl] = None
    videoUrl: Optional[HttpUrl] = None
    type: str

# Update the InstagramPost model similarly
class InstagramPost(BaseModel):
    debug_id: str
    debug_mediaType: int
    type: str
    id: str
    shortCode: str
    displayUrl: Optional[HttpUrl] = None
    videoUrl: Optional[HttpUrl] = None
    caption: Optional[str] = None
    hashtags: List[str]
    mentions: List[str]
    url: HttpUrl
    commentsCount: int
    dimensionsHeight: Optional[int] = None
    dimensionsWidth: Optional[int] = None
    likesCount: int
    timestamp: str
    childPosts: List[ChildPost]
    locationName: Optional[str] = None
    locationId: Optional[str] = None
    ownerUsername: str
    ownerId: str
    ownerFullName: Optional[str] = None
    viewCount: Optional[int] = None
    albumId: Optional[str] = None
    alt: Optional[str] = None
    sponsored: bool
    taggedUsers: List[TaggedUser]

# Update the format_media_data function to handle URLs properly
async def format_media_data(media):
    """Format media data to match Apify Instagram scraper output"""
    return {
        "debug_id": str(media.pk),
        "debug_mediaType": media.media_type,
        "type": "Image" if media.media_type == 1 else "Video" if media.media_type == 2 else "Carousel",
        "id": str(media.pk),
        "shortCode": media.code,
        "displayUrl": str(media.thumbnail_url) if media.thumbnail_url else None,
        "videoUrl": str(media.video_url) if media.video_url else None,
        "caption": media.caption_text,
        "hashtags": [tag[1:] for tag in media.caption_text.split() if tag.startswith("#")] if media.caption_text else [],
        "mentions": [mention[1:] for mention in media.caption_text.split() if mention.startswith("@")] if media.caption_text else [],
        "url": f"https://www.instagram.com/p/{media.code}/",
        "commentsCount": media.comment_count,
        "dimensionsHeight": None,
        "dimensionsWidth": None,
        "likesCount": media.like_count,
        "timestamp": media.taken_at.isoformat() if media.taken_at else None,
        "childPosts": [
            {
                "displayUrl": str(resource.thumbnail_url) if hasattr(resource, 'thumbnail_url') and resource.thumbnail_url else None,
                "videoUrl": str(resource.video_url) if hasattr(resource, 'video_url') and resource.video_url else None,
                "type": "Image" if resource.media_type == 1 else "Video" if resource.media_type == 2 else "Carousel"
            } for resource in media.resources
        ] if media.resources else [],
        "locationName": media.location.name if media.location else None,
        "locationId": str(media.location.pk) if media.location else None,
        "ownerUsername": media.user.username,
        "ownerId": str(media.user.pk),
        "ownerFullName": getattr(media.user, 'full_name', None),
        "viewCount": media.view_count if media.view_count else None,
        "albumId": None,
        "alt": media.accessibility_caption,
        "sponsored": bool(media.sponsor_tags) or media.is_paid_partnership,
        "taggedUsers": [
            {
                "username": tag.user.username,
                "id": str(tag.user.pk)
            } for tag in media.usertags
        ] if media.usertags else []
    }

class ScraperResponse(BaseModel):
    success: bool
    message: str
    posts_count: int
    posts: List[InstagramPost]


@app.post("/scrape", response_model=ScraperResponse, tags=["Scraper"])
async def scrape_instagram(request: ScraperRequest):
    """
    Scrape Instagram posts for a given username
    
    - **username**: Instagram username to scrape
    - **post_count**: Number of posts to scrape (default: 10)
    - **session_id**: Instagram session ID for authentication
    """
    try:
        cl = Client()
        await cl.login_by_sessionid(os.getenv('session_id',request.session_id))        
        user_id = await cl.user_id_from_username(request.username)
        medias = await cl.user_medias(user_id, request.post_count)
        
        formatted_posts = []
        for media in medias:
            formatted_post = await format_media_data(media)
            formatted_posts.append(formatted_post)
        
        return ScraperResponse(
            success=True,
            message=f"Successfully scraped {len(formatted_posts)} posts from {request.username}",
            posts_count=len(formatted_posts),
            posts=formatted_posts
        )
    
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health", tags=["Health"])
async def health_check():
    """Check if the API is running"""
    return {"status": "healthy", "timestamp": datetime.now().isoformat()}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)