Spaces:
Runtime error
Runtime error
| from utils import cosineSim, googleSearch, getSentences, parallel_scrap, matchingScore | |
| import gradio as gr | |
| from urllib.request import urlopen, Request | |
| from googleapiclient.discovery import build | |
| import requests | |
| import httpx | |
| import re | |
| from bs4 import BeautifulSoup | |
| import numpy as np | |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification | |
| import asyncio | |
| from scipy.special import softmax | |
| from evaluate import load | |
| from datetime import date | |
| import nltk | |
| import fitz | |
| from transformers import GPT2LMHeadModel, GPT2TokenizerFast | |
| import nltk, spacy, subprocess, torch | |
| import plotly.graph_objects as go | |
| import nltk | |
| from unidecode import unidecode | |
| nltk.download('punkt') | |
| tokenizer = AutoTokenizer.from_pretrained('google-bert/bert-base-uncased') | |
| from writing_analysis import ( | |
| normalize, | |
| preprocess_text1, | |
| preprocess_text2, | |
| vocabulary_richness_ttr, | |
| calculate_gunning_fog, | |
| calculate_average_sentence_length, | |
| calculate_average_word_length, | |
| calculate_syntactic_tree_depth, | |
| calculate_perplexity, | |
| ) | |
| np.set_printoptions(suppress=True) | |
| def plagiarism_check( | |
| plag_option, | |
| input, | |
| year_from, | |
| month_from, | |
| day_from, | |
| year_to, | |
| month_to, | |
| day_to, | |
| domains_to_skip, | |
| ): | |
| api_key = "AIzaSyCLyCCpOPLZWuptuPAPSg8cUIZhdEMVf6g" | |
| api_key = "AIzaSyCS1WQDMl1IMjaXtwSd_2rA195-Yc4psQE" | |
| api_key = "AIzaSyCB61O70B8AC3l5Kk3KMoLb6DN37B7nqIk" | |
| api_key = "AIzaSyCg1IbevcTAXAPYeYreps6wYWDbU0Kz8tg" | |
| # api_key = "AIzaSyBrx_pgb6A64wPFQXSGQRgGtukoxVV_0Fk" | |
| cse_id = "851813e81162b4ed4" | |
| sentences = getSentences(input) | |
| urlCount = {} | |
| ScoreArray = [] | |
| urlList = [] | |
| date_from = build_date(year_from, month_from, day_from) | |
| date_to = build_date(year_to, month_to, day_to) | |
| sort_date = f"date:r:{date_from}:{date_to}" | |
| # get list of URLS to check | |
| urlCount, ScoreArray = googleSearch( | |
| plag_option, | |
| sentences, | |
| urlCount, | |
| ScoreArray, | |
| urlList, | |
| sort_date, | |
| domains_to_skip, | |
| api_key, | |
| cse_id, | |
| ) | |
| print("Number of URLs: ", len(urlCount)) | |
| # print("Old Score Array:\n") | |
| # print2D(ScoreArray) | |
| # Scrape URLs in list | |
| formatted_tokens = [] | |
| soups = asyncio.run(parallel_scrap(urlList)) | |
| print(len(soups)) | |
| print( | |
| "Successful scraping: " | |
| + str(len([x for x in soups if x is not None])) | |
| + "out of " | |
| + str(len(urlList)) | |
| ) | |
| # Populate matching scores for scrapped pages | |
| for i, soup in enumerate(soups): | |
| print(f"Analyzing {i+1} of {len(soups)} soups........................") | |
| if soup: | |
| page_content = soup.text | |
| for j, sent in enumerate(sentences): | |
| score = matchingScore(sent, page_content) | |
| ScoreArray[i][j] = score | |
| # ScoreArray = asyncio.run(parallel_analyze_2(soups, sentences, ScoreArray)) | |
| # print("New Score Array:\n") | |
| # print2D(ScoreArray) | |
| # Gradio formatting section | |
| sentencePlag = [False] * len(sentences) | |
| sentenceToMaxURL = [-1] * len(sentences) | |
| for j in range(len(sentences)): | |
| if j > 0: | |
| maxScore = ScoreArray[sentenceToMaxURL[j - 1]][j] | |
| sentenceToMaxURL[j] = sentenceToMaxURL[j - 1] | |
| else: | |
| maxScore = -1 | |
| for i in range(len(ScoreArray)): | |
| margin = ( | |
| 0.1 | |
| if (j > 0 and sentenceToMaxURL[j] == sentenceToMaxURL[j - 1]) | |
| else 0 | |
| ) | |
| if ScoreArray[i][j] - maxScore > margin: | |
| maxScore = ScoreArray[i][j] | |
| sentenceToMaxURL[j] = i | |
| if maxScore > 0.5: | |
| sentencePlag[j] = True | |
| if ( | |
| (len(sentences) > 1) | |
| and (sentenceToMaxURL[1] != sentenceToMaxURL[0]) | |
| and ( | |
| ScoreArray[sentenceToMaxURL[0]][0] | |
| - ScoreArray[sentenceToMaxURL[1]][0] | |
| < 0.1 | |
| ) | |
| ): | |
| sentenceToMaxURL[0] = sentenceToMaxURL[1] | |
| index = np.unique(sentenceToMaxURL) | |
| urlMap = {} | |
| for count, i in enumerate(index): | |
| urlMap[i] = count + 1 | |
| for i, sent in enumerate(sentences): | |
| formatted_tokens.append( | |
| (sent, "[" + str(urlMap[sentenceToMaxURL[i]]) + "]") | |
| ) | |
| formatted_tokens.append(("\n", None)) | |
| formatted_tokens.append(("\n", None)) | |
| formatted_tokens.append(("\n", None)) | |
| urlScore = {} | |
| for url in index: | |
| s = [ | |
| ScoreArray[url][sen] | |
| for sen in range(len(sentences)) | |
| if sentenceToMaxURL[sen] == url | |
| ] | |
| urlScore[url] = sum(s) / len(s) | |
| for ind in index: | |
| formatted_tokens.append( | |
| ( | |
| urlList[ind] + " --- Matching Score: " + f"{str(round(urlScore[ind] * 100, 2))}%", | |
| "[" + str(urlMap[ind]) + "]", | |
| ) | |
| ) | |
| formatted_tokens.append(("\n", None)) | |
| print(f"Formatted Tokens: {formatted_tokens}") | |
| return formatted_tokens | |
| """ | |
| AI DETECTION SECTION | |
| """ | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| text_bc_model_path = "polygraf-ai/v3-bert-3-2m-trun-bc-lighter-spec" | |
| text_bc_tokenizer = AutoTokenizer.from_pretrained(text_bc_model_path) | |
| text_bc_model = AutoModelForSequenceClassification.from_pretrained(text_bc_model_path).to(device) | |
| text_mc_model_path = "polygraf-ai/text-detect-mc-bert-base-uncased-v1-bert-429k-256" | |
| text_mc_tokenizer = AutoTokenizer.from_pretrained(text_mc_model_path) | |
| text_mc_model = AutoModelForSequenceClassification.from_pretrained(text_mc_model_path).to(device) | |
| def remove_accents(input_str): | |
| # nfkd_form = unicodedata.normalize('NFKD', input_str) | |
| # return "".join([char for char in nfkd_form if not unicodedata.combining(char)]) | |
| text_no_accents = unidecode(input_str) | |
| return text_no_accents | |
| def remove_special_characters(text): | |
| text = remove_accents(text) | |
| pattern = r'[^\w\s\d.,!?\'"()-;]+' | |
| text = re.sub(pattern, '', text) | |
| return text | |
| def update_character_count(text): | |
| return f"{len(text)} characters" | |
| def split_text_allow_complete_sentences_nltk(text, max_length=256, tolerance=10, min_last_segment_length=120): | |
| sentences = nltk.sent_tokenize(text) | |
| segments = [] | |
| current_segment = [] | |
| current_length = 0 | |
| for sentence in sentences: | |
| tokens = tokenizer.tokenize(sentence) | |
| sentence_length = len(tokens) | |
| if current_length + sentence_length <= max_length + tolerance - 2: | |
| current_segment.append(sentence) | |
| current_length += sentence_length | |
| else: | |
| if current_segment: | |
| encoded_segment = tokenizer.encode(' '.join(current_segment), add_special_tokens=True, max_length=max_length+tolerance, truncation=True) | |
| segments.append((current_segment, len(encoded_segment))) | |
| current_segment = [sentence] | |
| current_length = sentence_length | |
| if current_segment: | |
| encoded_segment = tokenizer.encode(' '.join(current_segment), add_special_tokens=True, max_length=max_length+tolerance, truncation=True) | |
| segments.append((current_segment, len(encoded_segment))) | |
| final_segments = [] | |
| for i, (seg, length) in enumerate(segments): | |
| if i == len(segments) - 1: | |
| if length < min_last_segment_length and len(final_segments) > 0: | |
| prev_seg, prev_length = final_segments[-1] | |
| combined_encoded = tokenizer.encode(' '.join(prev_seg + seg), add_special_tokens=True, max_length=max_length+tolerance, truncation=True) | |
| if len(combined_encoded) <= max_length + tolerance: | |
| final_segments[-1] = (prev_seg + seg, len(combined_encoded)) | |
| else: | |
| final_segments.append((seg, length)) | |
| else: | |
| final_segments.append((seg, length)) | |
| else: | |
| final_segments.append((seg, length)) | |
| decoded_segments = [] | |
| encoded_segments = [] | |
| for seg, _ in final_segments: | |
| encoded_segment = tokenizer.encode(' '.join(seg), add_special_tokens=True, max_length=max_length+tolerance, truncation=True) | |
| decoded_segment = tokenizer.decode(encoded_segment) | |
| decoded_segments.append(decoded_segment) | |
| return decoded_segments | |
| def predict_bc(model, tokenizer, text): | |
| tokens = tokenizer( | |
| text, padding='max_length', truncation=True, max_length=256, return_tensors="pt" | |
| ).to(device)["input_ids"] | |
| output = model(tokens) | |
| output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] | |
| print("BC Score: ", output_norm) | |
| return output_norm | |
| def predict_mc(model, tokenizer, text): | |
| tokens = tokenizer( | |
| text, padding='max_length', truncation=True, return_tensors="pt", max_length=256 | |
| ).to(device)["input_ids"] | |
| output = model(tokens) | |
| output_norm = softmax(output.logits.detach().cpu().numpy(), 1)[0] | |
| print("MC Score: ", output_norm) | |
| return output_norm | |
| def ai_generated_test(ai_option, input): | |
| bc_scores = [] | |
| mc_scores = [] | |
| samples_len = len(split_text_allow_complete_sentences_nltk(input)) | |
| segments = split_text_allow_complete_sentences_nltk(input) | |
| for i in range(samples_len): | |
| cleaned_text = remove_special_characters(segments[i]) | |
| bc_score = predict_bc(text_bc_model, text_bc_tokenizer,cleaned_text ) | |
| mc_score = predict_mc(text_mc_model, text_mc_tokenizer, cleaned_text) | |
| bc_scores.append(bc_score) | |
| mc_scores.append(mc_score) | |
| bc_scores_array = np.array(bc_scores) | |
| mc_scores_array = np.array(mc_scores) | |
| average_bc_scores = np.mean(bc_scores_array, axis=0) | |
| average_mc_scores = np.mean(mc_scores_array, axis=0) | |
| bc_score_list = average_bc_scores.tolist() | |
| mc_score_list = average_mc_scores.tolist() | |
| # Temporary | |
| mc_score_list[1] = mc_score_list[0] + mc_score_list[1] | |
| mc_score_list = mc_score_list[1:] | |
| bc_score = {"AI": bc_score[1].item(), "HUMAN": bc_score[0].item()} | |
| mc_score = {} | |
| label_map = ["OpenAI GPT", "CLAUDE", "BARD", "LLAMA 2"] | |
| for score, label in zip(mc_score_list, label_map): | |
| mc_score[label.upper()] = score | |
| sum_prob = 1 - bc_score["HUMAN"] | |
| for key, value in mc_score.items(): | |
| mc_score[key] = value * sum_prob | |
| if ai_option == "Human vs AI": | |
| mc_score = {} | |
| if sum_prob < 0.01 : | |
| mc_score = {} | |
| return bc_score, mc_score | |
| else: | |
| return bc_score, mc_score | |
| # COMBINED | |
| def main( | |
| ai_option, | |
| plag_option, | |
| input, | |
| # models, | |
| year_from, | |
| month_from, | |
| day_from, | |
| year_to, | |
| month_to, | |
| day_to, | |
| domains_to_skip, | |
| ): | |
| formatted_tokens = plagiarism_check( | |
| plag_option, | |
| input, | |
| year_from, | |
| month_from, | |
| day_from, | |
| year_to, | |
| month_to, | |
| day_to, | |
| domains_to_skip, | |
| ) | |
| depth_analysis_plot = depth_analysis(input) | |
| bc_score, mc_score = ai_generated_test(ai_option,input) | |
| return ( | |
| bc_score, | |
| mc_score, | |
| formatted_tokens, | |
| depth_analysis_plot, | |
| ) | |
| def build_date(year, month, day): | |
| return f"{year}{months[month]}{day}" | |
| def len_validator(text): | |
| min_tokens = 128 | |
| lengt = len(tokenizer.tokenize(text = text, return_tensors="pt")) | |
| if lengt < min_tokens: | |
| return f"Warning! Input length is {lengt}. Please input a text that is greater than {min_tokens} tokens long. Recommended length {min_tokens*2} tokens." | |
| else : | |
| return f"Input length is satisified." | |
| def extract_text_from_pdf(pdf_path): | |
| doc = fitz.open(pdf_path) | |
| text = "" | |
| for page in doc: | |
| text += page.get_text() | |
| return text | |
| # DEPTH ANALYSIS | |
| print("loading depth analysis") | |
| nltk.download('stopwords') | |
| nltk.download('punkt') | |
| command = ['python3', '-m', 'spacy', 'download', 'en_core_web_sm'] | |
| # Execute the command | |
| subprocess.run(command) | |
| nlp = spacy.load("en_core_web_sm") | |
| # for perplexity | |
| model_id = "gpt2" | |
| gpt2_model = GPT2LMHeadModel.from_pretrained(model_id).to(device) | |
| gpt2_tokenizer = GPT2TokenizerFast.from_pretrained(model_id) | |
| def depth_analysis(input_text): | |
| # vocanulary richness | |
| processed_words = preprocess_text1(input_text) | |
| ttr_value = vocabulary_richness_ttr(processed_words) | |
| # readability | |
| gunning_fog = calculate_gunning_fog(input_text) | |
| gunning_fog_norm = normalize(gunning_fog, min_value=0, max_value=20) | |
| # average sentence length and average word length | |
| words, sentences = preprocess_text2(input_text) | |
| average_sentence_length = calculate_average_sentence_length(sentences) | |
| average_word_length = calculate_average_word_length(words) | |
| average_sentence_length_norm = normalize(average_sentence_length, min_value=0, max_value=40) | |
| average_word_length_norm = normalize(average_word_length, min_value=0, max_value=8) | |
| # syntactic_tree_depth | |
| average_tree_depth = calculate_syntactic_tree_depth(nlp, input_text) | |
| average_tree_depth_norm = normalize(average_tree_depth, min_value=0, max_value=10) | |
| # perplexity | |
| perplexity = calculate_perplexity(input_text, gpt2_model, gpt2_tokenizer, device) | |
| perplexity_norm = normalize(perplexity, min_value=0, max_value=30) | |
| features = { | |
| "readability": gunning_fog_norm, | |
| "syntactic tree depth": average_tree_depth_norm, | |
| "vocabulary richness": ttr_value, | |
| "perplexity": perplexity_norm, | |
| "average sentence length": average_sentence_length_norm, | |
| "average word length": average_word_length_norm, | |
| } | |
| print(features) | |
| fig = go.Figure() | |
| fig.add_trace(go.Scatterpolar( | |
| r=list(features.values()), | |
| theta=list(features.keys()), | |
| fill='toself', | |
| name='Radar Plot' | |
| )) | |
| fig.update_layout( | |
| polar=dict( | |
| radialaxis=dict( | |
| visible=True, | |
| range=[0, 100], | |
| )), | |
| showlegend=False, | |
| # autosize=False, | |
| # width=600, | |
| # height=600, | |
| margin=dict( | |
| l=10, | |
| r=20, | |
| b=10, | |
| t=10, | |
| # pad=100 | |
| ), | |
| ) | |
| return fig | |
| # START OF GRADIO | |
| title = "Copyright Checker" | |
| months = { | |
| "January": "01", | |
| "February": "02", | |
| "March": "03", | |
| "April": "04", | |
| "May": "05", | |
| "June": "06", | |
| "July": "07", | |
| "August": "08", | |
| "September": "09", | |
| "October": "10", | |
| "November": "11", | |
| "December": "12", | |
| } | |
| with gr.Blocks() as demo: | |
| today = date.today() | |
| # dd/mm/YY | |
| d1 = today.strftime("%d/%B/%Y") | |
| d1 = d1.split("/") | |
| model_list = ["OpenAI GPT", "CLAUDE", "BARD", "LLAMA2"] | |
| domain_list = ["com", "org", "net", "int", "edu", "gov", "mil"] | |
| gr.Markdown( | |
| """ | |
| # Copyright Checker | |
| """ | |
| ) | |
| with gr.Row(): | |
| input_text = gr.Textbox(label="Input text", lines=6, placeholder="") | |
| file_input = gr.File(label="Upload PDF") | |
| file_input.change(fn=extract_text_from_pdf, inputs=file_input, outputs=input_text) | |
| char_count = gr.Textbox(label="Minumum Character Limit Check") | |
| input_text.change(fn=len_validator, inputs=input_text, outputs=char_count) | |
| with gr.Row(): | |
| with gr.Column(): | |
| ai_option = gr.Radio(["Human vs AI", "Human vs AI Source Models"], label="Choose an option please.") | |
| with gr.Column(): | |
| plag_option = gr.Radio(["Standard", "Advanced"], label="Choose an option please.") | |
| with gr.Row(): | |
| with gr.Column(): | |
| only_ai_btn = gr.Button("AI Check") | |
| with gr.Column(): | |
| only_plagiarism_btn = gr.Button("Plagiarism Check") | |
| with gr.Row(): | |
| depth_analysis_btn = gr.Button("Detailed Writing Analysis") | |
| with gr.Row(): | |
| full_check_btn = gr.Button("Full Check") | |
| gr.Markdown( | |
| """ | |
| ## Output | |
| """ | |
| ) | |
| # models = gr.Dropdown( | |
| # model_list, | |
| # value=model_list, | |
| # multiselect=True, | |
| # label="Models to test against", | |
| # ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| bcLabel = gr.Label(label="Source") | |
| with gr.Column(): | |
| mcLabel = gr.Label(label="Creator") | |
| with gr.Group(): | |
| with gr.Row(): | |
| month_from = gr.Dropdown( | |
| choices=months, | |
| label="From Month", | |
| value="January", | |
| interactive=True, | |
| ) | |
| day_from = gr.Textbox(label="From Day", value="01") | |
| year_from = gr.Textbox(label="From Year", value="2000") | |
| # from_date_button = gr.Button("Submit") | |
| with gr.Row(): | |
| month_to = gr.Dropdown( | |
| choices=months, | |
| label="To Month", | |
| value=d1[1], | |
| interactive=True, | |
| ) | |
| day_to = gr.Textbox(label="To Day", value=d1[0]) | |
| year_to = gr.Textbox(label="To Year", value=d1[2]) | |
| # to_date_button = gr.Button("Submit") | |
| with gr.Row(): | |
| domains_to_skip = gr.Dropdown( | |
| domain_list, | |
| multiselect=True, | |
| label="Domain To Skip", | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| sentenceBreakdown = gr.HighlightedText( | |
| label="Plagiarism Sentence Breakdown", | |
| combine_adjacent=True, | |
| color_map={ | |
| "[1]": "red", | |
| "[2]": "orange", | |
| "[3]": "yellow", | |
| "[4]": "green", | |
| }, | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| writing_analysis_plot = gr.Plot( | |
| label="Writing Analysis Plot" | |
| ) | |
| full_check_btn.click( | |
| fn=main, | |
| inputs=[ | |
| ai_option, | |
| plag_option, | |
| input_text, | |
| # models, | |
| year_from, | |
| month_from, | |
| day_from, | |
| year_to, | |
| month_to, | |
| day_to, | |
| domains_to_skip, | |
| ], | |
| outputs=[ | |
| bcLabel, | |
| mcLabel, | |
| sentenceBreakdown, | |
| writing_analysis_plot, | |
| ], | |
| api_name="main", | |
| ) | |
| only_ai_btn.click( | |
| fn=ai_generated_test, | |
| inputs=[ai_option, input_text], | |
| outputs=[ | |
| bcLabel, | |
| mcLabel, | |
| ], | |
| api_name="ai_check", | |
| ) | |
| only_plagiarism_btn.click( | |
| fn=plagiarism_check, | |
| inputs=[ | |
| plag_option, | |
| input_text, | |
| year_from, | |
| month_from, | |
| day_from, | |
| year_to, | |
| month_to, | |
| day_to, | |
| domains_to_skip, | |
| ], | |
| outputs=[ | |
| sentenceBreakdown, | |
| ], | |
| api_name="plagiarism_check", | |
| ) | |
| depth_analysis_btn.click( | |
| fn=depth_analysis, | |
| inputs=[input_text], | |
| outputs=[writing_analysis_plot], | |
| api_name="depth_analysis", | |
| ) | |
| date_from = "" | |
| date_to = "" | |
| demo.launch(share=True, server_name="0.0.0.0", auth=("polygraf-admin", "test@aisd")) |