Spaces:
Sleeping
Sleeping
## Utility Functions | |
## Note: edit ~/.bigqueryrc to set global settings for bq command line tool | |
using CSV, DataFrames, JSON3 | |
function read_json(file_path::String) | |
json_data = JSON3.read(open(file_path, "r")) | |
return json_data | |
end | |
""" | |
## ostreacultura_bq_auth() | |
- Activate the service account using the credentials file | |
""" | |
function ostreacultura_bq_auth() | |
if isfile("ostreacultura-credentials.json") | |
run(`gcloud auth activate-service-account --key-file=ostreacultura-credentials.json`) | |
else | |
println("Credentials file not found") | |
end | |
end | |
""" | |
## julia_to_bq_type(julia_type::DataType) | |
- Map Julia types to BigQuery types | |
Arguments: | |
- julia_type: The Julia data type to map | |
Returns: | |
- The corresponding BigQuery type as a string | |
""" | |
function julia_to_bq_type(julia_type::DataType) | |
if julia_type == String | |
return "STRING" | |
elseif julia_type == Int64 | |
return "INTEGER" | |
elseif julia_type == Float64 | |
return "FLOAT" | |
elseif julia_type <: AbstractArray{Float64} | |
return "FLOAT64" | |
elseif julia_type <: AbstractArray{Int64} | |
return "INTEGER" | |
else | |
return "STRING" | |
end | |
end | |
""" | |
## create_bq_schema(df::DataFrame) | |
- Create a BigQuery schema from a DataFrame | |
Arguments: | |
- df: The DataFrame to create the schema from | |
Returns: | |
- The schema as a string in BigQuery format | |
Example: | |
df = DataFrame(text = ["Alice", "Bob"], embed = [rand(3), rand(3)]) | |
create_bq_schema(df) | |
""" | |
function create_bq_schema(df::DataFrame) | |
schema = [] | |
for col in names(df) | |
if eltype(df[!, col]) <: AbstractArray | |
push!(schema, Dict("name" => col, "type" => "FLOAT64", "mode" => "REPEATED")) | |
else | |
push!(schema, Dict("name" => col, "type" => julia_to_bq_type(eltype(df[!, col])), "mode" => "NULLABLE")) | |
end | |
end | |
return JSON3.write(schema) | |
end | |
""" | |
## dataframe_to_json(df::DataFrame, file_path::String) | |
- Convert a DataFrame to JSON format and save to a file | |
Arguments: | |
- df: The DataFrame to convert | |
- file_path: The path where the JSON file should be saved | |
""" | |
function dataframe_to_json(df::DataFrame, file_path::String) | |
open(file_path, "w") do io | |
for row in eachrow(df) | |
JSON.print(io, Dict(col => row[col] for col in names(df))) | |
write(io, "\n") | |
end | |
end | |
end | |
""" | |
# Function to send a DataFrame to a BigQuery table | |
## send_to_bq_table(df::DataFrame, dataset_name::String, table_name::String) | |
- Send a DataFrame to a BigQuery table, which will append if the table already exists | |
Arguments: | |
- df: The DataFrame to upload | |
- dataset_name: The BigQuery dataset name | |
- table_name: The BigQuery table name | |
# Example usage | |
df = DataFrame(text = ["Alice", "Bob"], embed = [rand(3), rand(3)]) | |
send_to_bq_table(df, "climate_truth", "embtest") | |
# Upload a DataFrame | |
using CSV, DataFrames | |
import OstreaCultura as OC | |
tdat = CSV.read("data/climate_test.csv", DataFrame) | |
emb = OC.multi_embeddings(tdat) | |
""" | |
function send_to_bq_table(df::DataFrame, dataset_name::String, table_name::String) | |
# Temporary JSON file | |
json_file_path = tempname() * ".json" | |
schema = create_bq_schema(df) | |
## Save schema to a file | |
schema_file_path = tempname() * ".json" | |
open(schema_file_path, "w") do io | |
write(io, schema) | |
end | |
# Save DataFrame to JSON | |
dataframe_to_json(df, json_file_path) | |
# Use bq command-line tool to load JSON to BigQuery table with specified schema | |
run(`bq load --source_format=NEWLINE_DELIMITED_JSON $dataset_name.$table_name $json_file_path $schema_file_path`) | |
# Clean up and remove the temporary JSON file after upload | |
rm(json_file_path) | |
rm(schema_file_path) | |
return nothing | |
end | |
""" | |
## bq(query::String) | |
- Run a BigQuery query and return the result as a DataFrame | |
Example: bq("SELECT * FROM ostreacultura.climate_truth.training LIMIT 10") | |
""" | |
function bq(query::String) | |
tname = tempname() | |
run(pipeline(`bq query --use_legacy_sql=false --format=csv $query`, tname)) | |
return CSV.read(tname, DataFrame) | |
end | |
""" | |
## Function to average embeddings over some group | |
example: | |
avg_embeddings("ostreacultura.climate_truth.embtest", "text", "embed") | |
""" | |
function avg_embeddings(table::String, group::String, embedname::String) | |
query = """ | |
SELECT | |
$group, | |
ARRAY( | |
SELECT AVG(value) | |
FROM UNNEST($embedname) AS value WITH OFFSET pos | |
GROUP BY pos | |
ORDER BY pos | |
) AS averaged_array | |
FROM ( | |
SELECT $group, ARRAY_CONCAT_AGG($embedname) AS $embedname | |
FROM $table | |
GROUP BY $group | |
) | |
""" | |
return query | |
end | |
""" | |
## SAVE results of query to a CSV file | |
Example: | |
bq_csv("SELECT * FROM ostreacultura.climate_truth.training LIMIT 10", "data/test.csv") | |
""" | |
function bq_csv(query::String, path::String) | |
run(pipeline(`bq query --use_legacy_sql=false --format=csv $query`, path)) | |
end | |