Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Regression in cohere-10m force merge latency after switching to NativeEngines990KnnVectorsWriter #2134

Open
shatejas opened this issue Sep 20, 2024 · 10 comments
Assignees
Labels
bug Something isn't working

Comments

@shatejas
Copy link
Collaborator

What is the bug?

After the switch to NativeEngines990KnnVectorsWriter we saw force merge latencies increased approximately by 20% in nightly runs

The increase has been consistent

How can one reproduce the bug?
Running a benchmark for force-merge against 2.17 vs 2.16 with cohere 10m dataset takes 2000seconds (~30 mins) more

What is the expected behavior?
It should take the same time or less

What is your host/environment?
Nightlies dashboard

Do you have any screenshots?
NA

Do you have any additional context?
NA

@shatejas shatejas added bug Something isn't working untriaged labels Sep 20, 2024
@shatejas
Copy link
Collaborator Author

shatejas commented Sep 20, 2024

To reproduce easily cohere 1m dataset was used for benchmarking for the below table
 

  number of index segments force merge time (minutes) force merge segments
2.17 code 75 15.68263 3
#2133 code 92 12.51252 3
2.16 code (minor code change to mimic it) 88 15.68209 3

Estimated bottlenecks

KNNVectorValues Creation

KNNVectorValues are created 3 times currently, we cannot reuse the same object as there is no way we could reset the iterator and putting effort into logic for resetting the iterator might not result in latency improvements

  1. Computing totalLiveDocs
  2. Training for quantization
  3. Building index

Currently we are creating KNNVectorValues when quantization is not needed. Exp 2 in the above table shows some improvement in force merge time

TotalLiveDocs computes

There is a linear time complexity to compute total live docs. TotalLiveDocs value is currently needed to

  1. Mean calculations during quantization training
  2. Memory allocation computations while building graph for HNSW

Flush case

For flush we can avoid this calculation as there are no deleted docs involved and we can rely on KNNVectorValues or vectors in the field to give us the right result for totalLiveDocs

Merge case

Merge involves removing deleted docs, While merging the segments the deleted docs aren’t considered. To do that current code path is using APIs in MergedVectorValues to have an iterator that can iterate while skipping the deleted docs. The APIs here does not give an iterator which considered deleted docs in its size count. As a result even KNNVectorValues cannot return the right result as it relies on the iterator provided by the MergedVectorValues to compute total live docs

@navneet1v
Copy link
Collaborator

@shatejas one way to avoid the linear complexity for totalLives does when there are no deleted docs is we can write our custom FloatVectorValues merger. We already have something like this in BinaryDocValues: ref: https://github.com/opensearch-project/k-NN/blob/main/src/main/java/org/opensearch/knn/index/codec/KNN80Codec/KNN80DocValuesReader.java#L38-L64

If we do this, we can remove the complexity for total live docs. We can also add some clone interfaces on those merged values which will help us remove complexity from code.

@jmazanec15
Copy link
Member

@shatejas The table doesnt show repro for 2.17 vs 2.16 right?

@shatejas
Copy link
Collaborator Author

shatejas commented Sep 23, 2024

@jmazanec15 So first row is 2.17 but on main branch as the code path is the same. To mimic 2.16 code path this change was made while running the bench mark

@jmazanec15
Copy link
Member

@shatejas but isnt 2.17 time same as 2.16 - so can we not repro it with the setup?

@shatejas
Copy link
Collaborator Author

shatejas commented Sep 23, 2024

@shatejas but isnt 2.17 time same as 2.16 - so can we not repro it with the setup?

Not exactly same, the number of segments being merged is ~15% higher for 2.16 compared to 2.17 so there is some difference

@navneet1v
Copy link
Collaborator

I did an initial deep-dive into the issue and ran some benchmarks. Here is the deep-dive results. I used https://github.com/opensearch-project/opensearch-cluster-cdk to setup all the cluster. This setup mimics the configuration for 10M workload testing in nightlies. Ref: https://opensearch.org/benchmarks/

Cluster Configuration

Key Value
DataNode Count 3
DataNode Type r6g.4xlarge
Dataset cohere10m
Dimension 768
primaries 6
replicas 1

Attempt 1

I added some logs around the code to know where the time is going in the merge in this attempt. Also during the merge I started doing hot_threads on the node to see where the threads are busy. Here is the output.

curl localhost:9200/_nodes/hot_threads

