Spaces:
Running
Running
File size: 2,987 Bytes
22dad4e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
import os
import stripe
import uvicorn
from fastapi import FastAPI, Request, Header
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import time
app = FastAPI()
templates = Jinja2Templates(directory="templates")
app.mount("/static", StaticFiles(directory="static"), name="static")
stripe.api_key = os.environ["STRIPE_KEY"]
# This is a terrible idea, only used for demo purposes!
app.state.stripe_customer_id = None
@app.get("/")
def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request, "hasCustomer": app.state.stripe_customer_id is not None})
@app.get("/success")
async def success(request: Request):
return templates.TemplateResponse("success.html", {"request": request})
@app.get("/cancel")
async def cancel(request: Request):
return templates.TemplateResponse("cancel.html", {"request": request})
@app.post("/create-checkout-session")
async def create_checkout_session(request: Request):
data = await request.json()
if not app.state.stripe_customer_id:
customer = stripe.Customer.create(
description="Demo customer",
)
app.state.stripe_customer_id = customer["id"]
checkout_session = stripe.checkout.Session.create(
customer=app.state.stripe_customer_id,
success_url="http://localhost:8000/success?session_id={CHECKOUT_SESSION_ID}",
cancel_url="http://localhost:8000/cancel",
payment_method_types=["card"],
mode="subscription",
line_items=[{
"price": data["priceId"],
"quantity": 1
}],
)
return {"sessionId": checkout_session["id"]}
@app.post("/create-portal-session")
async def create_portal_session():
session = stripe.billing_portal.Session.create(
customer=app.state.stripe_customer_id,
return_url="http://localhost:8000"
)
return {"url": session.url}
@app.post("/webhook")
async def webhook_received(request: Request, stripe_signature: str = Header(None)):
webhook_secret = os.environ["STRIPE_WEBHOOK_SECRET"]
print("Entré al webhook 182...")
time.sleep(1)
data = await request.body()
try:
event = stripe.Webhook.construct_event(
payload=data,
sig_header=stripe_signature,
secret=webhook_secret
)
event_data = event['data']
except Exception as e:
return {"error": str(e)}
event_type = event['type']
print("Voy a imprimir el event type:")
print(event_type)
time.sleep(1)
if event_type == 'checkout.session.completed':
print('checkout session completed')
elif event_type == 'invoice.paid':
print('invoice paid')
elif event_type == 'invoice.payment_failed':
print('invoice payment failed')
else:
print(f'unhandled event: {event_type}')
return {"status": "success"}
if __name__ == '__main__':
uvicorn.run("app:app", reload=True) |