Spaces:
Running
Running
import pandas as pd | |
from groq import Groq, RateLimitError | |
import instructor | |
from pydantic import BaseModel | |
import os | |
# Ensure GROQ_API_KEY is set in your environment variables | |
api_key = os.getenv('GROQ_API_KEY') | |
if not api_key: | |
raise ValueError("GROQ_API_KEY environment variable not set.") | |
# Create single patched Groq client with instructor for structured output | |
# Using Mode.JSON for structured output based on Pydantic models | |
client = instructor.from_groq(Groq(api_key=api_key), mode=instructor.Mode.JSON) | |
# Pydantic model for summarization output | |
class SummaryOutput(BaseModel): | |
summary: str | |
# Pydantic model for classification output | |
class ClassificationOutput(BaseModel): | |
category: str | |
# Define model names (as per your original code) | |
PRIMARY_SUMMARIZER_MODEL = "deepseek-r1-distill-llama-70b" | |
FALLBACK_SUMMARIZER_MODEL = "llama-3.3-70b-versatile" | |
CLASSIFICATION_MODEL = "meta-llama/llama-4-maverick-17b-128e-instruct" # Or your preferred classification model | |
# Define the standard list of categories, including "None" | |
CLASSIFICATION_LABELS = [ | |
"Company Culture and Values", | |
"Employee Stories and Spotlights", | |
"Work-Life Balance, Flexibility, and Well-being", | |
"Diversity, Equity, and Inclusion (DEI)", | |
"Professional Development and Growth Opportunities", | |
"Mission, Vision, and Social Responsibility", | |
"None" # Represents no applicable category or cases where classification isn't possible | |
] | |
def summarize_post(text: str) -> str | None: | |
""" | |
Summarizes the given post text using a primary model with a fallback. | |
Returns the summary string or None if summarization fails or input is invalid. | |
""" | |
# Check for NaN, None, or empty/whitespace-only string | |
if pd.isna(text) or text is None or not str(text).strip(): | |
print("Summarizer: Input text is empty or None. Returning None.") | |
return None | |
# Truncate text to a reasonable length to avoid token overflow and reduce costs | |
processed_text = str(text)[:500] | |
prompt = f""" | |
Summarize the following LinkedIn post in 5 to 10 words. | |
Only return the summary inside a JSON field called 'summary'. | |
Post Text: | |
\"\"\"{processed_text}\"\"\" | |
""" | |
try: | |
# Attempt with primary model | |
print(f"Attempting summarization with primary model: {PRIMARY_SUMMARIZER_MODEL}") | |
response = client.chat.completions.create( | |
model=PRIMARY_SUMMARIZER_MODEL, | |
response_model=SummaryOutput, | |
messages=[ | |
{"role": "system", "content": "You are a precise summarizer. Only return a JSON object with a 'summary' string."}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=0.3 | |
) | |
return response.summary | |
except RateLimitError: | |
print(f"Rate limit hit for primary summarizer model: {PRIMARY_SUMMARIZER_MODEL}. Trying fallback: {FALLBACK_SUMMARIZER_MODEL}") | |
try: | |
# Attempt with fallback model | |
response = client.chat.completions.create( | |
model=FALLBACK_SUMMARIZER_MODEL, | |
response_model=SummaryOutput, | |
messages=[ | |
{"role": "system", "content": "You are a precise summarizer. Only return a JSON object with a 'summary' string."}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=0.3 | |
) | |
print(f"Summarization successful with fallback model: {FALLBACK_SUMMARIZER_MODEL}") | |
return response.summary | |
except RateLimitError as rle_fallback: | |
print(f"Rate limit hit for fallback summarizer model ({FALLBACK_SUMMARIZER_MODEL}): {rle_fallback}. Summarization failed.") | |
return None | |
except Exception as e_fallback: | |
print(f"Error during summarization with fallback model ({FALLBACK_SUMMARIZER_MODEL}): {e_fallback}") | |
return None | |
except Exception as e_primary: | |
print(f"Error during summarization with primary model ({PRIMARY_SUMMARIZER_MODEL}): {e_primary}") | |
# Consider if fallback should be attempted for other errors too, or just return None | |
return None | |
def classify_post(summary: str | None, labels: list[str]) -> str: | |
""" | |
Classifies the post summary into one of the provided labels. | |
Ensures the returned category is one of the labels, defaulting to "None". | |
""" | |
# If the summary is None (e.g., from a failed summarization or empty input), | |
# or if the summary is an empty string after stripping, classify as "None". | |
if pd.isna(summary) or summary is None or not str(summary).strip(): | |
print("Classifier: Input summary is empty or None. Returning 'None' category.") | |
return "None" # Return the string "None" to match the label | |
# Join labels for the prompt to ensure the LLM knows the exact expected strings | |
labels_string = "', '".join(labels) | |
prompt = f""" | |
Post Summary: "{summary}" | |
Available Categories: | |
'{labels_string}' | |
Task: Choose the single most relevant category from the list above that applies to this summary. | |
Return ONLY ONE category string in a structured JSON format under the field 'category'. | |
The category MUST be one of the following: '{labels_string}'. | |
If no specific category applies, or if you are unsure, return "None". | |
""" | |
try: | |
system_message = ( | |
f"You are a very strict classifier. Your ONLY job is to return a JSON object " | |
f"with a 'category' field. The value of 'category' MUST be one of these " | |
f"exact strings: '{labels_string}'." | |
) | |
result = client.chat.completions.create( | |
model=CLASSIFICATION_MODEL, | |
response_model=ClassificationOutput, | |
messages=[ | |
{"role": "system", "content": system_message}, | |
{"role": "user", "content": prompt} | |
], | |
temperature=0 # Temperature 0 for deterministic classification | |
) | |
returned_category = result.category | |
# Validate the output against the provided labels | |
if returned_category not in labels: | |
print(f"Warning: Classifier returned '{returned_category}', which is not in the predefined labels. Forcing to 'None'. Summary: '{summary}'") | |
return "None" # Force to "None" if the LLM returns an unexpected category | |
return returned_category | |
except Exception as e: | |
print(f"Classification error: {e}. Summary: '{summary}'. Defaulting to 'None' category.") | |
return "None" # Default to "None" on any exception during classification | |
def summarize_and_classify_post(text: str | None, labels: list[str]) -> dict: | |
""" | |
Summarizes and then classifies a single post text. | |
Handles cases where text is None or summarization fails. | |
""" | |
summary = summarize_post(text) # This can return None | |
# If summarization didn't produce a result (e.g. empty input, error), | |
# or if the summary itself is effectively empty, the category is "None". | |
if summary is None or not summary.strip(): | |
category = "None" | |
else: | |
# If we have a valid summary, try to classify it. | |
# classify_post is designed to return one of the labels or "None". | |
category = classify_post(summary, labels) | |
return { | |
"summary": summary, # This can be None | |
"category": category # This will be one of the labels or "None" | |
} | |
def batch_summarize_and_classify(posts_data: list[dict]) -> list[dict]: | |
""" | |
Processes a batch of posts, performing summarization and classification for each. | |
Expects posts_data to be a list of dictionaries, each with at least 'id' and 'text' keys. | |
Returns a list of dictionaries, each with 'id', 'summary', and 'category'. | |
""" | |
results = [] | |
if not posts_data: | |
print("Input 'posts_data' is empty. Returning empty results.") | |
return results | |
for i, post_item in enumerate(posts_data): | |
if not isinstance(post_item, dict): | |
print(f"Warning: Item at index {i} is not a dictionary. Skipping.") | |
continue | |
post_id = post_item.get("id") | |
text_to_process = post_item.get("text") # This text is passed to summarize_and_classify_post | |
print(f"\nProcessing Post ID: {post_id if post_id else 'N/A (ID missing)'}, Text: '{str(text_to_process)[:50]}...'") | |
# summarize_and_classify_post will handle None/empty text internally | |
# and ensure category is "None" in such cases. | |
summary_and_category_result = summarize_and_classify_post(text_to_process, CLASSIFICATION_LABELS) | |
results.append({ | |
"id": post_id, # Include the ID for mapping back to original data | |
"summary": summary_and_category_result["summary"], | |
"category": summary_and_category_result["category"] # This is now validated | |
}) | |
print(f"Result for Post ID {post_id}: Summary='{summary_and_category_result['summary']}', Category='{summary_and_category_result['category']}'") | |
return results |