Spaces:
Running
Running
| import pandas as pd | |
| from groq import Groq, RateLimitError | |
| import instructor | |
| from pydantic import BaseModel | |
| import os | |
| # Ensure GROQ_API_KEY is set in your environment variables | |
| api_key = os.getenv('GROQ_API_KEY') | |
| if not api_key: | |
| raise ValueError("GROQ_API_KEY environment variable not set.") | |
| # Create single patched Groq client with instructor for structured output | |
| # Using Mode.JSON for structured output based on Pydantic models | |
| client = instructor.from_groq(Groq(api_key=api_key), mode=instructor.Mode.JSON) | |
| # Pydantic model for summarization output | |
| class SummaryOutput(BaseModel): | |
| summary: str | |
| # Pydantic model for classification output | |
| class ClassificationOutput(BaseModel): | |
| category: str | |
| # Define model names (as per your original code) | |
| PRIMARY_SUMMARIZER_MODEL = "deepseek-r1-distill-llama-70b" | |
| FALLBACK_SUMMARIZER_MODEL = "llama-3.3-70b-versatile" | |
| CLASSIFICATION_MODEL = "meta-llama/llama-4-maverick-17b-128e-instruct" # Or your preferred classification model | |
| # Define the standard list of categories, including "None" | |
| CLASSIFICATION_LABELS = [ | |
| "Company Culture and Values", | |
| "Employee Stories and Spotlights", | |
| "Work-Life Balance, Flexibility, and Well-being", | |
| "Diversity, Equity, and Inclusion (DEI)", | |
| "Professional Development and Growth Opportunities", | |
| "Mission, Vision, and Social Responsibility", | |
| "None" # Represents no applicable category or cases where classification isn't possible | |
| ] | |
| def summarize_post(text: str) -> str | None: | |
| """ | |
| Summarizes the given post text using a primary model with a fallback. | |
| Returns the summary string or None if summarization fails or input is invalid. | |
| """ | |
| # Check for NaN, None, or empty/whitespace-only string | |
| if pd.isna(text) or text is None or not str(text).strip(): | |
| print("Summarizer: Input text is empty or None. Returning None.") | |
| return None | |
| # Truncate text to a reasonable length to avoid token overflow and reduce costs | |
| processed_text = str(text)[:500] | |
| prompt = f""" | |
| Summarize the following LinkedIn post in 5 to 10 words. | |
| Only return the summary inside a JSON field called 'summary'. | |
| Post Text: | |
| \"\"\"{processed_text}\"\"\" | |
| """ | |
| try: | |
| # Attempt with primary model | |
| print(f"Attempting summarization with primary model: {PRIMARY_SUMMARIZER_MODEL}") | |
| response = client.chat.completions.create( | |
| model=PRIMARY_SUMMARIZER_MODEL, | |
| response_model=SummaryOutput, | |
| messages=[ | |
| {"role": "system", "content": "You are a precise summarizer. Only return a JSON object with a 'summary' string."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.3 | |
| ) | |
| return response.summary | |
| except RateLimitError: | |
| print(f"Rate limit hit for primary summarizer model: {PRIMARY_SUMMARIZER_MODEL}. Trying fallback: {FALLBACK_SUMMARIZER_MODEL}") | |
| try: | |
| # Attempt with fallback model | |
| response = client.chat.completions.create( | |
| model=FALLBACK_SUMMARIZER_MODEL, | |
| response_model=SummaryOutput, | |
| messages=[ | |
| {"role": "system", "content": "You are a precise summarizer. Only return a JSON object with a 'summary' string."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.3 | |
| ) | |
| print(f"Summarization successful with fallback model: {FALLBACK_SUMMARIZER_MODEL}") | |
| return response.summary | |
| except RateLimitError as rle_fallback: | |
| print(f"Rate limit hit for fallback summarizer model ({FALLBACK_SUMMARIZER_MODEL}): {rle_fallback}. Summarization failed.") | |
| return None | |
| except Exception as e_fallback: | |
| print(f"Error during summarization with fallback model ({FALLBACK_SUMMARIZER_MODEL}): {e_fallback}") | |
| return None | |
| except Exception as e_primary: | |
| print(f"Error during summarization with primary model ({PRIMARY_SUMMARIZER_MODEL}): {e_primary}") | |
| # Consider if fallback should be attempted for other errors too, or just return None | |
| return None | |
| def classify_post(summary: str | None, labels: list[str]) -> str: | |
| """ | |
| Classifies the post summary into one of the provided labels. | |
| Ensures the returned category is one of the labels, defaulting to "None". | |
| """ | |
| # If the summary is None (e.g., from a failed summarization or empty input), | |
| # or if the summary is an empty string after stripping, classify as "None". | |
| if pd.isna(summary) or summary is None or not str(summary).strip(): | |
| print("Classifier: Input summary is empty or None. Returning 'None' category.") | |
| return "None" # Return the string "None" to match the label | |
| # Join labels for the prompt to ensure the LLM knows the exact expected strings | |
| labels_string = "', '".join(labels) | |
| prompt = f""" | |
| Post Summary: "{summary}" | |
| Available Categories: | |
| '{labels_string}' | |
| Task: Choose the single most relevant category from the list above that applies to this summary. | |
| Return ONLY ONE category string in a structured JSON format under the field 'category'. | |
| The category MUST be one of the following: '{labels_string}'. | |
| If no specific category applies, or if you are unsure, return "None". | |
| """ | |
| try: | |
| system_message = ( | |
| f"You are a very strict classifier. Your ONLY job is to return a JSON object " | |
| f"with a 'category' field. The value of 'category' MUST be one of these " | |
| f"exact strings: '{labels_string}'." | |
| ) | |
| result = client.chat.completions.create( | |
| model=CLASSIFICATION_MODEL, | |
| response_model=ClassificationOutput, | |
| messages=[ | |
| {"role": "system", "content": system_message}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0 # Temperature 0 for deterministic classification | |
| ) | |
| returned_category = result.category | |
| # Validate the output against the provided labels | |
| if returned_category not in labels: | |
| print(f"Warning: Classifier returned '{returned_category}', which is not in the predefined labels. Forcing to 'None'. Summary: '{summary}'") | |
| return "None" # Force to "None" if the LLM returns an unexpected category | |
| return returned_category | |
| except Exception as e: | |
| print(f"Classification error: {e}. Summary: '{summary}'. Defaulting to 'None' category.") | |
| return "None" # Default to "None" on any exception during classification | |
| def summarize_and_classify_post(text: str | None, labels: list[str]) -> dict: | |
| """ | |
| Summarizes and then classifies a single post text. | |
| Handles cases where text is None or summarization fails. | |
| """ | |
| summary = summarize_post(text) # This can return None | |
| # If summarization didn't produce a result (e.g. empty input, error), | |
| # or if the summary itself is effectively empty, the category is "None". | |
| if summary is None or not summary.strip(): | |
| category = "None" | |
| else: | |
| # If we have a valid summary, try to classify it. | |
| # classify_post is designed to return one of the labels or "None". | |
| category = classify_post(summary, labels) | |
| return { | |
| "summary": summary, # This can be None | |
| "category": category # This will be one of the labels or "None" | |
| } | |
| def batch_summarize_and_classify(posts_data: list[dict]) -> list[dict]: | |
| """ | |
| Processes a batch of posts, performing summarization and classification for each. | |
| Expects posts_data to be a list of dictionaries, each with at least 'id' and 'text' keys. | |
| Returns a list of dictionaries, each with 'id', 'summary', and 'category'. | |
| """ | |
| results = [] | |
| if not posts_data: | |
| print("Input 'posts_data' is empty. Returning empty results.") | |
| return results | |
| for i, post_item in enumerate(posts_data): | |
| if not isinstance(post_item, dict): | |
| print(f"Warning: Item at index {i} is not a dictionary. Skipping.") | |
| continue | |
| post_id = post_item.get("id") | |
| text_to_process = post_item.get("text") # This text is passed to summarize_and_classify_post | |
| print(f"\nProcessing Post ID: {post_id if post_id else 'N/A (ID missing)'}, Text: '{str(text_to_process)[:50]}...'") | |
| # summarize_and_classify_post will handle None/empty text internally | |
| # and ensure category is "None" in such cases. | |
| summary_and_category_result = summarize_and_classify_post(text_to_process, CLASSIFICATION_LABELS) | |
| results.append({ | |
| "id": post_id, # Include the ID for mapping back to original data | |
| "summary": summary_and_category_result["summary"], | |
| "category": summary_and_category_result["category"] # This is now validated | |
| }) | |
| print(f"Result for Post ID {post_id}: Summary='{summary_and_category_result['summary']}', Category='{summary_and_category_result['category']}'") | |
| return results |