bearking58 commited on
Commit
b4f3263
·
1 Parent(s): ae51d62

feat: finalize pipeline

Browse files
device_manager.py ADDED
@@ -0,0 +1,12 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import torch
2
+
3
+
4
+ class DeviceManager:
5
+ _instance = None
6
+
7
+ def __new__(cls):
8
+ if cls._instance is None:
9
+ cls._instance = super(DeviceManager, cls).__new__(cls)
10
+ cls._instance.device = torch.device(
11
+ "cuda" if torch.cuda.is_available() else "cpu")
12
+ return cls._instance.device
gemma2b.py → gemma2b_dependencies.py RENAMED
@@ -3,19 +3,25 @@ import torch
3
  from torch.nn.functional import cosine_similarity
4
  from collections import Counter
5
  import numpy as np
 
6
 
7
 
8
  class Gemma2BDependencies:
9
- def __init__(self, question: str, answer: str):
10
- self.question = question
11
- self.answer = answer
12
- self.tokenizer = AutoTokenizer.from_pretrained("google/gemma-2b")
13
- self.model = AutoModelForCausalLM.from_pretrained("google/gemma-2b")
14
- self.device = torch.device("cuda")
15
- self.model.to(self.device)
16
-
17
- def calculate_perplexity(self):
18
- inputs = self.tokenizer(self.answer, return_tensors="pt",
 
 
 
 
 
19
  truncation=True, max_length=1024)
20
  inputs = {k: v.to(self.device) for k, v in inputs.items()}
21
 
@@ -27,9 +33,9 @@ class Gemma2BDependencies:
27
 
28
  return perplexity.item()
29
 
30
- def calculate_burstiness(self):
31
  # Tokenize the text using GPT-2 tokenizer
32
- tokens = self.tokenizer.tokenize(self.answer)
33
 
34
  # Count token frequencies
35
  frequency_counts = list(Counter(tokens).values())
@@ -42,8 +48,8 @@ class Gemma2BDependencies:
42
  vmr = variance / mean if mean > 0 else 0
43
  return vmr
44
 
45
- def get_embedding(self):
46
- inputs = self.tokenizer(self.text, return_tensors="pt",
47
  truncation=True, max_length=1024)
48
  inputs = {k: v.to(self.device) for k, v in inputs.items()}
49
 
@@ -55,8 +61,8 @@ class Gemma2BDependencies:
55
  embedding = torch.mean(last_hidden_states, dim=1)
56
  return embedding
57
 
58
- def calculate_cosine_similarity(self):
59
- embedding1 = self.get_embedding(self.question)
60
- embedding2 = self.get_embedding(self.answer)
61
  # Ensure the embeddings are in the correct shape for cosine_similarity
62
  return cosine_similarity(embedding1, embedding2).item()
 
3
  from torch.nn.functional import cosine_similarity
4
  from collections import Counter
5
  import numpy as np
6
+ from device_manager import DeviceManager
7
 
8
 
9
  class Gemma2BDependencies:
10
+ _instance = None
11
+
12
+ def __new__(cls):
13
+ if cls._instance is None:
14
+ cls._instance = super(Gemma2BDependencies, cls).__new__(cls)
15
+ cls._instance.tokenizer = AutoTokenizer.from_pretrained(
16
+ "google/gemma-2b")
17
+ cls._instance.model = AutoModelForCausalLM.from_pretrained(
18
+ "google/gemma-2b")
19
+ cls._instance.device = DeviceManager()
20
+ cls._instance.model.to(cls._instance.device)
21
+ return cls._instance
22
+
23
+ def calculate_perplexity(self, text: str):
24
+ inputs = self.tokenizer(text, return_tensors="pt",
25
  truncation=True, max_length=1024)
26
  inputs = {k: v.to(self.device) for k, v in inputs.items()}
27
 
 
33
 
34
  return perplexity.item()
35
 
