File size: 3,062 Bytes
d9764fe
 
84481d5
d9764fe
 
 
 
bcd9850
 
d9764fe
 
 
bcd9850
 
d9764fe
 
 
84481d5
d9764fe
 
 
 
 
 
 
bcd9850
 
 
 
 
 
 
 
 
 
 
 
 
 
84481d5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bcd9850
d9764fe
 
 
 
 
 
 
84481d5
d9764fe
 
2bf48c4
 
d9764fe
84481d5
d9764fe
 
2bf48c4
06a4ed9
 
2bf48c4
 
 
 
d9764fe
84481d5
d9764fe
 
84481d5
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import json
import os
import requests
from kafka import KafkaConsumer
from get_gpt_answer import GetGPTAnswer
from typing import List
from concurrent.futures import ThreadPoolExecutor
from predict_custom_model import predict_custom_trained_model
from google.protobuf.json_format import MessageToDict


def get_gpt_responses(data: dict[str, any], gpt_helper: GetGPTAnswer):
    data["gpt35_answer"] = gpt_helper.generate_gpt35_answer(data["question"])
    data["gpt4_answer"] = gpt_helper.generate_gpt4_answer(data["question"])
    return data


def process_batch(batch: List[dict[str, any]], batch_size: int, gpt_helper: GetGPTAnswer):
    with ThreadPoolExecutor(max_workers=batch_size) as executor:
        futures = [executor.submit(
            get_gpt_responses, data, gpt_helper) for data in batch]
        results = [future.result() for future in futures]

    print("Batch ready with gpt responses", results)

    predictions = predict_custom_trained_model(
        instances=results, project=os.environ.get("PROJECT_ID"), endpoint_id=os.environ.get("ENDPOINT_ID"))

    results = []
    for prediction in predictions:
        result_dict = {}
        for key, value in prediction._pb.items():
            # Ensure that 'value' is a protobuf message
            if hasattr(value, 'DESCRIPTOR'):
                result_dict[key] = MessageToDict(value)
            else:
                print(f"Item {key} is not a convertible protobuf message.")
        results.append(result_dict)

    return results


def send_results_back(full_results: dict[str, any], job_application_id: str):
    url = "https://ta-2-sistem-cerdas-be-vi2jkj4riq-et.a.run.app/api/anti-cheat/update"
    headers = {
        "Content-Type": "application/json",
        "x-api-key": os.environ.get("X-API-KEY")
    }

    body = {
        "job_application_id": job_application_id,
        "evaluation": full_results
    }

    response = requests.patch(url, json=body, headers=headers)
    print(f"Data sent with status code {response.status_code}")


def consume_messages():
    consumer = KafkaConsumer(
        "ai-detector",
        bootstrap_servers=[os.environ.get("KAFKA_IP")],
        auto_offset_reset='earliest',
        client_id="ai-detector-1",
        group_id="ai-detector",
    )

    print("Successfully connected to Kafka at", os.environ.get("KAFKA_IP"))

    BATCH_SIZE = 5
    gpt_helper = GetGPTAnswer()

    for message in consumer:
        try:
            incoming_message = json.loads(message.value.decode("utf-8"))
            full_batch = incoming_message["data"]
        except json.JSONDecodeError:
            print("Failed to decode JSON from message:", message.value)
            print("Continuing...")
            continue

        full_results = []
        for i in range(0, len(full_batch), BATCH_SIZE):
            batch = full_batch[i:i+BATCH_SIZE]
            batch_results = process_batch(batch, BATCH_SIZE, gpt_helper)
            full_results.extend(batch_results)

        send_results_back(full_results, incoming_message["job_application_id"])