Skip to content

Commit

Permalink
DataNode: Start/Stop Message Processing (#18126)
Browse files Browse the repository at this point in the history
* refactoring of the cluster control convinience methods so we can reuse them for the datanode migrations

* fixed missing license headers

* fixing configuration parameters

* fixing return types

* refactoring

* fixed missing bracket

* fixed missing bracket

* fixed constructor parameters

* use state machine extended context to hold remote processing auth token

* removed obsolete method

* waiting for buffers to drain

* added missing imports

* switch to new NodeService

* fix test and add test for authorization header

---------

Co-authored-by: Tomas Dvorak <[email protected]>
Co-authored-by: Matthias Oesterheld <[email protected]>
Co-authored-by: Matthias Oesterheld <[email protected]>
  • Loading branch information
4 people authored Feb 14, 2024
1 parent 714eda6 commit 062d880
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,35 @@
import org.graylog2.cluster.preflight.DataNodeProvisioningService;
import org.graylog2.plugin.certificates.RenewalPolicy;
import org.graylog2.plugin.cluster.ClusterConfigService;
import org.graylog2.system.processing.control.ClusterProcessingControl;
import org.graylog2.system.processing.control.ClusterProcessingControlFactory;
import org.graylog2.system.processing.control.RemoteProcessingControlResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

@Singleton
public class MigrationActionsImpl implements MigrationActions {
private static final Logger LOG = LoggerFactory.getLogger(MigrationActionsImpl.class);

private final ClusterConfigService clusterConfigService;
private final ClusterProcessingControlFactory clusterProcessingControlFactory;
private final NodeService<DataNodeDto> nodeService;
private final CaService caService;
private MigrationStateMachineContext stateMachineContext;
private final DataNodeProvisioningService dataNodeProvisioningService;

private MigrationStateMachineContext stateMachineContext;

@Inject
public MigrationActionsImpl(final ClusterConfigService clusterConfigService, NodeService<DataNodeDto> nodeService,
final CaService caService, DataNodeProvisioningService dataNodeProvisioningService) {
final CaService caService, DataNodeProvisioningService dataNodeProvisioningService,
final ClusterProcessingControlFactory clusterProcessingControlFactory) {
this.clusterConfigService = clusterConfigService;
this.nodeService = nodeService;
this.caService = caService;
this.dataNodeProvisioningService = dataNodeProvisioningService;
this.clusterProcessingControlFactory = clusterProcessingControlFactory;
}

@Override
Expand Down Expand Up @@ -97,12 +107,22 @@ public void reindexOldData() {

@Override
public void stopMessageProcessing() {

final String authToken = (String)stateMachineContext.getExtendedState(MigrationStateMachineContext.AUTH_TOKEN_KEY);
final ClusterProcessingControl<RemoteProcessingControlResource> control = clusterProcessingControlFactory.create(authToken);
LOG.info("Attempting to pause processing on all nodes...");
control.pauseProcessing();
LOG.info("Done pausing processing on all nodes.");
LOG.info("Waiting for output buffer to drain on all nodes...");
control.waitForEmptyBuffers();
LOG.info("Done waiting for output buffer to drain on all nodes.");
}

@Override
public void startMessageProcessing() {

final String authToken = (String)stateMachineContext.getExtendedState(MigrationStateMachineContext.AUTH_TOKEN_KEY);
final ClusterProcessingControl<RemoteProcessingControlResource> control = clusterProcessingControlFactory.create(authToken);
LOG.info("Resuming message processing.");
control.resumeGraylogMessageProcessing();
}

@Override
Expand Down Expand Up @@ -135,7 +155,6 @@ public boolean provisioningFinished() {
return nodeService.allActive().values().stream().allMatch(node -> node.getDataNodeStatus() == DataNodeStatus.AVAILABLE);
}

@Override
public void setStateMachineContext(MigrationStateMachineContext context) {
this.stateMachineContext = context;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.PROTECTED_AND_PUBLIC)
public class MigrationStateMachineContext {

public static final String AUTH_TOKEN_KEY = "authToken";

@JsonProperty
protected MigrationStep currentStep;
@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachine;
import org.graylog2.audit.jersey.NoAuditEvent;
import org.graylog2.shared.rest.resources.ProxiedResource;
import org.graylog2.shared.security.RestPermissions;

import static org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachineContext.AUTH_TOKEN_KEY;

@Path("/migration")
@RequiresAuthentication
@Consumes(MediaType.APPLICATION_JSON)
Expand All @@ -44,8 +49,10 @@ public class MigrationStateResource {
private final MigrationStateMachine stateMachine;

@Inject
public MigrationStateResource(MigrationStateMachine stateMachine) {
public MigrationStateResource(MigrationStateMachine stateMachine,
@Context HttpHeaders httpHeaders) {
this.stateMachine = stateMachine;
this.stateMachine.getContext().addExtendedState(AUTH_TOKEN_KEY, ProxiedResource.authenticationToken(httpHeaders));
}

@POST
Expand Down Expand Up @@ -77,7 +84,7 @@ public CurrentStateInformation status() {
@Produces(MediaType.TEXT_PLAIN)
@ApiOperation(value = "Serialize", notes = "Serialize migration graph as graphviz source")
public String serialize() {
// you can use https://dreampuf.github.io/GraphvizOnline/ to vizualize the result
// you can use https://dreampuf.github.io/GraphvizOnline/ to visualize the result
return stateMachine.serialize();
}
}
18 changes: 18 additions & 0 deletions graylog2-server/src/main/java/org/graylog2/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,24 @@ public class Configuration extends CaConfiguration {
@Parameter(value = "field_value_suggestion_mode", required = true, converter = FieldValueSuggestionModeConverter.class)
private FieldValueSuggestionMode fieldValueSuggestionMode = FieldValueSuggestionMode.ON;

public static final String INSTALL_HTTP_CONNECTION_TIMEOUT = "install_http_connection_timeout";
public static final String INSTALL_OUTPUT_BUFFER_DRAINING_INTERVAL = "install_output_buffer_drain_interval";
public static final String INSTALL_OUTPUT_BUFFER_DRAINING_MAX_RETRIES = "install_output_buffer_max_retries";

private static final int DEFAULT_INSTALL_RETRIES = 150;
private static final Duration DEFAULT_INSTALL_SECONDS = Duration.seconds(2);

@Parameter(value = INSTALL_HTTP_CONNECTION_TIMEOUT, validators = PositiveDurationValidator.class)
private Duration installHttpConnectionTimeout = Duration.seconds(10L);

@Parameter(value = INSTALL_OUTPUT_BUFFER_DRAINING_INTERVAL, validators = PositiveDurationValidator.class)
private Duration installOutputBufferDrainingInterval = DEFAULT_INSTALL_SECONDS;

// The maximum number of times to check if buffers have drained during Illuminate restarts on all
// nodes before giving up
@Parameter(value = INSTALL_OUTPUT_BUFFER_DRAINING_MAX_RETRIES, validators = PositiveIntegerValidator.class)
private int installOutputBufferDrainingMaxRetries = DEFAULT_INSTALL_RETRIES;

public boolean maintainsStreamAwareFieldTypes() {
return streamAwareFieldTypes;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.system.processing.control;

import com.github.joschi.jadconfig.util.Duration;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
import com.github.rholder.retry.RetryListener;
import com.github.rholder.retry.Retryer;
import com.github.rholder.retry.RetryerBuilder;
import com.github.rholder.retry.StopStrategies;
import com.github.rholder.retry.WaitStrategies;
import org.graylog2.cluster.Node;
import org.graylog2.cluster.nodes.NodeService;
import org.graylog2.cluster.nodes.ServerNodeDto;
import org.graylog2.rest.RemoteInterfaceProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import retrofit2.Call;
import retrofit2.Response;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.graylog2.Configuration.INSTALL_OUTPUT_BUFFER_DRAINING_INTERVAL;
import static org.graylog2.Configuration.INSTALL_OUTPUT_BUFFER_DRAINING_MAX_RETRIES;
import static org.graylog2.shared.utilities.StringUtils.f;

public class ClusterProcessingControl <F extends RemoteProcessingControlResource> {
private final Logger LOG = LoggerFactory.getLogger(ClusterProcessingControl.class);

private static final String OUTPUT_RATE_METRIC_NAME = "org.graylog2.throughput.output.1-sec-rate";

protected final String authorizationToken;
protected final RemoteInterfaceProvider remoteInterfaceProvider;
protected final NodeService<ServerNodeDto> nodeService;
protected final Duration connectionTimeout;
private final Duration bufferDrainInterval;
private final int maxBufferDrainRetries;

public ClusterProcessingControl(String authorizationToken, RemoteInterfaceProvider remoteInterfaceProvider, NodeService<ServerNodeDto> nodeService, Duration connectionTimeout, Duration bufferDrainInterval, int maxBufferDrainRetries) {
this.authorizationToken = authorizationToken;
this.remoteInterfaceProvider = remoteInterfaceProvider;
this.nodeService = nodeService;
this.connectionTimeout = connectionTimeout;
this.bufferDrainInterval = bufferDrainInterval;
this.maxBufferDrainRetries = maxBufferDrainRetries;
}

public void pauseProcessing() {
runOnAllActiveNodes("pause processing", RemoteProcessingControlResource::pauseProcessing, true);
}

protected <R> Map<String, R> runOnAllActiveNodes(
String operationName,
Function<F, Call<R>> callRemoteResource,
boolean stopOnFirstException
) {
final Map<String, R> result = new HashMap<>();
final List<ClusterProcessingControlException> exceptions = new ArrayList<>();
printNodeDebugInfo();
nodeService.allActive().entrySet().forEach(entry -> {
final Node nodeValue = entry.getValue();
try {
LOG.info("Attempting to call '{}' on node [{}].", operationName, nodeValue.getNodeId());
final Response<R> response = getrResponse(callRemoteResource, entry);
if (!response.isSuccessful()) {
final String message = f("Unable to call '%s' on node [%s] code [%s] body [%s]",
operationName, nodeValue.getNodeId(),
response.code(), response.body());
LOG.error("Unable to call '{}' on node [{}] code [{}] body [{}].",
operationName, nodeValue.getNodeId(),
response.code(), response.body());
throw new ClusterProcessingControlException(message);
}
result.put(entry.getKey(), response.body());
LOG.info("Successfully called '{}' on node [{}].", operationName, nodeValue.getNodeId());
} catch (Exception e) {
if (e instanceof ClusterProcessingControlException) {
exceptions.add((ClusterProcessingControlException) e);
} else {
final String message = f("Unable to call '%s' on node [%s]", operationName, nodeValue.getNodeId());
LOG.error(message, e);
exceptions.add(new ClusterProcessingControlException(message, e));
}

if (stopOnFirstException) {
throw exceptions.get(0);
}
}
});

if (!exceptions.isEmpty()) {
throw exceptions.get(0);
}

return result;
}

protected <R> Response<R> getrResponse(Function<F, Call<R>> callRemoteResource, Map.Entry<String, ServerNodeDto> entry) throws IOException {
var remoteProcessingControlResource = remoteInterfaceProvider.get(entry.getValue(),
this.authorizationToken, RemoteProcessingControlResource.class,
java.time.Duration.ofSeconds(connectionTimeout.toSeconds()));
return callRemoteResource.apply((F) remoteProcessingControlResource).execute();
}

public void waitForEmptyBuffers() throws OutputBufferDrainFailureException {
printNodeDebugInfo();
final Retryer<NodeOperationResult> retryer = RetryerBuilder.<NodeOperationResult>newBuilder()
.retryIfResult(value -> !value.success)
.withWaitStrategy(WaitStrategies.fixedWait(bufferDrainInterval.toSeconds(), TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(maxBufferDrainRetries))
.withRetryListener(new RetryListener() {
@Override
public <V> void onRetry(Attempt<V> attempt) {
if (attempt.getAttemptNumber() > 1) {
LOG.info("Checking again for empty output buffers (attempt #{}).", attempt.getAttemptNumber());
}
}
})
.build();
try {
retryer.call(() -> {
final Map<String, Double> nodeOutputRateMap = runOnAllActiveNodes("fetching output rate metric value",
res -> res.getMetric(OUTPUT_RATE_METRIC_NAME), true)
.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> (Double) entry.getValue().get("value")));
final boolean allZero = new HashSet<>(nodeOutputRateMap.values()).stream()
.allMatch(this::isOutputRateCloseToZero);
final Set<String> nonZeroNodes = nodeOutputRateMap
.entrySet()
.stream()
.filter(e -> !isOutputRateCloseToZero(e.getValue()))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
if (allZero) {
LOG.info("Output buffer is now empty on all nodes.");
} else {
LOG.info("Output rate has not yet reached zero on nodes [{}].", nonZeroNodes);
}
return new NodeOperationResult(allZero, nonZeroNodes);
});
} catch (RetryException e) {
final String message = f("The [%s] rate failed to reach zero on all nodes in [%s] with [%s] retries. Giving up. " +
"This is configurable with the [%s] and [%s] configuration properties", OUTPUT_RATE_METRIC_NAME,
bufferDrainInterval.toSeconds(), maxBufferDrainRetries, INSTALL_OUTPUT_BUFFER_DRAINING_INTERVAL,
INSTALL_OUTPUT_BUFFER_DRAINING_MAX_RETRIES);
LOG.error(message);
throw new OutputBufferDrainFailureException(bufferDrainInterval.toSeconds(), maxBufferDrainRetries,
tryGetExceptionNodes(e));
} catch (Exception e) {
throw new ClusterProcessingControlException("Failed to request node output rate on all nodes.", e);
}
}

/**
* Try to retrieve the nodes that have a non-zero output rate from the RetryException.
* This should succeed with the current implementation.
*/
protected static Set<String> tryGetExceptionNodes(RetryException e) {
try {
return ((NodeOperationResult) e.getLastFailedAttempt().get()).nonZeroOutputRateNodeIds();
} catch (ExecutionException ex) {
return Collections.emptySet();
}
}

public record NodeOperationResult(boolean success, Set<String> nonZeroOutputRateNodeIds) {
}

/**
* The output rate is the number of messages per second that are being written to OpenSearch (usually a
* whole number followed by some meaningless decimals - e.g. 100.01 messages/second).
* A value < 1 is effectively zero. The rate might become very small, but not zero in some cases,
* so this method accounts for that condition.
*/
protected boolean isOutputRateCloseToZero(double outputRate) {
return outputRate < 0.0001;
}

public void resumeGraylogMessageProcessing() {
LOG.info("Attempting to resume processing on all nodes...");
runOnAllActiveNodes("resume processing", RemoteProcessingControlResource::resumeProcessing, false);
LOG.info("Done resuming processing on all nodes.");
}

protected void printNodeDebugInfo() {
if (LOG.isDebugEnabled()) {
LOG.debug("The Graylog cluster contains the following nodes:");
nodeService.allActive().entrySet().forEach((entry) -> {
final Node node = entry.getValue();
LOG.debug("Node ID [{}] Transport Address [{}] Last Seen [{}]", node.getNodeId(), node.getTransportAddress(), node.getLastSeen());
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.system.processing.control;

public class ClusterProcessingControlException extends RuntimeException {

public ClusterProcessingControlException(String message) {
super(message);
}

public ClusterProcessingControlException(String message, Throwable cause) {
super(message, cause);
}
}
Loading

0 comments on commit 062d880

Please sign in to comment.