::: {ip-10-0-5-186.us-west-2.compute.internal}{jF730SGJT_q8mGGvXTZNRg}{McRJZa8aRGOOF-Plbsstjw}{10.0.5.186}{10.0.5.186:9300}{di}{shard_indexing_pressure_enabled=true}
   Hot threads at 2024-09-29T03:42:59.212Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

    1.5% (7.3ms out of 500ms) cpu usage by thread 'opensearch[ip-10-0-5-186.us-west-2.compute.internal][[target_index][2]: Lucene Merge Thread #81]'
     10/10 snapshots sharing following 19 elements
       app//org.apache.lucene.store.BufferedChecksumIndexInput.readBytes(BufferedChecksumIndexInput.java:46)
       app//org.apache.lucene.store.DataInput.readBytes(DataInput.java:73)
       app//org.apache.lucene.store.ChecksumIndexInput.skipByReading(ChecksumIndexInput.java:79)
       app//org.apache.lucene.store.ChecksumIndexInput.seek(ChecksumIndexInput.java:64)
       app//org.apache.lucene.codecs.CodecUtil.checksumEntireFile(CodecUtil.java:619)
       app//org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsReader.checkIntegrity(Lucene99FlatVectorsReader.java:228)
       org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsReader.checkIntegrity(NativeEngines990KnnVectorsReader.java:61)
       app//org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat$FieldsReader.checkIntegrity(PerFieldKnnVectorsFormat.java:248)
       app//org.apache.lucene.codecs.KnnVectorsWriter.merge(KnnVectorsWriter.java:89)
       app//org.apache.lucene.index.SegmentMerger.mergeVectorValues(SegmentMerger.java:257)
       app//org.apache.lucene.index.SegmentMerger$$Lambda/0x000000c801e2abe0.merge(Unknown Source)
       app//org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:300)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:151)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:5293)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4761)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6582)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:660)
       app//org.opensearch.index.engine.OpenSearchConcurrentMergeScheduler.doMerge(OpenSearchConcurrentMergeScheduler.java:120)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:721)

::: {ip-10-0-4-149.us-west-2.compute.internal}{elFwVckQQoGUpl_IhGsGDQ}{TkXjIyiVTL6dHwyv8k8dRg}{10.0.4.149}{10.0.4.149:9300}{m}{shard_indexing_pressure_enabled=true}
   Hot threads at 2024-09-29T03:42:59.212Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:


-------------------------------------------------------------
When read metrics is flattened


[ec2-user@ip-10-0-5-186 opensearch]$ curl localhost:9200/_nodes/hot_threads
::: {ip-10-0-5-186.us-west-2.compute.internal}{jF730SGJT_q8mGGvXTZNRg}{McRJZa8aRGOOF-Plbsstjw}{10.0.5.186}{10.0.5.186:9300}{di}{shard_indexing_pressure_enabled=true}
   Hot threads at 2024-09-29T03:43:27.483Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

    1.2% (5.8ms out of 500ms) cpu usage by thread 'opensearch[ip-10-0-5-186.us-west-2.compute.internal][[target_index][2]: Lucene Merge Thread #81]'
     10/10 snapshots sharing following 19 elements
       app//org.apache.lucene.store.BufferedChecksumIndexInput.readBytes(BufferedChecksumIndexInput.java:46)
       app//org.apache.lucene.store.DataInput.readBytes(DataInput.java:73)
       app//org.apache.lucene.store.ChecksumIndexInput.skipByReading(ChecksumIndexInput.java:79)
       app//org.apache.lucene.store.ChecksumIndexInput.seek(ChecksumIndexInput.java:64)
       app//org.apache.lucene.codecs.CodecUtil.checksumEntireFile(CodecUtil.java:619)
       app//org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsReader.checkIntegrity(Lucene99FlatVectorsReader.java:228)
       org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsReader.checkIntegrity(NativeEngines990KnnVectorsReader.java:61)
       app//org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat$FieldsReader.checkIntegrity(PerFieldKnnVectorsFormat.java:248)
       app//org.apache.lucene.codecs.KnnVectorsWriter.merge(KnnVectorsWriter.java:89)
       app//org.apache.lucene.index.SegmentMerger.mergeVectorValues(SegmentMerger.java:257)
       app//org.apache.lucene.index.SegmentMerger$$Lambda/0x000000c801e2abe0.merge(Unknown Source)
       app//org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:300)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:151)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:5293)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4761)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6582)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:660)
       app//org.opensearch.index.engine.OpenSearchConcurrentMergeScheduler.doMerge(OpenSearchConcurrentMergeScheduler.java:120)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:721)

::: {ip-10-0-4-149.us-west-2.compute.internal}{elFwVckQQoGUpl_IhGsGDQ}{TkXjIyiVTL6dHwyv8k8dRg}{10.0.4.149}{10.0.4.149:9300}{m}{shard_indexing_pressure_enabled=true}
   Hot threads at 2024-09-29T03:43:27.483Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

From logs I was able to see the logs which I added.

[2024-09-29T03:45:41,930][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-186.us-west-2.compute.internal] [Merge]: Reading and writing merged FlatVector Took : 4547 ms
[2024-09-29T03:45:41,930][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-186.us-west-2.compute.internal] [Merge] Reading live docs 1666071 took with merging vector values : 0 ms

Hot threads when CPU starts to flatten

curl localhost:9200/_nodes/hot_threads

