Spaces:
Running
Running
File size: 12,178 Bytes
b110593 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 |
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2024 Weaviate B.V. All rights reserved.
//
// CONTACT: [email protected]
//
package inverted
import (
"encoding/json"
"math"
"os"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var MAX_BUCKETS = 64
type ShardMetaData struct {
BucketedData map[string]map[int]int
SumData map[string]int
CountData map[string]int
ObjectCount int
}
type JsonShardMetaData struct {
path string
data *ShardMetaData // Only this part is saved in the file
sync.Mutex
UnlimitedBuckets bool
logger logrus.FieldLogger
}
// This class replaces the old PropertyLengthTracker. It fixes a bug and provides a
// simpler, easier to maintain implementation. The format is future-proofed, new
// data can be added to the file without breaking old versions of Weaviate.
//
// * We need to know the mean length of all properties for BM25 calculations
// * The prop length tracker is an approximate tracker that uses buckets and simply counts the entries in the buckets
// * There is a precise global counter for the sum of all lengths and a precise global counter for the number of entries
// * It only exists for string/text (and their array forms) because these are the only prop types that can be used with BM25
// * It should probably always exist when indexSearchable is set on a text prop going forward
//
// Property lengths are put into one of 64 buckets. The value of a bucket is given by the formula:
//
// float32(4 * math.Pow(1.25, float64(bucket)-3.5))
//
// Which as implemented gives bucket values of 0,1,2,3,4,5,6,8,10,13,17,21,26,33,41,52,65,81,101,127,158,198,248,310,387,484,606,757,947,1183,1479,1849,2312,2890,3612,4515,5644,7055,8819,11024,13780,17226,21532,26915,33644,42055,52569,65712,82140,102675,128344,160430,200537,250671,313339,391674,489593,611991,764989,956237,1195296,1494120,1867651,2334564
//
// These buckets are then recorded to disk. The original implementation was a binary format where all the data was tracked using manual pointer arithmetic. The new version tracks the statistics in a go map, and marshals that into JSON before writing it to disk. There is no measurable difference in speed between these two implementations while importing data, however it appears to slow the queries by about 15% (while improving recall by ~25%).
//
// The new tracker is exactly compatible with the old format to enable migration, which is why there is a -1 bucket. Altering the number of buckets or their values will break compatibility.
//
// Set UnlimitedBuckets to true for precise length tracking
//
// Note that some of the code in this file is forced by the need to be backwards-compatible with the old format. Once we are confident that all users have migrated to the new format, we can remove the old format code and simplify this file.
// NewJsonShardMetaData creates a new tracker and loads the data from the given path. If the file is in the old format, it will be converted to the new format.
func NewJsonShardMetaData(path string, logger logrus.FieldLogger) (t *JsonShardMetaData, err error) {
// Recover and return empty tracker on panic
defer func() {
if r := recover(); r != nil {
t.logger.Printf("Recovered from panic in NewJsonShardMetaData, original error: %v", r)
t = &JsonShardMetaData{
data: &ShardMetaData{make(map[string]map[int]int), make(map[string]int), make(map[string]int), 0},
path: path,
UnlimitedBuckets: false,
}
err = errors.Errorf("Recovered from panic in NewJsonShardMetaData, original error: %v", r)
}
}()
t = &JsonShardMetaData{
data: &ShardMetaData{make(map[string]map[int]int), make(map[string]int), make(map[string]int), 0},
path: path,
UnlimitedBuckets: false,
logger: logger,
}
// read the file into memory
bytes, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) { // File doesn't exist, probably a new class(or a recount), return empty tracker
logger.Printf("WARNING: prop len tracker file %s does not exist, creating new tracker", path)
t.Flush(false)
return t, nil
}
return nil, errors.Wrap(err, "read property length tracker file:"+path)
}
if len(bytes) == 0 {
return nil, errors.Errorf("failed sanity check, empty prop len tracker file %s has length 0. Delete file and set environment variable RECOUNT_PROPERTIES_AT_STARTUP to true", path)
}
// We don't have data file versioning, so we try to parse it as json. If the parse fails, it is probably the old format file, so we call the old format loader and copy everything across.
if err = json.Unmarshal(bytes, &t.data); err != nil {
// It's probably the old format file, load the old format and convert it to the new format
plt, err := NewPropertyLengthTracker(path)
if err != nil {
return nil, errors.Wrap(err, "convert old property length tracker")
}
propertyNames := plt.PropertyNames()
data := &ShardMetaData{make(map[string]map[int]int), make(map[string]int), make(map[string]int), 0}
// Loop over every page and bucket in the old tracker and add it to the new tracker
for _, name := range propertyNames {
data.BucketedData[name] = make(map[int]int, MAX_BUCKETS)
data.CountData[name] = 0
data.SumData[name] = 0
for i := 0; i <= MAX_BUCKETS; i++ {
fromBucket := i
if i == MAX_BUCKETS {
fromBucket = -1
}
count, err := plt.BucketCount(name, uint16(fromBucket))
if err != nil {
return nil, errors.Wrap(err, "convert old property length tracker")
}
data.BucketedData[name][fromBucket] = int(count)
value := float32(0)
if fromBucket == -1 {
value = 0
} else {
value = plt.valueFromBucket(uint16(fromBucket))
}
data.SumData[name] = data.SumData[name] + int(value)*int(count)
data.CountData[name] = data.CountData[name] + int(count)
}
}
t.data = data
t.Flush(true)
plt.Close()
plt.Drop()
t.Flush(false)
}
t.path = path
// Make really sure we aren't going to crash on a nil pointer
if t.data == nil {
return nil, errors.Errorf("failed sanity check, prop len tracker file %s has nil data. Delete file and set environment variable RECOUNT_PROPERTIES_AT_STARTUP to true", path)
}
return t, nil
}
func (t *JsonShardMetaData) Clear() {
if t == nil {
return
}
t.Lock()
defer t.Unlock()
t.data = &ShardMetaData{make(map[string]map[int]int), make(map[string]int), make(map[string]int), 0}
}
// Path to the file on disk
func (t *JsonShardMetaData) FileName() string {
if t == nil {
return ""
}
return t.path
}
func (t *JsonShardMetaData) TrackObjects(delta int) error {
if t == nil {
return nil
}
t.Lock()
defer t.Unlock()
t.data.ObjectCount = t.data.ObjectCount + delta
return nil
}
// Adds a new value to the tracker
func (t *JsonShardMetaData) TrackProperty(propName string, value float32) error {
if t == nil {
return nil
}
t.Lock()
defer t.Unlock()
// Remove this check once we are confident that all users have migrated to the new format
if t.data == nil {
t.logger.Print("WARNING: t.data is nil in TrackProperty, initializing to empty tracker")
t.data = &ShardMetaData{make(map[string]map[int]int), make(map[string]int), make(map[string]int), 0}
}
t.data.SumData[propName] = t.data.SumData[propName] + int(value)
t.data.CountData[propName] = t.data.CountData[propName] + 1
bucketId := t.bucketFromValue(value)
if _, ok := t.data.BucketedData[propName]; ok {
t.data.BucketedData[propName][int(bucketId)] = t.data.BucketedData[propName][int(bucketId)] + 1
} else {
t.data.BucketedData[propName] = make(map[int]int, 64+1)
t.data.BucketedData[propName][int(bucketId)] = 1
}
return nil
}
// Removes a value from the tracker
func (t *JsonShardMetaData) UnTrackProperty(propName string, value float32) error {
if t == nil {
return nil
}
t.Lock()
defer t.Unlock()
// Remove this check once we are confident that all users have migrated to the new format
if t.data == nil {
t.logger.Print("WARNING: t.data is nil in TrackProperty, initializing to empty tracker")
t.data = &ShardMetaData{make(map[string]map[int]int), make(map[string]int), make(map[string]int), 0}
}
t.data.SumData[propName] = t.data.SumData[propName] - int(value)
t.data.CountData[propName] = t.data.CountData[propName] - 1
bucketId := t.bucketFromValue(value)
if _, ok := t.data.BucketedData[propName]; ok {
t.data.BucketedData[propName][int(bucketId)] = t.data.BucketedData[propName][int(bucketId)] - 1
} else {
return errors.New("property not found")
}
return nil
}
// Returns the bucket that the given value belongs to
func (t *JsonShardMetaData) bucketFromValue(value float32) int {
if t == nil {
return 0
}
if t.UnlimitedBuckets {
return int(value)
}
if value <= 5.00 {
return int(value) - 1
}
bucket := int(math.Log(float64(value)/4.0)/math.Log(1.25) + 4)
if bucket > MAX_BUCKETS-1 {
return MAX_BUCKETS
}
return int(bucket)
}
// Returns the average length of the given property
func (t *JsonShardMetaData) PropertyMean(propName string) (float32, error) {
if t == nil {
return 0, nil
}
t.Lock()
defer t.Unlock()
sum, ok := t.data.SumData[propName]
if !ok {
return 0, nil
}
count, ok := t.data.CountData[propName]
if !ok {
return 0, nil
}
return float32(sum) / float32(count), nil
}
// returns totalPropertyLength, totalCount, average propertyLength = sum / totalCount, total propertylength, totalCount, error
func (t *JsonShardMetaData) PropertyTally(propName string) (int, int, float64, error) {
if t == nil {
return 0, 0, 0, nil
}
t.Lock()
defer t.Unlock()
sum, ok := t.data.SumData[propName]
if !ok {
return 0, 0, 0, nil // Required to match the old prop tracker (for now)
}
count, ok := t.data.CountData[propName]
if !ok {
return 0, 0, 0, nil // Required to match the old prop tracker (for now)
}
return sum, count, float64(sum) / float64(count), nil
}
// Returns the number of documents stored in the shard
func (t *JsonShardMetaData) ObjectTally() int {
if t == nil {
return 0
}
t.Lock()
defer t.Unlock()
return t.data.ObjectCount
}
// Writes the current state of the tracker to disk. (flushBackup = true) will only write the backup file
func (t *JsonShardMetaData) Flush(flushBackup bool) error {
if t == nil {
return nil
}
if !flushBackup { // Write the backup file first
t.Flush(true)
}
t.Lock()
defer t.Unlock()
bytes, err := json.Marshal(t.data)
if err != nil {
return err
}
filename := t.path
if flushBackup {
filename = t.path + ".bak"
}
// Do a write+rename to avoid corrupting the file if we crash while writing
tempfile := filename + ".tmp"
err = os.WriteFile(tempfile, bytes, 0o666)
if err != nil {
return err
}
err = os.Rename(tempfile, filename)
if err != nil {
return err
}
return nil
}
// Closes the tracker and removes the backup file
func (t *JsonShardMetaData) Close() error {
if t == nil {
return nil
}
if err := t.Flush(false); err != nil {
return errors.Wrap(err, "flush before closing")
}
t.Lock()
defer t.Unlock()
t.data.BucketedData = nil
return nil
}
// Drop removes the tracker from disk
func (t *JsonShardMetaData) Drop() error {
if t == nil {
return nil
}
t.Close()
t.Lock()
defer t.Unlock()
t.data.BucketedData = nil
if err := os.Remove(t.path); err != nil {
return errors.Wrap(err, "remove prop length tracker state from disk:"+t.path)
}
if err := os.Remove(t.path + ".bak"); err != nil {
return errors.Wrap(err, "remove prop length tracker state from disk:"+t.path+".bak")
}
return nil
}
|