virtual-data-analyst / functions /query_functions.py
nolanzandi's picture
Condense query functions
137ac45 verified
raw
history blame
8.85 kB
from typing import List
from typing import AnyStr
from haystack import component
import pandas as pd
from pandasql import sqldf
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)
import sqlite3
import psycopg2
from pymongo import MongoClient
import pymongoarrow.monkey
import json
import pluck
from utils import TEMP_DIR
import ast
@component
class SQLiteQuery:
def __init__(self, sql_database: str):
self.connection = sqlite3.connect(sql_database, check_same_thread=False)
@component.output_types(results=List[str], queries=List[str])
def run(self, queries: List[str], session_hash):
print("ATTEMPTING TO RUN SQLITE QUERY")
dir_path = TEMP_DIR / str(session_hash)
results = []
for query in queries:
result = pd.read_sql(query, self.connection)
result.to_csv(f'{dir_path}/file_upload/query.csv', index=False)
column_names = list(result.columns)
results.append(f"{result}")
self.connection.close()
return {"results": results, "queries": queries, "csv_columns": column_names}
@component
class PostgreSQLQuery:
def __init__(self, url: str, sql_port: int, sql_user: str, sql_pass: str, sql_db_name: str):
self.connection = psycopg2.connect(
database=sql_db_name,
user=sql_user,
password=sql_pass,
host=url, # e.g., "localhost" or an IP address
port=sql_port # default is 5432
)
@component.output_types(results=List[str], queries=List[str])
def run(self, queries: List[str], session_hash):
print("ATTEMPTING TO RUN POSTGRESQL QUERY")
dir_path = TEMP_DIR / str(session_hash)
results = []
for query in queries:
print(query)
result = pd.read_sql_query(query, self.connection)
result.to_csv(f'{dir_path}/sql/query.csv', index=False)
column_names = list(result.columns)
results.append(f"{result}")
self.connection.close()
return {"results": results, "queries": queries, "csv_columns": column_names}
@component
class DocDBQuery:
def __init__(self, connection_string: str, doc_db_name: str):
client = MongoClient(connection_string)
self.client = client
self.connection = client[doc_db_name]
@component.output_types(results=List[str], queries=List[str])
def run(self, aggregation_pipeline: List[str], db_collection, session_hash):
pymongoarrow.monkey.patch_all()
print("ATTEMPTING TO RUN MONGODB QUERY")
dir_path = TEMP_DIR / str(session_hash)
results = []
print(aggregation_pipeline)
aggregation_pipeline = aggregation_pipeline.replace(" ", "")
false_replace = [':false', ': false']
false_value = ':False'
true_replace = [':true', ': true']
true_value = ':True'
for replace in false_replace:
aggregation_pipeline = aggregation_pipeline.replace(replace, false_value)
for replace in true_replace:
aggregation_pipeline = aggregation_pipeline.replace(replace, true_value)
query_list = ast.literal_eval(aggregation_pipeline)
print("QUERY List")
print(query_list)
print(db_collection)
db = self.connection
collection = db[db_collection]
print(collection)
docs = collection.aggregate_pandas_all(query_list)
print("DATA FRAME COMPLETE")
docs.to_csv(f'{dir_path}/doc_db/query.csv', index=False)
column_names = list(docs.columns)
print("CSV COMPLETE")
results.append(f"{docs}")
self.client.close()
return {"results": results, "queries": aggregation_pipeline, "csv_columns": column_names}
@component
class GraphQLQuery:
def __init__(self):
self.connection = pluck
@component.output_types(results=List[str], queries=List[str])
def run(self, graphql_query, graphql_api_string, graphql_api_token, graphql_token_header, session_hash):
print("ATTEMPTING TO RUN GRAPHQL QUERY")
dir_path = TEMP_DIR / str(session_hash)
results = []
headers = {"Content-Type": "application/json"}
if graphql_token_header and graphql_api_token:
headers[graphql_token_header] = graphql_api_token
print(graphql_query)
response = self.connection.execute(url=graphql_api_string, headers=headers, query=graphql_query, column_names="short")
if response.errors:
raise ValueError(response.errors)
elif response.data:
print("DATA FRAME COMPLETE")
print(response)
response_frame = response.frames['default']
print("RESPONSE FRAME")
#print(response_frame)
response_frame.to_csv(f'{dir_path}/graphql/query.csv', index=False)
column_names = list(response_frame.columns)
print("CSV COMPLETE")
results.append(f"{response_frame}")
return {"results": results, "queries": graphql_query, "csv_columns": column_names}
def query_func(queries:List[str], session_hash, session_folder, args, **kwargs):
try:
print("QUERY")
print(queries)
if session_folder == "file_upload":
dir_path = TEMP_DIR / str(session_hash)
sql_query = SQLiteQuery(f'{dir_path}/file_upload/data_source.db')
result = sql_query.run(queries, session_hash)
elif session_folder == "sql":
sql_query = PostgreSQLQuery(args[0], args[1], args[2], args[3], args[4])
result = sql_query.run(queries, session_hash)
elif session_folder == 'doc_db':
doc_db_query = DocDBQuery(args[0], args[1])
result = doc_db_query.run(queries, kwargs['db_collection'], session_hash)
elif session_folder == 'graphql':
graphql_object = GraphQLQuery()
result = graphql_object.run(queries, args[0], args[1], args[2], session_hash)
print("RESULT")
if len(result["results"][0]) > 1000:
print("QUERY TOO LARGE")
return {"reply": f"""query result too large to be processed by llm, the query results are in our query.csv file.
The column names of this query.csv file are: {result["csv_columns"]}.
If you need to display the results directly, perhaps use the table_generation_func function."""}
else:
return {"reply": result["results"][0]}
except Exception as e:
reply = f"""There was an error running the {session_folder} Query = {queries}
The error is {e},
You should probably try again.
"""
print(reply)
return {"reply": reply}
def graphql_schema_query(graphql_type: AnyStr, session_hash, **kwargs):
dir_path = TEMP_DIR / str(session_hash)
try:
with open(f'{dir_path}/graphql/schema.json', 'r') as file:
data = json.load(file)
types_list = data["types"]
result = list(filter(lambda item: item["name"] == graphql_type, types_list))
print("SCHEMA RESULT")
print(graphql_type)
print(str(result))
return {"reply": str(result)}
except Exception as e:
reply = f"""There was an error querying our schema.json file with the type:{graphql_type}
The error is {e},
You should probably try again.
"""
print(reply)
return {"reply": reply}
def graphql_csv_query(csv_query: AnyStr, session_hash, **kwargs):
dir_path = TEMP_DIR / str(session_hash)
try:
query = pd.read_csv(f'{dir_path}/graphql/query.csv')
query.Name = 'query'
print("GRAPHQL CSV QUERY")
print(csv_query)
queried_df = sqldf(csv_query, locals())
print(queried_df)
column_names = list(queried_df.columns)
queried_df.to_csv(f'{dir_path}/graphql/query.csv', index=False)
if len(queried_df) > 1000:
print("CSV QUERY TOO LARGE")
return {"reply": f"""The new query results are in our query.csv file.
The column names of this query.csv file are: {column_names}.
If you need to display the results directly, perhaps use the table_generation_func function."""}
else:
return {"reply": str(queried_df)}
except Exception as e:
reply = f"""There was an error querying our query.csv file with the query:{csv_query}
The error is {e},
You should probably try again.
"""
print(reply)
return {"reply": reply}