Skip to content

Commit

Permalink
Changing interface template for SearchRequestOperationsListener
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Sep 18, 2023
1 parent e7cb97b commit 110ca71
Show file tree
Hide file tree
Showing 13 changed files with 475 additions and 1,280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand Down Expand Up @@ -188,11 +189,23 @@ public void testSearchWithWRRShardRouting() throws IOException {

for (NodeStats stat : nodeStats.getNodes()) {
SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal();
if (searchStats.getRequestStatsLongHolder().queryMetric > 0) {
assertThat(searchStats.getRequestStatsLongHolder().queryTotal, greaterThan(0L));
assertThat(searchStats.getRequestStatsLongHolder().fetchMetric, greaterThan(0L));
assertThat(searchStats.getRequestStatsLongHolder().fetchTotal, greaterThan(0L));
assertThat(searchStats.getRequestStatsLongHolder().expandSearchTotal, greaterThan(0L));
if (searchStats.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.QUERY.getName()) > 0) {
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.QUERY.getName()).longValue(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.FETCH.getName()).longValue(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.FETCH.getName()).longValue(),
greaterThan(0L)
);
assertThat(
searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.EXPAND.getName()).longValue(),
greaterThan(0L)
);
coordNumber += 1;
}
Assert.assertTrue(searchStats.getQueryCount() > 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.routing.GroupShardsIterator;
Expand Down Expand Up @@ -175,11 +176,23 @@ public void testSimpleStats() throws Exception {

for (NodeStats stat : nodeStats.getNodes()) {
Stats total = stat.getIndices().getSearch().getTotal();
if (total.getRequestStatsLongHolder().queryMetric > 0) {
assertEquals(total.getRequestStatsLongHolder().queryTotal, iters);
assertThat(total.getRequestStatsLongHolder().fetchMetric, greaterThan(0L));
assertEquals(total.getRequestStatsLongHolder().fetchTotal, iters);
assertEquals(total.getRequestStatsLongHolder().expandSearchTotal, iters);
if (total.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.QUERY.getName()) > 0) {
assertEquals(
iters,
total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.QUERY.getName()).intValue()
);
assertThat(
total.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.FETCH.getName()),
greaterThan(0L)
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.FETCH.getName()).intValue()
);
assertEquals(
iters,
total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.EXPAND.getName()).intValue()
);
numOfCoordinators += 1;
}
if (nodeIdsWithIndex.contains(stat.getNode().getId())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,11 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -124,9 +122,6 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten

private SearchRequestOperationsListener searchRequestOperationsListener;
private List<SearchRequestOperationsListener> searchListenersList;
Map<String, Runnable> searchPhaseStartTrackingMap;
Map<String, Runnable> searchPhaseEndTrackingMap;
Map<String, Runnable> searchPhaseFailureTrackingMap;

AbstractSearchAsyncAction(
String name,
Expand Down Expand Up @@ -185,118 +180,9 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
if (!CollectionUtils.isEmpty(searchListenersList)) {
this.searchListenersList = searchListenersList;
this.searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(this.searchListenersList, logger);
instantiateSearchPhaseStartMap();
instantiateSearchPhaseEndMap();
instantiateSearchPhaseFailMap();
}
}

private void instantiateSearchPhaseStartMap() {
Map<String, Runnable> searchPhaseStartTrackingMapModifiable = new HashMap<String, Runnable>();
searchPhaseStartTrackingMapModifiable.put(
SearchPhaseName.DFS_PRE_QUERY.getName(),
() -> searchRequestOperationsListener.onDFSPreQueryPhaseStart(this)
);
searchPhaseStartTrackingMapModifiable.put(
SearchPhaseName.CAN_MATCH.getName(),
() -> searchRequestOperationsListener.onCanMatchPhaseStart(this)
);
searchPhaseStartTrackingMapModifiable.put(
SearchPhaseName.DFS_QUERY.getName(),
() -> searchRequestOperationsListener.onQueryPhaseStart(this)
);
searchPhaseStartTrackingMapModifiable.put(
SearchPhaseName.QUERY.getName(),
() -> searchRequestOperationsListener.onQueryPhaseStart(this)
);
searchPhaseStartTrackingMapModifiable.put(
SearchPhaseName.FETCH.getName(),
() -> searchRequestOperationsListener.onFetchPhaseStart(this)
);
searchPhaseStartTrackingMapModifiable.put(
SearchPhaseName.EXPAND.getName(),
() -> searchRequestOperationsListener.onExpandSearchPhaseStart(this)
);
searchPhaseStartTrackingMap = Collections.unmodifiableMap(searchPhaseStartTrackingMapModifiable);
}

private void instantiateSearchPhaseEndMap() {
Map<String, Runnable> searchPhaseEndTrackingMapModifiable = new HashMap<String, Runnable>();
searchPhaseEndTrackingMapModifiable.put(
SearchPhaseName.DFS_PRE_QUERY.getName(),
() -> searchRequestOperationsListener.onDFSPreQueryPhaseEnd(
this,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())
)
);
searchPhaseEndTrackingMapModifiable.put(
SearchPhaseName.CAN_MATCH.getName(),
() -> searchRequestOperationsListener.onCanMatchPhaseEnd(
this,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())
)
);
searchPhaseEndTrackingMapModifiable.put(
SearchPhaseName.DFS_QUERY.getName(),
() -> searchRequestOperationsListener.onQueryPhaseEnd(
this,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())
)
);
searchPhaseEndTrackingMapModifiable.put(
SearchPhaseName.QUERY.getName(),
() -> searchRequestOperationsListener.onQueryPhaseEnd(
this,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())
)
);
searchPhaseEndTrackingMapModifiable.put(
SearchPhaseName.FETCH.getName(),
() -> searchRequestOperationsListener.onFetchPhaseEnd(
this,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())
)
);
searchPhaseEndTrackingMapModifiable.put(
SearchPhaseName.EXPAND.getName(),
() -> searchRequestOperationsListener.onExpandSearchPhaseEnd(
this,
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.getCurrentPhase().getStartTime())
)
);
searchPhaseEndTrackingMap = Collections.unmodifiableMap(searchPhaseEndTrackingMapModifiable);
}

