Spaces:
Sleeping
Sleeping
| from src.use_llm import main_generate, get_embeddings | |
| from src.questions_queries import * | |
| import time | |
| import uuid | |
| import chromadb | |
| import spacy | |
| import numpy as np | |
| import os | |
| from src.sparql_query_wikibase import wikibase_properties_id, classes_wikibase | |
| #os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| import spacy | |
| def get_nlp(): | |
| try: | |
| return spacy.load("en_core_web_sm") | |
| except OSError: | |
| from spacy.cli import download | |
| download("en_core_web_sm") | |
| return spacy.load("en_core_web_sm") | |
| nlp = get_nlp() | |
| questions_queries_all = [{ "question": education_most_popular_question, "query": education_most_popular_query}, | |
| { "question": how_many_designers_per_fashion_house_question, "query": how_many_designers_per_fashion_house_query}, | |
| {"question": how_many_directors_per_fashion_house_question, "query": how_many_directors_per_fashion_house_query}, | |
| {"question": designers_multiple_houses_question, "query":designers_multiple_houses_query }, | |
| {"question": award_question, "query": award_question}, | |
| {"question": fashion_houses_with_collections_question, "query": fashion_houses_with_collections_query}, | |
| {"question": popular_year_inception_question, "query": popular_year_inception_query}, | |
| {"question": longest_serving_director_question, "query": longest_serving_director_query}, | |
| {"question": houses_most_collections_question, "query": houses_most_collections_query}, | |
| {"question": collections_sustainability_theme_question, "query": collections_sustainability_theme_query}, | |
| {"question": collections_jeans_question, "query": collections_jeans_query}, | |
| {"question": creative_directors_school_question, "query": creative_directors_school_query}, | |
| {"question": fashion_houses_thematic_collection_question, "query": fashion_houses_thematic_collection_query}, | |
| {"question": fashion_house_directors_question, "query": fashion_house_directors_query}, | |
| { "question": designer_fashion_house_directors_question, "query": designer_fashion_house_directors_query}, | |
| { "question": country_designer_question, "query": country_designer_query}, | |
| { "question": designer_order_fashion_collection_question, "query": designer_order_fashion_collection_query}, | |
| { "question": designer_fashion_director_question2, "query": designer_fashion_director_query2}, | |
| { "question": year_designers_birth_question, "query": year_designers_birth_query} | |
| ] | |
| if os.path.exists("web_app/query_log.json"): | |
| other_pairs = pd.read_json("web_app/query_log.json") | |
| other_pairs_success = other_pairs[(other_pairs["status"] == "Success") & (other_pairs["feedback"] == "good")] | |
| other_pairs_success = other_pairs_success[["question", "query"]] | |
| questions_queries_all = questions_queries_all + other_pairs_success.to_dict(orient='records') | |
| #print only the questions | |
| #print([q["question"] for q in questions_queries_all]) | |
| def mask_entities(text, nlp): | |
| doc = nlp(text) | |
| masked_text = text | |
| for ent in doc.ents: | |
| masked_text = masked_text.replace(ent.text, "[ENTITY]") | |
| return masked_text | |
| import re | |
| import spacy | |
| def replace_entity(original_question, to_do_question, query): | |
| """ | |
| Replaces entities in the query using entities from to_do_question while preserving quotation marks. | |
| Handles multiple entity replacements and numerical entity replacements. | |
| """ | |
| nlp = spacy.load("en_core_web_sm") | |
| original_doc = nlp(original_question) | |
| to_do_doc = nlp(to_do_question) | |
| # Extract entities from both questions | |
| original_entities = [ent.text for ent in original_doc.ents] | |
| to_do_entities = [ent.text for ent in to_do_doc.ents] | |
| # print("Original Entities:", original_entities) | |
| # print("To-Do Entities:", to_do_entities) | |
| # Create entity mapping | |
| entity_mapping = {} | |
| for orig_ent, new_ent in zip(original_entities, to_do_entities): | |
| # Find numbers in each entity | |
| orig_numbers = re.findall(r"\d+", orig_ent) | |
| new_numbers = re.findall(r"\d+", new_ent) | |
| if orig_numbers and new_numbers and len(orig_numbers) == len(new_numbers): | |
| # If multiple numbers, replace each one | |
| for orig_num, new_num in zip(orig_numbers, new_numbers): | |
| entity_mapping[orig_num] = new_num | |
| else: | |
| # Otherwise, replace entire entity | |
| entity_mapping[orig_ent] = new_ent | |
| #print("Entity Mapping:", entity_mapping) | |
| # Replace entities in the query | |
| for orig, new in entity_mapping.items(): | |
| query = re.sub(rf'("{orig}"|\b{re.escape(orig)}\b)', | |
| lambda match: f'"{new}"' if match.group(0).startswith('"') else new, | |
| query) | |
| return query | |
| def capitalize_sentences(sentences): | |
| """ | |
| Ensures that each sentence in a list starts with an uppercase letter. | |
| """ | |
| capitalized_sentences = [] | |
| for sentence in sentences: | |
| sentence = sentence.strip() # Remove leading/trailing spaces | |
| if sentence: # Check if the sentence is not empty | |
| sentence = sentence[0].upper() + sentence[1:] # Capitalize first letter | |
| capitalized_sentences.append(sentence) | |
| return capitalized_sentences | |
| def similarity_question(question, questions_queries_dictionary, collection, n_results=5, threshold=0.15): | |
| """ | |
| Removes duplicate embeddings and retrieves similar questions. | |
| """ | |
| nlp = spacy.load("en_core_web_sm") # Load spaCy model for entity recognition | |
| original_documents = [questions_queries_dictionary[i]["question"] for i in range(len(questions_queries_dictionary))] | |
| masked_documents = [mask_entities(q, nlp) for q in original_documents] | |
| # Dictionary to store unique embeddings | |
| unique_embeddings = {} | |
| # Store each unique document in the vector embedding database | |
| for i, d in enumerate(masked_documents): | |
| embedding = get_embeddings(d)[0] | |
| # Check if embedding is unique | |
| is_duplicate = any(np.allclose(embedding, np.array(e), atol=1e-6) for e in unique_embeddings.values()) | |
| if not is_duplicate: | |
| unique_embeddings[str(i)] = embedding # Store unique embedding as a list | |
| collection.add( | |
| ids=[str(i)], | |
| embeddings=[embedding], # Ensure this is a list of lists | |
| documents=[d] | |
| ) | |
| # Compute the embedding for the input question | |
| masked_question = mask_entities(question, nlp) | |
| query_embedding = get_embeddings(d)[0] | |
| results = collection.query( | |
| query_embeddings=[query_embedding], # Ensure correct format | |
| n_results=n_results | |
| ) | |
| triples = [] | |
| for i in range(len(results['documents'][0])): | |
| masked_similar_question = results['documents'][0][i] | |
| distance = results['distances'][0][i] | |
| print(distance) | |
| paraphrase = distance < threshold | |
| # Find the corresponding original question | |
| index_similar_query = masked_documents.index(masked_similar_question) | |
| original_similar_question = original_documents[index_similar_query] | |
| similar_query = questions_queries_dictionary[index_similar_query]["query"] | |
| if paraphrase and "[ENTITY]" in masked_similar_question and "[ENTITY]" in masked_question: | |
| to_do_query = replace_entity(original_similar_question, question, similar_query) | |
| else: | |
| to_do_query = None | |
| triples.append((original_similar_question, similar_query, to_do_query)) | |
| return triples | |
| def similarity_question_no_masking(question, questions_queries_dictionary, collection, n_results=5, threshold=0.15): | |
| """ | |
| Removes duplicate embeddings and retrieves similar questions. | |
| """ | |
| original_documents = [questions_queries_dictionary[i]["question"] for i in range(len(questions_queries_dictionary))] | |
| # Dictionary to store unique embeddings | |
| unique_embeddings = {} | |
| # Store each unique document in the vector embedding database | |
| for i, d in enumerate(original_documents): | |
| embedding = get_embeddings(d)[0] | |
| # Check if embedding is unique | |
| is_duplicate = any(np.allclose(embedding, np.array(e), atol=1e-6) for e in unique_embeddings.values()) | |
| if not is_duplicate: | |
| unique_embeddings[str(i)] = embedding # Store unique embedding as a list | |
| collection.add( | |
| ids=[str(i)], | |
| embeddings=[embedding], # Ensure this is a list of lists | |
| documents=[d] | |
| ) | |
| # Compute the embedding for the input question | |
| query_embedding = get_embeddings(question)[0] | |
| results = collection.query( | |
| query_embeddings=[query_embedding], # Ensure correct format | |
| n_results=n_results | |
| ) | |
| triples = [] | |
| for i in range(len(results['documents'][0])): | |
| similar_question = results['documents'][0][i] | |
| distance = results['distances'][0][i] | |
| print(distance) | |
| paraphrase = distance < threshold | |
| # Find the corresponding original question | |
| index_similar_query = original_documents.index(similar_question) | |
| original_similar_question = original_documents[index_similar_query] | |
| similar_query = questions_queries_dictionary[index_similar_query]["query"] | |
| to_do_query = similar_query if paraphrase else None | |
| triples.append((original_similar_question, similar_query, to_do_query)) | |
| return triples | |
| def select_dict(dict, keys): | |
| return {k: dict[k] for k in keys if k in dict} | |
| def prompt_template(to_do_question,triples_examples,wikibase_properties_id,how_many_examples = 1, ): | |
| questions = [triples_examples[i][0] for i in range(len(triples_examples))][:how_many_examples] | |
| print("EXAMPLE QUESTION(s): ",questions) | |
| classes_wikibase_selection = select_dict(classes_wikibase, ["fashion house", "fashion designer"]) | |
| general_properties = select_dict(wikibase_properties_id, ["instance of", "reference URL", "start time", "end time", "occupation title", "point in time", "official website"]) | |
| general_properties["rdfs:label"] = "rdfs:label" | |
| designer_properties = select_dict(wikibase_properties_id, ["employer", "educated at", "work location", "award received", "date of birth", "date of death", "place of birth", "country of citizenship", "occupation", "sex or gender"]) | |
| fashion_house_properties = select_dict(wikibase_properties_id, ["inception","headquarters location", "parent organization", "founded by","owned by", "industry", "country", "total revenue", "designer employed", "fashion collection", "description of fashion collection","image of fashion collection"]) | |
| fashion_collection_properties = select_dict(wikibase_properties_id, ["fashion show category", "fashion show location", "fashion season"]) | |
| qualifier_properties = select_dict(wikibase_properties_id, ["start time", "end time", "occupation title", "point in time","description of fashion collection","image of fashion collection"]) | |
| prompt = f"""You are an expert in translating natural language questions into SPARQL queries for FashionDB - a knwoledge graph about Fashion. | |
| I provide you with the ontology of FashionDB. The properties are stored in a dictionary as property_label: property_id. The classes are stored in a dictionary as class_label: class_id. | |
| General Properties: {general_properties}, Fashion Designer Properties: {designer_properties}, Fashion House Properties: {fashion_house_properties}, Fashion Collection Properties: {fashion_collection_properties}. | |
| In particular the following properties are always qualifiers thus their prefix is always pq: {qualifier_properties}. | |
| Classes: {classes_wikibase_selection}. | |
| Remember to use the entities presented in Natural language question to translate , when generating the corresponding SPARQL query. | |
| I provide you with example.""" | |
| for i in range(len(questions)): | |
| prompt += f""" Example question: {triples_examples[i][0]} | |
| Corresponding SPARQL query:{triples_examples[i][1]} """ | |
| prompt += f""" Question to translate to SPARQL: {to_do_question} | |
| Remember that the use case is FASHION: if there is a mispelling of a fashion designer or house, you can adjust it according to your knowledge of fashion. Example: "balenciaho" should be "Balenciaga". | |
| Your generated corresponding SPARQL query: """ | |
| return prompt | |
| def prompt_template_gemma2(to_do_question, triples_examples, wikibase_properties_id, how_many_examples=1): | |
| questions = [triples_examples[i][0] for i in range(len(triples_examples))][:how_many_examples] | |
| print("EXAMPLE QUESTION(s): ",questions) | |
| classes_wikibase_selection = select_dict(classes_wikibase, ["fashion house", "fashion designer"]) | |
| general_properties = select_dict(wikibase_properties_id, ["instance of", "reference URL", "start time", "end time", "occupation title", "point in time", "official website"]) | |
| general_properties["rdfs:label"] = "rdfs:label" | |
| designer_properties = select_dict(wikibase_properties_id, ["employer", "educated at", "work location", "award received", "date of birth", "date of death", "place of birth", "country of citizenship", "occupation", "sex or gender"]) | |
| fashion_house_properties = select_dict(wikibase_properties_id, ["inception", "headquarters location", "parent organization", "founded by", "owned by", "industry", "country", "total revenue", "designer employed", "fashion collection", "description of fashion collection", "image of fashion collection"]) | |
| fashion_collection_properties = select_dict(wikibase_properties_id, ["fashion show category", "fashion show location", "fashion season"]) | |
| qualifier_properties = select_dict(wikibase_properties_id, ["start time", "end time", "occupation title", "point in time", "description of fashion collection", "image of fashion collection"]) | |
| prompt = f""" | |
| You are an expert in translating natural language fashion-related questions into **SPARQL queries** for **FashionDB**, a knowledge graph about fashion. | |
| --- | |
| ## **FashionDB Ontology** | |
| - **Classes**: {classes_wikibase_selection} | |
| - **General Properties**: {general_properties} | |
| - **Fashion Designer Properties**: {designer_properties} | |
| - **Fashion House Properties**: {fashion_house_properties} | |
| - **Fashion Collection Properties**: {fashion_collection_properties} | |
| - **Qualifier Properties** (always prefixed with `pq:`): {qualifier_properties} | |
| --- | |
| ## **Instructions** | |
| - **Fix misspellings** of fashion brands and designers before generating the query. | |
| - Example: "Guxci" → **"Gucci"**, "Balenciaho" → **"Balenciaga"**. | |
| - If a brand or designer **isn't recognized**, **make a reasonable correction** based on common fashion knowledge. | |
| - Handle **abstract or conceptual fashion questions**, such as: | |
| - "Which fashion houses have had the most influence in the 20th century?" | |
| - "What are the key design trends in haute couture from the 1990s?" | |
| - **Always return a valid SPARQL query** using the provided ontology. | |
| --- | |
| ## **Example(s)** | |
| """ | |
| for i in range(len(questions)): | |
| prompt += f""" | |
| **Example {i+1}** | |
| - **Question**: {triples_examples[i][0]} | |
| - **SPARQL Query**: | |
| ```sparql | |
| {triples_examples[i][1]} | |
| ``` | |
| """ | |
| prompt += f""" | |
| --- | |
| ## **Your Task** | |
| **Question**: {to_do_question} | |
| **SPARQL Query:** | |
| ```sparql | |
| """ | |
| return prompt | |
| def prompt_template_gpt4o_mini(to_do_question, triples_examples, wikibase_properties_id, how_many_examples=1): | |
| questions = [triples_examples[i][0] for i in range(len(triples_examples))][:how_many_examples] | |
| classes_wikibase_selection = select_dict(classes_wikibase, ["fashion house", "fashion designer"]) | |
| general_properties = select_dict(wikibase_properties_id, ["instance of", "reference URL", "start time", "end time", "occupation title", "point in time", "official website"]) | |
| general_properties["rdfs:label"] = "rdfs:label" | |
| designer_properties = select_dict(wikibase_properties_id, ["employer", "educated at", "work location", "award received", "date of birth", "date of death", "place of birth", "country of citizenship", "occupation", "sex or gender"]) | |
| fashion_house_properties = select_dict(wikibase_properties_id, ["inception", "headquarters location", "parent organization", "founded by", "owned by", "industry", "country", "total revenue", "designer employed", "fashion collection", "description of fashion collection", "image of fashion collection"]) | |
| fashion_collection_properties = select_dict(wikibase_properties_id, ["fashion show category", "fashion show location", "fashion season"]) | |
| qualifier_properties = select_dict(wikibase_properties_id, ["start time", "end time", "occupation title", "point in time", "description of fashion collection", "image of fashion collection"]) | |
| prompt = f""" | |
| You are a **SPARQL expert** specializing in **FashionDB**, a knowledge graph about fashion. | |
| ### **Your Task** | |
| - Translate the given **natural language question** into a **valid SPARQL query**. | |
| - **Fix spelling mistakes** of fashion brands and designers. | |
| - Example: "Guxci" → "Gucci", "Balenciaho" → "Balenciaga". | |
| - If a brand or designer isn't recognized, **guess the correct name** based on fashion industry knowledge. | |
| - Support **abstract fashion questions**, such as: | |
| - "How did Dior's designs evolve over the decades?" | |
| - "Which fashion houses had the biggest impact on 21st-century streetwear?" | |
| - Your **SPARQL query must use the correct ontology**. | |
| --- | |
| ### **FashionDB Ontology** | |
| - **Classes**: {classes_wikibase_selection} | |
| - **General Properties**: {general_properties} | |
| - **Fashion Designer Properties**: {designer_properties} | |
| - **Fashion House Properties**: {fashion_house_properties} | |
| - **Fashion Collection Properties**: {fashion_collection_properties} | |
| - **Qualifier Properties (always prefixed with `pq:`)**: {qualifier_properties} | |
| --- | |
| ### **Example(s)** | |
| """ | |
| for i in range(len(questions)): | |
| prompt += f""" | |
| **Example {i+1}** | |
| - **Question**: {triples_examples[i][0]} | |
| - **SPARQL Query**: | |
| ```sparql | |
| {triples_examples[i][1]} | |
| ``` | |
| """ | |
| prompt += f""" | |
| --- | |
| ### **Now Translate This Question** | |
| **Question**: {to_do_question} | |
| **SPARQL Query:** | |
| ```sparql | |
| """ | |
| return prompt | |
| #validate | |
| def replace_last_occurrence(s, pattern, replacement): | |
| pos = s.rfind(pattern) # Find the last occurrence of the pattern | |
| if pos != -1: | |
| return s[:pos] + s[pos:].replace(pattern, replacement, 1) | |
| def validation_query(sparql_query): | |
| if sparql_query.startswith("sparql"): | |
| sparql_query = sparql_query[6:] | |
| #if last character is \n remove it | |
| while sparql_query[-1] == "\n" or sparql_query[-1] == " ": | |
| sparql_query = sparql_query[:-1] | |
| if sparql_query[-1] == ".": | |
| sparql_query = sparql_query[:-1] | |
| sparql_query = sparql_query.encode().decode('unicode_escape') | |
| sparql_query = sparql_query.replace("wdt", "wbt") | |
| if "SERVICE" not in sparql_query: | |
| sparql_query = replace_last_occurrence(sparql_query, "}", "SERVICE wikibase:label { bd:serviceParam wikibase:language 'en'. } \n }") | |
| return sparql_query | |
| def safe_get_results(query, max_retries=3): | |
| """ | |
| Safely executes a SPARQL query, handling HTTP errors gracefully. | |
| Parameters: | |
| - query (str): The SPARQL query to execute. | |
| - max_retries (int): Number of retries before failing. | |
| Returns: | |
| - DataFrame: Query results, or an empty DataFrame if the query fails. | |
| """ | |
| for attempt in range(max_retries): | |
| try: | |
| return get_results_to_df(query) # Attempt to execute the query | |
| except requests.exceptions.HTTPError as e: | |
| print(f"Attempt {attempt + 1}: Query failed with HTTPError {e}") | |
| time.sleep(2) # Wait before retrying | |
| except Exception as e: | |
| print(f"Attempt {attempt + 1}: Unexpected error {e}") | |
| time.sleep(2) | |
| print("All attempts failed. Returning empty DataFrame.") | |
| return pd.DataFrame() # Return empty DataFrame if all retries fail | |
| def correction_question_prompt(to_do_question): | |
| correction_prompt = f""" | |
| You are an expert in **fashion brand and designer names**. | |
| Your task is to **correct misspellings** in the given question while keeping its original meaning. | |
| If you recognize a fashion-related name that is misspelled, **fix it**. | |
| If nothing is wrong, generate the Question to Correct. | |
| Don't generate **. | |
| ### **Examples** | |
| - "Who founded Guxci?" → "Who founded Gucci?" | |
| - "What is balenciaho famous for?" → "What is Balenciaga famous for?" | |
| - "Who is the head designer of gucxi?" → "Who is the head designer of Gucci?" | |
| ### **Question to Correct** | |
| {to_do_question} | |
| ### **Corrected Version** | |
| """ | |
| return correction_prompt | |
| def initialize_collection(): | |
| # Initialize ChromaDB client | |
| client = chromadb.Client() | |
| # If the collection already exists, delete it to start fresh. | |
| try: | |
| client.delete_collection(name="docs") # Delete the existing collection | |
| except: | |
| pass | |
| # Re-create the collection for each query | |
| collection = client.create_collection(name="docs") | |
| return collection | |
| def main_generate_queries(to_do_question): | |
| # # Initialize ChromaDB client | |
| # client = chromadb.Client() | |
| # # If the collection already exists, delete it to start fresh. | |
| # try: | |
| # client.delete_collection(name="docs") # Delete the existing collection | |
| # except: | |
| # pass | |
| # # Re-create the collection for each query | |
| # collection = client.create_collection(name="docs") | |
| collection = initialize_collection() | |
| triples_examples = similarity_question(to_do_question, questions_queries_all, collection) | |
| if triples_examples[0][2] is not None: | |
| print("it's a paraphrase :)") | |
| sparql_query = triples_examples[0][2] | |
| print(triples_examples[0][0]) | |
| result_query = safe_get_results(sparql_query) | |
| if result_query.empty: | |
| to_do_question = main_generate(correction_question_prompt(to_do_question), "gemma2", "You have to fix the mispellings of the Question to Correct") | |
| print(to_do_question) | |
| sparql_query = replace_entity(triples_examples[0][0], to_do_question, triples_examples[0][1]) | |
| result_query = safe_get_results(sparql_query) | |
| print(sparql_query) | |
| if not result_query.empty: | |
| return result_query.to_dict(orient='records'), sparql_query | |
| prompt = prompt_template_gemma2(to_do_question, triples_examples, wikibase_properties_id, how_many_examples=1) | |
| sparql_query = main_generate(prompt, "gemma2", "You are a natural language to SPARQL language translator. Do only generate the SPARQL query, nothing else.") | |
| sparql_query = validation_query(sparql_query) | |
| result_query = safe_get_results(sparql_query) | |
| print(sparql_query) | |
| if result_query.empty: | |
| to_do_question = main_generate(correction_question_prompt(to_do_question), "gemma2", "You have to fix the mispellings of the Question to Correct") | |
| print(to_do_question) | |
| prompt = prompt_template_gemma2(to_do_question, triples_examples, wikibase_properties_id, how_many_examples=2) | |
| sparql_query = main_generate(prompt, "gemma2", "You are a natural language to SPARQL language translator. Do only generate the SPARQL query, nothing else.") | |
| sparql_query = validation_query(sparql_query) | |
| result_query = safe_get_results(sparql_query) | |
| if result_query.empty: | |
| new_collection = initialize_collection() | |
| triples_examples_no_masked = similarity_question_no_masking(to_do_question, questions_queries_all, new_collection) | |
| prompt = prompt_template_gemma2(to_do_question, triples_examples_no_masked, wikibase_properties_id, how_many_examples=2) | |
| sparql_query = main_generate(prompt, "gemma2", "You are a natural language to SPARQL language translator. Do only generate the SPARQL query, nothing else.") | |
| sparql_query = validation_query(sparql_query) | |
| result_query = safe_get_results(sparql_query) | |
| print(sparql_query) | |
| if result_query.empty: | |
| text_generated = main_generate(to_do_question, "gemma2", "You are an expert in fashion. Just provide the answer to the question.") | |
| return text_generated, sparql_query | |
| print(sparql_query) | |
| print(result_query) | |
| return result_query.to_dict(orient='records'), sparql_query | |
| # #main("What is the inception of Chanel?") | |
| # if __name__ == "__main__": | |
| # #main("Which fashion designers being creative directors were born in Italy?") | |
| # #main_generate_queries("Which fashion houses had collections with jeans in their descriptions and how many of the collections have jeans?") | |
| # main_generate_queries("Which designers were born in 1970?") | |