::: {ip-10-0-5-186.us-west-2.compute.internal}{jF730SGJT_q8mGGvXTZNRg}{McRJZa8aRGOOF-Plbsstjw}{10.0.5.186}{10.0.5.186:9300}{di}{shard_indexing_pressure_enabled=true}
   Hot threads at 2024-09-29T03:57:28.666Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

   100.3% (501.4ms out of 500ms) cpu usage by thread 'opensearch[ip-10-0-5-186.us-west-2.compute.internal][[target_index][2]: Lucene Merge Thread #81]'
     10/10 snapshots sharing following 22 elements
       org.opensearch.knn.jni.FaissService.insertToIndex(Native Method)
       org.opensearch.knn.jni.JNIService.insertToIndex(JNIService.java:81)
       org.opensearch.knn.index.codec.nativeindex.MemOptimizedNativeIndexBuildStrategy.lambda$buildAndWriteIndex$1(MemOptimizedNativeIndexBuildStrategy.java:91)
       org.opensearch.knn.index.codec.nativeindex.MemOptimizedNativeIndexBuildStrategy$$Lambda/0x000000c801e0a218.run(Unknown Source)
       [email protected]/java.security.AccessController.executePrivileged(AccessController.java:778)
       [email protected]/java.security.AccessController.doPrivileged(AccessController.java:319)
       org.opensearch.knn.index.codec.nativeindex.MemOptimizedNativeIndexBuildStrategy.buildAndWriteIndex(MemOptimizedNativeIndexBuildStrategy.java:90)
       org.opensearch.knn.index.codec.nativeindex.NativeIndexWriter.buildAndWriteIndex(NativeIndexWriter.java:154)
       org.opensearch.knn.index.codec.nativeindex.NativeIndexWriter.mergeIndex(NativeIndexWriter.java:130)
       org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsWriter.mergeOneField(NativeEngines990KnnVectorsWriter.java:130)
       app//org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat$FieldsWriter.mergeOneField(PerFieldKnnVectorsFormat.java:121)
       app//org.apache.lucene.codecs.KnnVectorsWriter.merge(KnnVectorsWriter.java:99)
       app//org.apache.lucene.index.SegmentMerger.mergeVectorValues(SegmentMerger.java:257)
       app//org.apache.lucene.index.SegmentMerger$$Lambda/0x000000c801e2abe0.merge(Unknown Source)
       app//org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:300)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:151)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:5293)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4761)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6582)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:660)
       app//org.opensearch.index.engine.OpenSearchConcurrentMergeScheduler.doMerge(OpenSearchConcurrentMergeScheduler.java:120)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:721)

::: {ip-10-0-4-149.us-west-2.compute.internal}{elFwVckQQoGUpl_IhGsGDQ}{TkXjIyiVTL6dHwyv8k8dRg}{10.0.4.149}{10.0.4.149:9300}{m}{shard_indexing_pressure_enabled=true}
   Hot threads at 2024-09-29T03:57:28.667Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

Logs when force merge on 1 shard is done

[2024-09-29T03:45:41,930][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-186.us-west-2.compute.internal] [Merge]: Reading and writing merged FlatVector Took : 4547 ms
[2024-09-29T03:45:41,930][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-186.us-west-2.compute.internal] [Merge] Reading live docs 1666071 took with merging vector values : 0 ms
[2024-09-29T04:51:14,274][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-186.us-west-2.compute.internal] [Merge] Graph Build took 3932343 ms for vector field [target_field] for docs: 1666071
[2024-09-29T05:06:33,243][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-186.us-west-2.compute.internal] [Merge]: Reading and writing merged FlatVector Took : 4441 ms
[2024-09-29T05:06:33,244][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-186.us-west-2.compute.internal] [Merge] Reading live docs 1667528 took with merging vector values : 0 ms

OSB results

Metric Task Value Unit
Cumulative indexing time of primary shards 240.428 min
Min cumulative indexing time across primary shards 0.000133333 min
Median cumulative indexing time across primary shards 39.9339 min
Max cumulative indexing time across primary shards 41.2076 min
Cumulative indexing throttle time of primary shards 0 min
Min cumulative indexing throttle time across primary shards 0 min
Median cumulative indexing throttle time across primary shards 0 min
Max cumulative indexing throttle time across primary shards 0 min
Cumulative merge time of primary shards 1504.64 min
Cumulative merge count of primary shards 471
Min cumulative merge time across primary shards 0 min
Median cumulative merge time across primary shards 245.639 min
Max cumulative merge time across primary shards 263.798 min
Cumulative merge throttle time of primary shards 339.274 min
Min cumulative merge throttle time across primary shards 0 min
Median cumulative merge throttle time across primary shards 59.5843 min
Max cumulative merge throttle time across primary shards 62.9678 min
Cumulative refresh time of primary shards 8.43917 min
Cumulative refresh count of primary shards 570
Min cumulative refresh time across primary shards 0.000333333 min
Median cumulative refresh time across primary shards 1.37143 min
Max cumulative refresh time across primary shards 1.55967 min
Cumulative flush time of primary shards 128.082 min
Cumulative flush count of primary shards 370
Min cumulative flush time across primary shards 0 min
Median cumulative flush time across primary shards 21.4153 min
Max cumulative flush time across primary shards 21.9047 min
Total Young Gen GC time 0.125 s
Total Young Gen GC count 11
Total Old Gen GC time 0 s
Total Old Gen GC count 0
Store size 341.19 GB
Translog size 1.19954e-06 GB
Heap used for segments 0 MB
Heap used for doc values 0 MB
Heap used for terms 0 MB
Heap used for norms 0 MB
Heap used for points 0 MB
Heap used for stored fields 0 MB
Segment count 7
Min Throughput force-merge-segments 0 ops/s
Mean Throughput force-merge-segments 0 ops/s
Median Throughput force-merge-segments 0 ops/s
Max Throughput force-merge-segments 0 ops/s
100th percentile latency force-merge-segments 1.88385e+07 ms
100th percentile service time force-merge-segments 1.88385e+07 ms
error rate force-merge-segments 0 %

