Spaces:
Running
Running
// _ _ | |
// __ _____ __ ___ ___ __ _| |_ ___ | |
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
// \ V V / __/ (_| |\ V /| | (_| | || __/ | |
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
// | |
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
// | |
// CONTACT: [email protected] | |
// | |
// Package implements performance tracking examples | |
package main | |
import ( | |
"bytes" | |
"encoding/json" | |
"flag" | |
"fmt" | |
"io" | |
"net" | |
"net/http" | |
"os" | |
"os/exec" | |
"time" | |
"github.com/pkg/errors" | |
"github.com/weaviate/weaviate/entities/models" | |
) | |
type batch struct { | |
Objects []*models.Object | |
} | |
type benchmarkResult map[string]map[string]int64 | |
func main() { | |
var benchmarkName string | |
var numBatches, failPercentage, maxEntries int | |
flag.StringVar(&benchmarkName, "name", "SIFT", "Which benchmark should be run. Currently only SIFT is available.") | |
flag.IntVar(&maxEntries, "numberEntries", 100000, "Maximum number of entries read from the dataset") | |
flag.IntVar(&numBatches, "numBatches", 1, "With how many parallel batches objects should be added") | |
flag.IntVar(&failPercentage, "fail", -1, "Fail if regression is larger") | |
flag.Parse() | |
t := &http.Transport{ | |
Proxy: http.ProxyFromEnvironment, | |
DialContext: (&net.Dialer{ | |
Timeout: 30 * time.Second, | |
KeepAlive: 120 * time.Second, | |
}).DialContext, | |
MaxIdleConnsPerHost: 100, | |
MaxIdleConns: 100, | |
IdleConnTimeout: 90 * time.Second, | |
TLSHandshakeTimeout: 10 * time.Second, | |
ExpectContinueTimeout: 1 * time.Second, | |
} | |
c := &http.Client{Transport: t} | |
url := "http://localhost:8080/v1/" | |
alreadyRunning := startWeaviate(c, url) | |
var newRuntime map[string]int64 | |
var err error | |
switch benchmarkName { | |
case "SIFT": | |
newRuntime, err = benchmarkSift(c, url, maxEntries, numBatches) | |
default: | |
panic("Unknown benchmark " + benchmarkName) | |
} | |
if err != nil { | |
clearExistingObjects(c, url) | |
} | |
if !alreadyRunning { | |
tearDownWeaviate() | |
} | |
if err != nil { | |
panic(errors.Wrap(err, "Error occurred during benchmarking")) | |
} | |
FullBenchmarkName := benchmarkName + "-" + fmt.Sprint(maxEntries) + "_Entries-" + fmt.Sprint(numBatches) + "_Batch(es)" | |
// Write results to file, keeping existing entries | |
oldBenchmarkRunTimes := readCurrentBenchmarkResults() | |
oldRuntime := oldBenchmarkRunTimes[FullBenchmarkName] | |
oldBenchmarkRunTimes[FullBenchmarkName] = newRuntime | |
benchmarkJSON, _ := json.MarshalIndent(oldBenchmarkRunTimes, "", "\t") | |
if err := os.WriteFile("benchmark_results.json", benchmarkJSON, 0o666); err != nil { | |
panic(err) | |
} | |
totalNewRuntime := int64(0) | |
for _, runtime := range newRuntime { | |
totalNewRuntime += runtime | |
} | |
totalOldRuntime := int64(0) | |
for _, runtime := range oldRuntime { | |
totalOldRuntime += runtime | |
} | |
fmt.Fprint( | |
os.Stdout, | |
"Runtime for benchmark "+FullBenchmarkName+ | |
": old total runtime: "+fmt.Sprint(totalOldRuntime)+"ms, new total runtime:"+fmt.Sprint(totalNewRuntime)+"ms.\n"+ | |
"This is a change of "+fmt.Sprintf("%.2f", 100*float32(totalNewRuntime-totalOldRuntime)/float32(totalNewRuntime))+"%.\n"+ | |
"Please update the benchmark results if necessary.\n\n", | |
) | |
fmt.Fprint(os.Stdout, "Runtime for individual steps:.\n") | |
for name, time := range newRuntime { | |
fmt.Fprint(os.Stdout, "Runtime for "+name+" is "+fmt.Sprint(time)+"ms.\n") | |
} | |
// Return with error code if runtime regressed and corresponding flag was set | |
if failPercentage >= 0 && | |
totalOldRuntime > 0 && // don't report regression if no old entry exists | |
float64(totalOldRuntime)*(1.0+0.01*float64(failPercentage)) < float64(totalNewRuntime) { | |
fmt.Fprint( | |
os.Stderr, "Failed due to performance regressions.\n", | |
) | |
os.Exit(1) | |
} | |
} | |
// If there is already a schema present, clear it out | |
func clearExistingObjects(c *http.Client, url string) { | |
checkSchemaRequest := createRequest(url+"schema", "GET", nil) | |
checkSchemaResponseCode, body, _, err := performRequest(c, checkSchemaRequest) | |
if err != nil { | |
panic(errors.Wrap(err, "perform request")) | |
} | |
if checkSchemaResponseCode != 200 { | |
return | |
} | |
var dump models.Schema | |
if err := json.Unmarshal(body, &dump); err != nil { | |
panic(errors.Wrap(err, "Could not unmarshal read response")) | |
} | |
for _, classObj := range dump.Classes { | |
requestDelete := createRequest(url+"schema/"+classObj.Class, "DELETE", nil) | |
responseDeleteCode, _, _, err := performRequest(c, requestDelete) | |
if err != nil { | |
panic(errors.Wrap(err, "Could delete schema")) | |
} | |
if responseDeleteCode != 200 { | |
panic(fmt.Sprintf("Could not delete schema, code: %v", responseDeleteCode)) | |
} | |
} | |
} | |
func command(app string, arguments []string, waitForCompletion bool) error { | |
mydir, err := os.Getwd() | |
if err != nil { | |
return err | |
} | |
cmd := exec.Command(app, arguments...) | |
execDir := mydir + "/../../" | |
cmd.Dir = execDir | |
cmd.Stdout = os.Stdout | |
cmd.Stderr = os.Stderr | |
if waitForCompletion { | |
err = cmd.Run() | |
} else { | |
err = cmd.Start() | |
} | |
return err | |
} | |
func readCurrentBenchmarkResults() benchmarkResult { | |
benchmarkFile, err := os.Open("benchmark_results.json") | |
if err != nil { | |
fmt.Print("No benchmark file present.") | |
return make(benchmarkResult) | |
} | |
defer benchmarkFile.Close() | |
var result benchmarkResult | |
jsonParser := json.NewDecoder(benchmarkFile) | |
if err = jsonParser.Decode(&result); err != nil { | |
panic("Could not parse existing benchmark file.") | |
} | |
return result | |
} | |
func tearDownWeaviate() error { | |
fmt.Print("Shutting down weaviate.\n") | |
app := "docker-compose" | |
arguments := []string{ | |
"down", | |
"--remove-orphans", | |
} | |
return command(app, arguments, true) | |
} | |
// start weaviate in case it was not already started | |
// | |
// We want to benchmark the current state and therefore need to rebuild and then start a docker container | |
func startWeaviate(c *http.Client, url string) bool { | |
requestReady := createRequest(url+".well-known/ready", "GET", nil) | |
responseStartedCode, _, _, err := performRequest(c, requestReady) | |
alreadyRunning := err == nil && responseStartedCode == 200 | |
if alreadyRunning { | |
fmt.Print("Weaviate instance already running.\n") | |
return alreadyRunning | |
} | |
fmt.Print("(Re-) build and start weaviate.\n") | |
cmd := "./tools/test/run_ci_server.sh" | |
if err := command(cmd, []string{}, true); err != nil { | |
panic(errors.Wrap(err, "Command to (re-) build and start weaviate failed")) | |
} | |
return false | |
} | |
// createRequest creates requests | |
func createRequest(url string, method string, payload interface{}) *http.Request { | |
var body io.Reader = nil | |
if payload != nil { | |
jsonBody, err := json.Marshal(payload) | |
if err != nil { | |
panic(errors.Wrap(err, "Could not marshal request")) | |
} | |
body = bytes.NewBuffer(jsonBody) | |
} | |
request, err := http.NewRequest(method, url, body) | |
if err != nil { | |
panic(errors.Wrap(err, "Could not create request")) | |
} | |
request.Header.Add("Content-Type", "application/json") | |
request.Header.Add("Accept", "application/json") | |
return request | |
} | |
// performRequest runs requests | |
func performRequest(c *http.Client, request *http.Request) (int, []byte, int64, error) { | |
timeStart := time.Now() | |
response, err := c.Do(request) | |
requestTime := time.Since(timeStart).Milliseconds() | |
if err != nil { | |
return 0, nil, requestTime, err | |
} | |
body, err := io.ReadAll(response.Body) | |
response.Body.Close() | |
if err != nil { | |
return 0, nil, requestTime, err | |
} | |
return response.StatusCode, body, requestTime, nil | |
} | |