File size: 1,683 Bytes
aeb70ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import pathway as pw
import threading
from fastapi import FastAPI, HTTPException
import uvicorn
import os
from exporter import export_loop

class ProductSchema(pw.Schema):
    _id: str
    name: str
    price: float
    description: str
    category: str
    stock: int
    imageUrl: str
    isNew: bool
    isActive: bool
    createdBy: str
    ratings: dict
    createdAt: str
    updatedAt: str
    promoCopy: str

cache = {}
cache_lock = threading.Lock()
app = FastAPI()

def run_pipeline():
    products = pw.io.jsonlines.read(
        "mongo_exports/products.jsonl",
        schema=ProductSchema,
        mode="streaming"
    )
    transformed = products.select(
        _id=pw.this._id,
        name=pw.this.name,
        price=pw.this.price,
        category=pw.this.category,
        promo=pw.this.promoCopy
    ).with_id_from(pw.this._id)

    def update_cache(key, row, time, is_addition):
        with cache_lock:
            if is_addition:
                cache[str(key)] = row
            else:
                cache.pop(str(key), None)

    pw.io.subscribe(transformed, on_change=update_cache)
    pw.run()

@app.get("/products")
def get_all_products():
    with cache_lock:
        return list(cache.values())

@app.get("/products/{product_id}")
def get_product(product_id: str):
    with cache_lock:
        product = cache.get(product_id)
        if not product:
            raise HTTPException(status_code=404, detail="Product not found")
        return product

if __name__ == "__main__":
    threading.Thread(target=export_loop, daemon=True).start()
    threading.Thread(target=run_pipeline, daemon=True).start()
    uvicorn.run(app, host="0.0.0.0", port=7860)