Hypothesis

  1. The time to merge and write the flat vectors is 4547 ms for like 10M/6 = 1.6M docs
  2. Time to calculate live docs with all @shatejas changes: 0ms
[2024-09-29T03:45:41,930][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-186.us-west-2.compute.internal] [Merge]: Reading and writing merged FlatVector Took : 4547 ms
[2024-09-29T03:45:41,930][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-186.us-west-2.compute.internal] [Merge] Reading live docs 1666071 took with merging vector values : 0 ms

Screenshot 2024-10-15 at 6 07 03 PM

  1. The flat line before the start of the force merge(CPU going to 11%), is happening because of the IOs. We can see that green line is quite high. So we are reading something there. Need to add more logs to find out what is happening there. Next set of experiments will focus on who is doing these IOs.

Attempt 2

This time I used the same cluster and did hot threads call quite aggressively, and I was able to find out that there is a checksum validation being called before every merge starts. Here is the dump.

::: {ip-10-0-3-184.us-west-2.compute.internal}{XqmqM2bqQO-tbB7_QWXjUQ}{XUh8XNv9TJioeHaiyWZVtw}{10.0.3.184}{10.0.3.184:9300}{di}{shard_indexing_pressure_enabled=true}
   Hot threads at 2024-09-29T06:47:38.573Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

    3.2% (15.8ms out of 500ms) cpu usage by thread 'opensearch[ip-10-0-3-184.us-west-2.compute.internal][[target_index][2]: Lucene Merge Thread #0]'     10/10 snapshots sharing following 17 elements
       app//org.apache.lucene.store.BufferedChecksumIndexInput.readBytes(BufferedChecksumIndexInput.java:46)
       app//org.apache.lucene.store.DataInput.readBytes(DataInput.java:73)
       app//org.apache.lucene.store.ChecksumIndexInput.skipByReading(ChecksumIndexInput.java:79)       app//org.apache.lucene.store.ChecksumIndexInput.seek(ChecksumIndexInput.java:64)       app//org.apache.lucene.codecs.CodecUtil.checksumEntireFile(CodecUtil.java:619)
       app//org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsReader.checkIntegrity(Lucene90CompressingStoredFieldsReader.java:725)
       app//org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsWriter.merge(Lucene90CompressingStoredFieldsWriter.java:609)
       app//org.apache.lucene.index.SegmentMerger.mergeFields(SegmentMerger.java:236)
       app//org.apache.lucene.index.SegmentMerger$$Lambda/0x0000000801e55e50.merge(Unknown Source)
       app//org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:275)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:112)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:5293)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4761)       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6582)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:660)
       app//org.opensearch.index.engine.OpenSearchConcurrentMergeScheduler.doMerge(OpenSearchConcurrentMergeScheduler.java:120)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:721)

# Checking integrity

