Skip to content

Commit

Permalink
move reaper thread to DCP extension
Browse files Browse the repository at this point in the history
  • Loading branch information
paullatzelsperger committed Oct 18, 2024
1 parent e4878a7 commit b2f1389
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ dependencies {
testImplementation(project(":core:common:lib:json-ld-lib"))
testImplementation(project(":extensions:common:json-ld"))
testImplementation(libs.nimbus.jwt)
testImplementation(libs.awaitility)
}

Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.eclipse.edc.security.signature.jws2020.Jws2020SignatureSuite;
import org.eclipse.edc.spi.agent.ParticipantAgentService;
import org.eclipse.edc.spi.iam.IdentityService;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
Expand All @@ -66,6 +67,9 @@
import java.net.URISyntaxException;
import java.time.Clock;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static org.eclipse.edc.iam.verifiablecredentials.spi.VcConstants.STATUSLIST_2021_URL;
import static org.eclipse.edc.spi.constants.CoreConstants.JSON_LD;
Expand All @@ -83,6 +87,9 @@ public class IdentityAndTrustExtension implements ServiceExtension {

public static final String JSON_2020_SIGNATURE_SUITE = "JsonWebSignature2020";

public static final long DEFAULT_CLEANUP_PERIOD_SECONDS = 60;
@Setting(value = "The period of the JTI entry reaper thread in seconds", defaultValue = DEFAULT_CLEANUP_PERIOD_SECONDS + "")
public static final String CLEANUP_PERIOD = "edc.sql.store.jti.cleanup.period";

@Inject
private SecureTokenService secureTokenService;
Expand Down Expand Up @@ -132,9 +139,12 @@ public class IdentityAndTrustExtension implements ServiceExtension {

@Inject
private JtiValidationStore jtiValidationStore;

@Inject
private ExecutorInstrumentation executorInstrumentation;
private PresentationVerifier presentationVerifier;
private CredentialServiceClient credentialServiceClient;
private long reaperThreadPeriod;
private ScheduledFuture<?> jtiEntryReaperThread;

@Override
public void initialize(ServiceExtensionContext context) {
Expand All @@ -153,6 +163,8 @@ public void initialize(ServiceExtensionContext context) {
// TODO move in a separated extension?
signatureSuiteRegistry.register(JSON_2020_SIGNATURE_SUITE, new Jws2020SignatureSuite(typeManager.getMapper(JSON_LD)));

reaperThreadPeriod = context.getSetting(CLEANUP_PERIOD, DEFAULT_CLEANUP_PERIOD_SECONDS);

try {
jsonLd.registerCachedDocument(STATUSLIST_2021_URL, getClass().getClassLoader().getResource("statuslist2021.json").toURI());
} catch (URISyntaxException e) {
Expand All @@ -167,6 +179,17 @@ public void initialize(ServiceExtensionContext context) {
revocationServiceRegistry.addService(BitstringStatusListStatus.TYPE, new BitstringStatusListRevocationService(typeManager.getMapper(), validity));
}

@Override
public void start() {
jtiEntryReaperThread = executorInstrumentation.instrument(Executors.newSingleThreadScheduledExecutor(), "JTI Validation Entry Reaper Thread")
.scheduleAtFixedRate(jtiValidationStore::deleteExpired, reaperThreadPeriod, reaperThreadPeriod, TimeUnit.SECONDS);
}

@Override
public void shutdown() {
jtiEntryReaperThread.cancel(true);
}

@Provider
public IdentityService createIdentityService(ServiceExtensionContext context) {
var credentialServiceUrlResolver = new DidCredentialServiceUrlResolver(didResolverRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@
import org.eclipse.edc.iam.identitytrust.spi.SecureTokenService;
import org.eclipse.edc.json.JacksonTypeManager;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.jwt.validation.jti.JtiValidationStore;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.configuration.Config;
import org.eclipse.edc.spi.types.TypeManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.iam.identitytrust.core.IdentityAndTrustExtension.CLEANUP_PERIOD;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.atLeastOnce;
Expand All @@ -36,10 +43,14 @@
@ExtendWith(DependencyInjectionExtension.class)
class IdentityAndTrustExtensionTest {

private final JtiValidationStore storeMock = mock();

@BeforeEach
void setUp(ServiceExtensionContext context) {
context.registerService(SecureTokenService.class, mock());
context.registerService(TypeManager.class, new JacksonTypeManager());
context.registerService(JtiValidationStore.class, storeMock);
context.registerService(ExecutorInstrumentation.class, ExecutorInstrumentation.noop());
}

@Test
Expand All @@ -53,4 +64,16 @@ void verifyCorrectService(IdentityAndTrustExtension extension, ServiceExtensionC
assertThat(is).isInstanceOf(IdentityAndTrustService.class);
verify(configMock, atLeastOnce()).getString(eq(IdentityAndTrustExtension.CONNECTOR_DID_PROPERTY), isNull());
}

@Test
void assertReaperThreadRunning(IdentityAndTrustExtension extension, ServiceExtensionContext context) {
when(context.getSetting(eq(CLEANUP_PERIOD), anyLong())).thenReturn(1L);

extension.initialize(context);
extension.start();

await().atLeast(Duration.ofSeconds(1)) // that's the initial delay
.untilAsserted(() -> verify(storeMock, atLeastOnce()).deleteExpired());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
Expand All @@ -31,20 +30,14 @@
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.eclipse.edc.transaction.spi.TransactionContext;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@Provides({ JtiValidationStore.class })
@Extension(value = "SQL JTI Validation store")
public class SqlJtiValidationStoreExtension implements ServiceExtension {


@Setting(value = "The datasource to be used", defaultValue = DataSourceRegistry.DEFAULT_DATASOURCE)
public static final String DATASOURCE_NAME = "edc.sql.store.jti.datasource";
public static final long DEFAULT_CLEANUP_PERIOD_SECONDS = 60;
@Setting(value = "The period of the JTI entry reaper thread in seconds", defaultValue = DEFAULT_CLEANUP_PERIOD_SECONDS + "")
public static final String CLEANUP_PERIOD = "edc.sql.store.jti.cleanup.period";


@Inject
private DataSourceRegistry dataSourceRegistry;
Expand All @@ -63,36 +56,19 @@ public class SqlJtiValidationStoreExtension implements ServiceExtension {

@Inject
private SqlSchemaBootstrapper sqlSchemaBootstrapper;
@Inject
private ExecutorInstrumentation executorInstrumentation;
private SqlJtiValidationStore sqlStore;
private long reaperThreadPeriod;
private ScheduledFuture<?> reaperThread;


@Override
public void initialize(ServiceExtensionContext context) {
var dataSourceName = context.getConfig().getString(DATASOURCE_NAME, DataSourceRegistry.DEFAULT_DATASOURCE);

reaperThreadPeriod = context.getSetting(CLEANUP_PERIOD, DEFAULT_CLEANUP_PERIOD_SECONDS);

sqlStore = new SqlJtiValidationStore(dataSourceRegistry, dataSourceName, transactionContext, typeManager.getMapper(),
var sqlStore = new SqlJtiValidationStore(dataSourceRegistry, dataSourceName, transactionContext, typeManager.getMapper(),
getStatementImpl(), queryExecutor, context.getMonitor());

context.registerService(JtiValidationStore.class, sqlStore);
sqlSchemaBootstrapper.addStatementFromResource(dataSourceName, "jti-validation-schema.sql");
}

@Override
public void start() {
reaperThread = executorInstrumentation.instrument(Executors.newSingleThreadScheduledExecutor(), "SQL JTI Validation Reaper Thread")
.scheduleAtFixedRate(sqlStore::deleteExpired, reaperThreadPeriod, reaperThreadPeriod, TimeUnit.SECONDS);
}

@Override
public void shutdown() {
reaperThread.cancel(true);
}

private JtiValidationStoreStatements getStatementImpl() {
return statements == null ? new PostgresDialectStatements() : statements;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,4 @@ void shouldInitializeTheStore(SqlJtiValidationStoreExtension extension, ServiceE

verify(config).getString(eq(DATASOURCE_NAME), any());
}

@Test
void shouldStartReaperThread_withDefaultConfig(SqlJtiValidationStoreExtension extension, ServiceExtensionContext context) {
var config = mock(Config.class);
when(context.getConfig()).thenReturn(config);
when(config.getLong(SqlJtiValidationStoreExtension.CLEANUP_PERIOD, any())).thenReturn(1L);

extension.initialize(context);
extension.start();
}
}

0 comments on commit b2f1389

Please sign in to comment.