news_verification / src /application /text /search_detection.py
pmkhanh7890's picture
update algorithm
7e6ffb4
raw
history blame
13.5 kB
import string
import warnings
from difflib import SequenceMatcher
import nltk
import numpy as np
import pandas as pd
import torch
from sentence_transformers import (
SentenceTransformer,
util,
)
from src.application.text.helper import extract_equal_text
from src.application.text.preprocessing import split_into_paragraphs
from src.application.text.search import (
generate_search_phrases,
search_by_google,
)
from src.application.url_reader import URLReader
warnings.simplefilter(action="ignore", category=FutureWarning)
# Download necessary NLTK data files
nltk.download("punkt", quiet=True)
nltk.download("punkt_tab", quiet=True)
nltk.download("stopwords", quiet=True)
# load the model
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
PARAPHASE_MODEL = SentenceTransformer("paraphrase-MiniLM-L6-v2")
PARAPHASE_MODEL.to(DEVICE)
PARAPHRASE_THRESHOLD_HUMAN = 0.963
PARAPHRASE_THRESHOLD_MACHINE = 0.8
PARAPHRASE_THRESHOLD = 0.8
MIN_SAME_SENTENCE_LEN = 6
MIN_PHRASE_SENTENCE_LEN = 10
MIN_RATIO_PARAPHRASE_NUM = 0.5
MAX_CHAR_SIZE = 30000
def detect_text_by_relative_search(
input_text,
index,
is_support_opposite=False,
):
checked_urls = set()
searched_phrases = generate_search_phrases(input_text[index])
for candidate in searched_phrases:
search_results = search_by_google(candidate)
urls = [item["link"] for item in search_results.get("items", [])]
for url in urls[:3]:
if url in checked_urls: # visited url
continue
if "bbc.com" not in url:
continue
checked_urls.add(url)
print(f"\t\tChecking URL: {url}")
content = URLReader(url)
if content.is_extracted is True:
if content.title is None or content.text is None:
print("\t\t\t↑↑↑ Title or text not found")
continue
page_text = content.title + "\n" + content.text
if len(page_text) > MAX_CHAR_SIZE:
print(f"\t\t\t↑↑↑ More than {MAX_CHAR_SIZE} characters")
continue
print(f"\t\t\t↑↑↑ Title: {content.title}")
aligned_first_sentences = check_paraphrase(
input_text[index],
page_text,
url,
)
is_paraphrased = aligned_first_sentences["is_paraphrased"]
if is_paraphrased is False:
return (
is_paraphrased,
url,
aligned_first_sentences,
content.images,
index,
)
sub_paraphrase = True
while sub_paraphrase is True:
index += 1
print(f"----search {index} < {len(input_text)}----")
if index >= len(input_text):
print(f"input_text_last: {input_text[-1]}")
break
print(f"input_text: {input_text[index]}")
sub_sentences = check_paraphrase(
input_text[index],
page_text,
url,
)
sub_paraphrase = sub_sentences["is_paraphrased"]
print(f"sub_paraphrase: {sub_paraphrase}")
print(f"sub_sentences: {sub_sentences}")
if sub_paraphrase is True:
aligned_first_sentences["input"] += (
"<br>" + sub_sentences["input"]
)
aligned_first_sentences["source"] += (
"<br>" + sub_sentences["source"]
)
aligned_first_sentences["similarity"] += sub_sentences[
"similarity"
]
aligned_first_sentences["similarity"] /= 2
print(f"paraphrase: {is_paraphrased}")
print(f"aligned_first_sentences: {aligned_first_sentences}")
return (
is_paraphrased,
url,
aligned_first_sentences,
content.images,
index,
)
return False, None, [], [], index
def find_paragraph_source(text, text_index, sentences_df):
checked_urls = set()
searched_phrases = generate_search_phrases(text[text_index])
print(f"text[text_index]: {text[text_index]}")
print(f"searched_phrases: {searched_phrases}")
for candidate in searched_phrases:
search_results = search_by_google(candidate)
urls = [item["link"] for item in search_results.get("items", [])]
for url in urls[:3]:
if url in checked_urls: # visited url
continue
if "bbc.com" not in url:
continue
checked_urls.add(url)
print(f"\t\tChecking URL: {url}")
content = URLReader(url)
if content.is_extracted is True:
if content.title is None or content.text is None:
print("\t\t\t↑↑↑ Title or text not found")
continue
page_text = content.title + "\n" + content.text
if len(page_text) > MAX_CHAR_SIZE:
print(f"\t\t\t↑↑↑ More than {MAX_CHAR_SIZE} characters")
continue
print(f"\t\t\t↑↑↑ Title: {content.title}")
aligned_sentence = check_paraphrase(
text[text_index],
page_text,
url,
)
if aligned_sentence["paraphrase"] is False:
print(f'sentence_1: {sentences_df.loc[text_index, "input"]}')
print(f'sentence_2: {aligned_sentence["input"]}')
sentences_df.loc[text_index, "input"] = aligned_sentence["input"]
sentences_df.loc[text_index, "paraphrase"] = aligned_sentence["paraphrase"]
return sentences_df, []
# assign values
columns = [
"input",
"source",
"label",
"similarity",
"paraphrase",
"url",
]
for c in columns:
if c in sentences_df.columns:
sentences_df.loc[text_index, c] = aligned_sentence[c]
print(f"sen: {sentences_df}")
for text_index, _ in enumerate(sentences_df):
print(f"{text_index}")
if sentences_df.loc[text_index, "url"] is not None:
continue
# find content in new url
aligned_sentence = check_paraphrase(
text[text_index],
page_text,
url,
)
if aligned_sentence["url"] is None:
continue
columns = ["input", "source", "label", "similarity", "url"]
for c in columns:
if c in sentences_df.columns:
sentences_df.loc[text_index, c] = aligned_sentence[c]
return sentences_df, content.images
return sentences_df, []
def longest_common_subsequence(arr1, arr2):
"""
Finds the length of the longest common subsequence (contiguous) between
two arrays.
Args:
arr1: The first array.
arr2: The second array.
Returns:
The length of the longest common subsequence.
Returns 0 if either input is invalid.
"""
if not isinstance(arr1, list) or not isinstance(arr2, list):
return 0
n = len(arr1)
m = len(arr2)
if n == 0 or m == 0: # handle empty list
return 0
# Create table dp with size (n+1) x (m+1)
dp = [[0] * (m + 1) for _ in range(n + 1)]
max_length = 0
for i in range(1, n + 1):
for j in range(1, m + 1):
if arr1[i - 1] == arr2[j - 1]:
dp[i][j] = dp[i - 1][j - 1] + 1
max_length = max(max_length, dp[i][j])
else:
dp[i][j] = 0 # set 0 since the array must be consecutive
return max_length
def check_sentence(
input_sentence,
source_sentence,
min_same_sentence_len,
min_phrase_sentence_len,
verbose=False,
):
"""
Checks if two sentences are similar based on exact match or
longest common subsequence.
Args:
input_sentence: The input sentence.
source_sentence: The source sentence.
min_same_sentence_len: Minimum length for exact sentence match.
min_phrase_sentence_len: Minimum length for common subsequence match.
verbose: If True, print debug information.
Returns:
True if the sentences are considered similar, False otherwise.
Returns False if input is not valid.
"""
if not isinstance(input_sentence, str) or not isinstance(
source_sentence,
str,
):
return False
input_sentence = input_sentence.strip()
source_sentence = source_sentence.strip()
if not input_sentence or not source_sentence: # handle empty string
return False
input_words = input_sentence.split() # split without arguments
source_words = source_sentence.split() # split without arguments
if (
input_sentence == source_sentence
and len(input_words) >= min_same_sentence_len
):
if verbose:
print("Exact match found.")
return True
max_overlap_len = longest_common_subsequence(input_words, source_words)
if verbose:
print(f"Max overlap length: {max_overlap_len}") # print overlap length
if max_overlap_len >= min_phrase_sentence_len:
return True
return False
def check_paraphrase(input_text, page_text, url):
"""
Checks if the input text is paraphrased in the content at the given URL.
Args:
input_text: The text to check for paraphrase.
page_text: The text of the web page to compare with.
url
Returns:
A tuple containing:
"""
# Extract sentences from input text and web page
input_paragraphs = [input_text]
if not page_text:
return {}
page_paragraphs = split_into_paragraphs(page_text)
if not input_paragraphs or not page_paragraphs:
return {}
additional_sentences = []
for sentence in page_paragraphs:
if ", external" in sentence:
additional_sentences.append(sentence.replace(", external", ""))
page_paragraphs.extend(additional_sentences)
# Encode sentences into embeddings
embeddings1 = PARAPHASE_MODEL.encode(
input_paragraphs,
convert_to_tensor=True,
device=DEVICE,
)
embeddings2 = PARAPHASE_MODEL.encode(
page_paragraphs,
convert_to_tensor=True,
device=DEVICE,
)
# Compute cosine similarity matrix
similarity_matrix = util.cos_sim(embeddings1, embeddings2).cpu().numpy()
# Find sentence alignments
alignment = {}
for i, paragraph in enumerate(input_paragraphs):
max_sim_index = np.argmax(similarity_matrix[i])
max_similarity = similarity_matrix[i][max_sim_index]
label, is_paraphrased = determine_label(max_similarity)
print(f"is_paraphrased: {is_paraphrased}")
if is_paraphrased is False:
url = None
best_matched_paragraph = None
else:
best_matched_paragraph = page_paragraphs[max_sim_index]
alignment = {
"input": paragraph,
"source": best_matched_paragraph,
"similarity": max_similarity,
"label": label,
"paraphrase": is_paraphrased,
"url": url,
}
return alignment
def similarity_ratio(a, b):
"""
Calculates the similarity ratio between two strings using SequenceMatcher.
Args:
a: The first string.
b: The second string.
Returns:
A float representing the similarity ratio between 0.0 and 1.0.
Returns 0.0 if either input is None or not a string.
"""
if (
not isinstance(a, str)
or not isinstance(b, str)
or a is None
or b is None
):
return 0.0 # Handle cases where inputs are not strings or None
return SequenceMatcher(None, a, b).ratio()
def check_human(alligned_sentences):
"""
Checks if a sufficient number of input sentences are found within
source sentences.
Returns:
bool: True if the condition is met, False otherwise.
"""
if not alligned_sentences: # Handle empty data case
return False
if alligned_sentences["similarity"] >= 0.99:
return True
return False
def determine_label(similarity):
if similarity >= PARAPHRASE_THRESHOLD_HUMAN:
return "HUMAN", True
elif similarity >= PARAPHRASE_THRESHOLD_MACHINE:
return "MACHINE", True
else:
return "", False
if __name__ == "__main__":
pass