[ec2-user@ip-10-0-3-184 opensearch]$ curl localhost:9200/_nodes/ip-10-0-3-184.us-west-2.compute.internal/hot_threads
::: {ip-10-0-3-184.us-west-2.compute.internal}{XqmqM2bqQO-tbB7_QWXjUQ}{XUh8XNv9TJioeHaiyWZVtw}{10.0.3.184}{10.0.3.184:9300}{di}{shard_indexing_pressure_enabled=true}
   Hot threads at 2024-09-29T06:49:12.630Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

    0.2% (829.4micros out of 500ms) cpu usage by thread 'opensearch[ip-10-0-3-184.us-west-2.compute.internal][[target_index][2]: Lucene Merge Thread #0]'
     10/10 snapshots sharing following 19 elements
       app//org.apache.lucene.store.BufferedChecksumIndexInput.readBytes(BufferedChecksumIndexInput.java:46)
       app//org.apache.lucene.store.DataInput.readBytes(DataInput.java:73)
       app//org.apache.lucene.store.ChecksumIndexInput.skipByReading(ChecksumIndexInput.java:79)
       app//org.apache.lucene.store.ChecksumIndexInput.seek(ChecksumIndexInput.java:64)
       app//org.apache.lucene.codecs.CodecUtil.checksumEntireFile(CodecUtil.java:619)
       app//org.apache.lucene.codecs.lucene99.Lucene99FlatVectorsReader.checkIntegrity(Lucene99FlatVectorsReader.java:228)
       org.opensearch.knn.index.codec.KNN990Codec.NativeEngines990KnnVectorsReader.checkIntegrity(NativeEngines990KnnVectorsReader.java:61)
       app//org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat$FieldsReader.checkIntegrity(PerFieldKnnVectorsFormat.java:248)
       app//org.apache.lucene.codecs.KnnVectorsWriter.merge(KnnVectorsWriter.java:89)
       app//org.apache.lucene.index.SegmentMerger.mergeVectorValues(SegmentMerger.java:257)
       app//org.apache.lucene.index.SegmentMerger$$Lambda/0x0000000801f92cf0.merge(Unknown Source)
       app//org.apache.lucene.index.SegmentMerger.mergeWithLogging(SegmentMerger.java:300)
       app//org.apache.lucene.index.SegmentMerger.merge(SegmentMerger.java:151)
       app//org.apache.lucene.index.IndexWriter.mergeMiddle(IndexWriter.java:5293)
       app//org.apache.lucene.index.IndexWriter.merge(IndexWriter.java:4761)
       app//org.apache.lucene.index.IndexWriter$IndexWriterMergeSource.merge(IndexWriter.java:6582)
       app//org.apache.lucene.index.ConcurrentMergeScheduler.doMerge(ConcurrentMergeScheduler.java:660)
       app//org.opensearch.index.engine.OpenSearchConcurrentMergeScheduler.doMerge(OpenSearchConcurrentMergeScheduler.java:120)
       app//org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread.run(ConcurrentMergeScheduler.java:721)

Observation

  1. Since the checksum needs to read the whole file sequentially it is very evident that Reads will increase. Need to see why this was not happening older version of Opensearch.

Will do some code deep-dive for this.

@navneet1v
Copy link
Collaborator

navneet1v commented Oct 16, 2024

Continuing my deep-dive from above, what I found is file containing flat vectors is opened via IOContext as RANDOM vs dvd files are getting opened as READ. This could be a potential reason for high latency. A simple check to confirm this is switching the IOContext from Random to READONCE which can help.

Ref this to know about RANDOM and READ IOContext of Lucene which is mapped as madvise arguments which is used to adivse Operating system on how to mmap a file.

MADV_RANDOM
Expect page references in random order. (Hence, read ahead may be less useful than normally.)
MADV_SEQUENTIAL
Expect page references in sequential order. (Hence, pages in the given range can be aggressively read ahead, and may be freed soon after they are accessed.)

Attempt 3

Cluster Configuration

Key Value
DataNode Count 3
DataNode Type r6g.4xlarge
Dataset cohere10m
Dimension 768
primaries 6
replicas 1

Now to test that RANDOM is the culprit here I build a fresh version of Lucene, opensearch and k-NN from source by making my changes. Below are the steps I followed:
Building Lucene and Tar

  1. Checked out lucene branch_9_11 and then reverted to the release commit of lucene. : https://github.com/apache/lucene/releases/tag/releases%2Flucene%2F9.11.1
  2. Added my changes to lucene and pushed to my forked repo of lucene. navneet1v/lucene@cd02e6f
  3. Checked out lucene in the docker where I am building Opensearch and k-NN.
    1. ran below command on lucene to create the lucene version in local maven repo.

./gradlew mavenToLocal -Dversion.release=9.11.1

  1. Ensured that -Drepos.mavenLocal=true is added in here: https://github.com/opensearch-project/opensearch-build/blob/main/scripts/components/OpenSearch/build.sh at right places(aka any line where you see ./gradlew) to ensure that build is happening with local repo of lucene.
  2. Now run the builds to generate the tar, I used https://github.com/opensearch-project/opensearch-build repo.
  3. for this test the tar : https://github.com/navneet1v/k-NN/releases/tag/merge-time-fix-lucene-iocontext

Once the tar was build and deployed I ran the indexing and force merge and got these logs.

[2024-10-01T07:30:11,555][INFO ][o.o.i.r.RecoverySourceHandler] [ip-10-0-5-179.us-west-2.compute.internal] [target_index][2][recover to ip-10-0-4-25.us-west-2.compute.internal] finalizing recovery took [43.7ms]
[2024-10-01T07:49:39,343][INFO ][o.a.l.c.p.PerFieldKnnVectorsFormat$FieldsWriter] [ip-10-0-5-179.us-west-2.compute.internal] [Merge]: Total Time for doing Integrity check is : 838 ms
[2024-10-01T07:50:14,340][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-179.us-west-2.compute.internal] [Merge]: Reading and writing merged FlatVector Took : 34991 ms
[2024-10-01T07:50:14,352][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-179.us-west-2.compute.internal] [Merge] Reading live docs 1665540 took with merging vector values : 6 ms

OSB results

