Skip to content

Commit

Permalink
Allow resetting migration state map (#18321)
Browse files Browse the repository at this point in the history
  • Loading branch information
todvora authored Feb 20, 2024
1 parent ba70caf commit d623aae
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public interface MigrationActions {

void startDataNodes();

void resetMigration();

boolean dataNodeStartupFinished();

void setStateMachineContext(MigrationStateMachineContext context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachineContext;
import org.graylog.plugins.views.storage.migration.state.persistence.DatanodeMigrationConfiguration;
import org.graylog.security.certutil.CaService;
import org.graylog.security.certutil.ca.exceptions.KeyStoreStorageException;
import org.graylog2.bootstrap.preflight.PreflightConfigResult;
Expand Down Expand Up @@ -75,11 +74,6 @@ public MigrationActionsImpl(final ClusterConfigService clusterConfigService, Nod
this.preflightConfigService = preflightConfigService;
}

@Override
public void resetMigration() {
clusterConfigService.remove(DatanodeMigrationConfiguration.class);
}


@Override
public boolean runDirectoryCompatibilityCheck() {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,6 @@ public interface MigrationStateMachine {
MigrationStateMachineContext getContext();

String serialize();

void reset();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

import com.github.oxo42.stateless4j.StateMachine;
import com.github.oxo42.stateless4j.StateMachineConfig;
import com.github.oxo42.stateless4j.delegates.Trace;
import com.github.oxo42.stateless4j.delegates.Action1;
import com.github.oxo42.stateless4j.delegates.Func;
import com.google.common.annotations.VisibleForTesting;
import org.graylog.plugins.views.storage.migration.state.actions.MigrationActions;
import org.graylog.plugins.views.storage.migration.state.persistence.DatanodeMigrationConfiguration;
Expand Down Expand Up @@ -84,7 +85,7 @@ private static StateMachineConfig<MigrationState, MigrationStep> configureStates
// we now have enough information in the context to start the remote reindex migration. This will move us to the
// next state that will be active as long as the migration is running and will provide status information to the FE
config.configure(MigrationState.MIGRATE_EXISTING_DATA) // this state and screen has to request username, password and url of the old cluster
.permit(MigrationStep.START_REMOTE_REINDEX_MIGRATION, MigrationState.REMOTE_REINDEX_RUNNING, migrationActions::startRemoteReindex);
.permit(MigrationStep.START_REMOTE_REINDEX_MIGRATION, MigrationState.REMOTE_REINDEX_RUNNING, migrationActions::startRemoteReindex);

// the state machine will stay in this state till the migration is finished or fails. It should provide
// current migration status every time we trigger MigrationStep.REQUEST_MIGRATION_STATUS.
Expand Down Expand Up @@ -129,17 +130,18 @@ static StateMachine<MigrationState, MigrationStep> buildWithTestState(MigrationS
return new StateMachine<>(state, config);
}

private static StateMachine<MigrationState, MigrationStep> fromState(MigrationState state, Trace<MigrationState, MigrationStep> persistence, MigrationActions migrationActions) {
final StateMachineConfig<MigrationState, MigrationStep> config = configureStates(migrationActions);
final StateMachine<MigrationState, MigrationStep> stateMachine = new StateMachine<>(state, config);
stateMachine.fireInitialTransition(); // asserts that onEntry will be performed when loading persisted state
stateMachine.setTrace(persistence);
return stateMachine;
}

public static StateMachine<MigrationState, MigrationStep> buildFromPersistedState(DatanodeMigrationPersistence persistenceService, MigrationActions migrationActions) {
final MigrationState state = persistenceService.getConfiguration().map(DatanodeMigrationConfiguration::currentState).orElse(MigrationState.NEW);
final Trace<MigrationState, MigrationStep> persitanceTrace = new DatanodeMigrationPersistanceTrace(persistenceService);
return fromState(state, persitanceTrace, migrationActions);
public static StateMachine<MigrationState, MigrationStep> buildFromPersistedState(DatanodeMigrationPersistence persistence, MigrationActions migrationActions) {
final StateMachineConfig<MigrationState, MigrationStep> statesConfiguration = configureStates(migrationActions);

// state accessor and mutator
final Func<MigrationState> readStateFunction = () -> persistence.getConfiguration().map(DatanodeMigrationConfiguration::currentState).orElse(MigrationState.NEW);
final Action1<MigrationState> writeStateFunction = (currentState) -> persistence.saveConfiguration(new DatanodeMigrationConfiguration(currentState));

return new StateMachine<>(
readStateFunction.call(), // initial state obtained from the DB
readStateFunction,
writeStateFunction,
statesConfiguration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.github.oxo42.stateless4j.StateMachine;
import org.graylog.plugins.views.storage.migration.state.actions.MigrationActions;
import org.graylog.plugins.views.storage.migration.state.persistence.DatanodeMigrationConfiguration;
import org.graylog.plugins.views.storage.migration.state.persistence.DatanodeMigrationPersistence;
import org.graylog.plugins.views.storage.migration.state.rest.CurrentStateInformation;

Expand Down Expand Up @@ -84,4 +85,17 @@ public String serialize() {
throw new RuntimeException("Failed to serialize state map", e);
}
}


/**
* The state machine is configured to obtain from and persist state via the underlying persistence service. If we
* change the state in the persistence, it will be automatically changed in the state machine as well.
*
* @see MigrationStateMachineBuilder#buildFromPersistedState(DatanodeMigrationPersistence, MigrationActions)
*/
@Override
public void reset() {
persistenceService.saveStateMachineContext(new MigrationStateMachineContext());
persistenceService.saveConfiguration(new DatanodeMigrationConfiguration(MigrationState.NEW));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import jakarta.inject.Inject;
import jakarta.validation.constraints.NotNull;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
Expand Down Expand Up @@ -84,4 +85,14 @@ public String serialize() {
// you can use https://dreampuf.github.io/GraphvizOnline/ to visualize the result
return stateMachine.serialize();
}

@DELETE
@Path("/state")
@NoAuditEvent("No Audit Event needed") // TODO: do we need audit log here?
@RequiresPermissions(RestPermissions.DATANODE_MIGRATION)
@ApiOperation(value = "Reset the whole migration to the first step, start over")
public CurrentStateInformation resetState() {
stateMachine.reset();
return new CurrentStateInformation(stateMachine.getState(), stateMachine.nextSteps());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,22 @@ public boolean directoryCompatibilityCheckOk() {
.containsEntry("foo", "bar");
}

@Test
void testReset() {
final InMemoryStateMachinePersistence persistence = new InMemoryStateMachinePersistence();
final MigrationStateMachineProvider provider = new MigrationStateMachineProvider(persistence, new MigrationActionsAdapter());
final MigrationStateMachine sm = provider.get();
sm.trigger(MigrationStep.SELECT_MIGRATION, Collections.emptyMap());
sm.trigger(MigrationStep.SKIP_DIRECTORY_COMPATIBILITY_CHECK, Collections.emptyMap());

Assertions.assertThat(sm.getState()).isEqualTo(MigrationState.CA_CREATION_PAGE);

sm.reset();

Assertions.assertThat(sm.getState()).isEqualTo(MigrationState.NEW);

}

@Test
void testSerialization() {
final MigrationStateMachine migrationStateMachine = new MigrationStateMachineProvider(new InMemoryStateMachinePersistence(), new MigrationActionsAdapter()).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ public MigrationActionsAdapter() {
this.context = new MigrationStateMachineContext();
}

@Override
public void resetMigration() {

}

@Override
public boolean dataNodeStartupFinished() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,33 @@
import jakarta.ws.rs.core.HttpHeaders;
import jakarta.ws.rs.core.Response;
import org.apache.commons.lang3.tuple.Pair;
import org.assertj.core.api.Assertions;
import org.graylog.plugins.views.storage.migration.state.InMemoryStateMachinePersistence;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationActionsAdapter;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationState;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachine;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachineBuilder;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachineContext;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachineImpl;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStep;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.when;

public class MigrationStateResourceTest {

@Test
public void authenticationTokenSetToStateMachineContext() {
final CurrentStateInformation state = new CurrentStateInformation(MigrationState.NEW, List.of(MigrationStep.SELECT_MIGRATION));
final MigrationStateMachine stateMachine = mockStateMachine(state);
final MigrationStateMachine stateMachine = createStateMachine();

final String expectedAuthToken = "MyAuthorization";
final MigrationStateResource resource = new MigrationStateResource(
new MigrationStateResource(
stateMachine,
mockHttpHeaders(Pair.of(HttpHeaders.AUTHORIZATION, expectedAuthToken))
);
Expand All @@ -58,62 +55,55 @@ public void authenticationTokenSetToStateMachineContext() {

@Test
public void requestReturnsSuccessfulResult() {
CurrentStateInformation state = new CurrentStateInformation(MigrationState.NEW, List.of(MigrationStep.SELECT_MIGRATION));
final MigrationStateResource resource = new MigrationStateResource(mockStateMachine(state), mockHttpHeaders());
final MigrationStateResource resource = new MigrationStateResource(createStateMachine(), mockHttpHeaders());

try (Response response = resource.trigger(new MigrationStepRequest(MigrationStep.SELECT_MIGRATION, Map.of()))) {
assertThat(response.getStatus()).isEqualTo(200);
assertThat(response.getEntity()).isEqualTo(state);
assertThat(response.getEntity())
.isInstanceOf(CurrentStateInformation.class)
.extracting(e -> (CurrentStateInformation) e)
.satisfies(entity -> assertThat(entity.hasErrors()).isFalse());
}
}

@Test
public void requestReturns500OnError() {
final CurrentStateInformation expectedState = new CurrentStateInformation(MigrationState.NEW, List.of(MigrationStep.SELECT_MIGRATION), "Error", null);
final MigrationStateResource resource = new MigrationStateResource(mockStateMachine(expectedState), mockHttpHeaders());

try (Response response = resource.trigger(new MigrationStepRequest(MigrationStep.SELECT_MIGRATION, Map.of()))) {
final MigrationStateResource resource = new MigrationStateResource(createStateMachine(), mockHttpHeaders());
// trigger a step that's not allowed in this state. That should cause and propagate an error
try (Response response = resource.trigger(new MigrationStepRequest(MigrationStep.CONFIRM_OLD_CLUSTER_STOPPED, Map.of()))) {
assertThat(response.getStatus()).isEqualTo(500);
assertThat(response.getEntity()).isEqualTo(expectedState);
final Object entity = response.getEntity();
assertThat(entity)
.isInstanceOf(CurrentStateInformation.class)
.extracting(e -> (CurrentStateInformation) e)
.extracting(CurrentStateInformation::errorMessage)
.isEqualTo("No valid leaving transitions are permitted from state 'NEW' for trigger 'CONFIRM_OLD_CLUSTER_STOPPED'. Consider ignoring the trigger.");
}
}

@Test
void testReset() {
final MigrationStateResource resource = new MigrationStateResource(createStateMachine(), mockHttpHeaders());
resource.trigger(new MigrationStepRequest(MigrationStep.SELECT_MIGRATION, Map.of()));
assertThat(resource.status().state()).isEqualTo(MigrationState.MIGRATION_WELCOME_PAGE);
resource.resetState();
assertThat(resource.status().state()).isEqualTo(MigrationState.NEW);
}

@SafeVarargs
private HttpHeaders mockHttpHeaders(Pair<String, String>... headers) {
final HttpHeaders httpHeaders = Mockito.mock(HttpHeaders.class);
Arrays.stream(headers).forEach(pair -> when(httpHeaders.getRequestHeader(pair.getKey())).thenReturn(List.of(pair.getValue())));
return httpHeaders;
}

private MigrationStateMachine mockStateMachine(CurrentStateInformation state) {

final MigrationStateMachineContext context = new MigrationStateMachineContext();

return new MigrationStateMachine() {
@Override
public CurrentStateInformation trigger(MigrationStep step, Map<String, Object> args) {
return state;
}

@Override
public MigrationState getState() {
return state.state();
}

@Override
public List<MigrationStep> nextSteps() {
return state.nextSteps();
}

@Override
public MigrationStateMachineContext getContext() {
return context;
}

@Override
public String serialize() {
throw new UnsupportedOperationException("not implemented");
}
};
private MigrationStateMachine createStateMachine() {
final InMemoryStateMachinePersistence persistence = new InMemoryStateMachinePersistence();
final MigrationActionsAdapter actions = new MigrationActionsAdapter();
return new MigrationStateMachineImpl(
MigrationStateMachineBuilder.buildFromPersistedState(persistence, actions),
actions,
persistence
);
}
}

0 comments on commit d623aae

Please sign in to comment.