File size: 8,808 Bytes
48421ca
 
 
358551d
48421ca
 
 
 
 
0c479b4
48421ca
 
 
 
 
 
9f88b7d
 
 
 
48421ca
8aa59e6
 
 
 
22419aa
 
 
1d54736
 
 
 
 
 
8aa59e6
1d54736
 
 
 
 
8aa59e6
1d54736
 
22419aa
 
8aa59e6
22419aa
8aa59e6
 
 
 
 
 
1d54736
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8aa59e6
48421ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1d54736
 
 
 
 
 
48421ca
1d54736
48421ca
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
from fastapi import FastAPI, Request, BackgroundTasks
import json
import io
from openai import Client
from supabase import create_client
from typing import List, Dict, Any
import asyncio
import logging
from datetime import datetime
import os

# Initialize logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()
client = Client(api_key=os.getenv('OPENAI_API_KEY'),organization=os.getenv('ORG_ID'))
url: str = os.getenv('SUPABASE_URL')
key: str = os.getenv('SUPABASE_KEY')
supabase: Client = create_client(url, key)


@app.post("/send/batch_processing")
async def testv1(request: Request, background_tasks: BackgroundTasks):
    try:
        body_data = await request.json()

        print(body_data)
        # Create initial batch job record
        save_data = {
            'batch_job_id': f"batch_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}",
            "batch_job_status": False,
            "created_at": datetime.utcnow().isoformat()
        }
        
        response = (
            supabase.table("batch_processing_details")
            .insert(save_data)
            .execute()
        )
        
        # Add processing to background tasks
        background_tasks.add_task(process_batch_job, body_data, save_data['batch_job_id'])

        return {'data': 'Batch job is scheduled!'}
        
        # return {'data': 'Batch job is scheduled!', 'batch_job_id': save_data['batch_job_id']}
           
        
    except Exception as e:
        return {'error': str(e)}


def receipt_radar_prompt(raw_text:str)->str:
    insurance_response_structure = """
    {
      "insurance_type": "Classify it into 8 categories travel , health , term , vehicle, property,liability, life , buisness only .Try to find the closest possible based on the receipt text, if you don't understand the type classify it as others.",
      "policy_details": {
        "policyholder_name": "",
        "policy_number": "",
        "insurance_start_date": "",
        "insurance_end_date": "",
        "premium_amount": "",
        "payment_frequency": ""
      },
      "coverage_details": {
        "covered_items": {
          "item_type": "",
          "product_company": "",
          "product_model": "",
          "product_manufacturing_year": ""
        },
        "comprehensive_coverage_type_policy": "yes/no"
      }
    }
    """
    travel_response_structure = """
        travel_type(bus,train,airplane,taxi,bike,rickshaw classify in these categories only strictly),travel_company_name , departure_destination , arrival_destination , arrival_city(if you are not able to find the arrival city add the arrival destination into this field strictly. ), departure_date,arrival_date .If the arrival and departure dates are the same from receipt text given to you analyse it properly to check that, then only use the same date in both the fields .if you don't find any field mark it as null.
    """
    hotel_data_points = """ hotel_type(hotel_stay , dine_in , dine_in + stay(use both keyword strictly)), hotel_brand_name , hotel_location , hotel_checkin_date , hotel_checkout_date. if you don't find any field mark it as null """
    system_prompt = f"""Extract information from the following receipt OCR text and return a JSON object with these exact keys: brand, total_cost, location, purchase_category, brand_category, Date, currency, filename, payment_method, metadata.
    Rules:
    1. For total_cost, use the highest monetary value in the text.
    2. For brand_category, choose the closest match from: ["Fashion and Apparel", "Jewelry and Watches", "Beauty and Personal Care", "Automobiles", "Real Estate", "Travel(it may contain reciepts of airlines , trains , taxi ,cruise ,etc)", "Hospitality(it will include reciepts of Hotels (stays) , restaurants , cafe's , bar's , Accommodation Services , Beverages Services (don't include food delivery service in hospitality))","Food Delivery Services(like swiggy , zomato,eatsure and any other you can analyse from receipt text)", "Home and Lifestyle", "Technology and Electronics", "Sports and Leisure", "Art and Collectibles", "Health and Wellness", "Stationery and Writing Instruments", "Children and Baby", "Pet Accessories", "Financial Services", "Insurance"]
    3. Format Date as dd-mm-yyyy.Strictly return the date in the format dd-mm-yyyy.
    4. metadata: For insurance receipts extract the data points given in the JSON and return the JSON with structure: \n """ + insurance_response_structure + """
    5.metadata : For travel receipts(flight ,bus,train) extract these data points as a JSON object exactly""" + travel_response_structure + """
    6. metadata : For hotel receipts extract these data points as a JSON object exactly""" + hotel_data_points + f"""
    For non-insurance and non-travel , non-hotel receipts, return metadata as null.
    4. Use currency codes (e.g., USD, EUR) instead of symbols.
    5. Generate filename as 'PURCHASE_TYPE_BRAND_DATE' (e.g., 'clothing_gucci_20230715').
    6. If a value is not found, return null.
    7. If all values are null, return null.
    Ensure the strictly that output is a valid JSON object containing strictly the above keys, without any explanations.
    Here's the OCR text below analyse it and convert into json using keys provided in first line and using the rules provided in rules section:
    Generate a JSON response in the following format without using the ```json block. Ensure the output is properly formatted as plain text JSON.
    {raw_text}
    """
    return system_prompt




