Spaces:
Running
Running
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| import os | |
| from transformers import pipeline | |
| import numpy as np | |
| import torch | |
| import re | |
| from werkzeug.utils import secure_filename | |
| import uuid | |
| import platform | |
| # Set Transformers Cache Directory | |
| if platform.system() == "Windows": | |
| print("Windows detected. Assigning cache directory to Transformers in AppData\\Local.") | |
| transformers_cache_directory = os.path.join(os.getenv('LOCALAPPDATA'), 'transformers_cache') | |
| else: | |
| print("Non-Windows system detected. Assigning cache directory to /tmp/transformers_cache.") | |
| transformers_cache_directory = '/tmp/transformers_cache' | |
| # Ensure the directory exists | |
| if not os.path.exists(transformers_cache_directory): | |
| try: | |
| os.makedirs(transformers_cache_directory, exist_ok=True) | |
| print(f"Directory '{transformers_cache_directory}' created successfully.") | |
| except OSError as e: | |
| print(f"Error creating directory '{transformers_cache_directory}': {e}") | |
| else: | |
| print(f"Directory '{transformers_cache_directory}' already exists.") | |
| # Set the TRANSFORMERS_CACHE environment variable | |
| os.environ['TRANSFORMERS_CACHE'] = transformers_cache_directory | |
| print(f"Environment variable TRANSFORMERS_CACHE set to '{transformers_cache_directory}'.") | |
| class Config: | |
| UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), '/tmp/uploads') # Correct path | |
| MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16MB max file size | |
| CORS_HEADERS = 'Content-Type' | |
| class DialogueSentimentAnalyzer: | |
| def __init__(self, model_name: str = "microsoft/DialogRPT-updown"): | |
| self.device = 0 if torch.cuda.is_available() else -1 | |
| self.dialogue_model = pipeline( | |
| 'text-classification', | |
| model="microsoft/DialogRPT-updown", | |
| device=self.device | |
| ) | |
| self.sentiment_model = pipeline( | |
| 'sentiment-analysis', | |
| model="distilbert-base-uncased-finetuned-sst-2-english", | |
| device=self.device | |
| ) | |
| self.max_length = 512 | |
| def parse_dialogue(self, text: str): | |
| lines = text.strip().split('\n') | |
| dialogue = [] | |
| current_speaker = None | |
| current_text = [] | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| speaker_match = re.match(r'^([^:]+):', line) | |
| if speaker_match: | |
| if current_speaker and current_text: | |
| dialogue.append({'speaker': current_speaker, 'text': ' '.join(current_text)}) | |
| current_speaker = speaker_match.group(1) | |
| current_text = [line[len(current_speaker) + 1:].strip()] | |
| else: | |
| if current_speaker: | |
| current_text.append(line.strip()) | |
| if current_speaker and current_text: | |
| dialogue.append({'speaker': current_speaker, 'text': ' '.join(current_text)}) | |
| return dialogue | |
| def analyze_utterance(self, utterance): | |
| text = utterance['text'] | |
| dialogue_score = self.dialogue_model(text)[0] | |
| sentiment = self.sentiment_model(text)[0] | |
| positive_phrases = ['thank you', 'thanks', 'appreciate', 'great', 'perfect', 'looking forward', 'flexible', 'competitive'] | |
| negative_phrases = ['concerned', 'worry', 'issue', 'problem', 'difficult', 'unfortunately', 'sorry'] | |
| text_lower = text.lower() | |
| positive_count = sum(1 for phrase in positive_phrases if phrase in text_lower) | |
| negative_count = sum(1 for phrase in negative_phrases if phrase in text_lower) | |
| sentiment_score = float(sentiment['score']) | |
| if sentiment['label'] == 'NEGATIVE': | |
| sentiment_score = 1 - sentiment_score | |
| final_score = sentiment_score | |
| if positive_count > negative_count: | |
| final_score = min(1.0, final_score + 0.1 * (positive_count - negative_count)) | |
| elif negative_count > positive_count: | |
| final_score = max(0.0, final_score - 0.1 * (negative_count - positive_count)) | |
| return { | |
| 'speaker': utterance['speaker'], | |
| 'text': text, | |
| 'sentiment_score': final_score, | |
| 'engagement_score': float(dialogue_score['score']), | |
| 'positive_phrases': positive_count, | |
| 'negative_phrases': negative_count | |
| } | |
| def analyze_dialogue(self, text: str): | |
| dialogue = self.parse_dialogue(text) | |
| utterance_results = [self.analyze_utterance(utterance) for utterance in dialogue] | |
| overall_sentiment = np.mean([r['sentiment_score'] for r in utterance_results]) | |
| overall_engagement = np.mean([r['engagement_score'] for r in utterance_results]) | |
| sentiment_variance = np.std([r['sentiment_score'] for r in utterance_results]) | |
| confidence = max(0.0, 1.0 - sentiment_variance) | |
| speaker_sentiments = {} | |
| for result in utterance_results: | |
| if result['speaker'] not in speaker_sentiments: | |
| speaker_sentiments[result['speaker']] = [] | |
| speaker_sentiments[result['speaker']].append(result['sentiment_score']) | |
| speaker_averages = {speaker: np.mean(scores) for speaker, scores in speaker_sentiments.items()} | |
| return [{'label': 'Overall Sentiment', 'score': float(overall_sentiment)}, | |
| {'label': 'Confidence', 'score': float(confidence)}, | |
| {'label': 'Engagement', 'score': float(overall_engagement)}] + [ | |
| {'label': f'{speaker} Sentiment', 'score': float(score)} for speaker, score in speaker_averages.items() | |
| ] | |
| def save_uploaded_file(content, upload_folder): | |
| filename = f"{uuid.uuid4().hex}.txt" | |
| file_path = os.path.join(upload_folder, secure_filename(filename)) | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| f.write(content) | |
| return file_path | |
| def analyze_sentiment(file_path: str): | |
| try: | |
| analyzer = DialogueSentimentAnalyzer() | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| text = f.read() | |
| return analyzer.analyze_dialogue(text) | |
| except Exception as e: | |
| print(f"Error in sentiment analysis: {str(e)}") | |
| return [{'label': 'Error', 'score': 0.5}] | |
| def create_app(): | |
| app = Flask(__name__) | |
| app.config.from_object(Config) | |
| # Ensure the uploads directory exists | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| def upload_transcript(): | |
| try: | |
| transcript = request.form.get('transcript') | |
| if not transcript: | |
| return jsonify({'error': 'No transcript received'}), 400 | |
| # Save the transcript in the current folder | |
| file_path = os.path.join(os.getcwd(), 'transcript.txt') | |
| with open(file_path, 'w') as file: | |
| file.write(transcript) | |
| # Analyze sentiment | |
| sentiment_result = analyze_sentiment(file_path) | |
| # Remove the temporary file | |
| os.remove(file_path) | |
| return jsonify({'sentiment': sentiment_result}), 200 | |
| except Exception as e: | |
| return jsonify({'error': str(e)}), 500 | |
| return app | |
| if __name__ == '__main__': | |
| app = create_app() | |
| app.run(host="0.0.0.0", port=5000) |