From 706d4f5b9e1b89ffc8813d55bb45181cac13f5a5 Mon Sep 17 00:00:00 2001 From: Sagar Upadhyaya Date: Tue, 19 Sep 2023 15:33:09 -0700 Subject: [PATCH] Using generic enumMap for holding phase stats Signed-off-by: Sagar Upadhyaya --- .../search/SearchWeightedRoutingIT.java | 13 +- .../search/stats/SearchStatsIT.java | 16 +- .../admin/indices/stats/CommonStats.java | 9 - .../search/AbstractSearchAsyncAction.java | 24 +-- .../search/CanMatchPreFilterSearchPhase.java | 4 +- .../SearchDfsQueryThenFetchAsyncAction.java | 4 +- .../opensearch/action/search/SearchPhase.java | 2 +- .../action/search/SearchPhaseName.java | 17 +- .../SearchQueryThenFetchAsyncAction.java | 5 +- .../action/search/SearchRequestStats.java | 92 ++-------- .../action/search/TransportSearchAction.java | 21 ++- .../index/search/stats/SearchStats.java | 158 ++++++++---------- .../opensearch/indices/IndicesService.java | 10 +- .../opensearch/indices/NodeIndicesStats.java | 8 +- .../main/java/org/opensearch/node/Node.java | 7 +- .../cluster/node/stats/NodeStatsTests.java | 4 +- .../AbstractSearchAsyncActionTests.java | 57 +++---- .../SearchRequestOperationsListenerTests.java | 6 +- .../search/SearchRequestStatsTests.java | 96 ++--------- .../index/search/stats/SearchStatsTests.java | 47 ++++-- .../indices/NodeIndicesStatsTests.java | 4 +- 21 files changed, 229 insertions(+), 375 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java index e30ca49b735d7..5207dab83f1d9 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -189,21 +189,24 @@ public void testSearchWithWRRShardRouting() throws IOException { for (NodeStats stat : nodeStats.getNodes()) { SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); - if (searchStats.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.QUERY.getName()) > 0) { + if (searchStats.getRequestStatsLongHolder() + .getRequestStatsHolder() + .get(SearchPhaseName.QUERY.getName()) + .getTimeInMillis() > 0) { assertThat( - searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.QUERY.getName()).longValue(), + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTotal(), greaterThan(0L) ); assertThat( - searchStats.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.FETCH.getName()).longValue(), + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(), greaterThan(0L) ); assertThat( - searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.FETCH.getName()).longValue(), + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal(), greaterThan(0L) ); assertThat( - searchStats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.EXPAND.getName()).longValue(), + searchStats.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal(), greaterThan(0L) ); coordNumber += 1; diff --git a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java index 84715150365b7..253a8b2b14824 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/stats/SearchStatsIT.java @@ -199,22 +199,22 @@ public void testSimpleStats() throws Exception { for (NodeStats stat : nodeStats.getNodes()) { Stats total = stat.getIndices().getSearch().getTotal(); - if (total.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.QUERY.getName()) > 0) { - assertEquals( - iters, - total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.QUERY.getName()).intValue() - ); + if (total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.QUERY.getName()).getTimeInMillis() > 0) { assertThat( - total.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(SearchPhaseName.FETCH.getName()), + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTimeInMillis(), greaterThan(0L) ); assertEquals( iters, - total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.FETCH.getName()).intValue() + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal() + ); + assertEquals( + iters, + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.EXPAND.getName()).getTotal() ); assertEquals( iters, - total.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(SearchPhaseName.EXPAND.getName()).intValue() + total.getRequestStatsLongHolder().getRequestStatsHolder().get(SearchPhaseName.FETCH.getName()).getTotal() ); numOfCoordinators += 1; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java index a8e18b2bc4105..e4abaef4ddfa8 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java @@ -33,7 +33,6 @@ package org.opensearch.action.admin.indices.stats; import org.apache.lucene.store.AlreadyClosedException; -import org.opensearch.action.RequestStats; import org.opensearch.common.Nullable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -285,14 +284,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(recoveryStats); } - // We are adding request stats with a separate setter method since SearchStats was tightly coupled with Shard Search Stats, and all - // nodes won't share the same response in requestStats - public void addRequestStats(RequestStats requestStats) { - if (requestStats.getSearchRequestStats() != null && this.search != null) { - search.setSearchRequestStats(requestStats.getSearchRequestStats()); - } - } - public void add(CommonStats stats) { if (docs == null) { if (stats.getDocs() != null) { diff --git a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java index eda2586909d6d..1c0a1280ad550 100644 --- a/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/AbstractSearchAsyncAction.java @@ -50,7 +50,6 @@ import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ShardOperationFailedException; -import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.index.shard.ShardId; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; @@ -66,6 +65,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -120,8 +120,7 @@ abstract class AbstractSearchAsyncAction exten private final List releasables = new ArrayList<>(); - private SearchRequestOperationsListener searchRequestOperationsListener; - private List searchListenersList; + private Optional searchRequestOperationsListener; AbstractSearchAsyncAction( String name, @@ -141,7 +140,7 @@ abstract class AbstractSearchAsyncAction exten SearchPhaseResults resultConsumer, int maxConcurrentRequestsPerNode, SearchResponse.Clusters clusters, - List searchListenersList + SearchRequestOperationsListener searchRequestOperationsListener ) { super(name); final List toSkipIterators = new ArrayList<>(); @@ -177,10 +176,7 @@ abstract class AbstractSearchAsyncAction exten this.indexRoutings = indexRoutings; this.results = resultConsumer; this.clusters = clusters; - if (!CollectionUtils.isEmpty(searchListenersList)) { - this.searchListenersList = searchListenersList; - this.searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(this.searchListenersList, logger); - } + this.searchRequestOperationsListener = Optional.ofNullable(searchRequestOperationsListener); } @Override @@ -436,16 +432,12 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha } private void onPhaseEnd() { - if (!CollectionUtils.isEmpty(searchListenersList)) { - searchRequestOperationsListener.onPhaseEnd(this); - } + this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseEnd(this); }); } private void onPhaseStart(SearchPhase phase) { setCurrentPhase(phase); - if (!CollectionUtils.isEmpty(searchListenersList)) { - searchRequestOperationsListener.onPhaseStart(this); - } + this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> { searchRequestOperations.onPhaseStart(this); }); } private void executePhase(SearchPhase phase) { @@ -710,9 +702,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At @Override public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause) { - if (!CollectionUtils.isEmpty(searchListenersList)) { - searchRequestOperationsListener.onPhaseFailure(this); - } + this.searchRequestOperationsListener.ifPresent(searchRequestOperations -> searchRequestOperations.onPhaseFailure(this)); raisePhaseFailure(new SearchPhaseExecutionException(phase.getName(), msg, cause, buildShardFailures())); } diff --git a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java index 6452e7decd22b..ae481736ad0aa 100644 --- a/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/CanMatchPreFilterSearchPhase.java @@ -91,7 +91,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction, SearchPhase> phaseFactory, SearchResponse.Clusters clusters, - List searchListenersList + SearchRequestOperationsListener searchRequestOperationsListener ) { // We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests super( @@ -112,7 +112,7 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction searchListenersList + SearchRequestOperationsListener searchRequestOperationsListener ) { super( SearchPhaseName.DFS_PRE_QUERY.getName(), @@ -97,7 +97,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters, - searchListenersList + searchRequestOperationsListener ); this.queryPhaseResultConsumer = queryPhaseResultConsumer; this.searchPhaseController = searchPhaseController; diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhase.java b/server/src/main/java/org/opensearch/action/search/SearchPhase.java index 578bc9175d336..1c7b3c1f1563c 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhase.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhase.java @@ -42,7 +42,7 @@ * * @opensearch.internal */ -abstract class SearchPhase implements CheckedRunnable { +public abstract class SearchPhase implements CheckedRunnable { private final String name; private long startTimeInNanos; diff --git a/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java b/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java index 753ce96e0c647..4c0fe3ac06326 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java +++ b/server/src/main/java/org/opensearch/action/search/SearchPhaseName.java @@ -8,29 +8,18 @@ package org.opensearch.action.search; -import java.util.HashMap; -import java.util.Map; - /** * Enum for different Search Phases in OpenSearch * @opensearch.internal */ public enum SearchPhaseName { - DFS_PRE_QUERY("dfs"), + DFS_PRE_QUERY("dfs_pre_query"), QUERY("query"), FETCH("fetch"), DFS_QUERY("dfs_query"), EXPAND("expand"), CAN_MATCH("can_match"); - private static final Map STRING_TO_ENUM = new HashMap<>(); - - static { - for (SearchPhaseName searchPhaseName : values()) { - STRING_TO_ENUM.put(searchPhaseName.getName(), searchPhaseName); - } - } - private final String name; SearchPhaseName(final String name) { @@ -40,8 +29,4 @@ public enum SearchPhaseName { public String getName() { return name; } - - public static SearchPhaseName getSearchPhaseName(String value) { - return STRING_TO_ENUM.get(value); - } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java index 6e05c7b954c64..ca5ad087d3089 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/opensearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -45,7 +45,6 @@ import org.opensearch.search.query.QuerySearchResult; import org.opensearch.transport.Transport; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; @@ -83,7 +82,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction searchListenersList + SearchRequestOperationsListener searchRequestOperationsListener ) { super( SearchPhaseName.QUERY.getName(), @@ -103,7 +102,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction> phaseEndTracker = new ConcurrentHashMap<>(); - ConcurrentHashMap> phaseStartTracker = new ConcurrentHashMap<>(); - ConcurrentHashMap> phaseFailureTracker = new ConcurrentHashMap<>(); + Map phaseStatsMap = new EnumMap<>(SearchPhaseName.class); @Inject public SearchRequestStats() { - for (SearchPhaseName searchPhase : SearchPhaseName.values()) { - phaseEndTracker.put(searchPhase, searchPhaseContext -> { - if (SearchPhaseName.DFS_QUERY.equals(SearchPhaseName.getSearchPhaseName(searchPhaseContext.getCurrentPhase().getName()))) { - totalStats.queryCurrentMap.get(SearchPhaseName.QUERY).dec(); - totalStats.queryTotalMap.get(SearchPhaseName.QUERY).inc(); - totalStats.queryMetricMap.get(SearchPhaseName.QUERY) - .inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchPhaseContext.getCurrentPhase().getStartTimeInNanos())); - } else { - totalStats.queryCurrentMap.get(searchPhase).dec(); - totalStats.queryTotalMap.get(searchPhase).inc(); - totalStats.queryMetricMap.get(searchPhase) - .inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - searchPhaseContext.getCurrentPhase().getStartTimeInNanos())); - } - }); - phaseStartTracker.put(searchPhase, searchPhaseContext -> { - if (SearchPhaseName.DFS_QUERY.equals(SearchPhaseName.getSearchPhaseName(searchPhaseContext.getCurrentPhase().getName()))) { - totalStats.queryCurrentMap.get(SearchPhaseName.QUERY).inc(); - } else { - totalStats.queryCurrentMap.get(searchPhase).inc(); - } - }); - phaseFailureTracker.put(searchPhase, searchPhaseContext -> { - if (SearchPhaseName.DFS_QUERY.equals(SearchPhaseName.getSearchPhaseName(searchPhaseContext.getCurrentPhase().getName()))) { - totalStats.queryCurrentMap.get(SearchPhaseName.QUERY).dec(); - } else { - totalStats.queryCurrentMap.get(searchPhase).dec(); - } - }); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + phaseStatsMap.put(searchPhaseName, new StatsHolder()); } } public long getPhaseCurrent(SearchPhaseName searchPhaseName) { - return totalStats.queryCurrentMap.computeIfAbsent(searchPhaseName, searchPhase -> new CounterMetric()).count(); + return phaseStatsMap.get(searchPhaseName).current.count(); } public long getPhaseTotal(SearchPhaseName searchPhaseName) { - return totalStats.queryTotalMap.computeIfAbsent(searchPhaseName, searchPhase -> new CounterMetric()).count(); + return phaseStatsMap.get(searchPhaseName).total.count(); } public long getPhaseMetric(SearchPhaseName searchPhaseName) { - return totalStats.queryMetricMap.computeIfAbsent(searchPhaseName, searchPhase -> new MeanMetric()).sum(); + return phaseStatsMap.get(searchPhaseName).timing.sum(); } @Override public void onPhaseStart(SearchPhaseContext context) { - Optional.ofNullable(SearchPhaseName.getSearchPhaseName(context.getCurrentPhase().getName())).map(searchPhaseName -> { - phaseStartTracker.get(searchPhaseName).accept(context); - return null; - }); + phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.inc(); } @Override public void onPhaseEnd(SearchPhaseContext context) { - Optional.ofNullable(SearchPhaseName.getSearchPhaseName(context.getCurrentPhase().getName())).map(searchPhaseName -> { - phaseEndTracker.get(searchPhaseName).accept(context); - return null; - }); + StatsHolder phaseStats = phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()); + phaseStats.current.dec(); + phaseStats.total.inc(); + phaseStats.timing.inc(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos())); } @Override public void onPhaseFailure(SearchPhaseContext context) { - Optional.ofNullable(SearchPhaseName.getSearchPhaseName(context.getCurrentPhase().getName())).map(searchPhaseName -> { - phaseFailureTracker.get(searchPhaseName).accept(context); - return null; - }); + phaseStatsMap.get(context.getCurrentPhase().getSearchPhaseName()).current.dec(); } /** @@ -106,32 +70,8 @@ public void onPhaseFailure(SearchPhaseContext context) { */ public static final class StatsHolder { - - Map queryCurrentMap = new ConcurrentHashMap<>(); - Map queryTotalMap = new ConcurrentHashMap<>(); - Map queryMetricMap = new ConcurrentHashMap<>(); - - StatsHolder() { - for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - if (SearchPhaseName.DFS_QUERY.equals(searchPhaseName)) { - continue; - } - queryCurrentMap.put(searchPhaseName, new CounterMetric()); - queryTotalMap.put(searchPhaseName, new CounterMetric()); - queryMetricMap.put(searchPhaseName, new MeanMetric()); - } - } - - public Map getQueryCurrentMap() { - return queryCurrentMap; - } - - public Map getQueryTotalMap() { - return queryTotalMap; - } - - public Map getQueryMetricMap() { - return queryMetricMap; - } + CounterMetric current = new CounterMetric(); + CounterMetric total = new CounterMetric(); + MeanMetric timing = new MeanMetric(); } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 761709d8c5d60..cff1005beff27 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -67,6 +67,7 @@ import org.opensearch.core.common.breaker.CircuitBreaker; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.CircuitBreakerService; @@ -332,6 +333,12 @@ public void executeRequest( ActionListener listener ) { final List searchListenersList = createSearchListenerList(); + final SearchRequestOperationsListener searchRequestOperationsListener; + if (!CollectionUtils.isEmpty(searchListenersList)) { + searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); + } else { + searchRequestOperationsListener = null; + } executeRequest(task, searchRequest, new SearchAsyncActionProvider() { @Override public AbstractSearchAsyncAction asyncSearchAction( @@ -368,7 +375,7 @@ public AbstractSearchAsyncAction asyncSearchAction( new ArraySearchPhaseResults<>(shardsIts.size()), searchRequest.getMaxConcurrentShardRequests(), clusters, - searchListenersList + searchRequestOperationsListener ) { @Override protected void executePhaseOnShard( @@ -1149,6 +1156,12 @@ private AbstractSearchAsyncAction searchAsyncAction SearchResponse.Clusters clusters ) { final List searchListenersList = createSearchListenerList(); + final SearchRequestOperationsListener searchRequestOperationsListener; + if (!CollectionUtils.isEmpty(searchListenersList)) { + searchRequestOperationsListener = new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger); + } else { + searchRequestOperationsListener = null; + } if (preFilter) { return new CanMatchPreFilterSearchPhase( logger, @@ -1189,7 +1202,7 @@ public void run() { }; }, clusters, - searchListenersList + searchRequestOperationsListener ); } else { final QueryPhaseResultConsumer queryResultConsumer = searchPhaseController.newSearchPhaseResults( @@ -1220,7 +1233,7 @@ public void run() { clusterState, task, clusters, - searchListenersList + searchRequestOperationsListener ); break; case QUERY_THEN_FETCH: @@ -1241,7 +1254,7 @@ public void run() { clusterState, task, clusters, - searchListenersList + searchRequestOperationsListener ); break; default: diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index d760569526309..a2a936920e0d6 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -64,30 +64,60 @@ public class SearchStats implements Writeable, ToXContentFragment { * * @opensearch.internal */ + public static class PhaseStatsLongHolder implements Writeable { - public static class RequestStatsLongHolder { + long current; + long total; + long timeInMillis; + + public long getCurrent() { + return current; + } + + public long getTotal() { + return total; + } - Map searchPhaseCurrentMap = new HashMap<>(); - Map searchPhaseTotalMap = new HashMap<>(); - Map searchPhaseMetricMap = new HashMap<>(); + public long getTimeInMillis() { + return timeInMillis; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(current); + out.writeVLong(total); + out.writeVLong(timeInMillis); + } - public Map getSearchPhaseCurrentMap() { - return searchPhaseCurrentMap; + PhaseStatsLongHolder() { + this(0, 0, 0); } - public Map getSearchPhaseTotalMap() { - return searchPhaseTotalMap; + PhaseStatsLongHolder(long current, long total, long timeInMillis) { + this.current = current; + this.total = total; + this.timeInMillis = timeInMillis; } - public Map getSearchPhaseMetricMap() { - return searchPhaseMetricMap; + PhaseStatsLongHolder(StreamInput in) throws IOException { + this.current = in.readVLong(); + this.total = in.readVLong(); + this.timeInMillis = in.readVLong(); + } + + } + + public static class RequestStatsLongHolder { + + Map requestStatsHolder = new HashMap<>(); + + public Map getRequestStatsHolder() { + return requestStatsHolder; } RequestStatsLongHolder() { for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - searchPhaseCurrentMap.put(searchPhaseName.getName(), 0L); - searchPhaseTotalMap.put(searchPhaseName.getName(), 0L); - searchPhaseMetricMap.put(searchPhaseName.getName(), 0L); + requestStatsHolder.put(searchPhaseName.getName(), new PhaseStatsLongHolder()); } } } @@ -209,9 +239,7 @@ private Stats(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_3_0_0)) { this.requestStatsLongHolder = new RequestStatsLongHolder(); - requestStatsLongHolder.searchPhaseCurrentMap = in.readMap(StreamInput::readString, StreamInput::readVLong); - requestStatsLongHolder.searchPhaseTotalMap = in.readMap(StreamInput::readString, StreamInput::readVLong); - requestStatsLongHolder.searchPhaseMetricMap = in.readMap(StreamInput::readString, StreamInput::readVLong); + requestStatsLongHolder.requestStatsHolder = in.readMap(StreamInput::readString, PhaseStatsLongHolder::new); } if (in.getVersion().onOrAfter(Version.V_2_10_0)) { concurrentQueryCount = in.readVLong(); @@ -408,9 +436,11 @@ public void writeTo(StreamOutput out) throws IOException { if (requestStatsLongHolder == null) { requestStatsLongHolder = new RequestStatsLongHolder(); } - out.writeMap(requestStatsLongHolder.getSearchPhaseCurrentMap(), StreamOutput::writeString, StreamOutput::writeVLong); - out.writeMap(requestStatsLongHolder.getSearchPhaseTotalMap(), StreamOutput::writeString, StreamOutput::writeVLong); - out.writeMap(requestStatsLongHolder.getSearchPhaseMetricMap(), StreamOutput::writeString, StreamOutput::writeVLong); + out.writeMap( + requestStatsLongHolder.getRequestStatsHolder(), + StreamOutput::writeString, + (stream, stats) -> stats.writeTo(stream) + ); } if (out.getVersion().onOrAfter(Version.V_2_10_0)) { @@ -453,55 +483,17 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (requestStatsLongHolder != null) { builder.startObject(Fields.REQUEST); - builder.humanReadableField( - Fields.DFS_PREQUERY_TIME_IN_MILLIS, - Fields.QUERY_TIME, - new TimeValue(requestStatsLongHolder.searchPhaseMetricMap.get(SearchPhaseName.DFS_PRE_QUERY.getName())) - ); - builder.field( - Fields.DFS_PREQUERY_CURRENT, - requestStatsLongHolder.searchPhaseCurrentMap.get(SearchPhaseName.DFS_PRE_QUERY.getName()) - ); - builder.field( - Fields.DFS_PREQUERY_TOTAL, - requestStatsLongHolder.searchPhaseTotalMap.get(SearchPhaseName.DFS_PRE_QUERY.getName()) - ); - - builder.humanReadableField( - Fields.CANMATCH_TIME_IN_MILLIS, - Fields.QUERY_TIME, - new TimeValue(requestStatsLongHolder.searchPhaseMetricMap.get(SearchPhaseName.CAN_MATCH.getName())) - ); - builder.field( - Fields.CANMATCH_CURRENT, - requestStatsLongHolder.searchPhaseCurrentMap.get(SearchPhaseName.CAN_MATCH.getName()) - ); - builder.field(Fields.CANMATCH_TOTAL, requestStatsLongHolder.searchPhaseTotalMap.get(SearchPhaseName.CAN_MATCH.getName())); - - builder.humanReadableField( - Fields.QUERY_TIME_IN_MILLIS, - Fields.QUERY_TIME, - new TimeValue(requestStatsLongHolder.searchPhaseMetricMap.get(SearchPhaseName.QUERY.getName())) - ); - builder.field(Fields.QUERY_CURRENT, requestStatsLongHolder.searchPhaseCurrentMap.get(SearchPhaseName.QUERY.getName())); - builder.field(Fields.QUERY_TOTAL, requestStatsLongHolder.searchPhaseTotalMap.get(SearchPhaseName.QUERY.getName())); - - builder.humanReadableField( - Fields.FETCH_TIME_IN_MILLIS, - Fields.FETCH_TIME, - new TimeValue(requestStatsLongHolder.searchPhaseMetricMap.get(SearchPhaseName.FETCH.getName())) - ); - builder.field(Fields.FETCH_CURRENT, requestStatsLongHolder.searchPhaseCurrentMap.get(SearchPhaseName.FETCH.getName())); - builder.field(Fields.FETCH_TOTAL, requestStatsLongHolder.searchPhaseTotalMap.get(SearchPhaseName.FETCH.getName())); - - builder.humanReadableField( - Fields.EXPAND_TIME_IN_MILLIS, - Fields.FETCH_TIME, - new TimeValue(requestStatsLongHolder.searchPhaseMetricMap.get(SearchPhaseName.EXPAND.getName())) - ); - builder.field(Fields.EXPAND_CURRENT, requestStatsLongHolder.searchPhaseCurrentMap.get(SearchPhaseName.EXPAND.getName())); - builder.field(Fields.EXPAND_TOTAL, requestStatsLongHolder.searchPhaseTotalMap.get(SearchPhaseName.EXPAND.getName())); - + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + PhaseStatsLongHolder statsLongHolder = requestStatsLongHolder.requestStatsHolder.get(searchPhaseName.getName()); + if (statsLongHolder == null) { + continue; + } + builder.startObject(searchPhaseName.getName()); + builder.humanReadableField(Fields.TIME_IN_MILLIS, Fields.TIME, new TimeValue(statsLongHolder.timeInMillis)); + builder.field(Fields.CURRENT, statsLongHolder.current); + builder.field(Fields.TOTAL, statsLongHolder.total); + builder.endObject(); + } builder.endObject(); } return builder; @@ -525,17 +517,13 @@ public void setSearchRequestStats(SearchRequestStats searchRequestStats) { } for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - totalStats.requestStatsLongHolder.searchPhaseCurrentMap.put( + totalStats.requestStatsLongHolder.requestStatsHolder.put( searchPhaseName.getName(), - searchRequestStats.getPhaseCurrent(searchPhaseName) - ); - totalStats.requestStatsLongHolder.searchPhaseTotalMap.put( - searchPhaseName.getName(), - searchRequestStats.getPhaseTotal(searchPhaseName) - ); - totalStats.requestStatsLongHolder.searchPhaseMetricMap.put( - searchPhaseName.getName(), - searchRequestStats.getPhaseMetric(searchPhaseName) + new PhaseStatsLongHolder( + searchRequestStats.getPhaseCurrent(searchPhaseName), + searchRequestStats.getPhaseTotal(searchPhaseName), + searchRequestStats.getPhaseMetric(searchPhaseName) + ) ); } } @@ -656,15 +644,11 @@ static final class Fields { static final String SUGGEST_TIME_IN_MILLIS = "suggest_time_in_millis"; static final String SUGGEST_CURRENT = "suggest_current"; static final String REQUEST = "request"; - static final String DFS_PREQUERY_TIME_IN_MILLIS = "dfs_prequery_time_in_millis"; - static final String DFS_PREQUERY_CURRENT = "dfs_prequery_current"; - static final String DFS_PREQUERY_TOTAL = "dfs_prequery_total"; - static final String CANMATCH_TIME_IN_MILLIS = "canmatch_time_in_millis"; - static final String CANMATCH_CURRENT = "canmatch_current"; - static final String CANMATCH_TOTAL = "canmatch_total"; - static final String EXPAND_TIME_IN_MILLIS = "expand_time_in_millis"; - static final String EXPAND_CURRENT = "expand_current"; - static final String EXPAND_TOTAL = "expand_total"; + static final String TIME_IN_MILLIS = "time_in_millis"; + static final String TIME = "time"; + static final String CURRENT = "current"; + static final String TOTAL = "total"; + } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 7d1ce9f87650d..cf64b886ed523 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -42,12 +42,12 @@ import org.apache.lucene.util.RamUsageEstimator; import org.opensearch.OpenSearchException; import org.opensearch.ResourceAlreadyExistsException; -import org.opensearch.action.RequestStats; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.CommonStatsFlags.Flag; import org.opensearch.action.admin.indices.stats.IndexShardStats; import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.action.search.SearchType; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterState; @@ -334,7 +334,7 @@ public class IndicesService extends AbstractLifecycleComponent private volatile TimeValue clusterRemoteTranslogBufferInterval; private final FileCacheCleaner fileCacheCleaner; - private final RequestStats requestStats; + private final SearchRequestStats searchRequestStats; @Override protected void doStart() { @@ -366,7 +366,7 @@ public IndicesService( IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, Supplier repositoriesServiceSupplier, FileCacheCleaner fileCacheCleaner, - RequestStats requestStats, + SearchRequestStats searchRequestStats, @Nullable RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory ) { this.settings = settings; @@ -457,7 +457,7 @@ protected void closeInternal() { clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); this.remoteDirectoryFactory = remoteDirectoryFactory; this.translogFactorySupplier = getTranslogFactorySupplier(repositoriesServiceSupplier, threadPool, remoteStoreStatsTrackerFactory); - this.requestStats = requestStats; + this.searchRequestStats = searchRequestStats; this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate); @@ -581,7 +581,7 @@ public NodeIndicesStats stats(CommonStatsFlags flags) { } } - return new NodeIndicesStats(commonStats, statsByShard(this, flags), requestStats); + return new NodeIndicesStats(commonStats, statsByShard(this, flags), searchRequestStats); } Map> statsByShard(final IndicesService indicesService, final CommonStatsFlags flags) { diff --git a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java index ccdbe9a5e4fdf..8a7aaba2726f4 100644 --- a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java @@ -32,10 +32,10 @@ package org.opensearch.indices; -import org.opensearch.action.RequestStats; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.IndexShardStats; import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.common.Nullable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -92,7 +92,7 @@ public NodeIndicesStats(StreamInput in) throws IOException { } } - public NodeIndicesStats(CommonStats oldStats, Map> statsByShard, RequestStats requestStats) { + public NodeIndicesStats(CommonStats oldStats, Map> statsByShard, SearchRequestStats searchRequestStats) { // this.stats = stats; this.statsByShard = statsByShard; @@ -105,7 +105,9 @@ public NodeIndicesStats(CommonStats oldStats, Map> } } } - this.stats.addRequestStats(requestStats); + if (this.stats.search != null) { + this.stats.search.setSearchRequestStats(searchRequestStats); + } } @Nullable diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 681f21a038821..90fb339951d62 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -43,7 +43,6 @@ import org.opensearch.action.ActionModule; import org.opensearch.action.ActionModule.DynamicActionRegistry; import org.opensearch.action.ActionType; -import org.opensearch.action.RequestStats; import org.opensearch.action.admin.cluster.snapshots.status.TransportNodesSnapshotsStatus; import org.opensearch.action.search.SearchExecutionStatsCollector; import org.opensearch.action.search.SearchPhaseController; @@ -763,7 +762,7 @@ protected Node( threadPool ); - final RequestStats requestStats = new RequestStats(); + final SearchRequestStats searchRequestStats = new SearchRequestStats(); remoteStoreStatsTrackerFactory = new RemoteStoreStatsTrackerFactory(clusterService, settings); final IndicesService indicesService = new IndicesService( @@ -790,7 +789,7 @@ protected Node( remoteDirectoryFactory, repositoriesServiceReference::get, fileCacheCleaner, - requestStats, + searchRequestStats, remoteStoreStatsTrackerFactory ); @@ -1204,7 +1203,7 @@ protected Node( b.bind(SystemIndices.class).toInstance(systemIndices); b.bind(IdentityService.class).toInstance(identityService); b.bind(Tracer.class).toInstance(tracer); - b.bind(SearchRequestStats.class).toInstance(requestStats.getSearchRequestStats()); + b.bind(SearchRequestStats.class).toInstance(searchRequestStats); b.bind(RemoteClusterStateService.class).toProvider(() -> remoteClusterStateService); b.bind(PersistedStateRegistry.class).toInstance(persistedStateRegistry); }); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 75de5995fd5ee..e3f16463a5328 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -32,9 +32,9 @@ package org.opensearch.action.admin.cluster.node.stats; -import org.opensearch.action.RequestStats; import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.cluster.coordination.PendingClusterStateStats; import org.opensearch.cluster.coordination.PublishClusterStateStats; import org.opensearch.cluster.node.DiscoveryNode; @@ -802,7 +802,7 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { NodeIndicesStats indicesStats = null; if (remoteStoreStats) { - indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new RequestStats()); + indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>(), new SearchRequestStats()); RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats(); remoteSegmentStats.addUploadBytesStarted(10L); remoteSegmentStats.addUploadBytesSucceeded(10L); diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 58fe39e8cbf68..f628bb3201452 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -333,29 +333,29 @@ public void testOnPhaseFailureAndVerifyListeners() { final List requestOperationListeners = new ArrayList<>(List.of(testListener)); SearchQueryThenFetchAsyncAction action = createSearchQueryThenFetchAsyncAction(requestOperationListeners); action.start(); - assertEquals(1, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(action.getName()))); + assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName())); action.onPhaseFailure(new SearchPhase("test") { @Override public void run() { } }, "message", null); - assertEquals(0, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(action.getName()))); - assertEquals(0, testListener.getPhaseTotal(SearchPhaseName.getSearchPhaseName(action.getName()))); + assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName())); SearchDfsQueryThenFetchAsyncAction searchDfsQueryThenFetchAsyncAction = createSearchDfsQueryThenFetchAsyncAction( requestOperationListeners ); searchDfsQueryThenFetchAsyncAction.start(); - assertEquals(1, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(searchDfsQueryThenFetchAsyncAction.getName()))); + assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); searchDfsQueryThenFetchAsyncAction.onPhaseFailure(new SearchPhase("test") { @Override public void run() { } }, "message", null); - assertEquals(0, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(action.getName()))); - assertEquals(0, testListener.getPhaseTotal(SearchPhaseName.getSearchPhaseName(action.getName()))); + assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseTotal(action.getSearchPhaseName())); FetchSearchPhase fetchPhase = createFetchSearchPhase(); ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); @@ -363,15 +363,15 @@ public void run() { searchShardIterator.resetAndSkip(); action.skipShard(searchShardIterator); action.executeNextPhase(action, fetchPhase); - assertEquals(1, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(fetchPhase.getName()))); + assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); action.onPhaseFailure(new SearchPhase("test") { @Override public void run() { } }, "message", null); - assertEquals(0, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(fetchPhase.getName()))); - assertEquals(0, testListener.getPhaseTotal(SearchPhaseName.getSearchPhaseName(fetchPhase.getName()))); + assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName())); } public void testOnPhaseFailure() { @@ -601,7 +601,7 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx action.start(); // Verify queryPhase current metric - assertEquals(1, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(action.getName()))); + assertEquals(1, testListener.getPhaseCurrent(action.getSearchPhaseName())); TimeUnit.MILLISECONDS.sleep(delay); FetchSearchPhase fetchPhase = createFetchSearchPhase(); @@ -612,12 +612,12 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx action.executeNextPhase(action, fetchPhase); // Verify queryPhase total, current and latency metrics - assertEquals(0, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(action.getName()))); - assertThat(testListener.getPhaseMetric(SearchPhaseName.getSearchPhaseName(action.getName())), greaterThanOrEqualTo(delay)); - assertEquals(1, testListener.getPhaseTotal(SearchPhaseName.getSearchPhaseName(action.getName()))); + assertEquals(0, testListener.getPhaseCurrent(action.getSearchPhaseName())); + assertThat(testListener.getPhaseMetric(action.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(action.getSearchPhaseName())); // Verify fetchPhase current metric - assertEquals(1, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(fetchPhase.getName()))); + assertEquals(1, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); TimeUnit.MILLISECONDS.sleep(delay); ExpandSearchPhase expandPhase = createExpandSearchPhase(); @@ -625,18 +625,18 @@ public void testOnPhaseListenersWithQueryAndThenFetchType() throws InterruptedEx TimeUnit.MILLISECONDS.sleep(delay); // Verify fetchPhase total, current and latency metrics - assertThat(testListener.getPhaseMetric(SearchPhaseName.getSearchPhaseName(fetchPhase.getName())), greaterThanOrEqualTo(delay)); - assertEquals(1, testListener.getPhaseTotal(SearchPhaseName.getSearchPhaseName(fetchPhase.getName()))); - assertEquals(0, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(fetchPhase.getName()))); + assertThat(testListener.getPhaseMetric(fetchPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(fetchPhase.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseCurrent(fetchPhase.getSearchPhaseName())); - assertEquals(1, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(expandPhase.getName()))); + assertEquals(1, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName())); action.executeNextPhase(expandPhase, fetchPhase); action.sendSearchResponse(mock(InternalSearchResponse.class), mock(String.valueOf(QuerySearchResult.class))); - assertThat(testListener.getPhaseMetric(SearchPhaseName.getSearchPhaseName(expandPhase.getName())), greaterThanOrEqualTo(delay)); - assertEquals(1, testListener.getPhaseTotal(SearchPhaseName.getSearchPhaseName(expandPhase.getName()))); - assertEquals(0, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(expandPhase.getName()))); + assertThat(testListener.getPhaseMetric(expandPhase.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(expandPhase.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseCurrent(expandPhase.getSearchPhaseName())); } public void testOnPhaseListenersWithDfsType() throws InterruptedException { @@ -650,7 +650,7 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException { FetchSearchPhase fetchPhase = createFetchSearchPhase(); searchDfsQueryThenFetchAsyncAction.start(); - assertEquals(1, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(searchDfsQueryThenFetchAsyncAction.getName()))); + assertEquals(1, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); TimeUnit.MILLISECONDS.sleep(delay); ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); @@ -659,12 +659,9 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException { searchDfsQueryThenFetchAsyncAction.skipShard(searchShardIterator); searchDfsQueryThenFetchAsyncAction.executeNextPhase(searchDfsQueryThenFetchAsyncAction, fetchPhase); - assertThat( - testListener.getPhaseMetric(SearchPhaseName.getSearchPhaseName(searchDfsQueryThenFetchAsyncAction.getName())), - greaterThanOrEqualTo(delay) - ); - assertEquals(1, testListener.getPhaseTotal(SearchPhaseName.getSearchPhaseName(searchDfsQueryThenFetchAsyncAction.getName()))); - assertEquals(0, testListener.getPhaseCurrent(SearchPhaseName.getSearchPhaseName(searchDfsQueryThenFetchAsyncAction.getName()))); + assertThat(testListener.getPhaseMetric(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName()), greaterThanOrEqualTo(delay)); + assertEquals(1, testListener.getPhaseTotal(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); + assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); } private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction( @@ -709,7 +706,7 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct null, task, SearchResponse.Clusters.EMPTY, - searchRequestOperationsListeners + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger) ); } @@ -754,7 +751,7 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction( null, task, SearchResponse.Clusters.EMPTY, - searchRequestOperationsListeners + new SearchRequestOperationsListener.CompositeListener(searchRequestOperationsListeners, logger) ) { @Override ShardSearchFailure[] buildShardFailures() { diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java index e660d8dd07b0c..e9533c0fe941c 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestOperationsListenerTests.java @@ -35,17 +35,17 @@ public void testListenersAreExecuted() { @Override public void onPhaseStart(SearchPhaseContext context) { - searchPhaseStartMap.get(SearchPhaseName.getSearchPhaseName(context.getCurrentPhase().getName())).incrementAndGet(); + searchPhaseStartMap.get(context.getCurrentPhase().getSearchPhaseName()).incrementAndGet(); } @Override public void onPhaseEnd(SearchPhaseContext context) { - searchPhaseEndMap.get(SearchPhaseName.getSearchPhaseName(context.getCurrentPhase().getName())).incrementAndGet(); + searchPhaseEndMap.get(context.getCurrentPhase().getSearchPhaseName()).incrementAndGet(); } @Override public void onPhaseFailure(SearchPhaseContext context) { - searchPhaseFailureMap.get(SearchPhaseName.getSearchPhaseName(context.getCurrentPhase().getName())).incrementAndGet(); + searchPhaseFailureMap.get(context.getCurrentPhase().getSearchPhaseName()).incrementAndGet(); } }; diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java index 101e2f0a402e4..f24147a8194b4 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestStatsTests.java @@ -10,15 +10,11 @@ import org.opensearch.test.OpenSearchTestCase; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.mockito.Mockito.mock; @@ -32,9 +28,9 @@ public void testSearchRequestPhaseFailure() { when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - when(mockSearchPhase.getName()).thenReturn(searchPhaseName.getName()); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); testRequestStats.onPhaseStart(ctx); - assertEquals(getExpectedCount(1).apply(searchPhaseName).intValue(), testRequestStats.getPhaseCurrent(searchPhaseName)); + assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); testRequestStats.onPhaseFailure(ctx); assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); } @@ -48,19 +44,16 @@ public void testSearchRequestStats() { when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - when(mockSearchPhase.getName()).thenReturn(searchPhaseName.getName()); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); long tookTimeInMillis = randomIntBetween(1, 10); testRequestStats.onPhaseStart(ctx); long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); - assertEquals(getExpectedCount(1).apply(searchPhaseName).intValue(), testRequestStats.getPhaseCurrent(searchPhaseName)); + assertEquals(1, testRequestStats.getPhaseCurrent(searchPhaseName)); testRequestStats.onPhaseEnd(ctx); assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); - assertEquals(getExpectedCount(1).apply(searchPhaseName).intValue(), testRequestStats.getPhaseTotal(searchPhaseName)); - assertThat( - testRequestStats.getPhaseMetric(searchPhaseName), - greaterThanOrEqualTo(getExpectedCount((int) tookTimeInMillis).apply(searchPhaseName).longValue()) - ); + assertEquals(1, testRequestStats.getPhaseTotal(searchPhaseName)); + assertThat(testRequestStats.getPhaseMetric(searchPhaseName), greaterThanOrEqualTo(tookTimeInMillis)); } } @@ -74,7 +67,7 @@ public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedE SearchPhaseContext ctx = mock(SearchPhaseContext.class); SearchPhase mockSearchPhase = mock(SearchPhase.class); when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); - when(mockSearchPhase.getName()).thenReturn(searchPhaseName.getName()); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); for (int i = 0; i < numTasks; i++) { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); @@ -86,16 +79,8 @@ public void testSearchRequestStatsOnPhaseStartConcurrently() throws InterruptedE } phaser.arriveAndAwaitAdvance(); countDownLatch.await(); - for (SearchPhaseName searchPhaseName : getSearchPhaseNames()) { - if (SearchPhaseName.QUERY.equals(searchPhaseName)) { - assertEquals(numTasks * 2, testRequestStats.getPhaseCurrent(searchPhaseName)); - continue; - } else { - assertEquals( - getExpectedCount(numTasks).apply(searchPhaseName).intValue(), - testRequestStats.getPhaseCurrent(searchPhaseName) - ); - } + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals(numTasks, testRequestStats.getPhaseCurrent(searchPhaseName)); } } @@ -110,7 +95,7 @@ public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedExc SearchPhaseContext ctx = mock(SearchPhaseContext.class); SearchPhase mockSearchPhase = mock(SearchPhase.class); when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); - when(mockSearchPhase.getName()).thenReturn(searchPhaseName.getName()); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); long tookTimeInMillis = randomIntBetween(1, 10); long startTime = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(tookTimeInMillis); when(mockSearchPhase.getStartTimeInNanos()).thenReturn(startTime); @@ -126,22 +111,12 @@ public void testSearchRequestStatsOnPhaseEndConcurrently() throws InterruptedExc } phaser.arriveAndAwaitAdvance(); countDownLatch.await(); - for (SearchPhaseName searchPhaseName : getSearchPhaseNames()) { - if (SearchPhaseName.QUERY.equals(searchPhaseName)) { - assertEquals(numTasks * 2, testRequestStats.getPhaseTotal(searchPhaseName)); - assertThat( - testRequestStats.getPhaseMetric(searchPhaseName), - greaterThanOrEqualTo(searchPhaseNameLongMap.get(searchPhaseName) * numTasks * 2) - ); - } else { - assertEquals(getExpectedCount(numTasks).apply(searchPhaseName).intValue(), testRequestStats.getPhaseTotal(searchPhaseName)); - assertThat( - testRequestStats.getPhaseMetric(searchPhaseName), - greaterThanOrEqualTo( - getExpectedCount((int) (searchPhaseNameLongMap.get(searchPhaseName) * numTasks)).apply(searchPhaseName).longValue() - ) - ); - } + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals(numTasks, testRequestStats.getPhaseTotal(searchPhaseName)); + assertThat( + testRequestStats.getPhaseMetric(searchPhaseName), + greaterThanOrEqualTo((searchPhaseNameLongMap.get(searchPhaseName) * numTasks)) + ); } } @@ -155,7 +130,7 @@ public void testSearchRequestStatsOnPhaseFailureConcurrently() throws Interrupte SearchPhaseContext ctx = mock(SearchPhaseContext.class); SearchPhase mockSearchPhase = mock(SearchPhase.class); when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); - when(mockSearchPhase.getName()).thenReturn(searchPhaseName.getName()); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); for (int i = 0; i < numTasks; i++) { threads[i] = new Thread(() -> { phaser.arriveAndAwaitAdvance(); @@ -168,43 +143,8 @@ public void testSearchRequestStatsOnPhaseFailureConcurrently() throws Interrupte } phaser.arriveAndAwaitAdvance(); countDownLatch.await(); - for (SearchPhaseName searchPhaseName : getSearchPhaseNames()) { - assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); - } - } - - public void testSearchRequestStatsWithInvalidPhaseName() { - SearchRequestStats testRequestStats = new SearchRequestStats(); - Map stringStringMap = new HashMap<>(); - stringStringMap.computeIfAbsent(null, s -> { return "dummy"; }); - SearchPhaseContext ctx = mock(SearchPhaseContext.class); - SearchPhase mockSearchPhase = mock(SearchPhase.class); - when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); - when(mockSearchPhase.getName()).thenReturn("dummy"); - testRequestStats.onPhaseStart(ctx); - testRequestStats.onPhaseEnd(ctx); - testRequestStats.onPhaseFailure(ctx); - for (SearchPhaseName searchPhaseName : getSearchPhaseNames()) { + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { assertEquals(0, testRequestStats.getPhaseCurrent(searchPhaseName)); - assertEquals(0, testRequestStats.getPhaseTotal(searchPhaseName)); - assertEquals(0, testRequestStats.getPhaseMetric(searchPhaseName)); } } - - private List getSearchPhaseNames() { - List searchPhaseNames = new ArrayList<>(Arrays.asList(SearchPhaseName.values())); - searchPhaseNames.remove(SearchPhaseName.DFS_QUERY); - return searchPhaseNames; - } - - private Function getExpectedCount(int expected) { - Function currentCount = searchPhaseName -> { - if (SearchPhaseName.DFS_QUERY.equals(searchPhaseName)) { - return 0; - } else { - return expected; - } - }; - return currentCount; - } } diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index 0e29e7cf3f0bf..c27e4bf27327a 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -32,6 +32,8 @@ package org.opensearch.index.search.stats; +import org.opensearch.action.search.SearchPhase; +import org.opensearch.action.search.SearchPhaseContext; import org.opensearch.action.search.SearchPhaseName; import org.opensearch.action.search.SearchRequestStats; import org.opensearch.index.search.stats.SearchStats.Stats; @@ -39,6 +41,11 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class SearchStatsTests extends OpenSearchTestCase { @@ -70,17 +77,32 @@ public void testShardLevelSearchGroupStats() throws Exception { // Testing for request stats SearchRequestStats testRequestStats = new SearchRequestStats(); + SearchPhaseContext ctx = mock(SearchPhaseContext.class); for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - if (SearchPhaseName.DFS_QUERY.equals(searchPhaseName)) { - continue; + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + when(mockSearchPhase.getStartTimeInNanos()).thenReturn(System.nanoTime() - TimeUnit.SECONDS.toNanos(paramValue)); + when(mockSearchPhase.getSearchPhaseName()).thenReturn(searchPhaseName); + for (int iterator = 0; iterator < paramValue; iterator++) { + testRequestStats.onPhaseStart(ctx); + testRequestStats.onPhaseEnd(ctx); } - testRequestStats.totalStats.getQueryCurrentMap().get(searchPhaseName).inc(paramValue); - testRequestStats.totalStats.getQueryTotalMap().get(searchPhaseName).inc(paramValue); - testRequestStats.totalStats.getQueryMetricMap().get(searchPhaseName).inc(paramValue); } searchStats1.setSearchRequestStats(testRequestStats); - assertRequestStats(searchStats1.getTotal(), paramValue); - + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + assertEquals( + 0, + searchStats1.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).current + ); + assertEquals( + paramValue, + searchStats1.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).total + ); + assertThat( + searchStats1.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).timeInMillis, + greaterThanOrEqualTo(paramValue) + ); + } } private static void assertStats(Stats stats, long equalTo) { @@ -105,15 +127,4 @@ private static void assertStats(Stats stats, long equalTo) { // avg_concurrency is not summed up across stats assertEquals(1, stats.getConcurrentAvgSliceCount(), 0); } - - private static void assertRequestStats(Stats stats, long equalTo) { - for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { - if (SearchPhaseName.DFS_QUERY.equals(searchPhaseName)) { - continue; - } - assertEquals(equalTo, stats.getRequestStatsLongHolder().getSearchPhaseCurrentMap().get(searchPhaseName.getName()).longValue()); - assertEquals(equalTo, stats.getRequestStatsLongHolder().getSearchPhaseTotalMap().get(searchPhaseName.getName()).longValue()); - assertEquals(equalTo, stats.getRequestStatsLongHolder().getSearchPhaseMetricMap().get(searchPhaseName.getName()).longValue()); - } - } } diff --git a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java index 958043b0bc1e7..6f36d22b7e17b 100644 --- a/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java +++ b/server/src/test/java/org/opensearch/indices/NodeIndicesStatsTests.java @@ -32,8 +32,8 @@ package org.opensearch.indices; -import org.opensearch.action.RequestStats; import org.opensearch.action.admin.indices.stats.CommonStats; +import org.opensearch.action.search.SearchRequestStats; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.test.OpenSearchTestCase; @@ -46,7 +46,7 @@ public class NodeIndicesStatsTests extends OpenSearchTestCase { public void testInvalidLevel() { CommonStats oldStats = new CommonStats(); - RequestStats requestStats = new RequestStats(); + SearchRequestStats requestStats = new SearchRequestStats(); final NodeIndicesStats stats = new NodeIndicesStats(oldStats, Collections.emptyMap(), requestStats); final String level = randomAlphaOfLength(16); final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level));