private void instantiateSearchPhaseFailMap() {
Map<String, Runnable> searchPhaseFailureTrackingMapModifiable = new HashMap<String, Runnable>();
searchPhaseFailureTrackingMapModifiable.put(
SearchPhaseName.DFS_PRE_QUERY.getName(),
() -> searchRequestOperationsListener.onDFSPreQueryPhaseFailure(this)
);
searchPhaseFailureTrackingMapModifiable.put(
SearchPhaseName.CAN_MATCH.getName(),
() -> searchRequestOperationsListener.onCanMatchPhaseFailure(this)
);
searchPhaseFailureTrackingMapModifiable.put(
SearchPhaseName.DFS_QUERY.getName(),
() -> searchRequestOperationsListener.onQueryPhaseFailure(this)
);
searchPhaseFailureTrackingMapModifiable.put(
SearchPhaseName.QUERY.getName(),
() -> searchRequestOperationsListener.onQueryPhaseFailure(this)
);
searchPhaseFailureTrackingMapModifiable.put(
SearchPhaseName.FETCH.getName(),
() -> searchRequestOperationsListener.onFetchPhaseFailure(this)
);
searchPhaseFailureTrackingMapModifiable.put(
SearchPhaseName.EXPAND.getName(),
() -> searchRequestOperationsListener.onExpandSearchPhaseFailure(this)
);
searchPhaseFailureTrackingMap = Collections.unmodifiableMap(searchPhaseFailureTrackingMapModifiable);

}

