diff --git a/build.go b/build.go index 9963573..a31831c 100644 --- a/build.go +++ b/build.go @@ -157,22 +157,20 @@ func persistStoredFieldValues(fieldID int, return curr, data, nil } -func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, - fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64, - storedIndexOffset uint64, dictLocs []uint64, - sectionsIndexOffset uint64) (*SegmentBase, error) { +func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64, + storedIndexOffset uint64, sectionsIndexOffset uint64) (*SegmentBase, error) { sb := &SegmentBase{ mem: mem, memCRC: memCRC, chunkMode: chunkMode, - fieldsMap: fieldsMap, + fieldsMap: make(map[string]uint16), numDocs: numDocs, storedIndexOffset: storedIndexOffset, fieldsIndexOffset: sectionsIndexOffset, sectionsIndexOffset: sectionsIndexOffset, fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)), docValueOffset: 0, // docValueOffsets identified automatically by the section - dictLocs: dictLocs, + dictLocs: make([]uint64, 0), fieldFSTs: make(map[uint16]*vellum.FST), vecIndexCache: newVectorIndexCache(), } diff --git a/faiss_vector_cache.go b/faiss_vector_cache.go index fc2b798..f4aa608 100644 --- a/faiss_vector_cache.go +++ b/faiss_vector_cache.go @@ -74,8 +74,8 @@ func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap bool, entry, ok := vc.cache[fieldID] if ok { - vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except) index, vecDocIDMap, docVecIDMap = entry.load() + vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except) if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) { vc.m.RUnlock() return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil @@ -120,6 +120,19 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte, index *faiss.IndexImpl, vecDocIDMap map[int64]uint32, docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) { + // Handle concurrent accesses (to avoid unnecessary work) by adding a + // check within the write lock here. + entry := vc.cache[fieldID] + if entry != nil { + index, vecDocIDMap, docVecIDMap = entry.load() + vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except) + if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) { + return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil + } + docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry) + return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil + } + // if the cache doesn't have the entry, construct the vector to doc id map and // the vector index out of the mem bytes and update the cache under lock. pos := 0 diff --git a/merge.go b/merge.go index 490e9da..7d37dcd 100644 --- a/merge.go +++ b/merge.go @@ -73,7 +73,7 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat // wrap it for counting (tracking offsets) cr := NewCountHashWriterWithStatsReporter(br, s) - newDocNums, numDocs, storedIndexOffset, _, _, _, sectionsIndexOffset, err := + newDocNums, numDocs, storedIndexOffset, _, _, sectionsIndexOffset, err := MergeToWriter(segmentBases, drops, chunkMode, cr, closeCh) if err != nil { cleanup() @@ -111,7 +111,7 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) ( - newDocNums [][]uint64, numDocs, storedIndexOffset uint64, dictLocs []uint64, + newDocNums [][]uint64, numDocs, storedIndexOffset uint64, fieldsInv []string, fieldsMap map[string]uint16, sectionsIndexOffset uint64, err error) { @@ -122,7 +122,7 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, numDocs = computeNewDocCount(segments, drops) if isClosed(closeCh) { - return nil, 0, 0, nil, nil, nil, 0, seg.ErrClosed + return nil, 0, 0, nil, nil, 0, seg.ErrClosed } // the merge opaque is especially important when it comes to tracking the file @@ -140,7 +140,7 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops, fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh) if err != nil { - return nil, 0, 0, nil, nil, nil, 0, err + return nil, 0, 0, nil, nil, 0, err } // at this point, ask each section implementation to merge itself @@ -149,21 +149,19 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap, err = x.Merge(mergeOpaque, segments, drops, fieldsInv, newDocNums, cr, closeCh) if err != nil { - return nil, 0, 0, nil, nil, nil, 0, err + return nil, 0, 0, nil, nil, 0, err } } - } else { - dictLocs = make([]uint64, len(fieldsInv)) } // we can persist the fields section index now, this will point // to the various indexes (each in different section) available for a field. - sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, dictLocs, mergeOpaque) + sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, mergeOpaque) if err != nil { - return nil, 0, 0, nil, nil, nil, 0, err + return nil, 0, 0, nil, nil, 0, err } - return newDocNums, numDocs, storedIndexOffset, dictLocs, fieldsInv, fieldsMap, sectionsIndexOffset, nil + return newDocNums, numDocs, storedIndexOffset, fieldsInv, fieldsMap, sectionsIndexOffset, nil } // mapFields takes the fieldsInv list and returns a map of fieldName diff --git a/new.go b/new.go index 94079ea..f0d37c4 100644 --- a/new.go +++ b/new.go @@ -66,14 +66,13 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document, s.chunkMode = chunkMode s.w = NewCountHashWriter(&br) - storedIndexOffset, dictOffsets, sectionsIndexOffset, err := s.convert() + storedIndexOffset, sectionsIndexOffset, err := s.convert() if err != nil { return nil, uint64(0), err } sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode, - s.FieldsMap, s.FieldsInv, uint64(len(results)), - storedIndexOffset, dictOffsets, sectionsIndexOffset) + uint64(len(results)), storedIndexOffset, sectionsIndexOffset) // get the bytes written before the interim's reset() call // write it to the newly formed segment base. @@ -125,8 +124,10 @@ func (s *interim) reset() (err error) { s.results = nil s.chunkMode = 0 s.w = nil - s.FieldsMap = nil - s.FieldsInv = nil + for k := range s.FieldsMap { + delete(s.FieldsMap, k) + } + s.FieldsInv = s.FieldsInv[:0] s.metaBuf.Reset() s.tmp0 = s.tmp0[:0] s.tmp1 = s.tmp1[:0] @@ -168,8 +169,10 @@ type interimLoc struct { arrayposs []uint64 } -func (s *interim) convert() (uint64, []uint64, uint64, error) { - s.FieldsMap = map[string]uint16{} +func (s *interim) convert() (uint64, uint64, error) { + if s.FieldsMap == nil { + s.FieldsMap = map[string]uint16{} + } args := map[string]interface{}{ "results": s.results, @@ -209,17 +212,15 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) { storedIndexOffset, err := s.writeStoredFields() if err != nil { - return 0, nil, 0, err + return 0, 0, err } - var dictOffsets []uint64 - // we can persist the various sections at this point. // the rule of thumb here is that each section must persist field wise. for _, x := range segmentSections { _, err = x.Persist(s.opaque, s.w) if err != nil { - return 0, nil, 0, err + return 0, 0, err } } @@ -231,18 +232,14 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) { } } - if len(s.results) == 0 { - dictOffsets = make([]uint64, len(s.FieldsInv)) - } - // we can persist a new fields section here // this new fields section will point to the various indexes available - sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, dictOffsets, s.opaque) + sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, s.opaque) if err != nil { - return 0, nil, 0, err + return 0, 0, err } - return storedIndexOffset, dictOffsets, sectionsIndexOffset, nil + return storedIndexOffset, sectionsIndexOffset, nil } func (s *interim) getOrDefineField(fieldName string) int { diff --git a/segment.go b/segment.go index 8dce085..8780ead 100644 --- a/segment.go +++ b/segment.go @@ -326,7 +326,7 @@ func (s *SegmentBase) loadFieldsNew() error { if seek > uint64(len(s.mem)) { // handling a buffer overflow case. // a rare case where the backing buffer is not large enough to be read directly via - // a pos+binary.MaxVarinLen64 seek. For eg, this can happen when there is only + // a pos+binary.MaxVarintLen64 seek. For eg, this can happen when there is only // one field to be indexed in the entire batch of data and while writing out // these fields metadata, you write 1 + 8 bytes whereas the MaxVarintLen64 = 10. seek = uint64(len(s.mem)) @@ -342,7 +342,7 @@ func (s *SegmentBase) loadFieldsNew() error { // the following loop will be executed only once in the edge case pointed out above // since there is only field's offset store which occupies 8 bytes. // the pointer then seeks to a position preceding the sectionsIndexOffset, at - // which point the responbility of handling the out-of-bounds cases shifts to + // which point the responsibility of handling the out-of-bounds cases shifts to // the specific section's parsing logic. var fieldID uint64 for fieldID < numFields { @@ -867,15 +867,6 @@ func (s *SegmentBase) loadDvReaders() error { s.incrementBytesRead(read) - dataLoc, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64]) - if n <= 0 { - return fmt.Errorf("loadDvReaders: failed to read the dataLoc "+ - "offset for sectionID %v field %v", secID, s.fieldsInv[fieldID]) - } - if secID == SectionInvertedTextIndex { - s.dictLocs = append(s.dictLocs, dataLoc) - s.incrementBytesRead(uint64(n)) - } fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], fieldLocStart, fieldLocEnd) if err != nil { return err diff --git a/write.go b/write.go index 1906a9b..7b2c99e 100644 --- a/write.go +++ b/write.go @@ -50,7 +50,7 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer, return tw, nil } -func persistFieldsSection(fieldsInv []string, w *CountHashWriter, dictLocs []uint64, opaque map[int]resetable) (uint64, error) { +func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int]resetable) (uint64, error) { var rv uint64 fieldsOffsets := make([]uint64, 0, len(fieldsInv))