|
|
|
|
|
|
|
using CSV, DataFrames, JSON3 |
|
|
|
function read_json(file_path::String) |
|
json_data = JSON3.read(open(file_path, "r")) |
|
return json_data |
|
end |
|
|
|
""" |
|
## ostreacultura_bq_auth() |
|
- Activate the service account using the credentials file |
|
""" |
|
function ostreacultura_bq_auth() |
|
if isfile("ostreacultura-credentials.json") |
|
run(`gcloud auth activate-service-account --key-file=ostreacultura-credentials.json`) |
|
else |
|
println("Credentials file not found") |
|
end |
|
end |
|
|
|
""" |
|
## julia_to_bq_type(julia_type::DataType) |
|
- Map Julia types to BigQuery types |
|
|
|
Arguments: |
|
- julia_type: The Julia data type to map |
|
|
|
Returns: |
|
- The corresponding BigQuery type as a string |
|
""" |
|
function julia_to_bq_type(julia_type::DataType) |
|
if julia_type == String |
|
return "STRING" |
|
elseif julia_type == Int64 |
|
return "INTEGER" |
|
elseif julia_type == Float64 |
|
return "FLOAT" |
|
elseif julia_type <: AbstractArray{Float64} |
|
return "FLOAT64" |
|
elseif julia_type <: AbstractArray{Int64} |
|
return "INTEGER" |
|
else |
|
return "STRING" |
|
end |
|
end |
|
|
|
""" |
|
## create_bq_schema(df::DataFrame) |
|
- Create a BigQuery schema from a DataFrame |
|
|
|
Arguments: |
|
- df: The DataFrame to create the schema from |
|
|
|
Returns: |
|
- The schema as a string in BigQuery format |
|
|
|
Example: |
|
df = DataFrame(text = ["Alice", "Bob"], embed = [rand(3), rand(3)]) |
|
create_bq_schema(df) |
|
""" |
|
function create_bq_schema(df::DataFrame) |
|
schema = [] |
|
for col in names(df) |
|
if eltype(df[!, col]) <: AbstractArray |
|
push!(schema, Dict("name" => col, "type" => "FLOAT64", "mode" => "REPEATED")) |
|
else |
|
push!(schema, Dict("name" => col, "type" => julia_to_bq_type(eltype(df[!, col])), "mode" => "NULLABLE")) |
|
end |
|
end |
|
return JSON3.write(schema) |
|
end |
|
|
|
""" |
|
## dataframe_to_json(df::DataFrame, file_path::String) |
|
- Convert a DataFrame to JSON format and save to a file |
|
|
|
Arguments: |
|
- df: The DataFrame to convert |
|
- file_path: The path where the JSON file should be saved |
|
""" |
|
function dataframe_to_json(df::DataFrame, file_path::String) |
|
open(file_path, "w") do io |
|
for row in eachrow(df) |
|
JSON.print(io, Dict(col => row[col] for col in names(df))) |
|
write(io, "\n") |
|
end |
|
end |
|
end |
|
|
|
""" |
|
# Function to send a DataFrame to a BigQuery table |
|
## send_to_bq_table(df::DataFrame, dataset_name::String, table_name::String) |
|
- Send a DataFrame to a BigQuery table, which will append if the table already exists |
|
|
|
Arguments: |
|
- df: The DataFrame to upload |
|
- dataset_name: The BigQuery dataset name |
|
- table_name: The BigQuery table name |
|
|
|
# Example usage |
|
df = DataFrame(text = ["Alice", "Bob"], embed = [rand(3), rand(3)]) |
|
send_to_bq_table(df, "climate_truth", "embtest") |
|
|
|
# Upload a DataFrame |
|
using CSV, DataFrames |
|
import OstreaCultura as OC |
|
tdat = CSV.read("data/climate_test.csv", DataFrame) |
|
emb = OC.multi_embeddings(tdat) |
|
|
|
|
|
""" |
|
function send_to_bq_table(df::DataFrame, dataset_name::String, table_name::String) |
|
|
|
json_file_path = tempname() * ".json" |
|
schema = create_bq_schema(df) |
|
|
|
schema_file_path = tempname() * ".json" |
|
open(schema_file_path, "w") do io |
|
write(io, schema) |
|
end |
|
|
|
|
|
dataframe_to_json(df, json_file_path) |
|
|
|
|
|
run(`bq load --source_format=NEWLINE_DELIMITED_JSON $dataset_name.$table_name $json_file_path $schema_file_path`) |
|
|
|
|
|
rm(json_file_path) |
|
rm(schema_file_path) |
|
return nothing |
|
end |
|
|
|
""" |
|
## bq(query::String) |
|
- Run a BigQuery query and return the result as a DataFrame |
|
|
|
Example: bq("SELECT * FROM ostreacultura.climate_truth.training LIMIT 10") |
|
""" |
|
function bq(query::String) |
|
tname = tempname() |
|
run(pipeline(`bq query --use_legacy_sql=false --format=csv $query`, tname)) |
|
return CSV.read(tname, DataFrame) |
|
end |
|
|
|
|
|
""" |
|
## Function to average embeddings over some group |
|
example: |
|
avg_embeddings("ostreacultura.climate_truth.embtest", "text", "embed") |
|
""" |
|
function avg_embeddings(table::String, group::String, embedname::String) |
|
query = """ |
|
SELECT |
|
$group, |
|
ARRAY( |
|
SELECT AVG(value) |
|
FROM UNNEST($embedname) AS value WITH OFFSET pos |
|
GROUP BY pos |
|
ORDER BY pos |
|
) AS averaged_array |
|
FROM ( |
|
SELECT $group, ARRAY_CONCAT_AGG($embedname) AS $embedname |
|
FROM $table |
|
GROUP BY $group |
|
) |
|
""" |
|
return query |
|
end |
|
|
|
""" |
|
## SAVE results of query to a CSV file |
|
|
|
Example: |
|
bq_csv("SELECT * FROM ostreacultura.climate_truth.training LIMIT 10", "data/test.csv") |
|
""" |
|
function bq_csv(query::String, path::String) |
|
run(pipeline(`bq query --use_legacy_sql=false --format=csv $query`, path)) |
|
end |
|
|
|
|