Spaces:
Runtime error
Runtime error
| import torch | |
| import numpy as np | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import nltk | |
| import torch.nn.functional as F | |
| import nltk | |
| from scipy.special import softmax | |
| import yaml | |
| from utils import * | |
| import joblib | |
| from optimum.bettertransformer import BetterTransformer | |
| import gc | |
| from cleantext import clean | |
| import gradio as gr | |
| from tqdm.auto import tqdm | |
| from transformers import pipeline | |
| from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
| import nltk | |
| from nltk.tokenize import sent_tokenize | |
| from optimum.pipelines import pipeline | |
| with open("config.yaml", "r") as file: | |
| params = yaml.safe_load(file) | |
| nltk.download("punkt") | |
| nltk.download("stopwords") | |
| device_needed = "cuda" if torch.cuda.is_available() else "cpu" | |
| device = 'cpu' | |
| text_bc_model_path = params["TEXT_BC_MODEL_PATH"] | |
| text_mc_model_path = params["TEXT_MC_MODEL_PATH"] | |
| text_quillbot_model_path = params["TEXT_QUILLBOT_MODEL_PATH"] | |
| text_1on1_models = params["TEXT_1ON1_MODEL"] | |
| quillbot_labels = params["QUILLBOT_LABELS"] | |
| mc_label_map = params["MC_OUTPUT_LABELS"] | |
| text_1on1_label_map = params["1ON1_OUTPUT_LABELS"] | |
| mc_token_size = int(params["MC_TOKEN_SIZE"]) | |
| bc_token_size = int(params["BC_TOKEN_SIZE"]) | |
| bias_checker_model_name = params['BIAS_CHECKER_MODEL_PATH'] | |
| bias_corrector_model_name = params['BIAS_CORRECTOR_MODEL_PATH'] | |
| text_bc_tokenizer = AutoTokenizer.from_pretrained(text_bc_model_path) | |
| text_bc_model = AutoModelForSequenceClassification.from_pretrained( | |
| text_bc_model_path | |
| ).to(device) | |
| text_mc_tokenizer = AutoTokenizer.from_pretrained(text_mc_model_path) | |
| text_mc_model = AutoModelForSequenceClassification.from_pretrained( | |
| text_mc_model_path | |
| ).to(device) | |
| quillbot_tokenizer = AutoTokenizer.from_pretrained(text_quillbot_model_path) | |
| quillbot_model = AutoModelForSequenceClassification.from_pretrained( | |
| text_quillbot_model_path | |
| ).to(device) | |
| tokenizers_1on1 = {} | |
| models_1on1 = {} | |
| for model_name, model in zip(mc_label_map, text_1on1_models): | |
| tokenizers_1on1[model_name] = AutoTokenizer.from_pretrained(model) | |
| models_1on1[model_name] = ( | |
| AutoModelForSequenceClassification.from_pretrained(model).to(device) | |
| ) | |
| # proxy models for explainability | |
| mini_bc_model_name = "polygraf-ai/bc-model-bert-mini" | |
| bc_tokenizer_mini = AutoTokenizer.from_pretrained(mini_bc_model_name) | |
| bc_model_mini = AutoModelForSequenceClassification.from_pretrained( | |
| mini_bc_model_name | |
| ).to(device_needed) | |
| mini_humanizer_model_name = "polygraf-ai/quillbot-detector-bert-mini-9K" | |
| humanizer_tokenizer_mini = AutoTokenizer.from_pretrained( | |
| mini_humanizer_model_name | |
| ) | |
| humanizer_model_mini = AutoModelForSequenceClassification.from_pretrained( | |
| mini_humanizer_model_name | |
| ).to(device_needed) | |
| bc_model_mini = BetterTransformer.transform(bc_model_mini) | |
| humanizer_model_mini = BetterTransformer.transform(humanizer_model_mini) | |
| text_bc_model = BetterTransformer.transform(text_bc_model) | |
| text_mc_model = BetterTransformer.transform(text_mc_model) | |
| quillbot_model = BetterTransformer.transform(quillbot_model) | |
| bias_model_checker = AutoModelForSequenceClassification.from_pretrained(bias_checker_model_name) | |
| tokenizer = AutoTokenizer.from_pretrained(bias_checker_model_name) | |
| bias_model_checker = BetterTransformer.transform(bias_model_checker, keep_original_model=False) | |
| bias_checker = pipeline( | |
| "text-classification", | |
| model=bias_checker_model_name, | |
| tokenizer=bias_checker_model_name, | |
| ) | |
| gc.collect() | |
| bias_corrector = pipeline( "text2text-generation", model=bias_corrector_model_name, accelerator="ort") | |
| # model score calibration | |
| iso_reg = joblib.load("isotonic_regression_model.joblib") | |
| def split_text(text: str) -> list: | |
| sentences = sent_tokenize(text) | |
| return [[sentence] for sentence in sentences] | |
| def correct_text(text: str, bias_checker, bias_corrector, separator: str = " ") -> tuple: | |
| sentence_batches = split_text(text) | |
| corrected_text = [] | |
| corrections = [] | |
| for batch in tqdm(sentence_batches, total=len(sentence_batches), desc="correcting text.."): | |
| raw_text = " ".join(batch) | |
| results = bias_checker(raw_text) | |
| if results[0]["label"] != "LABEL_1" or (results[0]["label"] == "LABEL_1" and results[0]["score"] < 0.9): | |
| corrected_batch = bias_corrector(raw_text) | |
| corrected_version = corrected_batch[0]["generated_text"] | |
| corrected_text.append(corrected_version) | |
| corrections.append((raw_text, corrected_version)) | |
| else: | |
| corrected_text.append(raw_text) | |
| corrected_text = separator.join(corrected_text) | |
| return corrected_text, corrections | |
| def update(text: str): | |
| text = clean(text, lower=False) | |
| corrected_text, corrections = correct_text(text, bias_checker, bias_corrector) | |
| corrections_display = "\n\n".join([f"Original: {orig}\nCorrected: {corr}" for orig, corr in corrections]) | |
| return corrected_text, corrections_display | |
| def split_text_allow_complete_sentences_nltk( | |
| text, | |
| max_length=256, | |
| tolerance=30, | |
| min_last_segment_length=100, | |
| type_det="bc", | |
| ): | |
| sentences = nltk.sent_tokenize(text) | |
| segments = [] | |
| current_segment = [] | |
| current_length = 0 | |
| if type_det == "bc": | |
| tokenizer = text_bc_tokenizer | |
| max_length = bc_token_size | |
| elif type_det == "mc": | |
| tokenizer = text_mc_tokenizer | |
| max_length = mc_token_size | |
| for sentence in sentences: | |
| tokens = tokenizer.tokenize(sentence) | |
| sentence_length = len(tokens) | |
| if current_length + sentence_length <= max_length + tolerance - 2: | |
| current_segment.append(sentence) | |
| current_length += sentence_length | |
| else: | |
| if current_segment: | |
| encoded_segment = tokenizer.encode( | |
| " ".join(current_segment), | |
| add_special_tokens=True, | |
| max_length=max_length + tolerance, | |
| truncation=True, | |
| ) | |
| segments.append((current_segment, len(encoded_segment))) | |
| current_segment = [sentence] | |
| current_length = sentence_length | |
| if current_segment: | |
| encoded_segment = tokenizer.encode( | |
| " ".join(current_segment), | |
| add_special_tokens=True, | |
| max_length=max_length + tolerance, | |
| truncation=True, | |
| ) | |
| segments.append((current_segment, len(encoded_segment))) | |
| final_segments = [] | |
| for i, (seg, length) in enumerate(segments): | |
| if i == len(segments) - 1: | |
| if length < min_last_segment_length and len(final_segments) > 0: | |
| prev_seg, prev_length = final_segments[-1] | |
| combined_encoded = tokenizer.encode( | |
| " ".join(prev_seg + seg), | |
| add_special_tokens=True, | |
| max_length=max_length + tolerance, | |
| truncation=True, | |
| ) | |
| if len(combined_encoded) <= max_length + tolerance: | |
| final_segments[-1] = (prev_seg + seg, len(combined_encoded)) | |
| else: | |
| final_segments.append((seg, length)) | |
| else: | |
| final_segments.append((seg, length)) | |
| else: | |
| final_segments.append((seg, length)) | |
| decoded_segments = [] | |
| encoded_segments = [] | |
| for seg, _ in final_segments: | |
| encoded_segment = tokenizer.encode( | |
| " ".join(seg), | |
| add_special_tokens=True, | |
| max_length=max_length + tolerance, | |
| truncation=True, | |
| ) | |
| decoded_segment = tokenizer.decode(encoded_segment) | |
| decoded_segments.append(decoded_segment) | |
| return decoded_segments | |
| def predict_quillbot(text): | |
| with torch.no_grad(): | |
| quillbot_model.eval() | |
| tokenized_text = quillbot_tokenizer( | |
| text, | |
| padding="max_length", | |
| truncation=True, | |
| max_length=256, | |
| return_tensors="pt", | |
| ).to(device) | |
| output = quillbot_model(**tokenized_text) | |
| output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] | |
| q_score = { | |
| "Humanized": output_norm[1].item(), | |
| "Original": output_norm[0].item(), | |
| } | |
| return q_score | |
| def predict_for_explainanility(text, model_type=None): | |
| if model_type == "quillbot": | |
| cleaning = False | |
| max_length = 256 | |
| model = humanizer_model_mini | |
| tokenizer = humanizer_tokenizer_mini | |
| elif model_type == "bc": | |
| cleaning = True | |
| max_length = 512 | |
| model = bc_model_mini | |
| tokenizer = bc_tokenizer_mini | |
| else: | |
| raise ValueError("Invalid model type") | |
| with torch.no_grad(): | |
| if cleaning: | |
| text = [remove_special_characters(t) for t in text] | |
| tokenized_text = tokenizer( | |
| text, | |
| return_tensors="pt", | |
| padding="max_length", | |
| truncation=True, | |
| max_length=max_length, | |
| ).to(device_needed) | |
| outputs = model(**tokenized_text) | |
| tensor_logits = outputs[0] | |
| probas = F.softmax(tensor_logits).detach().cpu().numpy() | |
| return probas | |
| def predict_bc(model, tokenizer, text): | |
| with torch.no_grad(): | |
| model.eval() | |
| tokens = text_bc_tokenizer( | |
| text, | |
| padding="max_length", | |
| truncation=True, | |
| max_length=bc_token_size, | |
| return_tensors="pt", | |
| ).to(device) | |
| output = model(**tokens) | |
| output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] | |
| return output_norm | |
| def predict_mc(model, tokenizer, text): | |
| with torch.no_grad(): | |
| model.eval() | |
| tokens = text_mc_tokenizer( | |
| text, | |
| padding="max_length", | |
| truncation=True, | |
| return_tensors="pt", | |
| max_length=mc_token_size, | |
| ).to(device) | |
| output = model(**tokens) | |
| output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] | |
| return output_norm | |
| def predict_mc_scores(input): | |
| bc_scores = [] | |
| mc_scores = [] | |
| samples_len_bc = len( | |
| split_text_allow_complete_sentences_nltk(input, type_det="bc") | |
| ) | |
| segments_bc = split_text_allow_complete_sentences_nltk(input, type_det="bc") | |
| for i in range(samples_len_bc): | |
| cleaned_text_bc = remove_special_characters(segments_bc[i]) | |
| bc_score = predict_bc(text_bc_model, text_bc_tokenizer, cleaned_text_bc) | |
| bc_scores.append(bc_score) | |
| bc_scores_array = np.array(bc_scores) | |
| average_bc_scores = np.mean(bc_scores_array, axis=0) | |
| bc_score_list = average_bc_scores.tolist() | |
| bc_score = {"AI": bc_score_list[1], "HUMAN": bc_score_list[0]} | |
| segments_mc = split_text_allow_complete_sentences_nltk(input, type_det="mc") | |
| samples_len_mc = len( | |
| split_text_allow_complete_sentences_nltk(input, type_det="mc") | |
| ) | |
| for i in range(samples_len_mc): | |
| cleaned_text_mc = remove_special_characters(segments_mc[i]) | |
| mc_score = predict_mc(text_mc_model, text_mc_tokenizer, cleaned_text_mc) | |
| mc_scores.append(mc_score) | |
| mc_scores_array = np.array(mc_scores) | |
| average_mc_scores = np.mean(mc_scores_array, axis=0) | |
| mc_score_list = average_mc_scores.tolist() | |
| mc_score = {} | |
| for score, label in zip(mc_score_list, mc_label_map): | |
| mc_score[label.upper()] = score | |
| sum_prob = 1 - bc_score["HUMAN"] | |
| for key, value in mc_score.items(): | |
| mc_score[key] = value * sum_prob | |
| if sum_prob < 0.01: | |
| mc_score = {} | |
| return mc_score | |
| def predict_bc_scores(input): | |
| bc_scores = [] | |
| samples_len_bc = len( | |
| split_text_allow_complete_sentences_nltk(input, type_det="bc") | |
| ) | |
| segments_bc = split_text_allow_complete_sentences_nltk(input, type_det="bc") | |
| for i in range(samples_len_bc): | |
| cleaned_text_bc = remove_special_characters(segments_bc[i]) | |
| bc_score = predict_bc(text_bc_model, text_bc_tokenizer, cleaned_text_bc) | |
| bc_scores.append(bc_score) | |
| bc_scores_array = np.array(bc_scores) | |
| average_bc_scores = np.mean(bc_scores_array, axis=0) | |
| bc_score_list = average_bc_scores.tolist() | |
| print( | |
| f"Original BC scores: AI: {bc_score_list[1]}, HUMAN: {bc_score_list[0]}" | |
| ) | |
| # isotonic regression calibration | |
| ai_score = iso_reg.predict([bc_score_list[1]])[0] | |
| human_score = 1 - ai_score | |
| bc_score = {"AI": ai_score, "HUMAN": human_score} | |
| print(f"Calibration BC scores: AI: {ai_score}, HUMAN: {human_score}") | |
| print(f"Input Text: {cleaned_text_bc}") | |
| return bc_score | |
| def predict_1on1(model, tokenizer, text): | |
| with torch.no_grad(): | |
| model.eval() | |
| tokens = tokenizer( | |
| text, | |
| padding="max_length", | |
| truncation=True, | |
| return_tensors="pt", | |
| max_length=mc_token_size, | |
| ).to(device) | |
| output = model(**tokens) | |
| output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] | |
| return output_norm | |
| def predict_1on1_combined(input): | |
| predictions = [] | |
| for i, model in enumerate(text_1on1_models): | |
| predictions.append( | |
| predict_1on1(models_1on1[model], tokenizers_1on1[model], input)[1] | |
| ) | |
| return predictions | |
| def predict_1on1_single(input, model): | |
| predictions = predict_1on1( | |
| models_1on1[model], tokenizers_1on1[model], input | |
| )[1] | |
| return predictions | |
| def predict_mc_scores(input, models): | |
| if len(models) == 0: | |
| return {} | |
| print(f"Models to Test: {models}") | |
| # BC SCORE | |
| bc_scores = [] | |
| samples_len_bc = len( | |
| split_text_allow_complete_sentences_nltk(input, type_det="bc") | |
| ) | |
| segments_bc = split_text_allow_complete_sentences_nltk(input, type_det="bc") | |
| for i in range(samples_len_bc): | |
| cleaned_text_bc = remove_special_characters(segments_bc[i]) | |
| bc_score = predict_bc(text_bc_model, text_bc_tokenizer, cleaned_text_bc) | |
| bc_scores.append(bc_score) | |
| bc_scores_array = np.array(bc_scores) | |
| average_bc_scores = np.mean(bc_scores_array, axis=0) | |
| bc_score_list = average_bc_scores.tolist() | |
| print( | |
| f"Original BC scores: AI: {bc_score_list[1]}, HUMAN: {bc_score_list[0]}" | |
| ) | |
| # isotonic regression calibration | |
| ai_score = iso_reg.predict([bc_score_list[1]])[0] | |
| human_score = 1 - ai_score | |
| bc_score = {"AI": ai_score, "HUMAN": human_score} | |
| print(f"Calibration BC scores: AI: {ai_score}, HUMAN: {human_score}") | |
| # MC SCORE | |
| if len(models) > 1: | |
| print("Starting MC") | |
| mc_scores = [] | |
| segments_mc = split_text_allow_complete_sentences_nltk( | |
| input, type_det="mc" | |
| ) | |
| samples_len_mc = len( | |
| split_text_allow_complete_sentences_nltk(input, type_det="mc") | |
| ) | |
| for i in range(samples_len_mc): | |
| cleaned_text_mc = remove_special_characters(segments_mc[i]) | |
| mc_score = predict_mc( | |
| text_mc_model, text_mc_tokenizer, cleaned_text_mc | |
| ) | |
| mc_scores.append(mc_score) | |
| mc_scores_array = np.array(mc_scores) | |
| average_mc_scores = np.mean(mc_scores_array, axis=0) | |
| mc_score_list = average_mc_scores.tolist() | |
| mc_score = {} | |
| for score, label in zip(mc_score_list, mc_label_map): | |
| mc_score[label.upper()] = score | |
| mc_score = { | |
| key: mc_score[key.upper()] | |
| for key in models | |
| if key.upper() in mc_score | |
| } | |
| total = sum(mc_score.values()) | |
| # Normalize each value by dividing it by the total | |
| mc_score = {key: value / total for key, value in mc_score.items()} | |
| sum_prob = 1 - bc_score["HUMAN"] | |
| for key, value in mc_score.items(): | |
| mc_score[key] = value * sum_prob | |
| print("MC Score:", mc_score) | |
| if sum_prob < 0.01: | |
| mc_score = {} | |
| elif len(models) == 1: | |
| print("Starting 1on1") | |
| mc_scores = [] | |
| segments_mc = split_text_allow_complete_sentences_nltk( | |
| input, type_det="mc" | |
| ) | |
| samples_len_mc = len( | |
| split_text_allow_complete_sentences_nltk(input, type_det="mc") | |
| ) | |
| for i in range(samples_len_mc): | |
| cleaned_text_mc = remove_special_characters(segments_mc[i]) | |
| mc_score = predict_1on1_single(cleaned_text_mc, models[0]) | |
| mc_scores.append(mc_score) | |
| mc_scores_array = np.array(mc_scores) | |
| average_mc_scores = np.mean(mc_scores_array, axis=0) | |
| print(average_mc_scores) | |
| mc_score_list = average_mc_scores.tolist() | |
| mc_score = {} | |
| mc_score[models[0].upper()] = mc_score_list | |
| mc_score["OTHER"] = 1 - mc_score_list | |
| sum_prob = 1 - bc_score["HUMAN"] | |
| for key, value in mc_score.items(): | |
| mc_score[key] = value * sum_prob | |
| if sum_prob < 0.01: | |
| mc_score = {} | |
| return mc_score | |