Spaces:
Sleeping
Sleeping
""" | |
This file contains the code to watermark given sentences using PECCAVI | |
""" | |
import os | |
import sys | |
import time | |
import random | |
import torch | |
from utils.paraphraser import Paraphraser | |
from utils.entailment import EntailmentAnalyzer | |
from utils.sampling import SamplingProcessor | |
# from tokenizer import tokenize_sentence, tokenize_sentences | |
from utils.non_melting_point import NgramProcessor | |
from utils.masking_methods import MaskingProcessor | |
from tqdm import tqdm # add this import at the top if not already present | |
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) | |
from renderers.highlighter import highlight_common_words,reparaphrased_sentences_html | |
from renderers.tree import generate_subplot1, generate_subplot2 | |
from renderers.plot_3d import gen_three_D_plot | |
# from metrics.detectability import SentenceDetectabilityCalculator | |
# from metrics.distortion import SentenceDistortionCalculator | |
# from metrics.euclidean_distance import SentenceEuclideanDistanceCalculator | |
from transformers import pipeline, AutoTokenizer, AutoModelForMaskedLM | |
from transformers import BertTokenizer, BertForMaskedLM | |
from pathlib import Path | |
from utils.config import load_config | |
import logging | |
project_root = Path(__file__).parent.parent | |
config_path = project_root / "utils" / "config.yaml" | |
# Update logging configuration to reduce clutter | |
logging.basicConfig(level=logging.WARNING, format="%(asctime)s - %(levelname)s - %(message)s") | |
logger = logging.getLogger(__name__) | |
class Watermarker: | |
def __init__(self, config): | |
self.config = config | |
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
tqdm.write(f"[Watermarker] Initializing on device: {self.device}") | |
self.user_prompt = None | |
self.paraphrased_sentences = None | |
self.analyzed_paraphrased_sentences = None | |
self.selected_sentences = None | |
self.discarded_sentences = None | |
self.common_grams = None | |
self.indexed_ngrams = None | |
# self.subsequences = None | |
self.common_grams_position = None | |
self.masked_sentences = None | |
self.masked_words = None | |
self.masked_logits = None | |
self.sampled_sentences = None | |
self.reparaphrased_sentences = None | |
self.distortion_list = None | |
self.detectability_list = None | |
self.euclidean_dist_list = None | |
self.masking_strategies = ['random', 'pseudorandom','entropy'] | |
self.sampling_strategies = ['inverse_transform', 'exponential_minimum', 'temperature', 'greedy'] | |
self.masking_results = dict() | |
self.sampling_results = dict() | |
# Move the model to GPU if available. | |
self.tokenizer = BertTokenizer.from_pretrained("bert-large-cased-whole-word-masking") | |
self.model = BertForMaskedLM.from_pretrained("bert-large-cased-whole-word-masking").to(self.device) | |
self.paraphraser = Paraphraser(self.config['Paraphrase']) | |
self.entailment_analyzer = EntailmentAnalyzer(self.config['Entailment']) | |
self.ngram_processor = NgramProcessor() | |
self.masker = MaskingProcessor(self.tokenizer, self.model) | |
self.sampler = SamplingProcessor(self.tokenizer) | |
# self.detectability_calculator = SentenceDetectabilityCalculator(self.config['Metrics']) | |
# self.distortion_calculator = SentenceDistortionCalculator(self.config['Metrics']) | |
# self.euclidean_distance_calculator = SentenceEuclideanDistanceCalculator(self.config['Metrics']) | |
def Paraphrase(self, prompt:str, threshold:int=0.7): | |
""" | |
This function paraphrases the given prompt using PECCAVI | |
Args: | |
prompt: str: The prompt to be paraphrased | |
threshold: int: The threshold for the similarity score | |
Returns: | |
str: The paraphrased sentence | |
""" | |
start_time = time.time() | |
self.user_prompt = prompt | |
self.paraphrased_sentences = self.paraphraser.paraphrase(self.user_prompt) | |
if self.paraphrased_sentences is None: | |
print("Error in generating paraphrases", "Error: Could not complete step") | |
return None | |
self.updated_sentences = [] | |
self.common_grams_position = {} # Optional: also save order data if needed later | |
self.analyzed_paraphrased_sentences, self.selected_sentences, self.discarded_sentences = self.entailment_analyzer.analyze_entailment(self.user_prompt, self.paraphrased_sentences, threshold) | |
self.selected_sentences_list = [key for key in self.selected_sentences.keys()] | |
self.discarded_sentences_list = [key for key in self.discarded_sentences.keys()] | |
self.full_list = self.selected_sentences_list.copy() | |
self.full_list.extend(self.discarded_sentences_list) | |
self.full_list.append(self.user_prompt) | |
# self.user_prompt_tokenized = tokenize_sentence(self.user_prompt) | |
# self.selected_sentences_tokenized = tokenize_sentences(self.selected_sentences) | |
# self.discarded_sentences_tokenized = tokenize_sentences(self.discarded_sentences) | |
# all_tokenized_sentences = [] | |
# all_tokenized_sentences.append(self.user_prompt_tokenized) | |
# all_tokenized_sentences.extend(self.selected_sentences_tokenized) | |
# all_tokenized_sentences.extend(self.discarded_sentences_tokenized) | |
self.common_grams,self.indexed_ngrams = self.ngram_processor.find_filtered_ngrams(self.full_list) | |
print(f"Common grams: {self.common_grams}") | |
if self.user_prompt in self.full_list: | |
self.full_list.remove(self.user_prompt) | |
# highlighted_user_prompt = highlight_common_words(self.common_grams, [self.user_prompt], "Highlighted LCS in the User Prompt") | |
# highlighted_accepted_sentences = highlight_common_words(self.common_grams, self.selected_sentences, "Highlighted LCS in the Accepted Sentences") | |
# highlighted_discarded_sentences = highlight_common_words(self.common_grams, self.discarded_sentences, "Highlighted LCS in the Discarded Sentences") | |
execution_time = time.time() - start_time | |
time_info = f"Step 1 completed in {execution_time:.2f} seconds" | |
# return [ | |
# highlighted_user_prompt, | |
# highlighted_accepted_sentences, | |
# highlighted_discarded_sentences, | |
# time_info | |
# ] | |
def Masking(self): | |
"""Apply masking strategies to sentences and verify completeness.""" | |
tqdm.write("[Watermarker] Starting Masking process.") | |
# Initialize results dictionary for each strategy | |
for strategy in self.masking_strategies: | |
if strategy not in self.masking_results: | |
self.masking_results[strategy] = {} | |
# First pass: apply all strategies to all sentences | |
for strategy in self.masking_strategies: | |
tqdm.write(f"[Watermarker] Processing masking strategy: {strategy}") | |
results = self.masker.process_sentences(self.full_list, self.common_grams, strategy) | |
self.masking_results[strategy].update(results) | |
# Verification step: identify missing strategy-sentence combinations | |
missing_combinations = {} | |
for sentence in self.full_list: | |
missing_strategies = [] | |
for strategy in self.masking_strategies: | |
if sentence not in self.masking_results[strategy]: | |
tqdm.write(f"[WARNING] Missing masking result for strategy '{strategy}' on sentence: {sentence[:50]}...") | |
missing_strategies.append(strategy) | |
if missing_strategies: | |
missing_combinations[sentence] = missing_strategies | |
# Fix missing combinations by applying only the missing strategies | |
for sentence, strategies in missing_combinations.items(): | |
tqdm.write(f"[Watermarker] Sentence missing {len(strategies)} strategies: {strategies}") | |
for strategy in strategies: | |
tqdm.write(f"[Watermarker] Applying missing strategy '{strategy}' to sentence: {sentence[:30]}...") | |
try: | |
# Process only this sentence with the missing strategy | |
result = self.masker.process_sentences([sentence], self.common_grams, strategy) | |
if sentence in result: | |
self.masking_results[strategy][sentence] = result[sentence] | |
tqdm.write(f"[Watermarker] Successfully applied missing strategy '{strategy}'") | |
else: | |
tqdm.write(f"[ERROR] Failed to apply strategy '{strategy}' to sentence") | |
except Exception as e: | |
tqdm.write(f"[ERROR] Exception while applying strategy '{strategy}': {e}") | |
# Final verification | |
remaining_missing = False | |
for sentence in self.full_list: | |
for strategy in self.masking_strategies: | |
if sentence not in self.masking_results[strategy]: | |
tqdm.write(f"[ERROR] Still missing result for strategy '{strategy}' on sentence: {sentence[:50]}...") | |
remaining_missing = True | |
if remaining_missing: | |
tqdm.write("[WARNING] Some masking combinations are still missing after fix attempts") | |
else: | |
tqdm.write("[INFO] All sentences now have all masking strategies applied") | |
tqdm.write("[Watermarker] Masking process completed.") | |
return self.masking_results | |
def Sampling(self) : | |
""" | |
For each masking strategy in self.masking_results, sample a sentence from the | |
masked sentences using the given sampling strategy. | |
Return structure: | |
{ | |
"inverse_transform (SAMPLING STRATEGY)": | |
{ | |
"random (MASKING STRATEGY)": | |
{ | |
"Original sentence 1": | |
{ | |
"masked_sentence": "Masked version of sentence 1", | |
"sampled_sentence": "Sampled version of sentence 1" | |
}, | |
"Original sentence 2": | |
{ | |
"masked_sentence": "Masked version of sentence 2", | |
"sampled_sentence": "Sampled version of sentence 2" | |
}, | |
# ... additional original sentences | |
}, | |
"pseudorandom": | |
{ | |
# Similar structure for each original sentence | |
}, | |
"entropy": | |
{ | |
# Similar structure for each original sentence | |
}, | |
}, | |
"exponential_minimum": | |
{ | |
# Similar nested dictionaries for each masking strategy and original sentence | |
}, | |
"greedy": | |
{ | |
# Similar nested dictionaries for each masking strategy and original sentence | |
} | |
} | |
""" | |
tqdm.write("[Watermarker] Starting Sampling process.") | |
for strategy in self.sampling_strategies: | |
tqdm.write(f"[Watermarker] Processing sampling strategy: {strategy}") | |
self.sampling_results[strategy] = {} | |
for mask_strategy in self.masking_strategies: | |
results = self.sampler.process_masked_sentences( | |
self.masking_results[mask_strategy], | |
sampling_technique=strategy, | |
temperature=1.0 | |
) | |
self.sampling_results[strategy][mask_strategy] = results | |
tqdm.write("[Watermarker] Sampling process completed.") | |
return self.sampling_results | |
def re_paraphrasing(self): | |
tqdm.write("[Watermarker] Starting re-paraphrasing process.") | |
self.reparaphrasing_results = {} | |
for sampling_strategy, mask_dict in tqdm(self.sampling_results.items(), desc="Sampling Strategies", leave=True): | |
self.reparaphrasing_results[sampling_strategy] = {} | |
for mask_strategy, sentences_data in tqdm(mask_dict.items(), desc="Masking Strategies", leave=False): | |
self.reparaphrasing_results[sampling_strategy][mask_strategy] = {} | |
for original_sentence, result in tqdm(sentences_data.items(), desc="Sentences", leave=False): | |
sampled_sentence = result.get("sampled_sentence", None) | |
if sampled_sentence: | |
new_paraphrases = self.paraphraser.paraphrase(sampled_sentence, | |
num_return_sequences=10, | |
num_beams=10) | |
else: | |
new_paraphrases = [] | |
self.reparaphrasing_results[sampling_strategy][mask_strategy][original_sentence] = { | |
"masking_strategy": mask_strategy, | |
"sampling_strategy": sampling_strategy, | |
"sampled_sentence": sampled_sentence, | |
"re_paraphrased_sentences": new_paraphrases | |
} | |
tqdm.write("[Watermarker] Re-paraphrasing process completed.") | |
return self.reparaphrasing_results | |
def calculate_distortion(self): | |
return None | |
if __name__ == "__main__": | |
# config_path = '/home/jigyasu/PECCAVI-Text/utils/config.yaml' | |
config = load_config(config_path)['PECCAVI_TEXT'] | |
watermarker = Watermarker(config) | |
logger.info("Starting main Watermarker process.") | |
print("==> Paraphrasing:") | |
watermarker.Paraphrase("The quick brown fox jumps over small cat the lazy dog everyday again and again.") | |
logger.info("Paraphrasing completed.") | |
# Prepare a list to accumulate result strings | |
results_str = [] | |
results_str.append("========== WATERMARKING RESULTS ==========\n\n") | |
# --- Step 2: Common N-grams --- | |
results_str.append("==> Common N-grams:\n") | |
if watermarker.common_grams: | |
for ngram, positions in watermarker.common_grams.items(): | |
results_str.append(f" {ngram}: {positions}\n") | |
else: | |
results_str.append(" No common n-grams found.\n") | |
# --- Step 3: Selected Sentences --- | |
results_str.append("\n==> Selected Sentences:\n") | |
if watermarker.selected_sentences: | |
for sentence in watermarker.selected_sentences: | |
results_str.append(f" {sentence}\n") | |
else: | |
results_str.append(" No selected sentences available.\n") | |
# --- Step 4: Masking Results (without logits) --- | |
results_str.append("\n==> Masking Results:\n") | |
masking_results = watermarker.Masking() | |
for masking_strategy, results_dict in masking_results.items(): | |
results_str.append(f"\n-- Masking Strategy: {masking_strategy} --\n") | |
for original_sentence, data in results_dict.items(): | |
masked_sentence = data.get("masked_sentence", "") | |
results_str.append("Original:\n") | |
results_str.append(f" {original_sentence}\n") | |
results_str.append("Masked:\n") | |
results_str.append(f" {masked_sentence}\n") | |
results_str.append("-----\n") | |
# --- Step 5: Sampling Results --- | |
results_str.append("\n==> Sampling Results:\n") | |
sampling_results = watermarker.Sampling() | |
for sampling_strategy, mask_strategy_dict in sampling_results.items(): | |
results_str.append(f"\n-- Sampling Strategy: {sampling_strategy} --\n") | |
for mask_strategy, sentences in mask_strategy_dict.items(): | |
results_str.append(f"\n Masking Strategy: {mask_strategy}\n") | |
for original_sentence, res in sentences.items(): | |
masked_sentence = res.get("masked_sentence", "") | |
sampled_sentence = res.get("sampled_sentence", "") | |
results_str.append(" Original:\n") | |
results_str.append(f" {original_sentence}\n") | |
results_str.append(" Masked:\n") | |
results_str.append(f" {masked_sentence}\n") | |
results_str.append(" Sampled:\n") | |
results_str.append(f" {sampled_sentence}\n") | |
results_str.append(" -----\n") | |
# --- Step 6: Re-paraphrasing Results --- | |
results_str.append("\n==> Re-paraphrasing Results:\n") | |
reparaphrasing_results = watermarker.re_paraphrasing() | |
for sampling_strategy, mask_dict in reparaphrasing_results.items(): | |
results_str.append(f"\n-- Sampling Strategy: {sampling_strategy} --\n") | |
for mask_strategy, orig_sentence_dict in mask_dict.items(): | |
results_str.append(f"\n Masking Strategy: {mask_strategy}\n") | |
for original_sentence, data in orig_sentence_dict.items(): | |
sampled_sentence = data.get("sampled_sentence", "") | |
re_paraphrases = data.get("re_paraphrased_sentences", []) | |
results_str.append(" Original:\n") | |
results_str.append(f" {original_sentence}\n") | |
results_str.append(" Sampled:\n") | |
results_str.append(f" {sampled_sentence}\n") | |
results_str.append(" Re-paraphrased (first 3 examples):\n") | |
# Display only the first 3 re-paraphrases for brevity | |
for idx, rp in enumerate(re_paraphrases[:3]): | |
results_str.append(f" {idx+1}. {rp}\n") | |
results_str.append(" -----\n") | |
# Write all results to the output file | |
output_file = "watermarking_results.txt" | |
with open(output_file, "w", encoding="utf-8") as f: | |
f.writelines(results_str) | |
logger.info("Writing results to output file.") | |
print("\nResults have been written to", output_file) |