36
+ def calculate_burstiness(self, text: str):
37
  # Tokenize the text using GPT-2 tokenizer
38
+ tokens = self.tokenizer.tokenize(text)
39
 
40
  # Count token frequencies
41
  frequency_counts = list(Counter(tokens).values())
 
48
  vmr = variance / mean if mean > 0 else 0
49
  return vmr
50
 
51
+ def get_embedding(self, text: str):
52
+ inputs = self.tokenizer(text, return_tensors="pt",
53
  truncation=True, max_length=1024)
54
  inputs = {k: v.to(self.device) for k, v in inputs.items()}
55
 
 
61
  embedding = torch.mean(last_hidden_states, dim=1)
62
  return embedding
63
 
64
+ def calculate_cosine_similarity(self, question: str, answer: str):
65
+ embedding1 = self.get_embedding(question)
66
+ embedding2 = self.get_embedding(answer)
67
  # Ensure the embeddings are in the correct shape for cosine_similarity
68
  return cosine_similarity(embedding1, embedding2).item()
hypothesis.py CHANGED
@@ -5,23 +5,19 @@ import pandas as pd
5
  import numpy as np
6
  from collections import defaultdict, Counter
7
  from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
8
- from gemma2b import Gemma2BDependencies
9
 
10
 
11
  class BaseModelHypothesis:
12
- def __init__(self, question: str, answer: str):
13
  nltk.download('punkt')
14
  nltk.download('averaged_perceptron_tagger')
15
 
16
- self.question = question
17
- self.answer = answer
18
-
19
  self.analyzer = SentimentIntensityAnalyzer()
20
  self.lexicon_df = pd.read_csv(
21
  "https://storage.googleapis.com/ta-ai-detector/datasets/NRC-Emotion-Lexicon.csv")
22
  self.emotion_lexicon = self.process_emotion_lexicon()
23
- self.gemma2bdependencies = Gemma2BDependencies(
24
- self.question, self.answer)
25
 
26
  self.features_normalized_text_length = []
27
  self.features_not_normalized = []
@@ -39,30 +35,30 @@ class BaseModelHypothesis:
39
  emotion_lexicon[row["word"]].append(row["emotion"])
40
  return emotion_lexicon
41
 
42
- def calculate_normalized_text_length_features(self):
43
  self.features_normalized_text_length = self.extract_pos_features(
44
- self.answer)
45
  self.features_normalized_text_length = self.features_normalized_text_length + \
46
- self.calculate_emotion_proportions(self.answer)
47
  self.features_normalized_text_length.append(
48
- self.measure_unique_word_ratio(self.answer))
49
 
50
  return self.scaler_normalized_text_length.transform(np.array(self.features_normalized_text_length).astype(np.float32).reshape(1, -1))
51
 
52
- def calculate_not_normalized_features(self):
53
  self.features_not_normalized.append(
54
- self.measure_sentiment_intensity(self.answer))
55
  self.features_not_normalized = self.features_not_normalized + \
56
- self.measure_readability(self.answer)
57
  self.features_not_normalized.append(
58
- self.gemma2bdependencies.calculate_perplexity(self.answer))
59
  self.features_not_normalized.append(
60
- self.gemma2bdependencies.calculate_burstiness(self.answer))
61
 
62
  return self.scaler_not_normalized.transform(np.array(self.features_not_normalized).astype(np.float32).reshape(1, -1))
63
 
64
- def extract_pos_features(self):
65
- words = nltk.word_tokenize(self.answer)
66
  pos_tags = nltk.pos_tag(words)
67
  desired_tags = ["JJ", "VB", "RB", "PRP", "DT", "IN", "NN", "NNS"]
68
  pos_counts = defaultdict(int, {tag: 0 for tag in desired_tags})
@@ -76,19 +72,19 @@ class BaseModelHypothesis:
76
 
77
  return pos_ratios
78
 
