KevinStephenson
Adding in weaviate code
b110593
raw
history blame
7.61 kB
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
// Package implements performance tracking examples
package main
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"net"
"net/http"
"os"
"os/exec"
"time"
"github.com/pkg/errors"
"github.com/weaviate/weaviate/entities/models"
)
type batch struct {
Objects []*models.Object
}
type benchmarkResult map[string]map[string]int64
func main() {
var benchmarkName string
var numBatches, failPercentage, maxEntries int
flag.StringVar(&benchmarkName, "name", "SIFT", "Which benchmark should be run. Currently only SIFT is available.")
flag.IntVar(&maxEntries, "numberEntries", 100000, "Maximum number of entries read from the dataset")
flag.IntVar(&numBatches, "numBatches", 1, "With how many parallel batches objects should be added")
flag.IntVar(&failPercentage, "fail", -1, "Fail if regression is larger")
flag.Parse()
t := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 120 * time.Second,
}).DialContext,
MaxIdleConnsPerHost: 100,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
c := &http.Client{Transport: t}
url := "http://localhost:8080/v1/"
alreadyRunning := startWeaviate(c, url)
var newRuntime map[string]int64
var err error
switch benchmarkName {
case "SIFT":
newRuntime, err = benchmarkSift(c, url, maxEntries, numBatches)
default:
panic("Unknown benchmark " + benchmarkName)
}
if err != nil {
clearExistingObjects(c, url)
}
if !alreadyRunning {
tearDownWeaviate()
}
if err != nil {
panic(errors.Wrap(err, "Error occurred during benchmarking"))
}
FullBenchmarkName := benchmarkName + "-" + fmt.Sprint(maxEntries) + "_Entries-" + fmt.Sprint(numBatches) + "_Batch(es)"
// Write results to file, keeping existing entries
oldBenchmarkRunTimes := readCurrentBenchmarkResults()
oldRuntime := oldBenchmarkRunTimes[FullBenchmarkName]
oldBenchmarkRunTimes[FullBenchmarkName] = newRuntime
benchmarkJSON, _ := json.MarshalIndent(oldBenchmarkRunTimes, "", "\t")
if err := os.WriteFile("benchmark_results.json", benchmarkJSON, 0o666); err != nil {
panic(err)
}
totalNewRuntime := int64(0)
for _, runtime := range newRuntime {
totalNewRuntime += runtime
}
totalOldRuntime := int64(0)
for _, runtime := range oldRuntime {
totalOldRuntime += runtime
}
fmt.Fprint(
os.Stdout,
"Runtime for benchmark "+FullBenchmarkName+
": old total runtime: "+fmt.Sprint(totalOldRuntime)+"ms, new total runtime:"+fmt.Sprint(totalNewRuntime)+"ms.\n"+
"This is a change of "+fmt.Sprintf("%.2f", 100*float32(totalNewRuntime-totalOldRuntime)/float32(totalNewRuntime))+"%.\n"+
"Please update the benchmark results if necessary.\n\n",
)
fmt.Fprint(os.Stdout, "Runtime for individual steps:.\n")
for name, time := range newRuntime {
fmt.Fprint(os.Stdout, "Runtime for "+name+" is "+fmt.Sprint(time)+"ms.\n")
}
// Return with error code if runtime regressed and corresponding flag was set
if failPercentage >= 0 &&
totalOldRuntime > 0 && // don't report regression if no old entry exists
float64(totalOldRuntime)*(1.0+0.01*float64(failPercentage)) < float64(totalNewRuntime) {
fmt.Fprint(
os.Stderr, "Failed due to performance regressions.\n",
)
os.Exit(1)
}
}
// If there is already a schema present, clear it out
func clearExistingObjects(c *http.Client, url string) {
checkSchemaRequest := createRequest(url+"schema", "GET", nil)
checkSchemaResponseCode, body, _, err := performRequest(c, checkSchemaRequest)
if err != nil {
panic(errors.Wrap(err, "perform request"))
}
if checkSchemaResponseCode != 200 {
return
}
var dump models.Schema
if err := json.Unmarshal(body, &dump); err != nil {
panic(errors.Wrap(err, "Could not unmarshal read response"))
}
for _, classObj := range dump.Classes {
requestDelete := createRequest(url+"schema/"+classObj.Class, "DELETE", nil)
responseDeleteCode, _, _, err := performRequest(c, requestDelete)
if err != nil {
panic(errors.Wrap(err, "Could delete schema"))
}
if responseDeleteCode != 200 {
panic(fmt.Sprintf("Could not delete schema, code: %v", responseDeleteCode))
}
}
}
func command(app string, arguments []string, waitForCompletion bool) error {
mydir, err := os.Getwd()
if err != nil {
return err
}
cmd := exec.Command(app, arguments...)
execDir := mydir + "/../../"
cmd.Dir = execDir
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if waitForCompletion {
err = cmd.Run()
} else {
err = cmd.Start()
}
return err
}
func readCurrentBenchmarkResults() benchmarkResult {
benchmarkFile, err := os.Open("benchmark_results.json")
if err != nil {
fmt.Print("No benchmark file present.")
return make(benchmarkResult)
}
defer benchmarkFile.Close()
var result benchmarkResult
jsonParser := json.NewDecoder(benchmarkFile)
if err = jsonParser.Decode(&result); err != nil {
panic("Could not parse existing benchmark file.")
}
return result
}
func tearDownWeaviate() error {
fmt.Print("Shutting down weaviate.\n")
app := "docker-compose"
arguments := []string{
"down",
"--remove-orphans",
}
return command(app, arguments, true)
}
// start weaviate in case it was not already started
//
// We want to benchmark the current state and therefore need to rebuild and then start a docker container
func startWeaviate(c *http.Client, url string) bool {
requestReady := createRequest(url+".well-known/ready", "GET", nil)
responseStartedCode, _, _, err := performRequest(c, requestReady)
alreadyRunning := err == nil && responseStartedCode == 200
if alreadyRunning {
fmt.Print("Weaviate instance already running.\n")
return alreadyRunning
}
fmt.Print("(Re-) build and start weaviate.\n")
cmd := "./tools/test/run_ci_server.sh"
if err := command(cmd, []string{}, true); err != nil {
panic(errors.Wrap(err, "Command to (re-) build and start weaviate failed"))
}
return false
}
// createRequest creates requests
func createRequest(url string, method string, payload interface{}) *http.Request {
var body io.Reader = nil
if payload != nil {
jsonBody, err := json.Marshal(payload)
if err != nil {
panic(errors.Wrap(err, "Could not marshal request"))
}
body = bytes.NewBuffer(jsonBody)
}
request, err := http.NewRequest(method, url, body)
if err != nil {
panic(errors.Wrap(err, "Could not create request"))
}
request.Header.Add("Content-Type", "application/json")
request.Header.Add("Accept", "application/json")
return request
}
// performRequest runs requests
func performRequest(c *http.Client, request *http.Request) (int, []byte, int64, error) {
timeStart := time.Now()
response, err := c.Do(request)
requestTime := time.Since(timeStart).Milliseconds()
if err != nil {
return 0, nil, requestTime, err
}
body, err := io.ReadAll(response.Body)
response.Body.Close()
if err != nil {
return 0, nil, requestTime, err
}
return response.StatusCode, body, requestTime, nil
}