|                                                         Metric |                 Task |       Value |   Unit |                                                                                                   
|---------------------------------------------------------------:|---------------------:|------------:|-------:|                                                                                                   
|                     Cumulative indexing time of primary shards |                      | 0.000383333 |    min |                                                                                                   
|             Min cumulative indexing time across primary shards |                      |           0 |    min |                                                                                                   
|          Median cumulative indexing time across primary shards |                      |           0 |    min |                                                                                                   
|             Max cumulative indexing time across primary shards |                      | 0.000383333 |    min |                                                                                                   
|            Cumulative indexing throttle time of primary shards |                      |           0 |    min |                                                                                                   
|    Min cumulative indexing throttle time across primary shards |                      |           0 |    min |                                                                                                   
| Median cumulative indexing throttle time across primary shards |                      |           0 |    min | 
|                        Cumulative merge time of primary shards |                      |     427.972 |    min |
|                       Cumulative merge count of primary shards |                      |           6 |        |
|                Min cumulative merge time across primary shards |                      |           0 |    min |
|             Median cumulative merge time across primary shards |                      |     70.6227 |    min |
|                Max cumulative merge time across primary shards |                      |     73.9608 |    min |
|               Cumulative merge throttle time of primary shards |                      |           0 |    min |
|       Min cumulative merge throttle time across primary shards |                      |           0 |    min |
|    Median cumulative merge throttle time across primary shards |                      |           0 |    min |
|       Max cumulative merge throttle time across primary shards |                      |           0 |    min |
|                      Cumulative refresh time of primary shards |                      | 0.000883333 |    min |
|                     Cumulative refresh count of primary shards |                      |          66 |        |
|              Min cumulative refresh time across primary shards |                      | 6.66667e-05 |    min |
|           Median cumulative refresh time across primary shards |                      | 6.66667e-05 |    min |
|              Max cumulative refresh time across primary shards |                      | 0.000466667 |    min |
|                        Cumulative flush time of primary shards |                      |           0 |    min |
|                       Cumulative flush count of primary shards |                      |           6 |        |
|                Min cumulative flush time across primary shards |                      |           0 |    min |
|             Median cumulative flush time across primary shards |                      |           0 |    min |
|                Max cumulative flush time across primary shards |                      |           0 |    min |
|                                        Total Young Gen GC time |                      |       0.176 |      s |
|                                       Total Young Gen GC count |                      |          11 |        |
|                                          Total Old Gen GC time |                      |           0 |      s |
|                                         Total Old Gen GC count |                      |           0 |        |
|                                                     Store size |                      |      341.19 |     GB |
|                                                  Translog size |                      | 1.19954e-06 |     GB |
|                                         Heap used for segments |                      |           0 |     MB |
|                                       Heap used for doc values |                      |           0 |     MB |
|                                            Heap used for terms |                      |           0 |     MB |
|                                            Heap used for norms |                      |           0 |     MB |
|                                           Heap used for points |                      |           0 |     MB |
|                                    Heap used for stored fields |                      |           0 |     MB |
|                                                  Segment count |                      |           7 |        |
|                                                 Min Throughput | force-merge-segments |           0 |  ops/s |
|                                                Mean Throughput | force-merge-segments |           0 |  ops/s |
|                                              Median Throughput | force-merge-segments |           0 |  ops/s |
|                                                 Max Throughput | force-merge-segments |           0 |  ops/s |
|                                       100th percentile latency | force-merge-segments | 1.74575e+07 |     ms |
|                                  100th percentile service time | force-merge-segments | 1.74575e+07 |     ms |
|                                                     error rate | force-merge-segments |           0 |      % |

Conclusion

After making the IOContext to READ from Random there is no flat line for reads that is happening. To compare with older runs where we were taking like ~10min before for merging the flat vectors and doing the integrity checks the time has come down to ~ 2.5min.

Ref this to know about RANDOM and READ IOContext of Lucene which is mapped as madvise arguments which is used to adivse Operating system on how to mmap a file.

MADV_RANDOM
Expect page references in random order. (Hence, read ahead may be less useful than normally.)
MADV_SEQUENTIAL
Expect page references in sequential order. (Hence, pages in the given range can be aggressively read ahead, and may be freed soon after they are accessed.)

Code change I did: navneet1v/lucene@cd02e6f
Tar: https://github.com/navneet1v/k-NN/releases/tag/merge-time-fix-lucene-iocontext

Force merge started on 2024-10-01T07:48:00

