Spaces:
Sleeping
Sleeping
| // _ _ | |
| // __ _____ __ ___ ___ __ _| |_ ___ | |
| // \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \ | |
| // \ V V / __/ (_| |\ V /| | (_| | || __/ | |
| // \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___| | |
| // | |
| // Copyright © 2016 - 2024 Weaviate B.V. All rights reserved. | |
| // | |
| // CONTACT: [email protected] | |
| // | |
| package replica | |
| import ( | |
| "context" | |
| "errors" | |
| "fmt" | |
| "sort" | |
| "github.com/go-openapi/strfmt" | |
| "github.com/weaviate/weaviate/entities/additional" | |
| "github.com/weaviate/weaviate/entities/search" | |
| "github.com/weaviate/weaviate/entities/storobj" | |
| "github.com/weaviate/weaviate/usecases/objects" | |
| "golang.org/x/sync/errgroup" | |
| ) | |
| var ( | |
| // errConflictFindDeleted object exists on one replica but is deleted on the other. | |
| // | |
| // It depends on the order of operations | |
| // Created -> Deleted => It is safe in this case to propagate deletion to all replicas | |
| // Created -> Deleted -> Created => propagating deletion will result in data lost | |
| errConflictExistOrDeleted = errors.New("conflict: object has been deleted on another replica") | |
| // errConflictObjectChanged object changed since last time and cannot be repaired | |
| errConflictObjectChanged = errors.New("source object changed during repair") | |
| ) | |
| // repairer tries to detect inconsistencies and repair objects when reading them from replicas | |
| type repairer struct { | |
| class string | |
| client finderClient // needed to commit and abort operation | |
| } | |
| // repairOne repairs a single object (used by Finder::GetOne) | |
| func (r *repairer) repairOne(ctx context.Context, | |
| shard string, | |
| id strfmt.UUID, | |
| votes []objTuple, st rState, | |
| contentIdx int, | |
| ) (_ *storobj.Object, err error) { | |
| var ( | |
| lastUTime int64 | |
| winnerIdx int | |
| cl = r.client | |
| ) | |
| for i, x := range votes { | |
| if x.o.Deleted { | |
| return nil, errConflictExistOrDeleted | |
| } | |
| if x.UTime > lastUTime { | |
| lastUTime = x.UTime | |
| winnerIdx = i | |
| } | |
| } | |
| // fetch most recent object | |
| updates := votes[contentIdx].o | |
| winner := votes[winnerIdx] | |
| if updates.UpdateTime() != lastUTime { | |
| updates, err = cl.FullRead(ctx, winner.sender, r.class, shard, id, | |
| search.SelectProperties{}, additional.Properties{}) | |
| if err != nil { | |
| return nil, fmt.Errorf("get most recent object from %s: %w", winner.sender, err) | |
| } | |
| if updates.UpdateTime() != lastUTime { | |
| return nil, fmt.Errorf("fetch new state from %s: %w, %v", winner.sender, errConflictObjectChanged, err) | |
| } | |
| } | |
| var gr errgroup.Group | |
| for _, vote := range votes { // repair | |
| if vote.UTime == lastUTime { | |
| continue | |
| } | |
| vote := vote | |
| gr.Go(func() error { | |
| ups := []*objects.VObject{{ | |
| LatestObject: &updates.Object.Object, | |
| Vector: updates.Object.Vector, | |
| StaleUpdateTime: vote.UTime, | |
| }} | |
| resp, err := cl.Overwrite(ctx, vote.sender, r.class, shard, ups) | |
| if err != nil { | |
| return fmt.Errorf("node %q could not repair object: %w", vote.sender, err) | |
| } | |
| if len(resp) > 0 && resp[0].Err != "" { | |
| return fmt.Errorf("overwrite %w %s: %s", errConflictObjectChanged, vote.sender, resp[0].Err) | |
| } | |
| return nil | |
| }) | |
| } | |
| return updates.Object, gr.Wait() | |
| } | |
| // iTuple tuple of indices used to identify a unique object | |
| type iTuple struct { | |
| S int // sender's index | |
| O int // object's index | |
| T int64 // last update time | |
| Deleted bool | |
| } | |
| // repairExist repairs a single object when checking for existence | |
| func (r *repairer) repairExist(ctx context.Context, | |
| shard string, | |
| id strfmt.UUID, | |
| votes []boolTuple, | |
| st rState, | |
| ) (_ bool, err error) { | |
| var ( | |
| lastUTime int64 | |
| winnerIdx int | |
| cl = r.client | |
| ) | |
| for i, x := range votes { | |
| if x.o.Deleted { | |
| return false, errConflictExistOrDeleted | |
| } | |
| if x.UTime > lastUTime { | |
| lastUTime = x.UTime | |
| winnerIdx = i | |
| } | |
| } | |
| // fetch most recent object | |
| winner := votes[winnerIdx] | |
| resp, err := cl.FullRead(ctx, winner.sender, r.class, shard, id, search.SelectProperties{}, additional.Properties{}) | |
| if err != nil { | |
| return false, fmt.Errorf("get most recent object from %s: %w", winner.sender, err) | |
| } | |
| if resp.UpdateTime() != lastUTime { | |
| return false, fmt.Errorf("fetch new state from %s: %w, %v", winner.sender, errConflictObjectChanged, err) | |
| } | |
| gr, ctx := errgroup.WithContext(ctx) | |
| for _, vote := range votes { // repair | |
| if vote.UTime == lastUTime { | |
| continue | |
| } | |
| vote := vote | |
| gr.Go(func() error { | |
| ups := []*objects.VObject{{ | |
| LatestObject: &resp.Object.Object, | |
| Vector: resp.Object.Vector, | |
| StaleUpdateTime: vote.UTime, | |
| }} | |
| resp, err := cl.Overwrite(ctx, vote.sender, r.class, shard, ups) | |
| if err != nil { | |
| return fmt.Errorf("node %q could not repair object: %w", vote.sender, err) | |
| } | |
| if len(resp) > 0 && resp[0].Err != "" { | |
| return fmt.Errorf("overwrite %w %s: %s", errConflictObjectChanged, vote.sender, resp[0].Err) | |
| } | |
| return nil | |
| }) | |
| } | |
| return !resp.Deleted, gr.Wait() | |
| } | |
| // repairAll repairs objects when reading them ((use in combination with Finder::GetAll) | |
| func (r *repairer) repairBatchPart(ctx context.Context, | |
| shard string, | |
| ids []strfmt.UUID, | |
| votes []vote, | |
| st rState, | |
| contentIdx int, | |
| ) ([]*storobj.Object, error) { | |
| var ( | |
| result = make([]*storobj.Object, len(ids)) // final result | |
| lastTimes = make([]iTuple, len(ids)) // most recent times | |
| ms = make([]iTuple, 0, len(ids)) // mismatches | |
| nDeletions = 0 | |
| cl = r.client | |
| nVotes = len(votes) | |
| // The input objects cannot be used for repair because | |
| // their attributes might have been filtered out | |
| reFetchSet = make(map[int]struct{}) | |
| ) | |
| // find most recent objects | |
| for i, x := range votes[contentIdx].FullData { | |
| lastTimes[i] = iTuple{S: contentIdx, O: i, T: x.UpdateTime(), Deleted: x.Deleted} | |
| votes[contentIdx].Count[i] = nVotes // reuse Count[] to check consistency | |
| } | |
| for i, vote := range votes { | |
| if i != contentIdx { | |
| for j, x := range vote.DigestData { | |
| deleted := lastTimes[j].Deleted || x.Deleted | |
| if curTime := lastTimes[j].T; x.UpdateTime > curTime { | |
| lastTimes[j] = iTuple{S: i, O: j, T: x.UpdateTime} | |
| delete(reFetchSet, j) // input object is not up to date | |
| } else if x.UpdateTime < curTime { | |
| reFetchSet[j] = struct{}{} // we need to fetch this object again | |
| } | |
| lastTimes[j].Deleted = deleted | |
| votes[i].Count[j] = nVotes | |
| } | |
| } | |
| } | |
| // find missing content (diff) | |
| for i, p := range votes[contentIdx].FullData { | |
| if lastTimes[i].Deleted { // conflict | |
| nDeletions++ | |
| result[i] = nil | |
| votes[contentIdx].Count[i] = 0 | |
| } else if _, ok := reFetchSet[i]; ok || (contentIdx != lastTimes[i].S) { | |
| ms = append(ms, lastTimes[i]) | |
| } else { | |
| result[i] = p.Object | |
| } | |
| } | |
| if len(ms) > 0 { // fetch most recent objects | |
| // partition by hostname | |
| sort.SliceStable(ms, func(i, j int) bool { return ms[i].S < ms[j].S }) | |
| partitions := make([]int, 0, len(votes)) | |
| pre := ms[0].S | |
| for i, y := range ms { | |
| if y.S != pre { | |
| partitions = append(partitions, i) | |
| pre = y.S | |
| } | |
| } | |
| partitions = append(partitions, len(ms)) | |
| // concurrent fetches | |
| gr, ctx := errgroup.WithContext(ctx) | |
| start := 0 | |
| for _, end := range partitions { // fetch diffs | |
| rid := ms[start].S | |
| receiver := votes[rid].Sender | |
| query := make([]strfmt.UUID, end-start) | |
| for j := 0; start < end; start++ { | |
| query[j] = ids[ms[start].O] | |
| j++ | |
| } | |
| start := start | |
| gr.Go(func() error { | |
| resp, err := cl.FullReads(ctx, receiver, r.class, shard, query) | |
| for i, n := 0, len(query); i < n; i++ { | |
| idx := ms[start-n+i].O | |
| if err != nil || lastTimes[idx].T != resp[i].UpdateTime() { | |
| votes[rid].Count[idx]-- | |
| } else { | |
| result[idx] = resp[i].Object | |
| } | |
| } | |
| return nil | |
| }) | |
| } | |
| if err := gr.Wait(); err != nil { | |
| return nil, err | |
| } | |
| } | |
| // concurrent repairs | |
| gr, ctx := errgroup.WithContext(ctx) | |
| for rid, vote := range votes { | |
| query := make([]*objects.VObject, 0, len(ids)/2) | |
| m := make(map[string]int, len(ids)/2) // | |
| for j, x := range lastTimes { | |
| cTime := vote.UpdateTimeAt(j) | |
| if x.T != cTime && !x.Deleted && result[j] != nil && vote.Count[j] == nVotes { | |
| obj := objects.VObject{ | |
| LatestObject: &result[j].Object, | |
| Vector: result[j].Vector, | |
| StaleUpdateTime: cTime, | |
| } | |
| query = append(query, &obj) | |
| m[string(result[j].ID())] = j | |
| } | |
| } | |
| if len(query) == 0 { | |
| continue | |
| } | |
| receiver := vote.Sender | |
| rid := rid | |
| gr.Go(func() error { | |
| rs, err := cl.Overwrite(ctx, receiver, r.class, shard, query) | |
| if err != nil { | |
| for _, idx := range m { | |
| votes[rid].Count[idx]-- | |
| } | |
| return nil | |
| } | |
| for _, r := range rs { | |
| if r.Err != "" { | |
| if idx, ok := m[r.ID]; ok { | |
| votes[rid].Count[idx]-- | |
| } | |
| } | |
| } | |
| return nil | |
| }) | |
| } | |
| return result, gr.Wait() | |
| } | |