Yakova commited on
Commit
2d508dd
·
verified ·
1 Parent(s): 35950bd

Create app.py

Browse files
Files changed (1) hide show
  1. app.py +162 -0
app.py ADDED
@@ -0,0 +1,162 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, HTTPException
2
+ from fastapi.middleware.cors import CORSMiddleware
3
+ from pydantic import BaseModel, HttpUrl
4
+ from typing import List, Optional
5
+ import asyncio
6
+ import os
7
+ import aiograpi
8
+ from aiograpi import Client
9
+ from datetime import datetime
10
+
11
+ app = FastAPI(
12
+ title="Instagram Scraper API",
13
+ description="An API to scrape Instagram posts with Apify-like formatting",
14
+ version="1.0.0",
15
+ docs_url="/docs",
16
+ )
17
+
18
+ # Add CORS middleware
19
+ app.add_middleware(
20
+ CORSMiddleware,
21
+ allow_origins=["*"],
22
+ allow_credentials=True,
23
+ allow_methods=["*"],
24
+ allow_headers=["*"],
25
+ )
26
+
27
+ # Pydantic models for request and response
28
+ class ScraperRequest(BaseModel):
29
+ username: str
30
+ post_count: int = 10
31
+ session_id: Optional[str] =None
32
+
33
+ class TaggedUser(BaseModel):
34
+ username: str
35
+ id: str
36
+
37
+
38
+ # Update the ChildPost model to handle None values for URLs
39
+ class ChildPost(BaseModel):
40
+ displayUrl: Optional[HttpUrl] = None
41
+ videoUrl: Optional[HttpUrl] = None
42
+ type: str
43
+
44
+ # Update the InstagramPost model similarly
45
+ class InstagramPost(BaseModel):
46
+ debug_id: str
47
+ debug_mediaType: int
48
+ type: str
49
+ id: str
50
+ shortCode: str
51
+ displayUrl: Optional[HttpUrl] = None
52
+ videoUrl: Optional[HttpUrl] = None
53
+ caption: Optional[str] = None
54
+ hashtags: List[str]
55
+ mentions: List[str]
56
+ url: HttpUrl
57
+ commentsCount: int
58
+ dimensionsHeight: Optional[int] = None
59
+ dimensionsWidth: Optional[int] = None
60
+ likesCount: int
61
+ timestamp: str
62
+ childPosts: List[ChildPost]
63
+ locationName: Optional[str] = None
64
+ locationId: Optional[str] = None
65
+ ownerUsername: str
66
+ ownerId: str
67
+ ownerFullName: Optional[str] = None
68
+ viewCount: Optional[int] = None
69
+ albumId: Optional[str] = None
70
+ alt: Optional[str] = None
71
+ sponsored: bool
72
+ taggedUsers: List[TaggedUser]
73
+
74
+ # Update the format_media_data function to handle URLs properly
75
+ async def format_media_data(media):
76
+ """Format media data to match Apify Instagram scraper output"""
77
+ return {
78
+ "debug_id": str(media.pk),
79
+ "debug_mediaType": media.media_type,
80
+ "type": "Image" if media.media_type == 1 else "Video" if media.media_type == 2 else "Carousel",
81
+ "id": str(media.pk),
82
+ "shortCode": media.code,
83
+ "displayUrl": str(media.thumbnail_url) if media.thumbnail_url else None,
84
+ "videoUrl": str(media.video_url) if media.video_url else None,
85
+ "caption": media.caption_text,
86
+ "hashtags": [tag[1:] for tag in media.caption_text.split() if tag.startswith("#")] if media.caption_text else [],
87
+ "mentions": [mention[1:] for mention in media.caption_text.split() if mention.startswith("@")] if media.caption_text else [],
88
+ "url": f"https://www.instagram.com/p/{media.code}/",
89
+ "commentsCount": media.comment_count,
90
+ "dimensionsHeight": None,
91
+ "dimensionsWidth": None,
92
+ "likesCount": media.like_count,
93
+ "timestamp": media.taken_at.isoformat() if media.taken_at else None,
94
+ "childPosts": [
95
+ {
96
+ "displayUrl": str(resource.thumbnail_url) if hasattr(resource, 'thumbnail_url') and resource.thumbnail_url else None,
97
+ "videoUrl": str(resource.video_url) if hasattr(resource, 'video_url') and resource.video_url else None,
98
+ "type": "Image" if resource.media_type == 1 else "Video" if resource.media_type == 2 else "Carousel"
99
+ } for resource in media.resources
100
+ ] if media.resources else [],
101
+ "locationName": media.location.name if media.location else None,
102
+ "locationId": str(media.location.pk) if media.location else None,
103
+ "ownerUsername": media.user.username,
104
+ "ownerId": str(media.user.pk),
105
+ "ownerFullName": getattr(media.user, 'full_name', None),
106
+ "viewCount": media.view_count if media.view_count else None,
107
+ "albumId": None,
108
+ "alt": media.accessibility_caption,
109
+ "sponsored": bool(media.sponsor_tags) or media.is_paid_partnership,
110
+ "taggedUsers": [
111
+ {
112
+ "username": tag.user.username,
113
+ "id": str(tag.user.pk)
114
+ } for tag in media.usertags
115
+ ] if media.usertags else []
116
+ }
117
+
118
+ class ScraperResponse(BaseModel):
119
+ success: bool
120
+ message: str
121
+ posts_count: int
122
+ posts: List[InstagramPost]
123
+
124
+
125
+ @app.post("/scrape", response_model=ScraperResponse, tags=["Scraper"])
126
+ async def scrape_instagram(request: ScraperRequest):
127
+ """
128
+ Scrape Instagram posts for a given username
129
+
130
+ - **username**: Instagram username to scrape
131
+ - **post_count**: Number of posts to scrape (default: 10)
132
+ - **session_id**: Instagram session ID for authentication
133
+ """
134
+ try:
135
+ cl = Client()
136
+ await cl.login_by_sessionid(os.getenv('session_id',request.session_id))
137
+ user_id = await cl.user_id_from_username(request.username)
138
+ medias = await cl.user_medias(user_id, request.post_count)
139
+
140
+ formatted_posts = []
141
+ for media in medias:
142
+ formatted_post = await format_media_data(media)
143
+ formatted_posts.append(formatted_post)
144
+
145
+ return ScraperResponse(
146
+ success=True,
147
+ message=f"Successfully scraped {len(formatted_posts)} posts from {request.username}",
148
+ posts_count=len(formatted_posts),
149
+ posts=formatted_posts
150
+ )
151
+
152
+ except Exception as e:
153
+ raise HTTPException(status_code=500, detail=str(e))
154
+
155
+ @app.get("/health", tags=["Health"])
156
+ async def health_check():
157
+ """Check if the API is running"""
158
+ return {"status": "healthy", "timestamp": datetime.now().isoformat()}
159
+
160
+ if __name__ == "__main__":
161
+ import uvicorn
162
+ uvicorn.run(app, host="0.0.0.0", port=8000)