Spaces:
Sleeping
Sleeping
File size: 7,143 Bytes
65506c8 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import os
from PyPDF2 import PdfReader
import pandas as pd
from dotenv import load_dotenv
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import json
from datetime import datetime
from sklearn.decomposition import NMF
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
import random
from joblib import Parallel, delayed
class TweetDatasetProcessor:
def __init__(self):
load_dotenv()
# Load the fine-tuned GPT model and tokenizer
self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2') # Use your fine-tuned model path here
self.model = GPT2LMHeadModel.from_pretrained('path_to_finetuned_model') # Path to your fine-tuned model
self.tweets = []
self.personality_profile = ""
self.vectorizer = TfidfVectorizer(stop_words='english')
self.used_tweets = set() # Track used tweets to avoid repetition
@staticmethod
def _process_line(line):
"""Process a single line."""
line = line.strip()
if not line or line.startswith('http'): # Skip empty lines and URLs
return None
return {
'content': line,
'timestamp': datetime.now(),
'mentions': [word for word in line.split() if word.startswith('@')],
'hashtags': [word for word in line.split() if word.startswith('#')]
}
def extract_text_from_pdf(self, pdf_path):
"""Extract text content from PDF file."""
reader = PdfReader(pdf_path)
text = ""
for page in reader.pages:
text += page.extract_text()
return text
def process_pdf_content(self, text):
"""Process PDF content and clean extracted tweets."""
if not text.strip():
raise ValueError("The uploaded PDF appears to be empty.")
lines = text.split('\n')
# Pass the static method explicitly
clean_tweets = Parallel(n_jobs=-1)(delayed(TweetDatasetProcessor._process_line)(line) for line in lines)
self.tweets = [tweet for tweet in clean_tweets if tweet]
if not self.tweets:
raise ValueError("No tweets were extracted from the PDF. Ensure the content is properly formatted.")
# Save the processed tweets to a CSV
df = pd.DataFrame(self.tweets)
df.to_csv('processed_tweets.csv', index=False)
return df
def _extract_mentions(self, text):
"""Extract mentioned users from tweet."""
return [word for word in text.split() if word.startswith('@')]
def _extract_hashtags(self, text):
"""Extract hashtags from tweet."""
return [word for word in text.split() if word.startswith('#')]
def categorize_tweets(self):
"""Cluster tweets into categories using KMeans."""
all_tweets = [tweet['content'] for tweet in self.tweets]
if not all_tweets:
raise ValueError("No tweets available for clustering.")
tfidf_matrix = self.vectorizer.fit_transform(all_tweets)
kmeans = KMeans(n_clusters=5, random_state=1)
kmeans.fit(tfidf_matrix)
for i, tweet in enumerate(self.tweets):
tweet['category'] = f"Category {kmeans.labels_[i]}"
return pd.DataFrame(self.tweets)
def analyze_personality(self, max_tweets=50):
"""Comprehensive personality analysis using a limited subset of tweets."""
if not self.tweets:
raise ValueError("No tweets available for personality analysis.")
all_tweets = [tweet['content'] for tweet in self.tweets][:max_tweets]
analysis_prompt = f"""Perform a deep psychological analysis of the author based on these tweets:
Core beliefs, emotional tendencies, cognitive patterns, etc.
Tweets for analysis:
{json.dumps(all_tweets, indent=2)}
"""
# Prepare input for the fine-tuned model
inputs = self.tokenizer(analysis_prompt, return_tensors="pt", truncation=True, padding=True, max_length=512)
try:
# Generate response using the fine-tuned model
outputs = self.model.generate(inputs['input_ids'], max_length=500)
self.personality_profile = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return self.personality_profile
except Exception as e:
return f"Error during personality analysis: {str(e)}"
def analyze_topics(self, n_topics=None):
"""Extract and identify different topics the author has tweeted about."""
all_tweets = [tweet['content'] for tweet in self.tweets]
if not all_tweets:
return []
n_topics = n_topics or min(5, len(all_tweets) // 10)
tfidf_matrix = self.vectorizer.fit_transform(all_tweets)
nmf_model = NMF(n_components=n_topics, random_state=1)
nmf_model.fit(tfidf_matrix)
topics = []
for topic_idx, topic in enumerate(nmf_model.components_):
topic_words = [self.vectorizer.get_feature_names_out()[i] for i in topic.argsort()[:-n_topics - 1:-1]]
topics.append(" ".join(topic_words))
return list(set(topics)) # Remove duplicates
def count_tokens(self, text):
"""Estimate the number of tokens in the given text."""
return len(text.split())
def generate_tweet(self, context="", sample_size=3):
"""Generate a new tweet by sampling random tweets and avoiding repetition."""
if not self.tweets:
return "Error: No tweets available for generation."
# Randomly sample unique tweets
available_tweets = [tweet for tweet in self.tweets if tweet['content'] not in self.used_tweets]
if len(available_tweets) < sample_size:
self.used_tweets.clear() # Reset used tweets if all have been used
available_tweets = self.tweets
sampled_tweets = random.sample(available_tweets, sample_size)
sampled_contents = [tweet['content'] for tweet in sampled_tweets]
# Update the used tweets tracker
self.used_tweets.update(sampled_contents)
# Truncate personality profile to avoid token overflow
personality_profile_excerpt = self.personality_profile[:400] if len(self.personality_profile) > 400 else self.personality_profile
# Construct the prompt
prompt = f"""Based on this personality profile:
{personality_profile_excerpt}
Current context or topic (if any):
{context}
Tweets for context:
{', '.join(sampled_contents)}
**Only generate the tweet. Do not include analysis, explanation, or any other content.**
"""
inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, padding=True, max_length=512)
try:
# Generate tweet using the fine-tuned model
outputs = self.model.generate(inputs['input_ids'], max_length=150)
tweet = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return tweet
except Exception as e:
return f"Error generating tweet: {str(e)}"
|