[2024-10-01T07:49:39,343][INFO ][o.a.l.c.p.PerFieldKnnVectorsFormat$FieldsWriter] [ip-10-0-5-179.us-west-2.compute.internal] [Merge]: Total Time for doing Integrity check is : 838
[2024-10-01T07:50:14,340][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-179.us-west-2.compute.internal] [Merge]: Reading and writing merged FlatVector Took : 34991 ms
[2024-10-01T07:50:14,352][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-179.us-west-2.compute.internal] [Merge] Reading live docs 1665540 took with merging vector values : 6 ms
[2024-10-01T09:01:11,686][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-179.us-west-2.compute.internal] [Merge] Graph Build took 4257331 ms for vector field [target_field] for docs: 1665540
[2024-10-01T09:01:12,483][INFO ][o.a.l.c.l.Lucene99FlatVectorsReader] [ip-10-0-5-179.us-west-2.compute.internal] Opening vec file with IOContext has : IOContext [context=READ, mergeInfo=null, flushInfo=null, readOnce=false, load=false, randomAccess=false]
[2024-10-01T09:05:03,774][INFO ][o.a.l.c.p.PerFieldKnnVectorsFormat$FieldsWriter] [ip-10-0-5-179.us-west-2.compute.internal] [Merge]: Total Time for doing Integrity check is : 127443
[2024-10-01T09:05:08,091][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-179.us-west-2.compute.internal] [Merge]: Reading and writing merged FlatVector Took : 4315 ms
[2024-10-01T09:05:08,091][INFO ][o.o.k.i.c.K.NativeEngines990KnnVectorsWriter] [ip-10-0-5-179.us-west-2.compute.internal] [Merge] Reading live docs 1667998 took with merging vector values : 0 ms

The metrics for the node.

Screenshot 2024-10-15 at 6 25 52 PM

So finally total merge time with final attempt 3 has come down from 5.23hrs(deep-dive 1) to ~4.8hrs(deep-dive3) which has a diff of > 20mins which was actual degradation.

Open Questions

  1. How does the change of IOContext to RANDOM impacts Lucene Engine?

  2. What solution we should move towards now for fixing this regression?

@shatejas
Copy link
Collaborator Author

shatejas commented Oct 17, 2024

Thanks for the deep dive @navneet1v

How does the change of IOContext to RANDOM impacts Lucene Engine?

Based on the code deep dive, IOContext is used to advise the kernel about the access pattern of the data for memory-mapped files. This advise is helpful so kernel can optimize disk reads. IOContext translates into madvise system call which is used to advise the kernel (ref1 - lucene 9.11, ref2 - lucene 10). The default behavior of the kernel (without any madvise call) seems to preload pages in a non-aggressive way. This preloading of data is asynchronous.

Lucene engine depends on FlatVectorFormat to do its HNSW searches, where it seeks to the position of the vector based on the closest entry point. This access pattern is not sequential and opening the indexInput with IOContext.RANDOM gives a random access pattern advise to the kernel, as a response kernel does not preload data into the physical memory. This makes sense as it not only saves CPU resources since its not spending extra cycles on preloading data but also is seen to improve search latency.

Merges are different compared to searches. While merging data, all segments and its pages are accessed sequentially from the disk. In this case its beneficial to preload pages of the segments since it reduces runtime reads from the disk, which decreases the total time taken to do the merge. Since the codec reader instance used by lucene is the same as that of search, the access pattern advise given to the kernel remains random and is not changed before merges. This in turn slows down the merges since there are more number of runtime reads.

To see the behavior of lucene engine with IOContext.RANDOM vs using a preloading an experiment was done. Experiment uses cohere-10m dataset which has 10 million vectors, each with 768 dimensions. The opensearch cluster had 3 nodes, with a total of 6 shards and 1 replica. Each node had 4 vCPUs, 32Gb memory of which 16Gb was being used for Java heap.

Baseline - with IOContext.RANDOM

The total force merge time was ~9hr 30mins with read operations of ~120k/min before merge or each shard. the graph shows CPU utilization (blue line) vs read operations (Orange line). Note that the initial bump in CPU is indexing
Screenshot 2024-10-07 at 9 49 23 AM

Preload .vec and .vex files

To enforce preloading, Opensearch has an index setting called store.preload which was leveraged to have a preload behavior. .vec files store the flat vectors and .vex files store the lucenes HNSW graphs

The total force merge time was ~8hrs 54 mins with read operations of ~60k/min before merge of each shard
Screenshot 2024-10-07 at 9 51 30 AM

So it seems that with the change of IOContext.RANDOM, while search performance improved it degraded the total force merge time

@shatejas
Copy link
Collaborator Author

shatejas commented Oct 17, 2024

What solution we should move towards now for fixing this regression?

In an attempt to fix the regression few solutions were explored

1. Preloading .vec and .vex files [Ruled out]

Preloading data into physical memory for mmap directories helps with reducing latencies since it reduces read operations at runtime. To enable preloads a default cluster setting were overriden which tells the opensearch process to preload .vec and .vex on startup.

Experiment with FAISS engine showed that the preload approach decreased the total force merge time by ~20mins. The CPU and memory for FAISS experiment was changed to 16 CPU and 128GB (with 50% allocated to JVM) per node. This was done to be able to compare it to nightly runs

FAISS Baseline - with IOContext.RANDOM

Total force merge time: ~5hrs 10mins
Read operations before the start of the merge: 115K/min
Screenshot 2024-10-03 at 3 53 46 PM

Preloading .vec and .vex files