79
- def measure_sentiment_intensity(self):
80
- sentiment = self.analyzer.polarity_scores(self.answer)
81
  return sentiment["compound"]
82
 
83
- def measure_readability(self):
84
- gunning_fog = textstat.gunning_fog(self.answer)
85
- smog_index = textstat.smog_index(self.answer)
86
- dale_chall_score = textstat.dale_chall_readability_score(self.answer)
87
 
88
  return [gunning_fog, smog_index, dale_chall_score]
89
 
90
- def calculate_emotion_proportions(self):
91
- tokens = nltk.word_tokenize(self.answer)
92
 
93
  total_tokens = len(tokens)
94
 
@@ -108,8 +104,8 @@ class BaseModelHypothesis:
108
  proportions["sadness"], proportions["disgust"], proportions["anticipation"], proportions["joy"], proportions["surprise"]
109
  ]
110
 
111
- def measure_unique_word_ratio(self):
112
- tokens = nltk.word_tokenize(self.answer)
113
  total_words = len(tokens)
114
 
115
  unique_words = len(Counter(tokens).keys())
 
5
  import numpy as np
6
  from collections import defaultdict, Counter
7
  from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
8
+ from gemma2b_dependencies import Gemma2BDependencies
9
 
10
 
11
  class BaseModelHypothesis:
12
+ def __init__(self):
13
  nltk.download('punkt')
14
  nltk.download('averaged_perceptron_tagger')
15
 
 
 
 
16
  self.analyzer = SentimentIntensityAnalyzer()
17
  self.lexicon_df = pd.read_csv(
18
  "https://storage.googleapis.com/ta-ai-detector/datasets/NRC-Emotion-Lexicon.csv")
19
  self.emotion_lexicon = self.process_emotion_lexicon()
20
+ self.gemma2bdependencies = Gemma2BDependencies()
 
21
 
22
  self.features_normalized_text_length = []
23
  self.features_not_normalized = []
 
35
  emotion_lexicon[row["word"]].append(row["emotion"])
36
  return emotion_lexicon
37
 
38
+ def calculate_normalized_text_length_features(self, text: str) -> np.ndarray:
39
  self.features_normalized_text_length = self.extract_pos_features(
40
+ text)
41
  self.features_normalized_text_length = self.features_normalized_text_length + \
42
+ self.calculate_emotion_proportions(text)
43
  self.features_normalized_text_length.append(
44
+ self.measure_unique_word_ratio(text))
45
 
46
  return self.scaler_normalized_text_length.transform(np.array(self.features_normalized_text_length).astype(np.float32).reshape(1, -1))
47
 
48
+ def calculate_not_normalized_features(self, text: str) -> np.ndarray:
49
  self.features_not_normalized.append(
50
+ self.measure_sentiment_intensity(text))
51
  self.features_not_normalized = self.features_not_normalized + \
52
+ self.measure_readability(text)
53
  self.features_not_normalized.append(
54
+ self.gemma2bdependencies.calculate_perplexity(text))
55
  self.features_not_normalized.append(
56
+ self.gemma2bdependencies.calculate_burstiness(text))
57
 
58
  return self.scaler_not_normalized.transform(np.array(self.features_not_normalized).astype(np.float32).reshape(1, -1))
59
 
60
+ def extract_pos_features(self, text: str):
61
+ words = nltk.word_tokenize(text)
62
  pos_tags = nltk.pos_tag(words)
63
  desired_tags = ["JJ", "VB", "RB", "PRP", "DT", "IN", "NN", "NNS"]
64
  pos_counts = defaultdict(int, {tag: 0 for tag in desired_tags})
 
72
 
73
  return pos_ratios
74
 
75
+ def measure_sentiment_intensity(self, text: str):
76
+ sentiment = self.analyzer.polarity_scores(text)
77
  return sentiment["compound"]
78
 