async def process_batch_job(dataset: Dict[str, Any], batch_job_id: str):
    """
    Background task to process the batch job
    """
    try:
        logger.info(f"Starting batch processing for job {batch_job_id}")
        
        system_prompt = '''
            Your goal is to extract movie categories from movie descriptions, as well as a 1-sentence summary for these movies.
            You will be provided with a movie description, and you will output a json object containing the following information:
            
            {
                categories: string[] // Array of categories based on the movie description,
                summary: string // 1-sentence summary of the movie based on the movie description
            }
            
            Categories refer to the genre or type of the movie, like "action", "romance", "comedy", etc. Keep category names simple and use only lower case letters.
            Movies can have several categories, but try to keep it under 3-4. Only mention the categories that are the most obvious based on the description.
        '''
        
        openai_tasks = []
        for ds in dataset.get('data'):
            message_id = ds.get('message_id')
            user_id = ds.get('user_id')
            receipt_text = ds.get('receipt_text')
            email = ds.get('email')

            prompt = 
            task = {
                "custom_id": f"{message_id}-{user_id}-{email}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": "gpt-4o-mini",
                    "temperature": 0.1,
                    "response_format": { 
                        "type": "json_object"
                    },
                    "messages": [
                        {
                            "role": "user",
                            "content": description
                        }
                    ]
                }
            }
            openai_tasks.append(task)
        
        # Create batch file
        json_obj = io.BytesIO()
        for obj in openai_tasks:
            json_obj.write((json.dumps(obj) + '\n').encode('utf-8'))
        
        batch_file = client.files.create(
            file=json_obj,
            purpose="batch"
        )
        
        # Create batch job
        batch_job = client.batches.create(
            input_file_id=batch_file.id,
            endpoint="/v1/chat/completions",
            completion_window="24h"
        )
        
        # Update status in Supabase
        supabase.table("batch_processing_details").update({
            "batch_job_status": True,
            "completed_at": datetime.utcnow().isoformat()
        }).match({"batch_job_id": batch_job_id}).execute()
        
        logger.info(f"Batch job {batch_job_id} processed successfully")
        
    except Exception as e:
        logger.error(f"Error processing batch job {batch_job_id}: {str(e)}")
        # Update status with error
        supabase.table("batch_processing_details").update({
            "batch_job_status": False,
            "error": str(e),
            "completed_at": datetime.utcnow().isoformat()
        }).eq({"batch_job_id": batch_job_id}).execute()