Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fixes and Optimizations #267

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions build.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,22 +157,20 @@ func persistStoredFieldValues(fieldID int,
return curr, data, nil
}

func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32,
fieldsMap map[string]uint16, fieldsInv []string, numDocs uint64,
storedIndexOffset uint64, dictLocs []uint64,
sectionsIndexOffset uint64) (*SegmentBase, error) {
func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64,
storedIndexOffset uint64, sectionsIndexOffset uint64) (*SegmentBase, error) {
sb := &SegmentBase{
mem: mem,
memCRC: memCRC,
chunkMode: chunkMode,
fieldsMap: fieldsMap,
fieldsMap: make(map[string]uint16),
numDocs: numDocs,
storedIndexOffset: storedIndexOffset,
fieldsIndexOffset: sectionsIndexOffset,
sectionsIndexOffset: sectionsIndexOffset,
fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)),
docValueOffset: 0, // docValueOffsets identified automatically by the section
dictLocs: dictLocs,
dictLocs: make([]uint64, 0),
fieldFSTs: make(map[uint16]*vellum.FST),
vecIndexCache: newVectorIndexCache(),
}
Expand Down
15 changes: 14 additions & 1 deletion faiss_vector_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (vc *vectorIndexCache) loadFromCache(fieldID uint16, loadDocVecIDMap bool,

entry, ok := vc.cache[fieldID]
if ok {
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
index, vecDocIDMap, docVecIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) {
vc.m.RUnlock()
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
Expand Down Expand Up @@ -120,6 +120,19 @@ func (vc *vectorIndexCache) createAndCacheLOCKED(fieldID uint16, mem []byte,
index *faiss.IndexImpl, vecDocIDMap map[int64]uint32,
docVecIDMap map[uint32][]int64, vecIDsToExclude []int64, err error) {

// Handle concurrent accesses (to avoid unnecessary work) by adding a
// check within the write lock here.
entry := vc.cache[fieldID]
if entry != nil {
index, vecDocIDMap, docVecIDMap = entry.load()
vecIDsToExclude = getVecIDsToExclude(vecDocIDMap, except)
if !loadDocVecIDMap || (loadDocVecIDMap && len(entry.docVecIDMap) > 0) {
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}
docVecIDMap = vc.addDocVecIDMapToCacheLOCKED(entry)
return index, vecDocIDMap, docVecIDMap, vecIDsToExclude, nil
}

// if the cache doesn't have the entry, construct the vector to doc id map and
// the vector index out of the mem bytes and update the cache under lock.
pos := 0
Expand Down
18 changes: 8 additions & 10 deletions merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat
// wrap it for counting (tracking offsets)
cr := NewCountHashWriterWithStatsReporter(br, s)

newDocNums, numDocs, storedIndexOffset, _, _, _, sectionsIndexOffset, err :=
newDocNums, numDocs, storedIndexOffset, _, _, sectionsIndexOffset, err :=
MergeToWriter(segmentBases, drops, chunkMode, cr, closeCh)
if err != nil {
cleanup()
Expand Down Expand Up @@ -111,7 +111,7 @@ func mergeSegmentBases(segmentBases []*SegmentBase, drops []*roaring.Bitmap, pat

func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
chunkMode uint32, cr *CountHashWriter, closeCh chan struct{}) (
newDocNums [][]uint64, numDocs, storedIndexOffset uint64, dictLocs []uint64,
newDocNums [][]uint64, numDocs, storedIndexOffset uint64,
fieldsInv []string, fieldsMap map[string]uint16, sectionsIndexOffset uint64,
err error) {

Expand All @@ -122,7 +122,7 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
numDocs = computeNewDocCount(segments, drops)

if isClosed(closeCh) {
return nil, 0, 0, nil, nil, nil, 0, seg.ErrClosed
return nil, 0, 0, nil, nil, 0, seg.ErrClosed
}

// the merge opaque is especially important when it comes to tracking the file
Expand All @@ -140,7 +140,7 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,
storedIndexOffset, newDocNums, err = mergeStoredAndRemap(segments, drops,
fieldsMap, fieldsInv, fieldsSame, numDocs, cr, closeCh)
if err != nil {
return nil, 0, 0, nil, nil, nil, 0, err
return nil, 0, 0, nil, nil, 0, err
}

// at this point, ask each section implementation to merge itself
Expand All @@ -149,21 +149,19 @@ func MergeToWriter(segments []*SegmentBase, drops []*roaring.Bitmap,

err = x.Merge(mergeOpaque, segments, drops, fieldsInv, newDocNums, cr, closeCh)
if err != nil {
return nil, 0, 0, nil, nil, nil, 0, err
return nil, 0, 0, nil, nil, 0, err
}
}
} else {
dictLocs = make([]uint64, len(fieldsInv))
}

// we can persist the fields section index now, this will point
// to the various indexes (each in different section) available for a field.
sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, dictLocs, mergeOpaque)
sectionsIndexOffset, err = persistFieldsSection(fieldsInv, cr, mergeOpaque)
if err != nil {
return nil, 0, 0, nil, nil, nil, 0, err
return nil, 0, 0, nil, nil, 0, err
}

return newDocNums, numDocs, storedIndexOffset, dictLocs, fieldsInv, fieldsMap, sectionsIndexOffset, nil
return newDocNums, numDocs, storedIndexOffset, fieldsInv, fieldsMap, sectionsIndexOffset, nil
}

// mapFields takes the fieldsInv list and returns a map of fieldName
Expand Down
33 changes: 15 additions & 18 deletions new.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@ func (*ZapPlugin) newWithChunkMode(results []index.Document,
s.chunkMode = chunkMode
s.w = NewCountHashWriter(&br)

storedIndexOffset, dictOffsets, sectionsIndexOffset, err := s.convert()
storedIndexOffset, sectionsIndexOffset, err := s.convert()
if err != nil {
return nil, uint64(0), err
}

sb, err := InitSegmentBase(br.Bytes(), s.w.Sum32(), chunkMode,
s.FieldsMap, s.FieldsInv, uint64(len(results)),
storedIndexOffset, dictOffsets, sectionsIndexOffset)
uint64(len(results)), storedIndexOffset, sectionsIndexOffset)

// get the bytes written before the interim's reset() call
// write it to the newly formed segment base.
Expand Down Expand Up @@ -125,8 +124,10 @@ func (s *interim) reset() (err error) {
s.results = nil
s.chunkMode = 0
s.w = nil
s.FieldsMap = nil
s.FieldsInv = nil
for k := range s.FieldsMap {
delete(s.FieldsMap, k)
}
s.FieldsInv = s.FieldsInv[:0]
s.metaBuf.Reset()
s.tmp0 = s.tmp0[:0]
s.tmp1 = s.tmp1[:0]
Expand Down Expand Up @@ -168,8 +169,10 @@ type interimLoc struct {
arrayposs []uint64
}

func (s *interim) convert() (uint64, []uint64, uint64, error) {
s.FieldsMap = map[string]uint16{}
func (s *interim) convert() (uint64, uint64, error) {
if s.FieldsMap == nil {
s.FieldsMap = map[string]uint16{}
}

args := map[string]interface{}{
"results": s.results,
Expand Down Expand Up @@ -209,17 +212,15 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) {

storedIndexOffset, err := s.writeStoredFields()
if err != nil {
return 0, nil, 0, err
return 0, 0, err
}

var dictOffsets []uint64

// we can persist the various sections at this point.
// the rule of thumb here is that each section must persist field wise.
for _, x := range segmentSections {
_, err = x.Persist(s.opaque, s.w)
if err != nil {
return 0, nil, 0, err
return 0, 0, err
}
}

Expand All @@ -231,18 +232,14 @@ func (s *interim) convert() (uint64, []uint64, uint64, error) {
}
}

if len(s.results) == 0 {
dictOffsets = make([]uint64, len(s.FieldsInv))
}

// we can persist a new fields section here
// this new fields section will point to the various indexes available
sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, dictOffsets, s.opaque)
sectionsIndexOffset, err := persistFieldsSection(s.FieldsInv, s.w, s.opaque)
if err != nil {
return 0, nil, 0, err
return 0, 0, err
}

return storedIndexOffset, dictOffsets, sectionsIndexOffset, nil
return storedIndexOffset, sectionsIndexOffset, nil
}

func (s *interim) getOrDefineField(fieldName string) int {
Expand Down
13 changes: 2 additions & 11 deletions segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (s *SegmentBase) loadFieldsNew() error {
if seek > uint64(len(s.mem)) {
// handling a buffer overflow case.
// a rare case where the backing buffer is not large enough to be read directly via
// a pos+binary.MaxVarinLen64 seek. For eg, this can happen when there is only
// a pos+binary.MaxVarintLen64 seek. For eg, this can happen when there is only
// one field to be indexed in the entire batch of data and while writing out
// these fields metadata, you write 1 + 8 bytes whereas the MaxVarintLen64 = 10.
seek = uint64(len(s.mem))
Expand All @@ -342,7 +342,7 @@ func (s *SegmentBase) loadFieldsNew() error {
// the following loop will be executed only once in the edge case pointed out above
// since there is only field's offset store which occupies 8 bytes.
// the pointer then seeks to a position preceding the sectionsIndexOffset, at
// which point the responbility of handling the out-of-bounds cases shifts to
// which point the responsibility of handling the out-of-bounds cases shifts to
// the specific section's parsing logic.
var fieldID uint64
for fieldID < numFields {
Expand Down Expand Up @@ -867,15 +867,6 @@ func (s *SegmentBase) loadDvReaders() error {

s.incrementBytesRead(read)

dataLoc, n := binary.Uvarint(s.mem[pos : pos+binary.MaxVarintLen64])
if n <= 0 {
return fmt.Errorf("loadDvReaders: failed to read the dataLoc "+
"offset for sectionID %v field %v", secID, s.fieldsInv[fieldID])
}
if secID == SectionInvertedTextIndex {
s.dictLocs = append(s.dictLocs, dataLoc)
s.incrementBytesRead(uint64(n))
}
fieldDvReader, err := s.loadFieldDocValueReader(s.fieldsInv[fieldID], fieldLocStart, fieldLocEnd)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion write.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func writeRoaringWithLen(r *roaring.Bitmap, w io.Writer,
return tw, nil
}

func persistFieldsSection(fieldsInv []string, w *CountHashWriter, dictLocs []uint64, opaque map[int]resetable) (uint64, error) {
func persistFieldsSection(fieldsInv []string, w *CountHashWriter, opaque map[int]resetable) (uint64, error) {
var rv uint64
fieldsOffsets := make([]uint64, 0, len(fieldsInv))

Expand Down
Loading