Skip to content

Commit

Permalink
Enhance Point In Time support with APIs to list active point-in-time …
Browse files Browse the repository at this point in the history
…searches

Signed-off-by: Andriy Redko <[email protected]>
  • Loading branch information
reta committed Jun 21, 2024
1 parent fef7c4d commit faaed4b
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.data.core.OpenSearchOperations;
import org.opensearch.index.query.MoreLikeThisQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.BulkByScrollResponse;
Expand Down Expand Up @@ -81,7 +82,7 @@
* OpenSearchRestTemplate
* @since 0.1
*/
public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate {
public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate implements OpenSearchOperations {

private static final Log LOGGER = LogFactory.getLog(OpenSearchRestTemplate.class);

Expand Down Expand Up @@ -485,6 +486,13 @@ public Boolean closePointInTime(String pit) {
return false;
}

@Override
public List<PitInfo> listPointInTime() {
return execute(client -> client.getAllPits(RequestOptions.DEFAULT))
.getPitInfos().stream().map(pit -> new PitInfo(pit.getPitId(), pit.getCreationTime(), null))
.toList();
}

public SearchResponse suggest(SuggestBuilder suggestion, IndexCoordinates index) {
SearchRequest searchRequest = requestFactory.searchRequest(suggestion, index);
return execute(client -> client.search(searchRequest, RequestOptions.DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.client.opensearch.core.pit.DeletePitRequest;
import org.opensearch.client.opensearch.core.search.SearchResult;
import org.opensearch.client.transport.Version;
import org.opensearch.data.core.OpenSearchOperations;
import org.springframework.data.elasticsearch.BulkFailureException;
import org.springframework.data.elasticsearch.client.UnsupportedBackendOperation;
import org.springframework.data.elasticsearch.core.AbstractElasticsearchTemplate;
Expand Down Expand Up @@ -76,7 +77,7 @@
* @author Haibo Liu
* @since 4.4
*/
public class OpenSearchTemplate extends AbstractElasticsearchTemplate {
public class OpenSearchTemplate extends AbstractElasticsearchTemplate implements OpenSearchOperations {

private static final Log LOGGER = LogFactory.getLog(OpenSearchTemplate.class);

Expand Down Expand Up @@ -633,6 +634,14 @@ public Boolean closePointInTime(String pit) {
return !response.pits().isEmpty();
}

@Override
public List<PitInfo> listPointInTime() {
return execute(client -> client.listAllPit()).pits()
.stream()
.map(pit -> new PitInfo(pit.pitId(), pit.creationTime(), pit.keepAlive() == null ? null : Duration.ofMillis(pit.keepAlive())))
.toList();
}

// endregion

// region script methods
Expand Down Expand Up @@ -737,5 +746,4 @@ protected List<IndexedObjectInformation> checkForBulkOperationFailure(BulkRespon

}
// endregion

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.opensearch.data.core;

import java.time.Duration;
import java.util.List;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;

/**
* The extension over {@link ElasticsearchOperations} with OpenSearch specific operations.
*/
public interface OpenSearchOperations extends ElasticsearchOperations {
/**
* Return all active point in time searches
* @return all active point in time searches
*/
List<PitInfo> listPointInTime();

/**
* Describes the point in time entry
*
* @param id the point in time id
* @param keepAlive the new keep alive value to be sent with the query
*/
record PitInfo(String id, long creationTime, Duration keepAlive) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -40,6 +40,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -52,6 +54,7 @@
import org.junit.jupiter.api.Test;
import org.opensearch.data.client.EnabledIfOpenSearchVersion;
import org.opensearch.data.client.orhlc.NativeSearchQueryBuilder;
import org.opensearch.data.core.OpenSearchOperations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
Expand Down Expand Up @@ -130,7 +133,7 @@ public abstract class ElasticsearchIntegrationTests {
private static final String MULTI_INDEX_2_NAME = MULTI_INDEX_PREFIX + "-2";
private static final String MULTI_INDEX_3_NAME = MULTI_INDEX_PREFIX + "-3";

@Autowired protected ElasticsearchOperations operations;
@Autowired protected OpenSearchOperations operations;
private IndexOperations indexOperations;

@Autowired protected IndexNameProvider indexNameProvider;
Expand Down Expand Up @@ -2747,16 +2750,23 @@ public void testPointInTimeKeepAliveExpired() throws InterruptedException {
SearchHits<SampleEntity> results = operations.search(query,SampleEntity.class);
assertThat(results.getSearchHits().size()).isEqualTo(2);

// There may be a better way to do it, but Opensearch by default waits for up-to a minute to clear expired pits
Thread.sleep(120000);
final Query searchAfterQuery = getBuilderWithMatchAllQuery() //
.withSort(Sort.by(Sort.Order.desc("message"))) //
.withPointInTime(qpit)
.withSearchAfter(List.of(Objects.requireNonNull(results.getSearchHit(1).getContent().getMessage())))
.build();
assertThatExceptionOfType(ResourceNotFoundException.class).isThrownBy(
()-> operations.search(searchAfterQuery, SampleEntity.class)
);

final long started = System.nanoTime();
while ((System.nanoTime() - started) < TimeUnit.SECONDS.toNanos(120)) {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
if (operations.listPointInTime().isEmpty()) {
break;
}
}

assertThatExceptionOfType(ResourceNotFoundException.class)
.isThrownBy(()-> operations.search(searchAfterQuery,SampleEntity.class));

Boolean pitResult = operations.closePointInTime(pit);
Assertions.assertTrue(pitResult);
}
Expand Down

0 comments on commit faaed4b

Please sign in to comment.