from typing import List from typing import AnyStr from haystack import component import pandas as pd from pandasql import sqldf pd.set_option('display.max_rows', None) pd.set_option('display.max_columns', None) pd.set_option('display.width', None) pd.set_option('display.max_colwidth', None) import sqlite3 import psycopg2 from pymongo import MongoClient import pymongoarrow.monkey import json import pluck from utils import TEMP_DIR import ast @component class SQLiteQuery: def __init__(self, sql_database: str): self.connection = sqlite3.connect(sql_database, check_same_thread=False) @component.output_types(results=List[str], queries=List[str]) def run(self, queries: List[str], session_hash): print("ATTEMPTING TO RUN SQLITE QUERY") dir_path = TEMP_DIR / str(session_hash) results = [] for query in queries: result = pd.read_sql(query, self.connection) result.to_csv(f'{dir_path}/file_upload/query.csv', index=False) column_names = list(result.columns) results.append(f"{result}") self.connection.close() return {"results": results, "queries": queries, "csv_columns": column_names} @component class PostgreSQLQuery: def __init__(self, url: str, sql_port: int, sql_user: str, sql_pass: str, sql_db_name: str): self.connection = psycopg2.connect( database=sql_db_name, user=sql_user, password=sql_pass, host=url, # e.g., "localhost" or an IP address port=sql_port # default is 5432 ) @component.output_types(results=List[str], queries=List[str]) def run(self, queries: List[str], session_hash): print("ATTEMPTING TO RUN POSTGRESQL QUERY") dir_path = TEMP_DIR / str(session_hash) results = [] for query in queries: print(query) result = pd.read_sql_query(query, self.connection) result.to_csv(f'{dir_path}/sql/query.csv', index=False) column_names = list(result.columns) results.append(f"{result}") self.connection.close() return {"results": results, "queries": queries, "csv_columns": column_names} @component class DocDBQuery: def __init__(self, connection_string: str, doc_db_name: str): client = MongoClient(connection_string) self.client = client self.connection = client[doc_db_name] @component.output_types(results=List[str], queries=List[str]) def run(self, aggregation_pipeline: List[str], db_collection, session_hash): pymongoarrow.monkey.patch_all() print("ATTEMPTING TO RUN MONGODB QUERY") dir_path = TEMP_DIR / str(session_hash) results = [] print(aggregation_pipeline) aggregation_pipeline = aggregation_pipeline.replace(" ", "") false_replace = [':false', ': false'] false_value = ':False' true_replace = [':true', ': true'] true_value = ':True' for replace in false_replace: aggregation_pipeline = aggregation_pipeline.replace(replace, false_value) for replace in true_replace: aggregation_pipeline = aggregation_pipeline.replace(replace, true_value) query_list = ast.literal_eval(aggregation_pipeline) print("QUERY List") print(query_list) print(db_collection) db = self.connection collection = db[db_collection] print(collection) docs = collection.aggregate_pandas_all(query_list) print("DATA FRAME COMPLETE") docs.to_csv(f'{dir_path}/doc_db/query.csv', index=False) column_names = list(docs.columns) print("CSV COMPLETE") results.append(f"{docs}") self.client.close() return {"results": results, "queries": aggregation_pipeline, "csv_columns": column_names} @component class GraphQLQuery: def __init__(self): self.connection = pluck @component.output_types(results=List[str], queries=List[str]) def run(self, graphql_query, graphql_api_string, graphql_api_token, graphql_token_header, session_hash): print("ATTEMPTING TO RUN GRAPHQL QUERY") dir_path = TEMP_DIR / str(session_hash) results = [] headers = {"Content-Type": "application/json"} if graphql_token_header and graphql_api_token: headers[graphql_token_header] = graphql_api_token print(graphql_query) response = self.connection.execute(url=graphql_api_string, headers=headers, query=graphql_query, column_names="short") if response.errors: raise ValueError(response.errors) elif response.data: print("DATA FRAME COMPLETE") print(response) response_frame = response.frames['default'] print("RESPONSE FRAME") #print(response_frame) response_frame.to_csv(f'{dir_path}/graphql/query.csv', index=False) column_names = list(response_frame.columns) print("CSV COMPLETE") results.append(f"{response_frame}") return {"results": results, "queries": graphql_query, "csv_columns": column_names} def query_func(queries:List[str], session_hash, session_folder, args, **kwargs): try: print("QUERY") print(queries) if session_folder == "file_upload": dir_path = TEMP_DIR / str(session_hash) sql_query = SQLiteQuery(f'{dir_path}/file_upload/data_source.db') result = sql_query.run(queries, session_hash) elif session_folder == "sql": sql_query = PostgreSQLQuery(args[0], args[1], args[2], args[3], args[4]) result = sql_query.run(queries, session_hash) elif session_folder == 'doc_db': doc_db_query = DocDBQuery(args[0], args[1]) result = doc_db_query.run(queries, kwargs['db_collection'], session_hash) elif session_folder == 'graphql': graphql_object = GraphQLQuery() result = graphql_object.run(queries, args[0], args[1], args[2], session_hash) print("RESULT") print(result["csv_columns"]) if len(result["results"][0]) > 1000: print("QUERY TOO LARGE") return {"reply": f"""query result too large to be processed by llm, the query results are in our query.csv file. The column names of this query.csv file are: {result["csv_columns"]}. If you need to display the results directly, perhaps use the table_generation_func function."""} else: return {"reply": result["results"][0]} except Exception as e: reply = f"""There was an error running the {session_folder} Query = {queries} The error is {e}, You should probably try again. """ print(reply) return {"reply": reply} def graphql_schema_query(graphql_type: AnyStr, session_hash, **kwargs): dir_path = TEMP_DIR / str(session_hash) try: with open(f'{dir_path}/graphql/schema.json', 'r') as file: data = json.load(file) types_list = data["types"] result = list(filter(lambda item: item["name"] == graphql_type, types_list)) print("SCHEMA RESULT") print(graphql_type) print(str(result)) return {"reply": str(result)} except Exception as e: reply = f"""There was an error querying our schema.json file with the type:{graphql_type} The error is {e}, You should probably try again. """ print(reply) return {"reply": reply} def graphql_csv_query(csv_query: AnyStr, session_hash, **kwargs): dir_path = TEMP_DIR / str(session_hash) try: query = pd.read_csv(f'{dir_path}/graphql/query.csv') query.Name = 'query' print("GRAPHQL CSV QUERY") print(csv_query) queried_df = sqldf(csv_query, locals()) print(queried_df) column_names = list(queried_df.columns) queried_df.to_csv(f'{dir_path}/graphql/query.csv', index=False) if len(queried_df) > 1000: print("CSV QUERY TOO LARGE") return {"reply": f"""The new query results are in our query.csv file. The column names of this query.csv file are: {column_names}. If you need to display the results directly, perhaps use the table_generation_func function."""} else: return {"reply": str(queried_df)} except Exception as e: reply = f"""There was an error querying our query.csv file with the query:{csv_query} The error is {e}, You should probably try again. """ print(reply) return {"reply": reply}