Spaces:
Running
Running
import os | |
import httpx | |
from typing import Dict, Optional, Tuple | |
from fastapi import APIRouter, HTTPException | |
from pydantic import BaseModel | |
router = APIRouter() | |
# 📱 Instagram API Config | |
INSTAGRAM_API_BASE = "https://graph.instagram.com/v23.0" | |
INSTAGRAM_PAGE_ID = "17841464166934843" # Seu Page ID | |
INSTAGRAM_TOKEN = os.getenv("INSTAGRAM_ACCESS_TOKEN") | |
if not INSTAGRAM_TOKEN: | |
raise ValueError("❌ INSTAGRAM_ACCESS_TOKEN não foi definido nas variáveis de ambiente!") | |
# 📝 Modelos de dados | |
class InstagramPost(BaseModel): | |
image_url: str | |
caption: Optional[str] = None | |
class PublishResponse(BaseModel): | |
success: bool | |
media_id: Optional[str] = None | |
post_id: Optional[str] = None | |
post_url: Optional[str] = None | |
message: str | |
comment_posted: bool = False | |
comment_id: Optional[str] = None | |
def convert_to_bold_unicode(text: str) -> str: | |
""" | |
Converte texto normal para caracteres Unicode sans-serif bold. | |
Usa uma fonte mais próxima da padrão do Instagram. | |
Funciona para letras A-Z, a-z e números 0-9. | |
""" | |
# Mapeamento usando Mathematical Sans-Serif Bold (mais próximo da fonte do Instagram) | |
bold_map = { | |
'A': '𝗔', 'B': '𝗕', 'C': '𝗖', 'D': '𝗗', 'E': '𝗘', 'F': '𝗙', 'G': '𝗚', 'H': '𝗛', | |
'I': '𝗜', 'J': '𝗝', 'K': '𝗞', 'L': '𝗟', 'M': '𝗠', 'N': '𝗡', 'O': '𝗢', 'P': '𝗣', | |
'Q': '𝗤', 'R': '𝗥', 'S': '𝗦', 'T': '𝗧', 'U': '𝗨', 'V': '𝗩', 'W': '𝗪', 'X': '𝗫', | |
'Y': '𝗬', 'Z': '𝗭', | |
'a': '𝗮', 'b': '𝗯', 'c': '𝗰', 'd': '𝗱', 'e': '𝗲', 'f': '𝗳', 'g': '𝗴', 'h': '𝗵', | |
'i': '𝗶', 'j': '𝗷', 'k': '𝗸', 'l': '𝗹', 'm': '𝗺', 'n': '𝗻', 'o': '𝗼', 'p': '𝗽', | |
'q': '𝗾', 'r': '𝗿', 's': '𝘀', 't': '𝘁', 'u': '𝘂', 'v': '𝘃', 'w': '𝘄', 'x': '𝘅', | |
'y': '𝘆', 'z': '𝘇', | |
'0': '𝟬', '1': '𝟭', '2': '𝟮', '3': '𝟯', '4': '𝟰', '5': '𝟱', '6': '𝟲', '7': '𝟳', | |
'8': '𝟴', '9': '𝟵' | |
} | |
return ''.join(bold_map.get(char, char) for char in text) | |
def convert_to_italic_unicode(text: str) -> str: | |
""" | |
Converte texto normal para caracteres Unicode sans-serif itálico. | |
Usa uma fonte mais próxima da padrão do Instagram. | |
Funciona para letras A-Z, a-z e números 0-9. | |
""" | |
# Mapeamento usando Mathematical Sans-Serif Italic (mais próximo da fonte do Instagram) | |
italic_map = { | |
'A': '𝘈', 'B': '𝘉', 'C': '𝘊', 'D': '𝘋', 'E': '𝘌', 'F': '𝘍', 'G': '𝘎', 'H': '𝘏', | |
'I': '𝘐', 'J': '𝘑', 'K': '𝘒', 'L': '𝘓', 'M': '𝘔', 'N': '𝘕', 'O': '𝘖', 'P': '𝘗', | |
'Q': '𝘘', 'R': '𝘙', 'S': '𝘚', 'T': '𝘛', 'U': '𝘜', 'V': '𝘝', 'W': '𝘞', 'X': '𝘟', | |
'Y': '𝘠', 'Z': '𝘡', | |
'a': '𝘢', 'b': '𝘣', 'c': '𝘤', 'd': '𝘥', 'e': '𝘦', 'f': '𝘧', 'g': '𝘨', 'h': '𝘩', | |
'i': '𝘪', 'j': '𝘫', 'k': '𝘬', 'l': '𝘭', 'm': '𝘮', 'n': '𝘯', 'o': '𝘰', 'p': '𝘱', | |
'q': '𝘲', 'r': '𝘳', 's': '𝘴', 't': '𝘵', 'u': '𝘶', 'v': '𝘷', 'w': '𝘸', 'x': '𝘹', | |
'y': '𝘺', 'z': '𝘻', | |
# Números itálicos (mesmo conjunto do negrito, pois não há itálico específico) | |
'0': '𝟬', '1': '𝟭', '2': '𝟮', '3': '𝟯', '4': '𝟰', '5': '𝟱', '6': '𝟲', '7': '𝟳', | |
'8': '𝟴', '9': '𝟵' | |
} | |
return ''.join(italic_map.get(char, char) for char in text) | |
def clean_html_tags(text: str) -> str: | |
""" | |
Remove todas as tags HTML exceto <strong> e <em>. | |
Converte <h2> para <strong> (negrito). | |
Converte <p> para quebras de linha. | |
Remove completamente outras tags como <li>, <h3>, etc. | |
Returns: | |
str: Texto limpo com apenas <strong> e <em> | |
""" | |
if not text: | |
return "" | |
import re | |
# Converte <h2> para <strong> (mantendo o conteúdo) | |
text = re.sub(r'<h2[^>]*>(.*?)</h2>', r'<strong>\1</strong>', text, flags=re.DOTALL | re.IGNORECASE) | |
# Converte <p> para quebras de linha | |
text = re.sub(r'<p[^>]*>(.*?)</p>', r'\1\n\n', text, flags=re.DOTALL | re.IGNORECASE) | |
# Converte <br> e <br/> para quebra de linha simples | |
text = re.sub(r'<br\s*/?>', '\n', text, flags=re.IGNORECASE) | |
# Remove todas as outras tags HTML, mantendo apenas o conteúdo | |
# Exclui <strong>, </strong>, <em>, </em> da remoção | |
text = re.sub(r'<(?!/?(?:strong|em)\b)[^>]*>', '', text, flags=re.IGNORECASE) | |
# Remove quebras de linha excessivas e espaços extras | |
text = re.sub(r'\n\s*\n\s*\n+', '\n\n', text) # Máximo de 2 quebras consecutivas | |
text = re.sub(r'[ \t]+', ' ', text) # Remove espaços/tabs extras | |
text = text.strip() | |
return text | |
def format_text_for_instagram(text: str) -> Tuple[str, Optional[str]]: | |
""" | |
Formata o texto para o Instagram: | |
1. Limpa tags HTML indesejadas | |
2. Converte <strong> para negrito Unicode | |
3. Converte <em> para itálico Unicode | |
4. Corta se necessário e retorna o texto principal e o resto para comentário | |
Returns: | |
Tuple[str, Optional[str]]: (texto_principal, resto_para_comentario) | |
""" | |
if not text: | |
return "", None | |
# 🧹 Primeiro, limpa as tags HTML indesejadas | |
text = clean_html_tags(text) | |
# 🔤 Converte tags <strong> para negrito Unicode do Instagram | |
import re | |
def replace_strong_tags(match): | |
content = match.group(1) # Conteúdo entre as tags <strong> | |
return convert_to_bold_unicode(content) | |
def replace_em_tags(match): | |
content = match.group(1) # Conteúdo entre as tags <em> | |
return convert_to_italic_unicode(content) | |
# Substitui todas as ocorrências de <strong>conteudo</strong> | |
text = re.sub(r'<strong>(.*?)</strong>', replace_strong_tags, text, flags=re.DOTALL) | |
# Substitui todas as ocorrências de <em>conteudo</em> | |
text = re.sub(r'<em>(.*?)</em>', replace_em_tags, text, flags=re.DOTALL) | |
max_length = 2200 | |
suffix = '\n\n💬 Continua nos comentários!' | |
if len(text) <= max_length: | |
return text, None | |
cutoff_length = max_length - len(suffix) | |
if cutoff_length <= 0: | |
return suffix.strip(), text | |
trimmed = text[:cutoff_length] | |
def is_inside_quotes(s: str, index: int) -> bool: | |
"""Verifica se há aspas abertas não fechadas até o índice""" | |
up_to_index = s[:index + 1] | |
quote_count = up_to_index.count('"') | |
return quote_count % 2 != 0 | |
# Encontra o último ponto final fora de aspas | |
last_valid_dot = -1 | |
for i in range(len(trimmed) - 1, -1, -1): | |
if trimmed[i] == '.' and not is_inside_quotes(trimmed, i): | |
last_valid_dot = i | |
break | |
if last_valid_dot > 100: | |
main_text = trimmed[:last_valid_dot + 1] | |
remaining_text = text[last_valid_dot + 1:].strip() | |
else: | |
main_text = trimmed | |
remaining_text = text[cutoff_length:].strip() | |
final_main_text = f"{main_text}{suffix}" | |
return final_main_text, remaining_text if remaining_text else None | |
async def post_comment(client: httpx.AsyncClient, post_id: str, comment_text: str) -> Optional[str]: | |
""" | |
Posta um comentário no post do Instagram | |
Returns: | |
Optional[str]: ID do comentário se postado com sucesso | |
""" | |
try: | |
comment_url = f"{INSTAGRAM_API_BASE}/{post_id}/comments" | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {INSTAGRAM_TOKEN}" | |
} | |
comment_payload = { | |
"message": comment_text | |
} | |
print(f"💬 Postando comentário no post {post_id}") | |
comment_response = await client.post( | |
comment_url, | |
headers=headers, | |
json=comment_payload | |
) | |
if comment_response.status_code == 200: | |
comment_data = comment_response.json() | |
comment_id = comment_data.get("id") | |
print(f"✅ Comentário postado com ID: {comment_id}") | |
return comment_id | |
else: | |
error_detail = comment_response.text | |
print(f"⚠️ Erro ao postar comentário: {error_detail}") | |
return None | |
except Exception as e: | |
print(f"⚠️ Erro inesperado ao postar comentário: {str(e)}") | |
return None | |
# 🚀 Endpoint principal para publicar no Instagram | |
async def publish_instagram_post(post: InstagramPost) -> PublishResponse: | |
""" | |
Publica uma imagem no Instagram em duas etapas: | |
1. Cria o media container | |
2. Publica o post | |
3. Se necessário, posta o resto do texto como comentário | |
""" | |
async with httpx.AsyncClient(timeout=30.0) as client: | |
try: | |
# 📝 Processa o texto da caption | |
main_caption, remaining_text = format_text_for_instagram(post.caption) if post.caption else ("", None) | |
# 🎯 ETAPA 1: Criar o media container | |
media_payload = { | |
"image_url": post.image_url | |
} | |
# Adiciona caption processada se fornecida | |
if main_caption: | |
media_payload["caption"] = main_caption | |
media_url = f"{INSTAGRAM_API_BASE}/{INSTAGRAM_PAGE_ID}/media" | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {INSTAGRAM_TOKEN}" | |
} | |
print(f"📤 Criando media container para: {post.image_url}") | |
if remaining_text: | |
print(f"✂️ Texto cortado - será postado comentário com {len(remaining_text)} caracteres") | |
media_response = await client.post( | |
media_url, | |
headers=headers, | |
json=media_payload | |
) | |
if media_response.status_code != 200: | |
error_detail = media_response.text | |
print(f"❌ Erro ao criar media container: {error_detail}") | |
raise HTTPException( | |
status_code=media_response.status_code, | |
detail=f"Erro ao criar media container: {error_detail}" | |
) | |
media_data = media_response.json() | |
media_id = media_data.get("id") | |
if not media_id: | |
raise HTTPException( | |
status_code=500, | |
detail="ID do media container não retornado" | |
) | |
print(f"✅ Media container criado com ID: {media_id}") | |
# 🎯 ETAPA 2: Publicar o post | |
publish_payload = { | |
"creation_id": media_id | |
} | |
publish_url = f"{INSTAGRAM_API_BASE}/{INSTAGRAM_PAGE_ID}/media_publish" | |
print(f"📤 Publicando post com creation_id: {media_id}") | |
publish_response = await client.post( | |
publish_url, | |
headers=headers, | |
json=publish_payload | |
) | |
if publish_response.status_code != 200: | |
error_detail = publish_response.text | |
print(f"❌ Erro ao publicar post: {error_detail}") | |
raise HTTPException( | |
status_code=publish_response.status_code, | |
detail=f"Erro ao publicar post: {error_detail}" | |
) | |
publish_data = publish_response.json() | |
post_id = publish_data.get("id") | |
# 🔗 ETAPA 3: Obter detalhes do post para construir URL | |
post_url = None | |
if post_id: | |
try: | |
# Query para obter o permalink do post | |
post_details_url = f"{INSTAGRAM_API_BASE}/{post_id}?fields=permalink" | |
details_response = await client.get(post_details_url, headers=headers) | |
if details_response.status_code == 200: | |
details_data = details_response.json() | |
post_url = details_data.get("permalink") | |
print(f"🔗 Link do post: {post_url}") | |
else: | |
print(f"⚠️ Não foi possível obter o link do post: {details_response.text}") | |
except Exception as e: | |
print(f"⚠️ Erro ao obter link do post: {str(e)}") | |
# 💬 ETAPA 4: Postar comentário com o resto do texto (se necessário) | |
comment_posted = False | |
comment_id = None | |
if remaining_text and post_id: | |
comment_id = await post_comment(client, post_id, remaining_text) | |
comment_posted = comment_id is not None | |
success_message = "Post publicado com sucesso no Instagram!" | |
if comment_posted: | |
success_message += " Texto adicional postado como comentário." | |
elif remaining_text and not comment_posted: | |
success_message += " ATENÇÃO: Não foi possível postar o comentário com o resto do texto." | |
print(f"🎉 {success_message} Post ID: {post_id}") | |
return PublishResponse( | |
success=True, | |
media_id=media_id, | |
post_id=post_id, | |
post_url=post_url, | |
message=success_message, | |
comment_posted=comment_posted, | |
comment_id=comment_id | |
) | |
except httpx.TimeoutException: | |
print("⏰ Timeout na requisição para Instagram API") | |
raise HTTPException( | |
status_code=408, | |
detail="Timeout na comunicação com a API do Instagram" | |
) | |
except httpx.RequestError as e: | |
print(f"🌐 Erro de conexão: {str(e)}") | |
raise HTTPException( | |
status_code=502, | |
detail=f"Erro de conexão: {str(e)}" | |
) | |
except Exception as e: | |
print(f"💥 Erro inesperado: {str(e)}") | |
raise HTTPException( | |
status_code=500, | |
detail=f"Erro interno do servidor: {str(e)}" | |
) |