|
from typing import List
|
|
from typing import AnyStr
|
|
from haystack import component
|
|
import pandas as pd
|
|
from pandasql import sqldf
|
|
pd.set_option('display.max_rows', None)
|
|
pd.set_option('display.max_columns', None)
|
|
pd.set_option('display.width', None)
|
|
pd.set_option('display.max_colwidth', None)
|
|
import sqlite3
|
|
import psycopg2
|
|
from pymongo import MongoClient
|
|
import pymongoarrow.monkey
|
|
import json
|
|
import pluck
|
|
from utils import TEMP_DIR
|
|
import ast
|
|
|
|
@component
|
|
class SQLiteQuery:
|
|
|
|
def __init__(self, sql_database: str):
|
|
self.connection = sqlite3.connect(sql_database, check_same_thread=False)
|
|
|
|
@component.output_types(results=List[str], queries=List[str])
|
|
def run(self, queries: List[str], session_hash):
|
|
print("ATTEMPTING TO RUN SQLITE QUERY")
|
|
dir_path = TEMP_DIR / str(session_hash)
|
|
results = []
|
|
for query in queries:
|
|
result = pd.read_sql(query, self.connection)
|
|
result.to_csv(f'{dir_path}/file_upload/query.csv', index=False)
|
|
column_names = list(result.columns)
|
|
results.append(f"{result}")
|
|
self.connection.close()
|
|
return {"results": results, "queries": queries, "csv_columns": column_names}
|
|
|
|
@component
|
|
class PostgreSQLQuery:
|
|
|
|
def __init__(self, url: str, sql_port: int, sql_user: str, sql_pass: str, sql_db_name: str):
|
|
self.connection = psycopg2.connect(
|
|
database=sql_db_name,
|
|
user=sql_user,
|
|
password=sql_pass,
|
|
host=url,
|
|
port=sql_port
|
|
)
|
|
|
|
@component.output_types(results=List[str], queries=List[str])
|
|
def run(self, queries: List[str], session_hash):
|
|
print("ATTEMPTING TO RUN POSTGRESQL QUERY")
|
|
dir_path = TEMP_DIR / str(session_hash)
|
|
results = []
|
|
for query in queries:
|
|
print(query)
|
|
result = pd.read_sql_query(query, self.connection)
|
|
result.to_csv(f'{dir_path}/sql/query.csv', index=False)
|
|
column_names = list(result.columns)
|
|
results.append(f"{result}")
|
|
self.connection.close()
|
|
return {"results": results, "queries": queries, "csv_columns": column_names}
|
|
|
|
@component
|
|
class DocDBQuery:
|
|
|
|
def __init__(self, connection_string: str, doc_db_name: str):
|
|
client = MongoClient(connection_string)
|
|
|
|
self.client = client
|
|
self.connection = client[doc_db_name]
|
|
|
|
@component.output_types(results=List[str], queries=List[str])
|
|
def run(self, aggregation_pipeline: List[str], db_collection, session_hash):
|
|
pymongoarrow.monkey.patch_all()
|
|
print("ATTEMPTING TO RUN MONGODB QUERY")
|
|
dir_path = TEMP_DIR / str(session_hash)
|
|
results = []
|
|
print(aggregation_pipeline)
|
|
|
|
aggregation_pipeline = aggregation_pipeline.replace(" ", "")
|
|
|
|
false_replace = [':false', ': false']
|
|
false_value = ':False'
|
|
true_replace = [':true', ': true']
|
|
true_value = ':True'
|
|
|
|
for replace in false_replace:
|
|
aggregation_pipeline = aggregation_pipeline.replace(replace, false_value)
|
|
for replace in true_replace:
|
|
aggregation_pipeline = aggregation_pipeline.replace(replace, true_value)
|
|
|
|
query_list = ast.literal_eval(aggregation_pipeline)
|
|
|
|
print("QUERY List")
|
|
print(query_list)
|
|
print(db_collection)
|
|
|
|
db = self.connection
|
|
collection = db[db_collection]
|
|
|
|
print(collection)
|
|
docs = collection.aggregate_pandas_all(query_list)
|
|
print("DATA FRAME COMPLETE")
|
|
docs.to_csv(f'{dir_path}/doc_db/query.csv', index=False)
|
|
column_names = list(docs.columns)
|
|
print("CSV COMPLETE")
|
|
results.append(f"{docs}")
|
|
self.client.close()
|
|
return {"results": results, "queries": aggregation_pipeline, "csv_columns": column_names}
|
|
|
|
@component
|
|
class GraphQLQuery:
|
|
|
|
def __init__(self):
|
|
|
|
self.connection = pluck
|
|
|
|
@component.output_types(results=List[str], queries=List[str])
|
|
def run(self, graphql_query, graphql_api_string, graphql_api_token, graphql_token_header, session_hash):
|
|
print("ATTEMPTING TO RUN GRAPHQL QUERY")
|
|
dir_path = TEMP_DIR / str(session_hash)
|
|
results = []
|
|
|
|
headers = {"Content-Type": "application/json"}
|
|
if graphql_token_header and graphql_api_token:
|
|
headers[graphql_token_header] = graphql_api_token
|
|
|
|
print(graphql_query)
|
|
|
|
response = self.connection.execute(url=graphql_api_string, headers=headers, query=graphql_query, column_names="short")
|
|
|
|
if response.errors:
|
|
raise ValueError(response.errors)
|
|
elif response.data:
|
|
print("DATA FRAME COMPLETE")
|
|
print(response)
|
|
response_frame = response.frames['default']
|
|
print("RESPONSE FRAME")
|
|
|
|
|
|
response_frame.to_csv(f'{dir_path}/graphql/query.csv', index=False)
|
|
column_names = list(response_frame.columns)
|
|
print("CSV COMPLETE")
|
|
results.append(f"{response_frame}")
|
|
return {"results": results, "queries": graphql_query, "csv_columns": column_names}
|
|
|
|
def query_func(queries:List[str], session_hash, session_folder, args, **kwargs):
|
|
try:
|
|
print("QUERY")
|
|
print(queries)
|
|
if session_folder == "file_upload":
|
|
dir_path = TEMP_DIR / str(session_hash)
|
|
sql_query = SQLiteQuery(f'{dir_path}/file_upload/data_source.db')
|
|
result = sql_query.run(queries, session_hash)
|
|
elif session_folder == "sql":
|
|
sql_query = PostgreSQLQuery(args[0], args[1], args[2], args[3], args[4])
|
|
result = sql_query.run(queries, session_hash)
|
|
elif session_folder == 'doc_db':
|
|
doc_db_query = DocDBQuery(args[0], args[1])
|
|
result = doc_db_query.run(queries, kwargs['db_collection'], session_hash)
|
|
elif session_folder == 'graphql':
|
|
graphql_object = GraphQLQuery()
|
|
result = graphql_object.run(queries, args[0], args[1], args[2], session_hash)
|
|
print("RESULT")
|
|
print(result["csv_columns"])
|
|
if len(result["results"][0]) > 1000:
|
|
print("QUERY TOO LARGE")
|
|
return {"reply": f"""query result too large to be processed by llm, the query results are in our query.csv file.
|
|
The column names of this query.csv file are: {result["csv_columns"]}.
|
|
If you need to display the results directly, perhaps use the table_generation_func function."""}
|
|
else:
|
|
return {"reply": result["results"][0]}
|
|
|
|
except Exception as e:
|
|
reply = f"""There was an error running the {session_folder} Query = {queries}
|
|
The error is {e},
|
|
You should probably try again.
|
|
"""
|
|
print(reply)
|
|
return {"reply": reply}
|
|
|
|
def graphql_schema_query(graphql_type: AnyStr, session_hash, **kwargs):
|
|
dir_path = TEMP_DIR / str(session_hash)
|
|
try:
|
|
with open(f'{dir_path}/graphql/schema.json', 'r') as file:
|
|
data = json.load(file)
|
|
|
|
types_list = data["types"]
|
|
result = list(filter(lambda item: item["name"] == graphql_type, types_list))
|
|
|
|
print("SCHEMA RESULT")
|
|
print(graphql_type)
|
|
print(str(result))
|
|
|
|
return {"reply": str(result)}
|
|
|
|
except Exception as e:
|
|
reply = f"""There was an error querying our schema.json file with the type:{graphql_type}
|
|
The error is {e},
|
|
You should probably try again.
|
|
"""
|
|
print(reply)
|
|
return {"reply": reply}
|
|
|
|
def graphql_csv_query(csv_query: AnyStr, session_hash, **kwargs):
|
|
dir_path = TEMP_DIR / str(session_hash)
|
|
try:
|
|
query = pd.read_csv(f'{dir_path}/graphql/query.csv')
|
|
query.Name = 'query'
|
|
print("GRAPHQL CSV QUERY")
|
|
print(csv_query)
|
|
queried_df = sqldf(csv_query, locals())
|
|
print(queried_df)
|
|
column_names = list(queried_df.columns)
|
|
queried_df.to_csv(f'{dir_path}/graphql/query.csv', index=False)
|
|
|
|
if len(queried_df) > 1000:
|
|
print("CSV QUERY TOO LARGE")
|
|
return {"reply": f"""The new query results are in our query.csv file.
|
|
The column names of this query.csv file are: {column_names}.
|
|
If you need to display the results directly, perhaps use the table_generation_func function."""}
|
|
else:
|
|
return {"reply": str(queried_df)}
|
|
|
|
except Exception as e:
|
|
reply = f"""There was an error querying our query.csv file with the query:{csv_query}
|
|
The error is {e},
|
|
You should probably try again.
|
|
"""
|
|
print(reply)
|
|
return {"reply": reply} |