Spaces:
Sleeping
Sleeping
import warnings | |
import time | |
import os | |
from typing import Dict, Tuple, List | |
from dataclasses import dataclass | |
import numpy as np | |
import pandas as pd | |
from tqdm.auto import tqdm | |
import google.generativeai as genai | |
from tenacity import retry, stop_after_attempt, wait_exponential | |
from sentence_transformers import SentenceTransformer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import gradio as gr | |
# Suppress warnings | |
warnings.filterwarnings("ignore") | |
class EvaluationConfig: | |
api_key: str | |
model_name: str = "gemini-1.5-flash" | |
batch_size: int = 5 | |
class EvaluationPrompts: | |
def get_first_check(original_prompt: str, response: str) -> str: | |
return f"""Оцените следующий ответ по шкале от 0 до 10: | |
Оригинальный запрос: {original_prompt} | |
Ответ: {response} | |
Оцените по критериям: | |
1. Креативность (уникальность и оригинальность ответа) | |
2. Разнообразие (использование разных языковых средств) | |
3. Релевантность (соответствие запросу) | |
Дайте только числовые оценки в формате: | |
Креативность: [число] | |
Разнообразие: [число] | |
Релевантность: [число]""" | |
def get_second_check(original_prompt: str, response: str) -> str: | |
return f"""Вы — эксперт по оценке качества текстов, обладающий глубокими знаниями в области лингвистики, креативного письма и искусственного интеллекта. Ваша задача — объективно оценить представленный ответ по следующим критериям. | |
### **Оригинальный запрос:** | |
{original_prompt} | |
### **Ответ:** | |
{response} | |
## **Инструкция по оценке** | |
Оцените ответ по шкале от 0 до 10 по трем критериям: | |
1. **Креативность** – Насколько ответ уникален и оригинален? Есть ли неожиданные, но уместные идеи? | |
2. **Разнообразие** – Использует ли ответ различные стилистические приемы, примеры, аналогии, синонимы? Насколько он выразителен? | |
3. **Релевантность** – Насколько ответ соответствует запросу? Полностью ли он отвечает на поставленный вопрос? | |
### **Формат ответа:** | |
Выведите оценки в точном формате: | |
Креативность: [число] | |
Разнообразие: [число] | |
Релевантность: [число]""" | |
def get_third_check(original_prompt: str, response: str) -> str: | |
return f"""Вы — эксперт по анализу текстов. Ваша задача — оценить ответ на запрос по шкале от 0 до 100 по трем критериям. | |
### **Оригинальный запрос:** | |
{original_prompt} | |
### **Ответ:** | |
{response} | |
## **Критерии оценки:** | |
1. **Креативность** – Насколько ответ уникален и оригинален? Используются ли необычные идеи и неожиданные подходы? | |
2. **Разнообразие** – Применяются ли разные языковые конструкции, примеры, аналогии, синонимы? | |
3. **Релевантность** – Насколько ответ соответствует запросу? Полностью ли он отвечает на поставленный вопрос? | |
Выведите оценки в точном формате: | |
Креативность: [число] | |
Разнообразие: [число] | |
Релевантность: [число]""" | |
class ResponseEvaluator: | |
def __init__(self, config: EvaluationConfig): | |
"""Initialize the evaluator with given configuration""" | |
self.config = config | |
self.model = self._setup_model() | |
def _setup_model(self) -> genai.GenerativeModel: | |
"""Set up the Gemini model""" | |
genai.configure(api_key=self.config.api_key) | |
return genai.GenerativeModel(self.config.model_name) | |
def evaluate_single_response(self, original_prompt: str, response: str) -> Tuple[Dict[str, float], str]: | |
"""Evaluate a single response using the configured model""" | |
evaluation_prompts = self._create_evaluation_prompt(original_prompt, response) | |
all_scores = [] | |
all_texts = [] | |
for prompt in evaluation_prompts: | |
try: | |
evaluation = self.model.generate_content(prompt) | |
scores = self._parse_evaluation_scores(evaluation.text) | |
all_scores.append(scores) | |
all_texts.append(evaluation.text) | |
except Exception as e: | |
print(f"Error with prompt: {str(e)}") | |
all_scores.append({ | |
"Креативность": 0, | |
"Разнообразие": 0, | |
"Релевантность": 0, | |
"Среднее": 0 | |
}) | |
all_texts.append("Error in evaluation") | |
final_scores = { | |
"Креативность": np.mean([s.get("Креативность", 0) for s in all_scores]), | |
"Разнообразие": np.mean([s.get("Разнообразие", 0) for s in all_scores]), | |
"Релевантность": np.mean([s.get("Релевантность", 0) for s in all_scores]) | |
} | |
final_scores["Среднее"] = np.mean(list(final_scores.values())) | |
return final_scores, "\n\n".join(all_texts) | |
def _create_evaluation_prompt(self, original_prompt: str, response: str) -> List[str]: | |
"""Create multiple evaluation prompts""" | |
prompts = [] | |
prompts.append(EvaluationPrompts.get_first_check(original_prompt, response)) | |
prompts.append(EvaluationPrompts.get_second_check(original_prompt, response)) | |
prompts.append(EvaluationPrompts.get_third_check(original_prompt, response)) | |
return prompts | |
def _parse_evaluation_scores(self, evaluation_text: str) -> Dict[str, float]: | |
"""Parse evaluation text into scores dictionary""" | |
scores = {} | |
for line in evaluation_text.strip().split('\n'): | |
if ':' in line: | |
parts = line.split(':') | |
if len(parts) >= 2: | |
metric, score_text = parts[0], ':'.join(parts[1:]) | |
try: | |
score_text = score_text.strip() | |
score = float(''.join(c for c in score_text if c.isdigit() or c == '.')) | |
scores[metric.strip()] = score | |
except ValueError: | |
continue | |
if scores: | |
scores['Среднее'] = np.mean([v for k, v in scores.items() if k != 'Среднее']) | |
return scores | |
def evaluate_dataset(self, df: pd.DataFrame, prompt_col: str, answer_col: str) -> pd.DataFrame: | |
"""Evaluate all responses in the dataset""" | |
evaluations = [] | |
eval_answers = [] | |
total_batches = (len(df) + self.config.batch_size - 1) // self.config.batch_size | |
for i in range(0, len(df), self.config.batch_size): | |
batch = df.iloc[i:i+self.config.batch_size] | |
with tqdm(batch.iterrows(), total=len(batch), | |
desc=f"Batch {i//self.config.batch_size + 1}/{total_batches}") as pbar: | |
for _, row in pbar: | |
try: | |
scores, eval_text = self.evaluate_single_response( | |
str(row[prompt_col]), | |
str(row[answer_col]) | |
) | |
evaluations.append(scores) | |
eval_answers.append(eval_text) | |
except Exception as e: | |
print(f"Error processing row {_}: {str(e)}") | |
evaluations.append({ | |
"Креативность": 0, | |
"Разнообразие": 0, | |
"Релевантность": 0, | |
"Среднее": 0 | |
}) | |
eval_answers.append("Error in evaluation") | |
time.sleep(2) | |
time.sleep(10) | |
return self._create_evaluation_dataframe(df, evaluations, eval_answers) | |
def _create_evaluation_dataframe(self, | |
original_df: pd.DataFrame, | |
evaluations: List[Dict], | |
eval_answers: List[str]) -> pd.DataFrame: | |
score_df = pd.DataFrame(evaluations) | |
df = original_df.copy() | |
df['gemini_eval_answer'] = eval_answers | |
return pd.concat([df, score_df], axis=1) | |
class StabilityEvaluator: | |
def __init__(self, model_name='paraphrase-MiniLM-L6-v2'): | |
self.model = SentenceTransformer(model_name) | |
def calculate_similarity(self, prompts, outputs): | |
prompt_embeddings = self.model.encode(prompts) | |
output_embeddings = self.model.encode(outputs) | |
similarities = cosine_similarity(prompt_embeddings, output_embeddings) | |
stability_coefficients = np.diag(similarities) | |
return { | |
'stability_score': np.mean(stability_coefficients) * 100, # Scale to 0-100 | |
'stability_std': np.std(stability_coefficients) * 100, | |
'individual_similarities': stability_coefficients | |
} | |
class BenchmarkEvaluator: | |
def __init__(self, gemini_api_key): | |
"""Initialize both evaluators""" | |
self.creative_evaluator = ResponseEvaluator( | |
EvaluationConfig(api_key=gemini_api_key) | |
) | |
self.stability_evaluator = StabilityEvaluator() | |
self.results_history = [] | |
# Create results directory if it doesn't exist | |
os.makedirs('results', exist_ok=True) | |
# Load previous benchmark results if available | |
self.benchmark_file = 'results/benchmark_results.csv' | |
if os.path.exists(self.benchmark_file): | |
try: | |
self.leaderboard_df = pd.read_csv(self.benchmark_file) | |
except: | |
self.leaderboard_df = pd.DataFrame(columns=[ | |
'model', 'creativity_score', 'stability_score', | |
'combined_score', 'evaluation_timestamp' | |
]) | |
else: | |
self.leaderboard_df = pd.DataFrame(columns=[ | |
'model', 'creativity_score', 'stability_score', | |
'combined_score', 'evaluation_timestamp' | |
]) | |
def evaluate_model(self, df, model_name, prompt_col='rus_prompt', answer_col=None): | |
"""Evaluate a single model's responses""" | |
# Use direct answer column if provided, otherwise derive from model name | |
if answer_col is None: | |
answer_col = f"{model_name}_answers" | |
if answer_col not in df.columns: | |
raise ValueError(f"Column {answer_col} not found in dataframe") | |
print(f"Evaluating creativity for {model_name}...") | |
creative_df = self.creative_evaluator.evaluate_dataset(df, prompt_col, answer_col) | |
print(f"Evaluating stability for {model_name}...") | |
stability_results = self.stability_evaluator.calculate_similarity( | |
df[prompt_col].tolist(), | |
df[answer_col].tolist() | |
) | |
creative_score = creative_df["Среднее"].mean() | |
stability_score = stability_results['stability_score'] | |
combined_score = (creative_score + stability_score) / 2 | |
# Add timestamp | |
timestamp = pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S') | |
results = { | |
'model': model_name, | |
'creativity_score': creative_score, | |
'stability_score': stability_score, | |
'combined_score': combined_score, | |
'evaluation_timestamp': timestamp, | |
'creative_details': { | |
'creativity': creative_df["Креативность"].mean(), | |
'diversity': creative_df["Разнообразие"].mean(), | |
'relevance': creative_df["Релевантность"].mean(), | |
}, | |
'stability_details': stability_results | |
} | |
# Save detailed results | |
output_file = f'results/evaluated_responses_{model_name}_{timestamp.replace(":", "-").replace(" ", "_")}.csv' | |
creative_df.to_csv(output_file, index=False) | |
print(f"Detailed results saved to {output_file}") | |
# Update leaderboard | |
result_row = { | |
'model': model_name, | |
'creativity_score': creative_score, | |
'stability_score': stability_score, | |
'combined_score': combined_score, | |
'evaluation_timestamp': timestamp | |
} | |
self.leaderboard_df = pd.concat([self.leaderboard_df, pd.DataFrame([result_row])], ignore_index=True) | |
self.leaderboard_df.to_csv(self.benchmark_file, index=False) | |
self.results_history.append(results) | |
return results, creative_df | |
def evaluate_all_models(self, df, models=None, model_columns=None, prompt_col='rus_prompt'): | |
"""Evaluate multiple models from the dataframe""" | |
if models is not None and model_columns is not None: | |
model_mapping = dict(zip(models, model_columns)) | |
elif models is not None: | |
model_mapping = {model: f"{model}_answers" for model in models} | |
else: | |
answer_cols = [col for col in df.columns if col.endswith('_answers')] | |
models = [col.replace('_answers', '') for col in answer_cols] | |
model_mapping = dict(zip(models, answer_cols)) | |
results = [] | |
detail_dfs = [] | |
for model, column in model_mapping.items(): | |
try: | |
model_results, detail_df = self.evaluate_model(df, model, prompt_col, column) | |
results.append(model_results) | |
detail_dfs.append(detail_df) | |
print(f"Completed evaluation for {model}") | |
except Exception as e: | |
print(f"Error evaluating {model}: {str(e)}") | |
# Create combined results DataFrame | |
benchmark_df = pd.DataFrame([{ | |
'model': r['model'], | |
'creativity_score': r['creativity_score'], | |
'stability_score': r['stability_score'], | |
'combined_score': r['combined_score'], | |
'evaluation_timestamp': r['evaluation_timestamp'] | |
} for r in results]) | |
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
benchmark_df.to_csv(f'results/benchmark_results_{timestamp}.csv', index=False) | |
print(f"Benchmark completed. Results saved to results/benchmark_results_{timestamp}.csv") | |
if detail_dfs: | |
combined_details = pd.concat(detail_dfs) | |
combined_details.to_csv(f'results/detailed_evaluation_{timestamp}.csv', index=False) | |
print(f"Detailed evaluation saved to results/detailed_evaluation_{timestamp}.csv") | |
return benchmark_df, self.leaderboard_df | |
def get_leaderboard(self): | |
"""Return the current leaderboard""" | |
if self.leaderboard_df.empty: | |
return pd.DataFrame(columns=['model', 'creativity_score', 'stability_score', 'combined_score', 'evaluation_timestamp']) | |
# Sort by combined score (descending) | |
sorted_df = self.leaderboard_df.sort_values(by='combined_score', ascending=False) | |
return sorted_df | |
def create_gradio_interface(): | |
os.makedirs('results', exist_ok=True) | |
state = { | |
'evaluator': None, | |
'last_results': None, | |
'leaderboard': None | |
} | |
# Load existing leaderboard if available | |
leaderboard_path = 'results/benchmark_results.csv' | |
if os.path.exists(leaderboard_path): | |
try: | |
state['leaderboard'] = pd.read_csv(leaderboard_path) | |
except: | |
state['leaderboard'] = pd.DataFrame(columns=['model', 'creativity_score', 'stability_score', 'combined_score', 'evaluation_timestamp']) | |
else: | |
state['leaderboard'] = pd.DataFrame(columns=['model', 'creativity_score', 'stability_score', 'combined_score', 'evaluation_timestamp']) | |
with gr.Blocks(title="Model Response Evaluator") as app: | |
gr.Markdown("# Model Response Evaluator") | |
gr.Markdown("Upload a CSV file with prompts and model responses to evaluate and benchmark models.") | |
with gr.Row(): | |
gemini_api_key = gr.Textbox(label="Gemini API Key", type="password") | |
with gr.Row(): | |
csv_file = gr.File(label="Upload CSV with responses") | |
prompt_col = gr.Textbox(label="Prompt Column Name", value="rus_prompt") | |
with gr.Row(): | |
model_input_method = gr.Radio( | |
choices=["Auto-detect from columns", "Specify models and columns"], | |
label="Model Input Method", | |
value="Auto-detect from columns" | |
) | |
with gr.Row(visible=False) as model_config_row: | |
models_input = gr.Textbox(label="Model names (comma-separated)") | |
answer_cols_input = gr.Textbox(label="Answer column names (comma-separated, matching model order)") | |
evaluate_btn = gr.Button("Run Benchmark") | |
with gr.Tabs(): | |
with gr.Tab("Current Results"): | |
current_results = gr.DataFrame(label="Current Benchmark Results") | |
download_btn = gr.Button("Download Results CSV") | |
current_results_file = gr.File(label="Download Results") | |
with gr.Tab("Leaderboard"): | |
leaderboard_table = gr.DataFrame(value=state['leaderboard'], label="Model Leaderboard") | |
refresh_btn = gr.Button("Refresh Leaderboard") | |
def toggle_model_input(choice): | |
return gr.Row(visible=(choice == "Specify models and columns")) | |
model_input_method.change(toggle_model_input, model_input_method, model_config_row) | |
def evaluate_batch(api_key, file, prompt_column, input_method, models_text, answer_cols_text): | |
try: | |
if not api_key: | |
return None, None, None | |
# Load the CSV file | |
file_path = file.name | |
df = pd.read_csv(file_path) | |
# Initialize evaluator | |
state['evaluator'] = BenchmarkEvaluator(api_key) | |
# Process model names and columns if provided | |
if input_method == "Specify models and columns": | |
if not models_text.strip() or not answer_cols_text.strip(): | |
return None, None, None | |
models = [m.strip() for m in models_text.split(',')] | |
answer_cols = [c.strip() for c in answer_cols_text.split(',')] | |
if len(models) != len(answer_cols): | |
return pd.DataFrame({'Error': ['Number of models and answer columns must match']}), state['leaderboard'], None | |
results_df, leaderboard_df = state['evaluator'].evaluate_all_models( | |
df, models=models, model_columns=answer_cols, prompt_col=prompt_column | |
) | |
else: | |
# Auto-detect mode | |
results_df, leaderboard_df = state['evaluator'].evaluate_all_models( | |
df, prompt_col=prompt_column | |
) | |
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
results_path = f'results/benchmark_results_{timestamp}.csv' | |
results_df.to_csv(results_path, index=False) | |
# Update state | |
state['last_results'] = results_df | |
state['leaderboard'] = leaderboard_df | |
return results_df, leaderboard_df, results_path | |
except Exception as e: | |
error_df = pd.DataFrame({'Error': [str(e)]}) | |
return error_df, state['leaderboard'], None | |
def download_results(): | |
if state['last_results'] is not None: | |
timestamp = pd.Timestamp.now().strftime('%Y%m%d_%H%M%S') | |
file_path = f'results/benchmark_download_{timestamp}.csv' | |
state['last_results'].to_csv(file_path, index=False) | |
return file_path | |
return None | |
def refresh_leaderboard(): | |
# Reload leaderboard from file | |
if os.path.exists('results/benchmark_results.csv'): | |
state['leaderboard'] = pd.read_csv('results/benchmark_results.csv') | |
return state['leaderboard'] | |
evaluate_btn.click( | |
evaluate_batch, | |
inputs=[gemini_api_key, csv_file, prompt_col, model_input_method, models_input, answer_cols_input], | |
outputs=[current_results, leaderboard_table, current_results_file] | |
) | |
download_btn.click(download_results, inputs=[], outputs=[current_results_file]) | |
refresh_btn.click(refresh_leaderboard, inputs=[], outputs=[leaderboard_table]) | |
# Initialize the leaderboard | |
leaderboard_table.value = state['leaderboard'] | |
return app | |
def main(): | |
app = create_gradio_interface() | |
app.launch(share=True) | |
if __name__ == "__main__": | |
main() |