@Override
public void addReleasable(Releasable releasable) {
releasables.add(releasable);
Expand Down Expand Up @@ -544,35 +430,28 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
clusterState.version()
);
}
onPhaseEnd(this);
onPhaseEnd();
executePhase(nextPhase);
}
}

private void onPhaseEnd(SearchPhaseContext searchPhaseContext) {
if (!(searchListenersList == null) && !searchListenersList.isEmpty()) {
if (searchPhaseContext.getCurrentPhase() != null
&& searchPhaseEndTrackingMap.containsKey(searchPhaseContext.getCurrentPhase().getName())) {
searchPhaseEndTrackingMap.get(searchPhaseContext.getCurrentPhase().getName()).run();
}
private void onPhaseEnd() {
if (!CollectionUtils.isEmpty(searchListenersList)) {
searchRequestOperationsListener.onPhaseEnd(this);
}
}

private void onPhaseStart(SearchPhase phase, SearchPhaseContext searchPhaseContext) {
private void onPhaseStart(SearchPhase phase) {
setCurrentPhase(phase);
phase.setStartTimeInNanos(System.nanoTime());
if (!CollectionUtils.isEmpty(searchListenersList)) {
if (searchPhaseContext.getCurrentPhase() != null
&& searchPhaseStartTrackingMap.containsKey(searchPhaseContext.getCurrentPhase().getName())) {
searchPhaseStartTrackingMap.get(searchPhaseContext.getCurrentPhase().getName()).run();
}
searchRequestOperationsListener.onPhaseStart(this);
}
}

private void executePhase(SearchPhase phase) {
try {
onPhaseStart(phase, this);
phase.run();
onPhaseStart(phase);
phase.recordAndRun();
} catch (Exception e) {
if (logger.isDebugEnabled()) {
logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e);
Expand Down Expand Up @@ -825,16 +704,14 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
}
listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
}
onPhaseEnd(this);
onPhaseEnd();
setCurrentPhase(null);
}

@Override
public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) {
if (!CollectionUtils.isEmpty(searchListenersList)) {
if (this.getCurrentPhase() != null && searchPhaseFailureTrackingMap.containsKey(this.getCurrentPhase().getName())) {
searchPhaseFailureTrackingMap.get(this.getCurrentPhase().getName()).run();
}
searchRequestOperationsListener.onPhaseFailure(this);
}
raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,19 @@
*/
abstract class SearchPhase implements CheckedRunnable<IOException> {
private final String name;
private long startTime;
private long startTimeInNanos;

protected SearchPhase(String name) {
this.name = Objects.requireNonNull(name, "name must not be null");
}

public void setStartTimeInNanos(long startTime) {
this.startTime = startTime;
public long getStartTimeInNanos() {
return startTimeInNanos;
}

public long getStartTime() {
return startTime;
public void recordAndRun() throws IOException {
this.startTimeInNanos = System.nanoTime();
run();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.action.search;

import java.util.HashMap;
import java.util.Map;

/**
* Enum for different Search Phases in OpenSearch
* @opensearch.internal
Expand All @@ -20,6 +23,14 @@ public enum SearchPhaseName {
EXPAND("expand"),
CAN_MATCH("can_match");

private static final Map<String, SearchPhaseName> STRING_TO_ENUM = new HashMap<>();

static {
for (SearchPhaseName searchPhaseName : values()) {
STRING_TO_ENUM.put(searchPhaseName.getName(), searchPhaseName);
}
}

private final String name;

SearchPhaseName(final String name) {
Expand All @@ -29,4 +40,8 @@ public enum SearchPhaseName {
public String getName() {
return name;
}

public static SearchPhaseName getSearchPhaseName(String value) {
return STRING_TO_ENUM.get(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
List<SearchRequestOperationsListener> searchListenersList
) {
super(
"query",
SearchPhaseName.QUERY.getName(),
logger,
searchTransportService,
nodeIdToConnection,
Expand Down
Loading

0 comments on commit 110ca71

Please sign in to comment.