79
+ def measure_readability(self, text: str):
80
+ gunning_fog = textstat.gunning_fog(text)
81
+ smog_index = textstat.smog_index(text)
82
+ dale_chall_score = textstat.dale_chall_readability_score(text)
83
 
84
  return [gunning_fog, smog_index, dale_chall_score]
85
 
86
+ def calculate_emotion_proportions(self, text: str):
87
+ tokens = nltk.word_tokenize(text)
88
 
89
  total_tokens = len(tokens)
90
 
 
104
  proportions["sadness"], proportions["disgust"], proportions["anticipation"], proportions["joy"], proportions["surprise"]
105
  ]
106
 
107
+ def measure_unique_word_ratio(self, text: str):
108
+ tokens = nltk.word_tokenize(text)
109
  total_words = len(tokens)
110
 
111
  unique_words = len(Counter(tokens).keys())
main_model.py ADDED
@@ -0,0 +1,81 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from device_manager import DeviceManager
2
+ from transformers import AlbertModel, AlbertTokenizerFast
3
+ import torch.nn as nn
4
+ import torch
5
+ import numpy as np
6
+
7
+
8
+ class AlbertCustomClassificationHead(nn.Module):
9
+ def __init__(self, albert_model, num_additional_features=25, dropout_rate=0.1):
10
+ super(AlbertCustomClassificationHead, self).__init__()
11
+ self.albert_model = albert_model
12
+ self.dropout = nn.Dropout(dropout_rate)
13
+ self.classifier = nn.Linear(1024 + num_additional_features, 1)
14
+
15
+ def forward(self, input_ids, attention_mask, additional_features, labels=None):
16
+ albert_output = self.albert_model(
17
+ input_ids=input_ids, attention_mask=attention_mask).pooler_output
18
+
19
+ combined_features = torch.cat(
20
+ [albert_output, additional_features], dim=1)
21
+
22
+ dropout_output = self.dropout(combined_features)
23
+
24
+ logits = self.classifier(dropout_output)
25
+
26
+ if labels is not None:
27
+ loss_fn = nn.BCEWithLogitsLoss()
28
+ labels = labels.unsqueeze(1)
29
+ loss = loss_fn(logits, labels.float())
30
+ return logits, loss
31
+ else:
32
+ return logits
33
+
34
+
35
+ class PredictMainModel:
36
+ _instance = None
37
+
38
+ def __new__(cls):
39
+ if cls._instance is None:
40
+ cls._instance = super(PredictMainModel, cls).__new__()
41
+ cls._instance.initialize()
42
+ return cls._instance
43
+
44
+ def initialize(self):
45
+ self.model_name = "albert-large-v2"
46
+ self.tokenizer = AlbertTokenizerFast.from_pretrained(self.model_name)
47
+ self.albert_model = AlbertModel.from_pretrained(self.model_name)
48
+ self.device = DeviceManager()
49
+
50
+ self.model = AlbertCustomClassificationHead(
51
+ self.albert_model).to(self.device)
52
+ # TODO : CHANGE MODEL STATE DICT PATH
53
+ self.model.load_state_dict(torch.load("best_model_fold_4.pth"))
54
+
55
+ def preprocess_input(self, text: str, additional_features: np.ndarray):
56
+ encoding = self.tokenizer.encode_plus(
57
+ text,
58
+ add_special_tokens=True,
59
+ max_length=512,
60
+ return_token_type_ids=False,
61
+ padding="max_length",
62
+ truncation=True,
63
+ return_attention_mask=True,
64
+ return_tensors="pt"
65
+ )
66
+
67
+ additional_features_tensor = torch.tensor(
68
+ additional_features, dtype=torch.float)
69
+
70
+ return {
71
+ "input_ids": encoding["input_ids"].to(self.device),
72
+ "attention_mask": encoding["attention_mask"].to(self.device),
73
+ "additional_features": additional_features_tensor.to(self.device)
74
+ }
75
+
76
+ def predict(self, text: str, additional_features: np.ndarray) -> float:
77
+ self.model.eval()
78
+ with torch.no_grad():
79
+ data = self.preprocess_input(text, additional_features)
80
+ logits = self.model(**data)
81
+ return torch.sigmoid(logits).cpu().numpy()[0][0]
prediction.py CHANGED
@@ -1,37 +1,12 @@
1
  from fastapi import FastAPI