Total force merge time: ~4hrs 55mins
Read operations before the start of the merge: 60K/min
Screenshot 2024-10-03 at 4 02 51 PM

Why was the solution ruled out?

While this solution showed improvements in total force merge time compared to IOContext.RANDOM. It degraded the search latency by 50%

These are Lucene latencies from experiment mentioned in #2134 (comment)

p50 p90 p99
Baseline 231.8 ms 611 ms 1965 ms
Preload 570.7 ms 1553.5 ms 2376.5 ms

Why did the latency increase?

Preloading not only loads data into the physical memory but also loads data into CPU caches. The code is structured in such a way that it ignores the explicit madvise() call if preloads are enabled (ref). So when the the reader is opened during search the random access madvise() call is ignored and the data is preloaded.

As the reader seeks to different pages, CPU always seemed to have more cache misses compared to random madvise call, Preload seems to be evicting useful data from CPU cache to make space for preload data. This is just a hypothesis and I was not able to find the evidence for this

2. Opening a new IndexInput by overriding getMergeInstance()

Before the merge starts, lucene creates a merge state. MergeState holds all the references to the readers needed during merge process. While creating merge state, it calls getMergeInstance() for each reader. The solution leverages this method to open a new IndexInput with IOContext as READ_ONCE . This ensures that the kernel aggressively loads pages and discards them due to a corresponding madvise(SEQUENTIAL) call as a result of read once IOContext.

pseudo code for Lucene99FlatVectorsReader

@Override
  public FlatVectorsReader getMergeInstance() {
    try {
      return new Lucene99FlatVectorsReader(this, IOContext.READONCE);
    } catch (IOException e) {
      // Throwing for testing purposes, we can return existing instance once the testing is done
      throw new RuntimeException(e);
    }
  }
  private Lucene99FlatVectorsReader(
      final Lucene99FlatVectorsReader flatVectorsReader, final IOContext ioContext)
      throws IOException {
    super(flatVectorsReader.getFlatVectorScorer());
    this.segmentReadState = flatVectorsReader.segmentReadState;
    boolean success = false;
    try {
      this.vectorData =
          openDataInput(
              flatVectorsReader.segmentReadState,
              readMetadata(flatVectorsReader.segmentReadState),
              Lucene99FlatVectorsFormat.VECTOR_DATA_EXTENSION,
              Lucene99FlatVectorsFormat.VECTOR_DATA_CODEC_NAME,
              ioContext);
      success = true;
    } finally {
      if (success == false) {
        IOUtils.closeWhileHandlingException(this);
      }
    }
  }

With the change in Lucene99FlatVectorsReader and the corresponding change in knn plugin code total force merge time for FAISS engine was ~4hr 46mins

Screenshot 2024-10-12 at 12 14 43 PM

why is the solution not favored?

Based on the javadoc its better to clone an IndexInput then open a new one. while experiments did not reveal any issues with opening a new IndexInput, for multi threaded use clone is more favorable according to documentation

3. Advice kernel with a sequential access pattern before the start of the merge

This bypasses opening a new IndexInput. Instead it clones the existing IndexInput and calls madvise(SEQUENTIAL) for files needed to be merged. To do so we need access to each MemorySegment used. These are only accessible inside of IndexInput.

The solution relies on IndexInput#prefetch method introduced in Lucene 10. The method takes in an offset and length as parameters, currently this method calls madvise with WILL_NEED flag telling the kernel that the some bytes will be needed for the current segment.

We leverage this method to create prefetchSequential() in index input. prefetchSequential will iterate through all the MemorySegments and put a madvise(SEQUENTIAL) call for each of them using prefetch . prefetchSequential can then be used by getMergeInstance() used during merging.

Code snippets

MemorySegmentIndexInput

  public void prefetchSequential() throws IOException {
    if (NATIVE_ACCESS.isEmpty()) {
      return;
    }
    long offset = 0;
    for (MemorySegment seg : segments) {
      prefetch(offset, seg.byteSize(), ReadAdvice.SEQUENTIAL);
      offset += seg.byteSize();
    }
  }
  @Override
  public void prefetch(long offset, long length) throws IOException {
    prefetch(offset, length, ReadAdvice.WILL_NEED);
  }

Lucene99FlatVectorsReader.java

  private Lucene99FlatVectorsReader(final Lucene99FlatVectorsReader reader) throws IOException {
    super(reader.getFlatVectorScorer());
    this.fields = reader.fields;
    this.vectorData = reader.vectorData.clone();
    this.vectorData.seek(0);
    this.vectorData.prefetchSequential();
  }

public FlatVectorsReader getMergeInstance() {
    try {
      return new Lucene99FlatVectorsReader(this);
    } catch (IOException exception) {
      throw new RuntimeException(exception);
    }

Benchmarks showed the total merge time was approximately 4hrs 40mins for FAISS engine

Screenshot 2024-10-14 at 10 33 50 AM

@navneet1v reached out to Lucene community to discuss the feasibility of the solution. Here is the github issues apache/lucene#13920

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants