Spaces:
Running
Running
import streamlit as st | |
import os | |
from huggingface_hub import InferenceClient | |
import numpy as np | |
import pandas as pd | |
import concurrent.futures | |
import re | |
import time | |
import random | |
import functools | |
# FILES | |
iteration_output_file = "llm_benchmark_iteration_results.csv" # File to store iteration results, defined as global | |
results_file = "llm_benchmark_results.csv" # all data | |
# GLOBAL PARAMETERS | |
time_sleep = 0.2 # take time before making a new request | |
base_temp = 0.2 # base temperature for models | |
# QUESTION GLOBAL PARAMETERS | |
question_temp = 0.7 # question generation temperature | |
question_max_tokens = 256 # question generation max tokens | |
question_treshold = 3.5 # min average rank for questions to be accepted | |
reject_rank = 2 # all question ranks must be above | |
# ANSWER GLOBAL PARAMETERS | |
answer_temp = 0.5 # base answering temperature | |
long_temp = 1.0 # answering temperature for creative questions | |
answer_max_tokens = 1048 # max tokens per answer | |
long_max_tokens = 2048 # max tokens per creative answer | |
# --- Difficulty probabilities --- | |
difficulty_probabilities = { | |
"a very simple": 0.0, | |
"a simple": 0.0, | |
"a": 0.1, # average | |
"a difficult": 0.3, | |
"a very difficult": 0.6 | |
} | |
def retry_api_request(max_retries=3, wait_time=10): | |
"""Decorator for retrying API requests with rate limit handling.""" | |
def decorator(func): | |
def wrapper(*args, **kwargs): | |
retries = 0 | |
while retries <= max_retries: | |
try: | |
return func(*args, **kwargs) | |
except Exception as e: | |
print(f"API error: {e}") | |
if retries < max_retries: | |
print(f"Waiting for {wait_time} seconds before retrying... (Retry {retries + 1}/{max_retries})") | |
time.sleep(wait_time) | |
retries += 1 | |
else: | |
print(f"Max retries reached. Request failed.") | |
return None | |
return None | |
return wrapper | |
return decorator | |
# --- Single model request function for Hugging Face --- | |
def make_hf_request(model_name, messages, temperature, max_tokens, token=None): | |
""" | |
Send request to a Hugging Face model using InferenceClient | |
Args: | |
model_name: Name of the Hugging Face model | |
messages: Messages in the format [{"role": "user", "content": "..."}] | |
temperature: Temperature parameter for generation | |
max_tokens: Maximum tokens to generate | |
token: Hugging Face API token | |
Returns: | |
Generated text or None if request fails | |
""" | |
client = InferenceClient(model=model_name, token=token) | |
# Convert messages to a prompt string | |
prompt = "" | |
for msg in messages: | |
if msg["role"] == "user": | |
prompt += f"User: {msg['content']}\n\n" | |
elif msg["role"] == "assistant": | |
prompt += f"Assistant: {msg['content']}\n\n" | |
if not prompt.endswith("Assistant: "): | |
prompt += "Assistant: " | |
try: | |
response = client.text_generation( | |
prompt, | |
max_new_tokens=max_tokens, | |
temperature=temperature, | |
do_sample=True | |
) | |
return response | |
except Exception as e: | |
print(f"Hugging Face Inference API error: {e}") | |
return None | |
# --- Prompting Functions --- | |
def generate_question_prompt(topic, difficulty): | |
# 1. Base Instructions with Difficulty and Topic Clarity | |
base_instructions = [ | |
f"Generate {difficulty} question on the following topic: {topic}.", | |
f"Formulate {difficulty} question regarding the following topic: {topic}.", | |
f"Create {difficulty} question about the following topic: {topic}.", | |
f"Compose {difficulty} question on the following topic: {topic}.", | |
f"Develop {difficulty} question that explores the following topic: {topic}." | |
] | |
# 2. Difficulty Options and Instructions | |
difficulty_instructions = { | |
"a very simple": [ | |
"The question should test basic, widely known facts.", | |
"It should be answerable with common knowledge.", | |
"Focus on simple recall and recognition.", | |
"The answer is immediately obvious to someone with basic knowledge." | |
], | |
"a simple": [ | |
"The question should require recall of specific information.", | |
"It should test knowledge of fundamental concepts.", | |
"The answer can be found in introductory materials.", | |
"No complex reasoning or deep analysis is needed." | |
], | |
"a": [ # For "average" difficulty - no specific instructions needed beyond base | |
"The question should be moderately challenging.", | |
"It should require some basic reasoning or inference.", | |
"The answer may require connecting two or three pieces of information.", | |
"It should test understanding beyond simple memorization." | |
], | |
"a difficult": [ | |
"The question should require analytical thinking and application of knowledge.", | |
"It should go beyond simple facts and require interpretation.", | |
"The answer may involve multiple steps or perspectives.", | |
"It should test deeper comprehension and problem-solving skills." | |
], | |
"a very difficult": [ | |
"The question should require expert-level knowledge and critical analysis.", | |
"It should involve complex reasoning and nuanced understanding.", | |
"The answer may require synthesis of information from various sources.", | |
"It should be challenging even for someone knowledgeable in the field." | |
], | |
} | |
difficulty_instructions_creative_writing = { | |
"a very simple": [ | |
"The task should be very easy to complete, requiring minimal creativity or effort.", | |
"Focus on simple, straightforward writing." | |
], | |
"a simple": [ | |
"The task should require some imagination, but remain relatively easy.", | |
"Focus on basic storytelling or poetic elements." | |
], | |
"a": [ | |
"The task should be moderately challenging, requiring a good balance of creativity and execution.", | |
"Explore more complex ideas or writing styles." | |
], | |
"a difficult": [ | |
"The task should be quite challenging, pushing the boundaries of creativity and writing skill.", | |
"Incorporate complex themes, metaphors, or unusual narrative structures." | |
], | |
"a very difficult": [ | |
"The task should be extremely challenging, requiring a high level of originality and mastery of language.", | |
"Experiment with unconventional forms, complex symbolism, or profound philosophical concepts." | |
], | |
} | |
# --- Topic-Specific Instructions --- | |
topic_instructions = { | |
"math": [ | |
"The question should be a mathematical problem.", | |
"It should involve calculations or mathematical reasoning.", | |
"Formulate a math word problem.", | |
"Create a mathematical problem related to a specic field of math study" | |
], | |
"logics": [ | |
"The question should be a logic puzzle or riddle.", | |
"It should require deductive or inductive reasoning.", | |
"Formulate a logical reasoning problem.", | |
"Create a logic puzzle that requires careful analysis." | |
], | |
"history": [ | |
"The question should relate to a specific historical event, period, or figure.", | |
"It should require analyzing historical causes and consequences.", | |
"Formulate a question about historical interpretation or analysis.", | |
"Create a question that requires understanding of historical context." | |
], | |
"current news": [ | |
"The question should pertain to a recent, significant news event.", | |
"It should require understanding of current affairs.", | |
"Formulate a question about the implications of a current news event.", | |
"Create a question that requires analysis of a recent development." | |
], | |
"general culture": [ | |
"The question should relate to general knowledge and cultural awareness.", | |
"It should test understanding of common cultural references.", | |
"Formulate a question about a well-known cultural phenomenon.", | |
"Create a general knowledge question." | |
], | |
"science": [ | |
"Generate a question regarding a scientific concept.", | |
"It should test the comprehension of a scientific fact or principle.", | |
"Form a question that assesses knowledge in a scientific domain." | |
], | |
"technology":[ | |
"Generate a question regarding a technological concept.", | |
"It should test the comprehension of a technological fact or principle.", | |
"Form a question that assesses knowledge in a technological domain." | |
], | |
"grammar":[ | |
"Generate a question regarding a gramatical or linguistic concept.", | |
"It should test the comprehension of a gramatical or linguistic fact or principle.", | |
"Form a question that assesses knowledge in a gramatical or linguistic domain.", | |
"Create a question testing the understanding of gramar and linguistic rules." | |
], | |
"coding":[ | |
"Generate a question about a coding concept or algorithm. Suggest also one or more programming languages to address the question.", | |
"The question should test understanding of programming principles. If required, suggest also one or more programming languages to address the question.", | |
"Formulate a coding problem or question. You may want to suggest also one or more programming languages to address the question.", | |
"Create a question that requires knowledge of programming logic. If needed, suggest also one or more programming languages to address the question.", | |
"The question should be related to software development or computer science. If required, suggest also one or more programming languages to address the question." | |
"The question should be about Python programming.", | |
"Formulate a coding problem solvable in Java.", | |
"Create a question related to JavaScript concepts." | |
"The question should involve algorithm design. Ssuggest also one or more programming languages to address the question.", | |
"Formulate a question about data structures. Suggest also one or more programming languages to address the question.", | |
"Create a question testing debugging skills.", | |
"The question should assess code optimization techniques." | |
], | |
"creative writing": [ | |
"Write a short story (under 3000 characters) that begins with the sentence: 'The old lighthouse keeper saw a light that wasn't his own.'", | |
"Compose a poem (under 3000 characters) in the style of haiku, about the feeling of a summer rain.", | |
"Write a short story (under 3000 characters), no more than five sentences, about a robot who discovers the meaning of friendship.", | |
"Create a humorous anecdote (under 3000 characters) about a cat and a laser pointer.", | |
"Write a short story (under 3000 characters) that ends with the phrase: '...and that's how the world changed forever.'", | |
"Compose a free verse poem (under 3000 characters) about the loneliness of space travel.", | |
"Write a short, poignant story (under 3000 characters) about a lost object found again.", | |
"Tell a joke (under 3000 characters) about a programmer and a bug.", | |
"Respond to the philosophical question (under 3000 characters): 'If a tree falls in a forest and no one is around to hear it, does it make a sound?' in a creative and thought-provoking way.", | |
"Write a very short story (under 3000 characters) about a talking animal.", | |
"Imagine you are a grain of sand. Describe your life (under 3000 characters).", | |
"Write a short story (under 3000 characters) set in a world where colors don't exist.", | |
"Write a poem (under 3000 characters) about the feeling of nostalgia.", | |
"Create a short, funny dialogue (under 3000 characters) between two inanimate objects.", | |
"Write a flash fiction piece (under 3000 characters) inspired by a random word (e.g., 'serendipity', 'obfuscate', 'ephemeral').", | |
"Respond to the following prompt (under 3000 characters) with a creative story: 'You wake up one morning to find you can fly.'", | |
"Compose a short story(under 3000 characters), inspired by a piece of classical music", | |
"Tell a joke (under 3000 characters) based on a pun.", | |
"Write a short description (under 3000 characters) of a dream you had.", | |
"Craft a short, suspenseful story (under 3000 characters) that begins: 'The phone rang, but the screen was blank...'", | |
], | |
} | |
#add the creative writing specific prompts to the difficulty prompt, | |
#if the topic is creative writing | |
if topic == "creative writing": | |
difficulty_instructions.update(difficulty_instructions_creative_writing) | |
# 4. Guiding Sentence for Question Types | |
question_type_intro = "As an example for you, it could be in the form of:" | |
question_types = [ | |
"a comparison question (asking to compare and contrast...).", | |
"an analysis question (asking to analyze the relationship between...).", | |
"an explanation question (asking to explain the causes of...).", | |
"a discussion question (asking to discuss the implications of...).", | |
"a significance question (asking about the significance of...).", | |
"a cause-and-effect question (like 'How does ... affect ...?').", | |
"a difference question (like 'What are the key differences between ... and ...?').", | |
"a hypothetical question (like 'What would be the consequences of ...?').", # Counterfactual | |
"a scenario-based question (like 'Develop a scenario where...').", #Scenario based | |
"a pros and cons question (Provide arguments for and against...')." #pro and cons | |
] | |
# --- Combine Prompts using Random Choices --- | |
prompt = random.choice(base_instructions) + "\n" | |
prompt += random.choice(difficulty_instructions[difficulty]) + "\n" | |
# Add topic-specific instruction, handling cases where topic might not be defined. | |
if topic in topic_instructions: | |
prompt += random.choice(topic_instructions[topic]) + "\n" | |
else: | |
print(f"Warning: No topic_instructions defined for topic '{topic}'") | |
# 5. Conditional Question Types (Not for math, logics, grammar) | |
if topic not in ["math", "logics", "grammar", "coding", "creative writing"]: | |
prompt += question_type_intro + "\n" | |
prompt += random.choice(question_types) | |
prompt += "\n\nIn generating your question, do not show your internal thought process. Make sure to provide as an output only the final complete and consistent formulation of your question\n" | |
return prompt | |
def answer_question_prompt(question): | |
return f"Answer the question below. Ensure your answer is clear and insightful, relevant to the topic discussed, logical and grammatically sound, and contains only correct information. In generating your answer, do not show your internal thought process. Provide only your final, complete, and supported answer.\n\nQuestion: {question}\n\nAnswer:" | |
def rank_answer_prompt(question, answer, topic): | |
prompt = f"""You are an expert evaluator. Rank the following answer to the given question on a scale of 1 to 5, where: | |
1: Not good answer - unclear, irrelevant to the topic, poorly formulated, or with evidently incorrect statements. For creative writing, this also includes being unoriginal, unimaginative, or failing to adhere to the prompt's constraints (including the 3000-character limit). | |
2: Quite good answer - quite clear, reasonably adherent to the topic, reasonably well-formulated, with no incorrect statements. For creative writing, some originality and imagination are present, but it may be somewhat predictable or have minor flaws. Adheres to the 3000-character limit. | |
3: Good answer - clear, relevant to the topic, well-formulated, with correct statements. For creative writing, this includes demonstrating good originality, imagination, and adherence to the prompt, including the 3000-character limit. | |
4: Very good answer - very clear, very relevant to the topic, expertly formulated, with highly correct statements. For creative writing, shows strong originality, a compelling narrative or poetic voice, and excellent adherence to the prompt, including the 3000-character limit. | |
5: Exceptionally good answer - only appliable to exceptional answers that match all the criteria of the previous "4: Very good answer", but also bring additional unique insights, perfectly sound original arguments, or other exceptional unexpected contributions to the topic. For creative writing, this indicates a truly outstanding piece of writing with exceptional creativity, emotional resonance, and masterful execution, while adhering to the 3000-character limit. | |
Consider these criteria in your ranking: | |
- Clarity: Is the answer easy to understand? Is it ambiguous or confusing? | |
- Relevance: Is the answer relevant to the specified topic? | |
- Formulation: Is the answer well-structured and grammatically correct? Is it logically sound? Is it in a form that proovs expert knowledge? | |
- Correctness: Are the statements in the answer correct? (this is extremely relevant for topics such as math, grammar, logics, coding, science, technology) | |
- Interest/Engagement: Is the answer likely to be engaging or thought-provoking? (minor consideration) | |
""" | |
if topic == "creative writing": # More robust topic check | |
prompt += """ | |
- (For Creative Writing ONLY): Originality: Is the writing original and imaginative? Does it avoid clichés? | |
- (For Creative Writing ONLY): Emotional Resonance: Does the writing evoke emotion or connect with the reader on an emotional level? | |
- (For Creative Writing ONLY): Adherence to Prompt: Does the writing fully address the specific requirements of the creative writing prompt? | |
- (For Creative Writing ONLY): Character Limit: Does the writing adhere to the 3000-character limit? | |
""" | |
prompt += f""" | |
Just return a single number (the rank from 1 to 5), do not add any other text. | |
Question: {question} | |
Answer: {answer} | |
Rank:""" | |
return prompt | |
def rank_question_prompt(question, topic, difficulty): | |
difficulty_mapping_rank_prompt = { | |
"a very simple": "very simple", | |
"a simple": "simple", | |
"a": "average", | |
"a difficult": "difficult", | |
"a very difficult": "very difficult" | |
} | |
difficulty_for_prompt = difficulty_mapping_rank_prompt[difficulty] | |
prompt = f"""You are an expert evaluator of questions. Rank the quality of the following question on a scale of 1 to 5, where: | |
1: Very poor question - unclear, irrelevant to the topic, not appropriate for the difficulty level, or poorly formulated. For creative writing prompts, this also means the prompt is uninspired, lacks clear instructions, or sets an unreasonable character limit. | |
2: Poor question - somewhat unclear, loosely related to the topic, slightly inappropriate for the difficulty level, or with minor formulation issues. For creative writing, the prompt may be somewhat unimaginative or have minor clarity issues. | |
3: Good question - clear, relevant to the topic, generally appropriate for the difficulty level, and reasonably well-formulated. For creative writing, the prompt is clear, provides a reasonable starting point for creative work, and sets a clear 3000-character limit. | |
4: Very good question - clear, highly relevant to the topic, appropriate for the difficulty level, and well-formulated. For creative writing, the prompt is engaging, sparks imagination, and offers a good balance of direction and freedom, with a clear 3000-character limit. | |
5: Excellent question - exceptionally clear, insightful, highly relevant to the topic, perfectly matched to the difficulty level, and expertly formulated. For creative writing, the prompt is exceptionally creative, thought-provoking, and likely to inspire high-quality writing, with a clear 3000-character limit. | |
Consider these criteria in your ranking: | |
- Clarity: Is the question easy to understand? Is it ambiguous or confusing? | |
- Relevance: Is the question relevant to the specified topic ({topic})? | |
- Difficulty: Is the difficulty of the question appropriate for the indicated level ({difficulty_for_prompt})? | |
- Formulation: Is the question well-structured and grammatically correct? Is it logically sound? | |
- Interest/Engagement: Is the question likely to be engaging or thought-provoking? (minor consideration) | |
""" | |
if topic == "creative writing": | |
prompt += f""" | |
- **(For Creative Writing ONLY): Creativity:** Does the prompt encourage original and imaginative responses? | |
- **(For Creative Writing ONLY): Clarity of Constraints:** Are the creative constraints (e.g., story, poem, joke) and the 3000-character limit clearly stated? | |
- **(For Creative Writing ONLY): Inspiration Potential:** Is the prompt likely to inspire high-quality, creative writing? | |
""" | |
prompt += f""" | |
Just return a single number (the rank from 1 to 5), do not add any other text. | |
Question: {question} | |
Rank:""" | |
return prompt | |
# --- Helper function to parse rank strings --- | |
def parse_rank_string(rank_str, ranking_model_id): | |
match = re.search(r'^\D*(\d+)', rank_str) # Regex to find the first integer | |
if match: | |
rank_str = match.group(1) # Extract the first captured group (the integer) | |
try: | |
rank_val = int(rank_str) # Convert to integer *after* regex extraction | |
if not 1 <= rank_val <= 5: # Check if rank is within valid range | |
print(f"Warning: Model {ranking_model_id} returned rank outside of valid range [1-5]: {rank_val}. Rank set to None.") | |
return None | |
return rank_val | |
except ValueError: | |
print(f"Warning: Model {ranking_model_id} returned non-integer rank after regex extraction: '{rank_str}'. Rank set to None.") | |
return None | |
else: | |
print(f"Warning: Model {ranking_model_id} returned non-numeric rank: '{rank_str}'. Rank set to None.") | |
return None | |
# --- Helper Function for Parallel Ranking --- | |
def get_rank_from_model(ranking_model_id, question, answer, consecutive_failures, failure_threshold, unresponsive_models, model_config, topic, token=None, timeout=60): | |
start_time = time.time() | |
rank = None # Initialize rank to None, indicating potential failure | |
rank_prompt = rank_answer_prompt(question, answer, topic) | |
try: | |
response = make_hf_request(model_config[ranking_model_id]["name"], [{"role": "user", "content": rank_prompt}], base_temp, 5, token=token) | |
if response: | |
try: | |
rank_str = response.strip() | |
rank = parse_rank_string(rank_str, ranking_model_id) | |
except ValueError: | |
print(f"Warning: Model {ranking_model_id} returned non-integer rank: '{rank_str}'. Rank set to None.") | |
rank = None | |
else: | |
print(f"Warning: Model {ranking_model_id} failed to provide rank. Rank set to None.") | |
except Exception as e: | |
duration = time.time() - start_time | |
print(f"Warning: Model {ranking_model_id} ranking timed out or failed after {duration:.2f}s: {e}") | |
rank = None | |
duration = time.time() - start_time # Calculate total duration of ranking attempt | |
if duration > timeout: | |
print(f"Warning: Ranking by model {ranking_model_id} exceeded timeout of {timeout:.2f}s and took {duration:.2f}s.") | |
rank = None # Ensure rank is None if timeout occurs | |
time.sleep(time_sleep) # Keep a small delay to avoid overwhelming APIs even in parallel | |
return ranking_model_id, rank | |
# --- Helper Function for Parallel Ranking of questions --- | |
def get_question_rank_from_model(ranking_model_id, question, topic, difficulty, consecutive_failures, failure_threshold, unresponsive_models, model_config, token=None, timeout=60): | |
start_time = time.time() | |
rank = None # Initialize rank to None, indicating potential failure | |
rank_prompt = rank_question_prompt(question, topic, difficulty) # Use question rank prompt | |
try: | |
response = make_hf_request(model_config[ranking_model_id]["name"], [{"role": "user", "content": rank_prompt}], base_temp, 5, token=token) | |
if response: | |
try: | |
rank_str = response.strip() | |
rank = parse_rank_string(rank_str, ranking_model_id) | |
except ValueError: | |
print(f"Warning: Model {ranking_model_id} returned non-integer rank for question: '{rank_str}'. Rank set to None.") | |
rank = None | |
else: | |
print(f"Warning: Model {ranking_model_id} failed to provide rank for question. Rank set to None.") | |
except Exception as e: | |
duration = time.time() - start_time | |
print(f"Warning: Model {ranking_model_id} ranking question timed out or failed after {duration:.2f}s: {e}") | |
rank = None | |
duration = time.time() - start_time # Calculate total duration of ranking attempt | |
if duration > timeout: | |
print(f"Warning: Ranking question by model {ranking_model_id} exceeded timeout of {timeout:.2f}s and took {duration:.2f}s.") | |
rank = None # Ensure rank is None if timeout occurs | |
time.sleep(time_sleep) # Keep a small delay to avoid overwhelming APIs even in parallel | |
return ranking_model_id, rank | |
# --- Helper Function for Parallel Answering --- | |
def get_answer_from_model(model_id, question, consecutive_failures, failure_threshold, unresponsive_models, model_config, topic, token=None, timeout=60): | |
start_time = time.time() # Start timer | |
answer_prompt = answer_question_prompt(question) | |
answer = "Error answering" # Default answer | |
temp = answer_temp | |
max_tok = answer_max_tokens | |
if topic == "math" or topic == "coding" or topic == "grammar" or topic == "logics": | |
temp = long_temp | |
max_tok = long_max_tokens | |
try: | |
response = make_hf_request(model_config[model_id]["name"], [{"role": "user", "content": answer_prompt}], temp, max_tok, token=token) | |
if response: | |
answer = response.strip() | |
except Exception as e: | |
duration = time.time() - start_time | |
print(f"Warning: Model {model_id} answering timed out or failed after {duration:.2f}s: {e}") | |
answer = "Error answering - Timeout" # Or a specific timeout error message | |
return answer, duration # Return error answer and duration | |
time.sleep(time_sleep) # Small delay | |
duration = time.time() - start_time # Calculate duration | |
print(f"Answer generation by \"{model_id}\": {duration:.2f}s") # Print answer generation duration separately as requested - as requested | |
return answer, duration # Return answer and duration | |
# --- Core Logic --- | |
def run_benchmark(hf_models, topics, difficulties, t, model_config, token=None): | |
results = { | |
"model_name": [], | |
"topic": [], | |
"question_prompt": [], | |
"question": [], | |
"answer": [], | |
"answer_generation_duration": [], | |
"average_rank": [], | |
"ranks":[], | |
"question_rank_average": [], | |
"question_ranks": [], | |
"question_rank_duration": [] | |
} | |
cumulative_model_ranks = {} # To store cumulative ranks for each model | |
# Check if iteration output file exists and remove it if it does to start fresh | |
if os.path.exists(iteration_output_file): | |
os.remove(iteration_output_file) | |
consecutive_failures = {} # Track failures per model ID | |
failure_threshold = 5 | |
unresponsive_models = set() | |
# Updated model lists with more informative labels | |
active_models = hf_models.copy() | |
model_weights = {} | |
for model_id in active_models: | |
cumulative_model_ranks[model_id] = [] | |
consecutive_failures[model_id] = 0 | |
model_weights[model_id] = 1.0 / len(active_models) # Initial equal weights | |
difficulty_choices = list(difficulty_probabilities.keys()) | |
probability_values = list(difficulty_probabilities.values()) | |
# --- Difficulty mapping for output labels --- | |
difficulty_mapping = { | |
"a very simple": "1", | |
"a simple": "2", | |
"a": "3", | |
"a difficult": "4", | |
"a very difficult": "5" | |
} | |
s_t = 0 #count succesful iterations | |
for iteration in range(t): # Added iteration counter | |
if len(active_models) < 2: | |
print("Fewer than 2 active models remaining. Exiting benchmark.") | |
break | |
topic = random.choice(topics) | |
# --- Select difficulty with probabilities --- | |
difficulty = random.choices(difficulty_choices, weights=probability_values, k=1)[0] # Weighted random choice | |
print(f"--- Iteration {s_t + 1}/{t}: {difficulty} question ({difficulty_mapping[difficulty]}) on {topic} ---") # Print iteration number | |
st.write(f"--- Iteration {s_t + 1}/{t}: {difficulty} question ({difficulty_mapping[difficulty]}) on {topic} ---") # Print iteration number | |
# --- Question Generation --- | |
question = None | |
question_prompt = generate_question_prompt(topic, difficulty) | |
question_accepted = False # Flag to track if question is accepted | |
question_ranks_all = [] | |
question_avg_rank = np.nan | |
question_ranking_duration_total = 0 | |
cumulative_avg_rank = {} # To store cumulative average ranks for each model | |
max_attempts = 3 * len(active_models) | |
for attempt in range(max_attempts): | |
# --- Filter for question generation roles ("answer" or "both") --- | |
question_gen_candidates = [ | |
model_id for model_id in active_models | |
if model_config[model_id].get("role", "both") in ["answer", "both"] | |
] | |
if not question_gen_candidates: # No suitable models left | |
print("No models available for question generation with 'answer' or 'both' role. Skipping iteration.") | |
continue # Skip to next iteration | |
question_generator_model_id = random.choice(question_gen_candidates) | |
# --- Question Generation --- | |
response = make_hf_request(model_config[question_generator_model_id]["name"], | |
[{"role": "user", "content": question_prompt}], | |
question_temp, | |
question_max_tokens, | |
token=token) | |
if response: | |
question = response.strip() | |
consecutive_failures[question_generator_model_id] = 0 # Reset on success | |
break | |
else: | |
print(f"Skipping due to request failure for model {question_generator_model_id}.") | |
consecutive_failures[question_generator_model_id] += 1 | |
if consecutive_failures[question_generator_model_id] >= failure_threshold: | |
print(f"Model {question_generator_model_id} is unresponsive (question gen). Removing from active models.") | |
if question_generator_model_id in active_models: | |
active_models.remove(question_generator_model_id) | |
unresponsive_models.add(question_generator_model_id) | |
time.sleep(time_sleep) | |
if question is None: | |
print(f"Failed to generate a question after {max_attempts} attempts. Skipping this round.") | |
continue | |
# --- Parallel Question Ranking --- | |
question_ranks = {} | |
question_ranking_futures = [] | |
question_ranking_start_time = time.time() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=len(active_models) or 1) as executor: | |
for ranking_model_id in active_models: | |
# --- Filter for ranking roles ("rank" or "both") --- | |
if model_config[ranking_model_id].get("role", "both") in ["rank", "both"]: | |
future = executor.submit( | |
get_question_rank_from_model, | |
ranking_model_id, | |
question, | |
topic, | |
difficulty, | |
consecutive_failures, | |
failure_threshold, | |
unresponsive_models, | |
model_config, | |
token, | |
timeout=60 | |
) | |
question_ranking_futures.append(future) | |
for future in concurrent.futures.as_completed(question_ranking_futures): # Collect ranks as they become available | |
ranking_model_id, rank = future.result() # Get model_id and rank | |
question_ranks[ranking_model_id] = rank # Store rank with model_id as key | |
question_ranking_end_time = time.time() | |
question_ranking_duration_total = question_ranking_end_time - question_ranking_start_time | |
# Filter out None values (failed ranks) and calculate weighted average | |
valid_question_ranks_values = [r for r in question_ranks.values() if r is not None] # Get rank values | |
question_avg_rank = np.nan # Default to NaN | |
if valid_question_ranks_values: | |
# Create a list of weights corresponding to the valid ranks | |
weights_for_valid_question_ranks = [model_weights[model_id] | |
for model_id, rank in question_ranks.items() | |
if rank is not None] | |
#check that the length is correct | |
if len(weights_for_valid_question_ranks) != len(valid_question_ranks_values): | |
print("Warning: Mismatch length of weights and valid question ranks") | |
print(f'weights_for_valid_question_ranks {weights_for_valid_question_ranks}') | |
print(f'valid_question_ranks_values: {valid_question_ranks_values}') | |
question_avg_rank = np.average(valid_question_ranks_values, weights=weights_for_valid_question_ranks) | |
min_question_rank = min(valid_question_ranks_values) if valid_question_ranks_values else 0 # To avoid error if no valid rank | |
if question_avg_rank >= question_treshold and all(rank > reject_rank for rank in valid_question_ranks_values): # Question acceptance criteria | |
question_accepted = True | |
print(f"Question accepted. Avg Question Rank: {question_avg_rank:.2f}, Min Rank: {min_question_rank}, Ranks: {[question_ranks[m] for m in active_models if m in question_ranks]}") | |
st.write(f"Question accepted. Avg Question Rank: {question_avg_rank:.2f}, Min Rank: {min_question_rank}, Ranks: {[question_ranks[m] for m in active_models if m in question_ranks]}") | |
s_t += 1 | |
else: | |
question_accepted = False | |
print(f"Question rejected. Avg Question Rank: {question_avg_rank:.2f}, Min Rank: {min_question_rank}, Ranks: {[question_ranks[m] for m in active_models if m in question_ranks]}") | |
st.write(f"Question rejected. Avg Question Rank: {question_avg_rank:.2f}, Min Rank: {min_question_rank}, Ranks: {[question_ranks[m] for m in active_models if m in question_ranks]}") | |
if not question_accepted: | |
print("Generated question was not accepted. Regenerating question.") | |
continue | |
if len(active_models) < 2: | |
print("Fewer than 2 active models remaining. Exiting benchmark.") | |
break | |
# --- Parallel Answer Generation --- | |
answers = {} | |
answer_futures = [] | |
answer_durations = {} | |
with concurrent.futures.ThreadPoolExecutor(max_workers=len(active_models)) as executor: | |
for model_id in active_models: | |
# --- Filter for answer generation roles ("answer" or "both") --- | |
if model_config[model_id].get("role", "both") in ["answer", "both"]: | |
try: | |
future = executor.submit( | |
get_answer_from_model, | |
model_id, | |
question, | |
consecutive_failures, | |
failure_threshold, | |
unresponsive_models, | |
model_config, | |
topic, | |
token, | |
timeout=60 | |
) | |
answer_futures.append(future) | |
except TimeoutError as e: | |
print(f"Answer generation for model {model_id} timed out: {e}") | |
answer = "I am struggling to answer this question" # Treat timeout as error | |
duration = 120 # You can set a default duration or handle it differently if needed | |
answers[model_id] = answer # Store error answer | |
answer_durations[model_id] = duration # Store default duration | |
for future in concurrent.futures.as_completed(answer_futures): | |
model_id = active_models[answer_futures.index(future)] # Get model_id based on future index (order is preserved) | |
answer, duration = future.result() # Get both answer and duration - unpack the returned tuple | |
st.write(f"Answer generation by \"{model_id}\": {duration:.2f}s") # Print answer generation duration separately as requested - as requested | |
answers[model_id] = answer | |
answer_durations[model_id] = duration # Store duration - store the duration in the answer_durations dictionary | |
# --- Ranking Process --- | |
# Prepare to write to file (open in append mode outside the model loop but inside iteration loop) | |
iteration_results_file_opened = open(iteration_output_file, 'a') | |
if iteration == 0: # Write header only for the first iteration | |
iteration_results_file_opened.write("Iteration, Topic, Difficulty, Question Rank, QR Duration, Model,Cumulative Avg Rank,Iteration Avg Rank,Ranks,Ranking Duration (sec)\n") # Added Ranking Duration to header | |
for model_id in active_models: | |
answer = answers.get(model_id, "Error answering") # Safely retrieve pre-generated answer, default to error | |
duration = answer_durations.get(model_id, 0.0) # Safely retrieve duration, default to 0.0 | |
if answer == "Error answering": # Handle answer generation errors | |
consecutive_failures[model_id] += 1 | |
if consecutive_failures[model_id] >= failure_threshold: | |
print(f"Model {model_id} is consistently failing to answer. Removing from active models.") | |
if model_id in active_models: # double check before removing, might have been removed in another thread | |
active_models.remove(model_id) | |
unresponsive_models.add(model_id) | |
continue # Skip ranking if answer generation failed for this model | |
if len(active_models) < 2: # Re-check active models before ranking | |
print("Fewer than 2 active models remaining. Exiting benchmark.") | |
break | |
ranks = {} | |
ranking_futures = [] | |
ranking_start_time = time.time() | |
with concurrent.futures.ThreadPoolExecutor(max_workers=len(active_models) or 1) as executor: | |
for ranking_model_id in active_models: | |
# --- Filter for ranking roles ("rank" or "both") --- | |
if model_config[ranking_model_id].get("role", "both") in ["rank", "both"]: | |
future = executor.submit( | |
get_rank_from_model, | |
ranking_model_id, | |
question, | |
answer, | |
consecutive_failures, | |
failure_threshold, | |
unresponsive_models, | |
model_config, | |
topic, | |
token, | |
timeout=60 | |
) | |
ranking_futures.append(future) | |
for future in concurrent.futures.as_completed(ranking_futures): # Collect ranks as they become available | |
ranking_model_id, rank = future.result() # Get model_id and rank | |
ranks[ranking_model_id] = rank # Store rank with model_id as key | |
ranking_end_time = time.time() # Record end time of ranking | |
ranking_duration = ranking_end_time - ranking_start_time # Calculate duration | |
# Filter out None values (failed ranks) and calculate weighted average | |
valid_ranks_values = [r for r in ranks.values() if r is not None] # Get rank values | |
average_rank = np.nan # Default to NaN | |
if valid_ranks_values: | |
#Create a list of weights corresponding to the valid ranks | |
weights_for_valid_ranks = [model_weights[model_id] | |
for model_id, rank in ranks.items() | |
if rank is not None] | |
if len(weights_for_valid_ranks) != len(valid_ranks_values): | |
print("Warning: Mismatch length of weights and valid answer ranks") | |
print(f'weights_for_valid_ranks {weights_for_valid_ranks}') | |
print(f'valid_ranks_values: {valid_ranks_values}') | |
average_rank = np.average(valid_ranks_values, weights=weights_for_valid_ranks) | |
results["model_name"].append(model_id) | |
results["topic"].append(topic) | |
results["question_prompt"].append(question_prompt) | |
results["question"].append(question) | |
results["answer"].append(answer) | |
results["answer_generation_duration"].append(duration) | |
results["average_rank"].append(average_rank) | |
results["ranks"].append([ranks.get(m, None) for m in active_models]) # Store raw ranks including Nones, ensure order, use .get() for safety | |
results["question_rank_average"].append(question_avg_rank) # Store question rank average | |
results["question_ranks"].append([question_ranks.get(m, None) for m in active_models]) # Store question ranks, use .get() for safety | |
results["question_rank_duration"].append(question_ranking_duration_total) # Store question ranking duration | |
cumulative_model_ranks[model_id].append(average_rank) # Append current iteration's average rank | |
# Safely calculate cumulative average rank - this is where the KeyError was happening | |
cumulative_avg_rank[model_id] = np.nanmean(cumulative_model_ranks.get(model_id, [])) if cumulative_model_ranks.get(model_id, []) else np.nan | |
# --- Print and store iteration results IMMEDIATELY after ranking for this model --- | |
ranks_str = "[" + ", ".join(map(str, [ranks.get(m, None) for m in active_models])) + "]" if ranks else "[]" # Format ranks for CSV, ensure order, use .get() for safety | |
print(f"{topic}, {difficulty_mapping[difficulty]}, {model_id}, Avg Rank: {cumulative_avg_rank.get(model_id, np.nan):.2f}, Avg Rank for run: {average_rank:.2f}, Ranks: {ranks_str}, {ranking_duration:.2f} s") | |
st.write(f"{topic}, {difficulty_mapping[difficulty]}, {model_id}, Avg Rank: {cumulative_avg_rank.get(model_id, np.nan):.2f}, Avg Rank for run: {average_rank:.2f}, Ranks: {ranks_str}, {ranking_duration:.2f} s") | |
# Write iteration results to file (append mode) - write for each model right after ranking | |
iteration_results_file_opened.write(f"{iteration+1},{topic}, {difficulty_mapping[difficulty]},{question_avg_rank:.2f},{question_ranking_duration_total:.2f},{model_id},{cumulative_avg_rank.get(model_id, np.nan):.2f},{average_rank:.2f},{ranks_str},{ranking_duration:.2f}\n") | |
# Update model weights based on cumulative average ranks, handling NaNs | |
temp_weights = {} | |
total_valid_rank = 0 # Keep track of the sum of valid (non-NaN) ranks | |
for m_id in active_models: | |
# Safe access with .get() to handle missing keys | |
if m_id in cumulative_avg_rank and not np.isnan(cumulative_avg_rank.get(m_id, np.nan)): | |
temp_weights[m_id] = cumulative_avg_rank.get(m_id, 0) | |
total_valid_rank += cumulative_avg_rank.get(m_id, 0) | |
else: # if cumulative is empty or NaN, keep original | |
temp_weights[m_id] = model_weights.get(m_id, 1.0 / len(active_models)) | |
# Normalize the weights so they sum to 1, handling cases where total_valid_rank might be zero | |
if total_valid_rank > 0: | |
for m_id in temp_weights: | |
model_weights[m_id] = temp_weights[m_id] / total_valid_rank | |
else: | |
# If total_valid_rank is 0 (all models have NaN ranks), revert to equal weights | |
for m_id in active_models: | |
model_weights[m_id] = 1.0 / len(active_models) | |
iteration_results_file_opened.close() | |
print(f"Unresponsive models during this run: {unresponsive_models}") | |
return results, cumulative_avg_rank, s_t | |
def check_model_availability(models, token): | |
"""Test if models are available with the provided token""" | |
availability_results = {} | |
for model_name in models: | |
st.write(f"Testing availability of {model_name}...") | |
try: | |
# Create a simple test prompt | |
test_prompt = "Hello, are you available?" | |
# Use a short timeout to quickly test connectivity | |
client = InferenceClient(model=model_name, token=token) | |
response = client.text_generation( | |
test_prompt, | |
max_new_tokens=10, | |
temperature=0.7, | |
do_sample=True | |
) | |
availability_results[model_name] = { | |
"available": True, | |
"response": response[:50] + "..." if len(response) > 50 else response | |
} | |
st.success(f"✅ {model_name} is available") | |
except Exception as e: | |
error_msg = str(e) | |
availability_results[model_name] = { | |
"available": False, | |
"error": error_msg | |
} | |
if "401" in error_msg or "unauthorized" in error_msg.lower(): | |
st.error(f"❌ {model_name}: Authentication error. Check your API token.") | |
elif "404" in error_msg or "not found" in error_msg.lower(): | |
st.error(f"❌ {model_name}: Model not found. It may not exist or you may not have access.") | |
elif "429" in error_msg or "rate limit" in error_msg.lower(): | |
st.error(f"❌ {model_name}: Rate limit exceeded. Try again later.") | |
else: | |
st.error(f"❌ {model_name}: Unknown error: {error_msg}") | |
time.sleep(1) # Add delay between checks | |
return availability_results | |
# Streamlit UI | |
st.title("AutoBench 1.0 Demo") | |
st.write("A Collective-Model-As-Judge system that will generate a customizable LLM benchmark. AutoBench 1.0 Demo is just a simple trial version for educational purposes that implements the Hugging Face's Inference API. Please refer to the AutoBench 1.0 repository for any advanced use. \n\nChose the models you want to evaluate (at least 2). The models will rank each other against the selected topics. But, first, check if models are available (this will depend on your Hugging Face account. Premium is strongly recommended to avoid unresponsive models). Consult the README file for more details.") | |
# Setup sidebar for configuration | |
st.sidebar.header("Configuration") | |
# Add a field for the Hugging Face token | |
hf_token = st.sidebar.text_input("Hugging Face API Token", type="password", | |
help="Your Hugging Face API token to access models") | |
# Model selection | |
st.sidebar.subheader("Models") | |
available_models = [ | |
"meta-llama/Llama-3.3-70B-Instruct", | |
"meta-llama/Llama-3.1-70B-Instruct", | |
"mistralai/Mistral-7B-Instruct-v0.2", | |
"mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"Qwen/Qwen2.5-72B-Instruct", | |
"Qwen/QwQ-32B-Preview" | |
# Add more models as needed | |
] | |
selected_models = st.sidebar.multiselect( | |
"Select models to benchmark", | |
available_models, | |
default=available_models[:3] # Default to first 3 models | |
) | |
# Topic selection | |
available_topics = ["math", "logics", "grammar", "coding", "history", "current news", | |
"general culture", "science", "technology", "creative writing"] | |
selected_topics = st.sidebar.multiselect( | |
"Select topics", | |
available_topics, | |
default=available_topics | |
) | |
# Number of iterations | |
num_iterations = st.sidebar.slider("Number of iterations", 1, 20, 5) | |
# Create model_config dictionary from selected models | |
model_config = {} | |
for model in selected_models: | |
model_config[model] = {"name": model, "role": "both"} | |
if st.sidebar.button("Test Selected Models"): | |
if not hf_token: | |
st.error("Please enter your Hugging Face API token") | |
elif not selected_models: | |
st.error("Please select at least one model") | |
else: | |
with st.spinner("Testing model availability..."): | |
availability = check_model_availability(selected_models, hf_token) | |
# Show results in a table | |
availability_df = pd.DataFrame([ | |
{ | |
"Model": model, | |
"Available": info["available"], | |
"Status": "Available" if info["available"] else "Error", | |
"Details": info.get("response", "") if info["available"] else info.get("error", "") | |
} | |
for model, info in availability.items() | |
]) | |
st.dataframe(availability_df) | |
# Check if we have enough models to run the benchmark | |
available_models = [m for m, info in availability.items() if info["available"]] | |
if len(available_models) >= 2: | |
st.success(f"{len(available_models)} models are available for benchmarking") | |
else: | |
st.error("You need at least 2 available models to run the benchmark") | |
# Start benchmark button | |
if st.sidebar.button("Start Benchmark"): | |
if not hf_token: | |
st.error("Please enter your Hugging Face API token") | |
elif not selected_models: | |
st.error("Please select at least two models") | |
elif not selected_topics: | |
st.error("Please select at least one topic") | |
else: | |
# Create progress bar | |
progress_bar = st.progress(0) | |
status_text = st.empty() | |
# Setup to capture results for display | |
results_container = st.container() | |
with results_container: | |
results_placeholder = st.empty() | |
iterations_table = st.empty() | |
# Create a global variable to store intermediate results | |
if 'results_df' not in st.session_state: | |
st.session_state.results_df = pd.DataFrame() | |
# Run the benchmark | |
try: | |
# Update status | |
status_text.text("Benchmark running... For more detailed logs, check the container log (above, next to the \"running\" button).") | |
# Run benchmark and get results | |
results, cumulative_avg_rank, total_successful = run_benchmark( | |
selected_models, selected_topics, | |
["a very simple", "a simple", "a", "a difficult", "a very difficult"], | |
num_iterations, model_config, hf_token | |
) | |
# Update progress to complete | |
progress_bar.progress(100) | |
st.subheader(f"Benchmark completed! {total_successful} successful iterations.") | |
# Display results | |
if total_successful > 0: | |
results_df = pd.DataFrame(results) | |
st.session_state['results_df'] = results_df | |
# Show model rankings | |
st.subheader("Model Rankings") | |
ranking_df = pd.DataFrame({ | |
"Model": list(cumulative_avg_rank.keys()), | |
"Average Rank": [round(r, 2) if not np.isnan(r) else 'N/A' for r in cumulative_avg_rank.values()] | |
}) | |
#ranking_df = ranking_df.sort_values("Average Rank", ascending=False) | |
st.dataframe(ranking_df) # Use st.dataframe directly instead of results_placeholder.dataframe | |
# Show detailed results | |
st.subheader("Detailed Results") | |
st.dataframe(results_df) | |
# Option to download results | |
csv = results_df.to_csv(index=False) | |
st.download_button( | |
label="Download Results CSV", | |
data=csv, | |
file_name="llm_benchmark_results.csv", | |
mime="text/csv", | |
) | |
else: | |
st.warning("The benchmark did not complete any successful iterations.") | |
except Exception as e: | |
st.error(f"An error occurred: {e}") | |
st.exception(e) | |
# Show previous results if available | |
elif 'results_df' in st.session_state and not st.session_state.results_df.empty: | |
st.subheader("Previous Results") | |
st.dataframe(st.session_state.results_df) |