import pandas as pd from groq import Groq, RateLimitError import instructor from pydantic import BaseModel import os # Ensure GROQ_API_KEY is set in your environment variables api_key = os.getenv('GROQ_API_KEY') if not api_key: raise ValueError("GROQ_API_KEY environment variable not set.") # Create single patched Groq client with instructor for structured output # Using Mode.JSON for structured output based on Pydantic models client = instructor.from_groq(Groq(api_key=api_key), mode=instructor.Mode.JSON) # Pydantic model for summarization output class SummaryOutput(BaseModel): summary: str # Pydantic model for classification output class ClassificationOutput(BaseModel): category: str # Define model names (as per your original code) PRIMARY_SUMMARIZER_MODEL = "deepseek-r1-distill-llama-70b" FALLBACK_SUMMARIZER_MODEL = "llama-3.3-70b-versatile" CLASSIFICATION_MODEL = "meta-llama/llama-4-maverick-17b-128e-instruct" # Or your preferred classification model # Define the standard list of categories, including "None" CLASSIFICATION_LABELS = [ "Company Culture and Values", "Employee Stories and Spotlights", "Work-Life Balance, Flexibility, and Well-being", "Diversity, Equity, and Inclusion (DEI)", "Professional Development and Growth Opportunities", "Mission, Vision, and Social Responsibility", "None" # Represents no applicable category or cases where classification isn't possible ] def summarize_post(text: str) -> str | None: """ Summarizes the given post text using a primary model with a fallback. Returns the summary string or None if summarization fails or input is invalid. """ # Check for NaN, None, or empty/whitespace-only string if pd.isna(text) or text is None or not str(text).strip(): print("Summarizer: Input text is empty or None. Returning None.") return None # Truncate text to a reasonable length to avoid token overflow and reduce costs processed_text = str(text)[:500] prompt = f""" Summarize the following LinkedIn post in 5 to 10 words. Only return the summary inside a JSON field called 'summary'. Post Text: \"\"\"{processed_text}\"\"\" """ try: # Attempt with primary model print(f"Attempting summarization with primary model: {PRIMARY_SUMMARIZER_MODEL}") response = client.chat.completions.create( model=PRIMARY_SUMMARIZER_MODEL, response_model=SummaryOutput, messages=[ {"role": "system", "content": "You are a precise summarizer. Only return a JSON object with a 'summary' string."}, {"role": "user", "content": prompt} ], temperature=0.3 ) return response.summary except RateLimitError: print(f"Rate limit hit for primary summarizer model: {PRIMARY_SUMMARIZER_MODEL}. Trying fallback: {FALLBACK_SUMMARIZER_MODEL}") try: # Attempt with fallback model response = client.chat.completions.create( model=FALLBACK_SUMMARIZER_MODEL, response_model=SummaryOutput, messages=[ {"role": "system", "content": "You are a precise summarizer. Only return a JSON object with a 'summary' string."}, {"role": "user", "content": prompt} ], temperature=0.3 ) print(f"Summarization successful with fallback model: {FALLBACK_SUMMARIZER_MODEL}") return response.summary except RateLimitError as rle_fallback: print(f"Rate limit hit for fallback summarizer model ({FALLBACK_SUMMARIZER_MODEL}): {rle_fallback}. Summarization failed.") return None except Exception as e_fallback: print(f"Error during summarization with fallback model ({FALLBACK_SUMMARIZER_MODEL}): {e_fallback}") return None except Exception as e_primary: print(f"Error during summarization with primary model ({PRIMARY_SUMMARIZER_MODEL}): {e_primary}") # Consider if fallback should be attempted for other errors too, or just return None return None def classify_post(summary: str | None, labels: list[str]) -> str: """ Classifies the post summary into one of the provided labels. Ensures the returned category is one of the labels, defaulting to "None". """ # If the summary is None (e.g., from a failed summarization or empty input), # or if the summary is an empty string after stripping, classify as "None". if pd.isna(summary) or summary is None or not str(summary).strip(): print("Classifier: Input summary is empty or None. Returning 'None' category.") return "None" # Return the string "None" to match the label # Join labels for the prompt to ensure the LLM knows the exact expected strings labels_string = "', '".join(labels) prompt = f""" Post Summary: "{summary}" Available Categories: '{labels_string}' Task: Choose the single most relevant category from the list above that applies to this summary. Return ONLY ONE category string in a structured JSON format under the field 'category'. The category MUST be one of the following: '{labels_string}'. If no specific category applies, or if you are unsure, return "None". """ try: system_message = ( f"You are a very strict classifier. Your ONLY job is to return a JSON object " f"with a 'category' field. The value of 'category' MUST be one of these " f"exact strings: '{labels_string}'." ) result = client.chat.completions.create( model=CLASSIFICATION_MODEL, response_model=ClassificationOutput, messages=[ {"role": "system", "content": system_message}, {"role": "user", "content": prompt} ], temperature=0 # Temperature 0 for deterministic classification ) returned_category = result.category # Validate the output against the provided labels if returned_category not in labels: print(f"Warning: Classifier returned '{returned_category}', which is not in the predefined labels. Forcing to 'None'. Summary: '{summary}'") return "None" # Force to "None" if the LLM returns an unexpected category return returned_category except Exception as e: print(f"Classification error: {e}. Summary: '{summary}'. Defaulting to 'None' category.") return "None" # Default to "None" on any exception during classification def summarize_and_classify_post(text: str | None, labels: list[str]) -> dict: """ Summarizes and then classifies a single post text. Handles cases where text is None or summarization fails. """ summary = summarize_post(text) # This can return None # If summarization didn't produce a result (e.g. empty input, error), # or if the summary itself is effectively empty, the category is "None". if summary is None or not summary.strip(): category = "None" else: # If we have a valid summary, try to classify it. # classify_post is designed to return one of the labels or "None". category = classify_post(summary, labels) return { "summary": summary, # This can be None "category": category # This will be one of the labels or "None" } def batch_summarize_and_classify(posts_data: list[dict]) -> list[dict]: """ Processes a batch of posts, performing summarization and classification for each. Expects posts_data to be a list of dictionaries, each with at least 'id' and 'text' keys. Returns a list of dictionaries, each with 'id', 'summary', and 'category'. """ results = [] if not posts_data: print("Input 'posts_data' is empty. Returning empty results.") return results for i, post_item in enumerate(posts_data): if not isinstance(post_item, dict): print(f"Warning: Item at index {i} is not a dictionary. Skipping.") continue post_id = post_item.get("id") text_to_process = post_item.get("text") # This text is passed to summarize_and_classify_post print(f"\nProcessing Post ID: {post_id if post_id else 'N/A (ID missing)'}, Text: '{str(text_to_process)[:50]}...'") # summarize_and_classify_post will handle None/empty text internally # and ensure category is "None" in such cases. summary_and_category_result = summarize_and_classify_post(text_to_process, CLASSIFICATION_LABELS) results.append({ "id": post_id, # Include the ID for mapping back to original data "summary": summary_and_category_result["summary"], "category": summary_and_category_result["category"] # This is now validated }) print(f"Result for Post ID {post_id}: Summary='{summary_and_category_result['summary']}', Category='{summary_and_category_result['category']}'") return results