news_verification / src /application /content_detection.py
pmkhanh7890's picture
refactor code and fix bugs
b73a4fc
raw
history blame
31.9 kB
"""
Author: Khanh Phan
Date: 2024-12-04
"""
import pandas as pd
from src.application.config import MIN_RATIO_PARAPHRASE_NUM, PARAPHRASE_THRESHOLD, PARAPHRASE_THRESHOLD_MACHINE
from src.application.formatting import color_text, format_entity_count
from src.application.image.image_detection import (
detect_image_by_ai_model,
detect_image_by_reverse_search,
detect_image_from_news_image,
)
from src.application.text.entity import (
apply_highlight,
highlight_entities,
)
from src.application.text.helper import (
extract_equal_text,
postprocess_label,
split_into_paragraphs,
)
from src.application.text.model_detection import (
detect_text_by_ai_model,
predict_generation_model,
)
from src.application.text.search_detection import find_sentence_source
class NewsVerification:
def __init__(self):
"""
Initializes the NewsVerification object.
"""
self.news_text: str = ""
self.news_title: str = ""
self.news_content: str = ""
self.news_image: str = ""
self.text_prediction_label: list[str] = ["UNKNOWN"]
self.text_prediction_score: list[float] = [0.0]
self.image_prediction_label: list[str] = ["UNKNOWN"]
self.image_prediction_score: list[str] = [0.0]
self.image_referent_url: list[str] = []
self.news_prediction_label: str = ""
self.news_prediction_score: float = -1
# news' urls to find img
self.found_img_url: list[str] = []
# Analyzed results
self.aligned_sentences_df: pd.DataFrame = pd.DataFrame(
columns=[
"input",
"source",
"label",
"similarity",
"paraphrase",
"url",
# "entities",
],
)
self.grouped_url_df: pd.DataFrame = pd.DataFrame()
# For formatting ouput tables
self.ordinary_user_table: list = []
self.fact_checker_table: list = []
self.governor_table: list = []
def load_news(self, news_title: str, news_content: str, news_image: str):
"""
Loads news data into the object's attributes.
Args:
news_title (str): The title of the news article.
news_content (str): The content of the news article.
news_image (str): The url of image in news article.
"""
# Combine title and content for a full text representation.
# .strip() removes leading/trailing whitespace for cleaner text.
self.news_text = (news_title + "\n\n" + news_content).strip()
# if not isinstance(news_title, str) or not isinstance(
# news_content,
# str,
# ):
# raise TypeError("News title and content must be strings.")
# if not isinstance(news_image, str) or news_image is not None:
# Warning("News image must be a string.")
self.news_title = news_title
self.news_content = news_content
self.news_image = news_image
def group_by_url(self):
"""
Groups aligned sentences by URL
Then, concatenates text the 'input' and 'source' text for each group.
"""
def concat_text(series):
"""
Concatenates the elements of a pd.Series into a single string.
"""
return " ".join(
series.astype(str).tolist(),
) # Handle mixed data types and NaNs
# Group sentences by URL and concatenate 'input' and 'source' text.
self.grouped_url_df = (
self.aligned_sentences_df.groupby("url")
.agg(
{
"input": concat_text,
"source": concat_text,
},
)
.reset_index()
) # Reset index to make 'url' a regular column
# Add new columns for label and score
self.grouped_url_df["label"] = None
self.grouped_url_df["score"] = None
print(f"aligned_sentences_df:\n {self.aligned_sentences_df}")
def determine_text_origin_by_url(self):
"""
Determines the text origin for each URL group.
"""
for index, row in self.grouped_url_df.iterrows():
# Verify text origin using URL-based verification.
label, score = self.verify_text(row["url"])
# If URL-based verification returns 'UNKNOWN', use AI detection
if label == "UNKNOWN":
# Concatenate text from "input" column in sentence_df
text = " ".join(row["input"])
# Detect text origin using an AI model.
label, score = detect_text_by_ai_model(text)
self.grouped_url_df.at[index, "label"] = label
self.grouped_url_df.at[index, "score"] = score
def determine_text_origin(self):
"""
Determines the origin of the input text by analyzing
its sources and applying AI detection models.
This method groups sentences by their source URLs,
applies verification and AI detection, and then determines
an overall label and score for the input text.
"""
# Find the text URLs associated with the input text
self.find_text_source()
# Group sentences by URL and concatenate 'input' and 'source' text.
self.group_by_url()
# Determine the text origin for each URL group
self.determine_text_origin_by_url()
# Determine the overall label and score for the entire input text.
if not self.grouped_url_df.empty:
# Check for 'gpt-4o' labels in the grouped URLs.
machine_label = self.grouped_url_df[
self.grouped_url_df["label"].str.contains(
"gpt-4o",
case=False,
na=False,
)
]
if not machine_label.empty:
# If 'gpt-4o' labels are found, post-process and assign.
labels = machine_label["label"].tolist()
label = postprocess_label(labels)
# labels = " and ".join(machine_label["label"].tolist())
# label = remove_duplicate_words(label)
self.text_prediction_label[0] = label
self.text_prediction_score[0] = machine_label["score"].mean()
else:
# If no 'gpt-4o' labels, assign for 'HUMAN' labels.
machine_label = self.aligned_sentences_df[
self.aligned_sentences_df["label"] == "HUMAN"
]
self.text_prediction_label[0] = "HUMAN"
self.text_prediction_score[0] = machine_label["score"].mean()
else:
# If no found URLs, use AI detection on the entire input text.
print("No source found in the input text")
text = " ".join(self.aligned_sentences_df["input"].tolist())
# Detect text origin using an AI model.
label, score = detect_text_by_ai_model(text)
self.text_prediction_label[0] = label
self.text_prediction_score[0] = score
def find_text_source(self):
"""
Determines the origin of the given text based on paraphrasing
detection and human authorship analysis.
1. Splits the input news text into sentences,
2. Searches for sources for each sentence
3. Updates the aligned_sentences_df with the found sources.
"""
print("CHECK TEXT:")
print("\tFrom search engine:")
input_paragraphs = split_into_paragraphs(self.news_text)
# Initialize an empty DataFrame if it doesn't exist, otherwise extend it.
if not hasattr(self, 'aligned_sentences_df') or self.aligned_sentences_df is None:
self.aligned_sentences_df = pd.DataFrame(columns=[
"input",
"source",
"label",
"similarity",
"paraphrase",
"url",
"entities",
])
# Setup DataFrame for input_sentences
for _ in range(len(input_paragraphs)):
self.aligned_sentences_df = pd.concat(
[
self.aligned_sentences_df,
pd.DataFrame(
[
{
"input": None,
"source": None,
"label": None,
"similarity": None,
"paraphrase": None,
"url": None,
"entities": None,
},
],
),
],
ignore_index=True,
)
# Find a source for each sentence
for index, _ in enumerate(input_paragraphs):
similarity = self.aligned_sentences_df.loc[index, "similarity"]
if similarity is not None:
if similarity > PARAPHRASE_THRESHOLD_MACHINE:
continue
print(f"\n-------index = {index}-------")
print(f"current_text = {input_paragraphs[index]}\n")
self.aligned_sentences_df, img_urls = find_sentence_source(
input_paragraphs,
index,
self.aligned_sentences_df,
)
# Initialize found_img_url if it does not exist.
if not hasattr(self, 'found_img_url'):
self.found_img_url = []
self.found_img_url.extend(img_urls)
def verify_text(self, url):
"""
Verifies the text origin based on similarity scores and labels
associated with a given URL.
1. Filters sentences by URL and similarity score,
2. Determines if the text is likely generated by a machine or a human.
3. Calculates an average similarity score.
Args:
url (str): The URL to filter sentences by.
Returns:
tuple: A
- Label ("MACHINE", "HUMAN", or "UNKNOWN")
- Score
"""
label = "UNKNOWN"
score = 0
# calculate the average similarity when the similary score
# in each row of sentences_df is higher than 0.8
# Filter sentences by URL.
filtered_by_url = self.aligned_sentences_df[
self.aligned_sentences_df["url"] == url
]
# Filter sentences by similarity score (> PARAPHRASE_THRESHOLD).
filtered_by_similarity = filtered_by_url[
filtered_by_url["similarity"] > PARAPHRASE_THRESHOLD
]
# Check if a ratio of remaining filtering-sentences is more than 50%.
if len(filtered_by_similarity) / len(self.aligned_sentences_df) > MIN_RATIO_PARAPHRASE_NUM:
# check if "MACHINE" is in self.aligned_sentences_df["label"]:
contains_machine = (
filtered_by_similarity["label"]
.str.contains(
"MACHINE",
case=False,
na=False,
)
.any()
)
# TODO: integrate with determine_text_origin
if contains_machine:
# If "MACHINE" label is present, set label and calculate score.
machine_rows = filtered_by_similarity[
filtered_by_similarity["label"].str.contains(
"MACHINE",
case=False,
na=False,
)
]
generated_model, _ = predict_generation_model(self.news_text)
label = f"Partially generated by {generated_model}"
score = machine_rows["similarity"].mean()
else:
# If no "MACHINE" label, assign "HUMAN" label and calculate score.
label = "HUMAN"
human_rows = filtered_by_similarity[
filtered_by_similarity["label"].str.contains(
"HUMAN",
case=False,
na=False,
)
]
score = human_rows["similarity"].mean()
return label, score
def determine_image_origin(self):
"""
Determines the origin of the news image using various detection methods.
1. Matching against previously found image URLs.
2. Reverse image search.
3. AI-based image detection.
If none of these methods succeed, the image origin is marked as "UNKNOWN".
"""
print("CHECK IMAGE:")
# Handle the case where no image is provided.
if self.news_image is None:
self.image_prediction_label = "UNKNOWN"
self.image_prediction_score = 0.0
self.image_referent_url = None
return
# Attempt to match the image against previously found image URLs.
print("\tFrom found image URLs...")
matched_url, similarity = detect_image_from_news_image(
self.news_image,
self.found_img_url,
)
if matched_url is not None:
print(f"matched image: {matched_url}\nsimilarity: {similarity}\n")
self.image_prediction_label = "HUMAN"
self.image_prediction_score = similarity
self.image_referent_url = matched_url
return
# Attempt to find the image origin using reverse image search.
print("\tFrom reverse image search...")
matched_url, similarity = detect_image_by_reverse_search(
self.news_image,
)
if matched_url is not None:
print(f"matched image: {matched_url}\tScore: {similarity}%\n")
self.image_prediction_label = "HUMAN"
self.image_prediction_score = similarity
self.image_referent_url = matched_url
return
# Attempt to detect the image origin using an AI model.
print("\tFrom an AI model...")
detected_label, score = detect_image_by_ai_model(self.news_image)
if detected_label:
print(f"detected_label: {detected_label} ({score})")
self.image_prediction_label = detected_label
self.image_prediction_score = score
self.image_referent_url = None
return
# If all detection methods fail, mark the image origin as "UNKNOWN".
self.image_prediction_label = "UNKNOWN"
self.image_prediction_score = 50
self.image_referent_url = None
def determine_origin(self):
"""
Determine origins by analyzing the news text and image.
"""
if self.news_text != "":
self.determine_text_origin()
if self.news_image != "":
self.determine_image_origin()
# Handle entity recognition and processing.
self.handle_entities()
def generate_report(self) -> tuple[str, str, str]:
"""
Generates reports tailored for different user roles
(ordinary users, fact checkers, governors).
Returns:
tuple: A tuple containing three html-formatted reports:
- ordinary_user_table: Report for ordinary users.
- fact_checker_table: Report for fact checkers.
- governor_table: Report for governors.
"""
ordinary_user_table = self.create_ordinary_user_table()
fact_checker_table = self.create_fact_checker_table()
governor_table = self.create_governor_table()
return ordinary_user_table, fact_checker_table, governor_table
def handle_entities(self):
"""
Highlights and assigns entities with colors to aligned sentences
based on grouped URLs.
For each grouped URL:
1. Highlights entities in the input and source text
2. Then assigns these highlighted entities to the corresponding
sentences in the aligned sentences DataFrame.
"""
entities_with_colors = []
for index, row in self.grouped_url_df.iterrows():
# Get entity-words (in pair) with colors
entities_with_colors = highlight_entities(
row["input"],
row["source"],
)
# Assign the highlighted entities to the corresponding sentences
# in aligned_sentences_df.
for index, sentence in self.aligned_sentences_df.iterrows():
if sentence["url"] == row["url"]:
# Use .at to modify the DataFrame efficiently.
self.aligned_sentences_df.at[index, "entities"] = (
entities_with_colors
)
def get_text_urls(self) -> set:
"""
Returns a set of unique URLs referenced in the text analysis.
Returns:
set: A set containing the unique URLs referenced in the text.
"""
return set(self.text_referent_url)
def create_fact_checker_table(self):
rows = []
rows.append(self.format_image_fact_checker_row())
for _, row in self.aligned_sentences_df.iterrows():
if row["input"] is None:
continue
if row["source"] is None:
equal_idx_1 = equal_idx_2 = []
else: # Get index of equal phrases in input and source sentences
equal_idx_1, equal_idx_2 = extract_equal_text(
row["input"],
row["source"],
)
self.fact_checker_table.append(
[
row,
equal_idx_1,
equal_idx_2,
row["entities"],
row["url"],
],
)
previous_url = None
span_row = 1
for index, row in enumerate(self.fact_checker_table):
current_url = row[4]
last_url_row = False
# First row or URL change
if index == 0 or current_url != previous_url:
first_url_row = True
previous_url = current_url
# Increase counter "span_row" when the next url is the same
while (
index + span_row < len(self.fact_checker_table)
and self.fact_checker_table[index + span_row][4]
== current_url
):
span_row += 1
else:
first_url_row = False
span_row -= 1
if span_row == 1:
last_url_row = True
formatted_row = self.format_text_fact_checker_row(
row,
first_url_row,
last_url_row,
span_row,
)
rows.append(formatted_row)
table = "\n".join(rows)
return f"""
<h5>Comparison between input news and source news:</h5>
<table border="1" style="width:100%; text-align:left;">
<col style="width: 170px;">
<col style="width: 170px;">
<col style="width: 30px;">
<col style="width: 75px;">
<thead>
<tr>
<th>Input news</th>
<th>Source (URL in Originality)</th>
<th>Forensic</th>
<th>Originality</th>
</tr>
</thead>
<tbody>
{table}
</tbody>
</table>
<style>
"""
def format_text_fact_checker_row(
self,
row,
first_url_row=True,
last_url_row=True,
span_row=1,
):
entity_count = 0
if row[0]["input"] is None:
return ""
if row[0]["source"] is not None: # source is not empty
if row[3] is not None:
# highlight entities
input_sentence, highlight_idx_input = apply_highlight(
row[0]["input"],
row[3],
"input",
)
source_sentence, highlight_idx_source = apply_highlight(
row[0]["source"],
row[3],
"source",
)
else:
input_sentence = row[0]["input"]
source_sentence = row[0]["source"]
highlight_idx_input = []
highlight_idx_source = []
if row[3] is not None:
entity_count = len(row[3])
# Color overlapping words
input_sentence = color_text(
input_sentence,
row[1],
highlight_idx_input,
) # text, index of highlight words
source_sentence = color_text(
source_sentence,
row[2],
highlight_idx_source,
) # text, index of highlight words
# Replace _ to get correct formatting
# Original one having _ for correct word counting
input_sentence = input_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
source_sentence = source_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
else:
input_sentence = row[0]["input"]
source_sentence = row[0]["source"]
url = row[0]["url"]
# Displayed label and score by url
filterby_url = self.grouped_url_df[self.grouped_url_df["url"] == url]
if len(filterby_url) > 0:
label = filterby_url["label"].values[0]
score = filterby_url["score"].values[0]
else:
label = self.text_prediction_label[0]
score = self.text_prediction_score[0]
# Format displayed url
source_text_url = f"""<a href="{url}">{url}</a>"""
# Format displayed entity count
entity_count_text = format_entity_count(entity_count)
border_top = "border-top: 1px solid transparent;"
border_bottom = "border-bottom: 1px solid transparent;"
word_break = "word-break: break-all;"
if first_url_row is True:
# First & Last the group: no transparent
if last_url_row is True:
return f"""
<tr>
<td>{input_sentence}</td>
<td>{source_sentence}</td>
<td rowspan="{span_row}">{label}<br>
({score * 100:.2f}%)<br><br>
{entity_count_text}</td>
<td rowspan="{span_row}"; style="{word_break}";>{source_text_url}</td>
</tr>
"""
# First row of the group: transparent bottom border
return f"""
<tr>
<td style="{border_bottom}";>{input_sentence}</td>
<td style="{border_bottom}";>{source_sentence}</td>
<td rowspan="{span_row}">{label}<br>
({score * 100:.2f}%)<br><br>
{entity_count_text}</td>
<td rowspan="{span_row}"; style="{word_break}";>{source_text_url}</td>
</tr>
"""
else:
if last_url_row is True:
# NOT First row, Last row: transparent top border
return f"""
<tr>
<td style="{border_top}";>{input_sentence}</td>
<td style="{border_top}";>{source_sentence}</td>
</tr>
"""
else:
# NOT First & NOT Last row: transparent top & bottom borders
return f"""
<tr>
<td style="{border_top} {border_bottom}";>{input_sentence}</td>
<td style="{border_top} {border_bottom}";>{source_sentence}</td>
</tr>
"""
def format_image_fact_checker_row(self):
if (
self.image_referent_url is not None
or self.image_referent_url != ""
):
source_image = f"""<img src="{self.image_referent_url}" width="100" height="150">""" # noqa: E501
source_image_url = f"""<a href="{self.image_referent_url}">{self.image_referent_url}</a>""" # noqa: E501
else:
source_image = "Image not found"
source_image_url = ""
word_break = "word-break: break-all;"
return f"""
<tr>
<td>input image</td>
<td>{source_image}</td>
<td>{self.image_prediction_label}<br>({self.image_prediction_score:.2f}%)</td>
<td style="{word_break}";>{source_image_url}</td></tr>"""
def create_ordinary_user_table(self):
rows = []
rows.append(self.format_image_ordinary_user_row())
rows.append(self.format_text_ordinary_user_row())
table = "\n".join(rows)
return f"""
<h5>Comparison between input news and source news:</h5>
<table border="1" style="width:100%; text-align:left;">
<col style="width: 340px;">
<col style="width: 30px;">
<col style="width: 75px;">
<thead>
<tr>
<th>Input news</th>
<th>Forensic</th>
<th>Originality</th>
</tr>
</thead>
<tbody>
{table}
</tbody>
</table>
<style>
"""
def format_text_ordinary_user_row(self):
input_sentences = ""
source_text_urls = ""
urls = []
for _, row in self.aligned_sentences_df.iterrows():
if row["input"] is None:
continue
input_sentences += row["input"] + "<br><br>"
url = row["url"]
if url not in urls:
urls.append(url)
source_text_urls += f"""<a href="{url}">{url}</a><br>"""
word_break = "word-break: break-all;"
return f"""
<tr>
<td>{input_sentences}</td>
<td>{self.text_prediction_label[0]}<br>
({self.text_prediction_score[0] * 100:.2f}%)</td>
<td style="{word_break}";>{source_text_urls}</td>
</tr>
"""
def format_image_ordinary_user_row(self):
if (
self.image_referent_url is not None
or self.image_referent_url != ""
):
source_image_url = f"""<a href="{self.image_referent_url}">{self.image_referent_url}</a>""" # noqa: E501
else:
source_image_url = ""
word_break = "word-break: break-all;"
return f"""<tr><td>input image</td><td>{self.image_prediction_label}<br>({self.image_prediction_score:.2f}%)</td><td style="{word_break}";>{source_image_url}</td></tr>""" # noqa: E501
def create_governor_table(self):
rows = []
rows.append(self.format_image_governor_row())
for _, row in self.aligned_sentences_df.iterrows():
if row["input"] is None:
continue
if row["source"] is None:
equal_idx_1 = equal_idx_2 = []
else:
# Get index of equal phrases in input and source sentences
equal_idx_1, equal_idx_2 = extract_equal_text(
row["input"],
row["source"],
)
self.governor_table.append(
[
row,
equal_idx_1,
equal_idx_2,
row["entities"],
],
)
formatted_row = self.format_text_governor_row()
rows.append(formatted_row)
table = "\n".join(rows)
return f"""
<h5>Comparison between input news and source news:</h5>
<table border="1" style="width:100%; text-align:left;">
<col style="width: 170px;">
<col style="width: 170px;">
<col style="width: 30px;">
<col style="width: 75px;">
<thead>
<tr>
<th>Input news</th>
<th>Source (URL in Originality)</th>
<th>Forensic</th>
<th>Originality</th>
</tr>
</thead>
<tbody>
{table}
</tbody>
</table>
<style>
"""
def format_text_governor_row(self):
input_sentences = ""
source_sentences = ""
source_text_urls = ""
urls = []
sentence_count = 0
entity_count = [0, 0] # to get index of [-2]
for row in self.governor_table:
if row[0]["input"] is None:
continue
if row[0]["source"] is not None: # source is not empty
# highlight entities
input_sentence, highlight_idx_input = apply_highlight(
row[0]["input"],
row[3], # entities_with_colors
"input", # key
entity_count[
-2
], # since the last one is for current counting
)
source_sentence, highlight_idx_source = apply_highlight(
row[0]["source"],
row[3], # entities_with_colors
"source", # key
entity_count[
-2
], # since the last one is for current counting
)
# Color overlapping words
input_sentence = color_text(
input_sentence,
row[1],
highlight_idx_input,
) # text, index of highlight words
source_sentence = color_text(
source_sentence,
row[2],
highlight_idx_source,
) # text, index of highlight words
input_sentence = input_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
source_sentence = source_sentence.replace(
"span_style",
"span style",
).replace("1px_4px", "1px 4px")
else:
if row[0]["source"] is None:
source_sentence = ""
else:
source_sentence = row[0]["source"]
input_sentence = row[0]["input"]
# convert score to HUMAN-based score:
input_sentences += input_sentence + "<br><br>"
source_sentences += source_sentence + "<br><br>"
url = row[0]["url"]
if url not in urls:
urls.append(url)
source_text_urls += f"""<a href="{url}">{url}</a><br><br>"""
sentence_count += 1
if row[3] is not None:
entity_count.append(len(row[3]))
entity_count_text = format_entity_count(sum(entity_count))
word_break = "word-break: break-all;"
return f"""
<tr>
<td>{input_sentences}</td>
<td>{source_sentences}</td>
<td>{self.text_prediction_label[0]}<br>
({self.text_prediction_score[0] * 100:.2f}%)<br><br>
{entity_count_text}</td>
<td style="{word_break}";>{source_text_urls}</td>
</tr>
"""
def format_image_governor_row(self):
if (
self.image_referent_url is not None
or self.image_referent_url != ""
):
source_image = f"""<img src="{self.image_referent_url}" width="100" height="150">""" # noqa: E501
source_image_url = f"""<a href="{self.image_referent_url}">{self.image_referent_url}</a>""" # noqa: E501
else:
source_image = "Image not found"
source_image_url = ""
word_break = "word-break: break-all;"
return f"""<tr><td>input image</td><td>{source_image}</td><td>{self.image_prediction_label}<br>({self.image_prediction_score:.2f}%)</td><td style="{word_break}";>{source_image_url}</td></tr>""" # noqa: E501