2
  from pydantic import BaseModel
3
  from hypothesis import BaseModelHypothesis
4
- from randomforest import RandomForestDependencies
 
 
5
  import torch.nn as nn
6
  import torch
7
-
8
-
9
- class AlbertCustomClassificationHead(nn.Module):
10
- def __init__(self, albert_model, dropout_rate=0.1):
11
- super(AlbertCustomClassificationHead, self).__init__()
12
- self.albert_model = albert_model
13
- self.dropout = nn.Dropout(dropout_rate)
14
- self.classifier = nn.Linear(1024 + 25, 1)
15
-
16
- def forward(self, input_ids, attention_mask, additional_features, labels=None):
17
- albert_output = self.albert_model(
18
- input_ids=input_ids, attention_mask=attention_mask).pooler_output
19
-
20
- combined_features = torch.cat(
21
- [albert_output, additional_features], dim=1)
22
-
23
- dropout_output = self.dropout(combined_features)
24
-
25
- logits = self.classifier(dropout_output)
26
-
27
- if labels is not None:
28
- loss_fn = nn.BCEWithLogitsLoss()
29
- labels = labels.unsqueeze(1)
30
- loss = loss_fn(logits, labels.float())
31
- return logits, loss
32
- else:
33
- return logits
34
-
35
 
36
  app = FastAPI()
37
 
@@ -60,4 +35,23 @@ async def predict(request: PredictRequest):
60
  features_not_normalized = hypothesis.calculate_not_normalized_features(
61
  answer)
62
 
63
- return request_dict.get("backspace_count")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  from fastapi import FastAPI
2
  from pydantic import BaseModel
3
  from hypothesis import BaseModelHypothesis
4
+ from random_forest_dependencies import RandomForestDependencies
5
+ from random_forest_model import RandomForestModel
6
+ from main_model import PredictMainModel
7
  import torch.nn as nn
8
  import torch
9
+ import numpy as np
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
 
11
  app = FastAPI()
12
 
 
35
  features_not_normalized = hypothesis.calculate_not_normalized_features(
36
  answer)
37
 
38
+ combined_additional_features = np.concatenate(
39
+ (features_normalized_text_length, features_not_normalized), axis=1)
40
+
41
+ main_model = PredictMainModel()
42
+ main_model_probability = main_model.predict(
43
+ answer, combined_additional_features)
44
+
45
+ random_forest_features = RandomForestDependencies()
46
+ secondary_model_features = random_forest_features.calculate_features(
47
+ question, answer, main_model_probability, backspace_count, typing_duration, letter_click_counts)
48
+
49
+ secondary_model = RandomForestModel()
50
+ secondary_model_prediction = secondary_model.predict(
51
+ secondary_model_features)
52
+
53
+ return {
54
+ "main_model_probability": main_model_probability,
55
+ "final_prediction": secondary_model_prediction,
56
+ "prediction_class": "AI" if secondary_model_prediction == 1 else "HUMAN"
57
+ }
randomforest.py → random_forest_dependencies.py RENAMED
@@ -1,32 +1,28 @@
1
- from gemma2b import Gemma2BDependencies
2
  from collections import Counter
3
 
4
 
5
  class RandomForestDependencies:
6
- def __init__(self, question: str, answer: str):
7
- self.question = question
8
- self.answer = answer
9
-
10
- self.gemma2bdependencies = Gemma2BDependencies(
11
- self.question, self.answer)
12
  self.random_forest_features = []
13
 
14
- def calculate_features(self, probability: float, backspace_count: int, typing_duration: int, letter_click_counts: dict[str, int]):
15
  cosine_similarity = self.gemma2bdependencies.calculate_cosine_similarity(
16
- self.question, self.answer)
17
- backspace_count_normalized = backspace_count / len(self.answer)
18
- typing_duration_normalized = typing_duration / len(self.answer)
19
  letter_discrepancy = self.calculate_letter_discrepancy(
20
- self.answer, letter_click_counts)
21
 
22
  self.random_forest_features = [
23
  cosine_similarity, probability, backspace_count_normalized,
24
  typing_duration_normalized, letter_discrepancy
25
  ]
26
 
27
- def calculate_letter_discrepancy(self, letter_click_counts: dict[str, int]):
28
  # Calculate letter frequencies in the text
29
- text_letter_counts = Counter(self.answer.lower())
30
 
31
  # Calculate the ratio of click counts to text counts for each letter, adjusting for letters not in text
32
  ratios = [letter_click_counts.get(letter, 0) / (text_letter_counts.get(letter, 0) + 1)
@@ -35,6 +31,6 @@ class RandomForestDependencies:
35
  # Average the ratios and normalize by the length of the text
36
  average_ratio = sum(ratios) / len(ratios)
37
  discrepancy_ratio_normalized = average_ratio / \
38
- (len(self.answer) if len(self.answer) > 0 else 1)
39
 
40
  return discrepancy_ratio_normalized
 
1
+ from gemma2b_dependencies import Gemma2BDependencies
2
  from collections import Counter
3
 
4
 
5
  class RandomForestDependencies:
6
+ def __init__(self):
7
+ self.gemma2bdependencies = Gemma2BDependencies()
 
 
 
 
8
  self.random_forest_features = []
9
 
10
+ def calculate_features(self, question: str, answer: str, probability: float, backspace_count: int, typing_duration: int, letter_click_counts: dict[str, int]):
11
  cosine_similarity = self.gemma2bdependencies.calculate_cosine_similarity(
12
+ question, answer)
13
+ backspace_count_normalized = backspace_count / len(answer)
14
+ typing_duration_normalized = typing_duration / len(answer)
15
  letter_discrepancy = self.calculate_letter_discrepancy(
16
+ answer, letter_click_counts)
17
 
18
  self.random_forest_features = [
19
  cosine_similarity, probability, backspace_count_normalized,
20
  typing_duration_normalized, letter_discrepancy
21
  ]
22
 
23
+ def calculate_letter_discrepancy(self, text: str, letter_click_counts: dict[str, int]):
24
  # Calculate letter frequencies in the text
25
+ text_letter_counts = Counter(text.lower())
26
 
27
  # Calculate the ratio of click counts to text counts for each letter, adjusting for letters not in text
28
  ratios = [letter_click_counts.get(letter, 0) / (text_letter_counts.get(letter, 0) + 1)
 
31
  # Average the ratios and normalize by the length of the text
32
  average_ratio = sum(ratios) / len(ratios)
33
  discrepancy_ratio_normalized = average_ratio / \
34
+ (len(text) if len(text) > 0 else 1)
35
 
36
  return discrepancy_ratio_normalized
random_forest_model.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import joblib
2
+ import numpy as np
3
+ from typing import List
4
+
5
+
6
+ class RandomForestModel:
7
+ def __init__(self):
8
+ self.scaler = joblib.load("rf_scaler.joblib")
9
+ self.model = joblib.load("random_forest.joblib")
10
+
11
+ def preprocess_input(self, secondary_model_features: List[float]) -> np.ndarray:
12
+ return self.scaler.transform(np.array(secondary_model_features).astype(np.float32).reshape(1, -1))
13
+
14
+ def predict(self, secondary_model_features: List[float]):
15
+ return self.model.predict(self.preprocess_input(secondary_model_features))[0]