Skip to content

Commit

Permalink
chore(merge): release-10.2.0 into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
bonita-ci committed Sep 6, 2024
2 parents 2ce5a5e + 0537f5d commit 874714c
Show file tree
Hide file tree
Showing 12 changed files with 496 additions and 418 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.*;

import lombok.extern.slf4j.Slf4j;
import org.bonitasoft.engine.exception.BonitaRuntimeException;
Expand All @@ -44,6 +41,11 @@ public class SpringBeanAccessor {
private static final String HAZELCAST_CONFIG_FILENAME = "hazelcast.xml";

static final BonitaHomeServer BONITA_HOME_SERVER = BonitaHomeServer.getInstance();

private static final String WORK_CORE_POOL_SIZE = "bonita.tenant.work.corePoolSize";
private static final String WORK_MAX_POOL_SIZE = "bonita.tenant.work.maximumPoolSize";
private static final String WORK_KEEP_ALIVE_IN_SECONDS = "bonita.tenant.work.keepAliveTimeSeconds";

private BonitaSpringContext context;

private boolean contextFinishedInitialized = false;
Expand Down Expand Up @@ -108,6 +110,24 @@ private void configureContext(BonitaSpringContext context) throws IOException {
propertySources.addAfter(StandardEnvironment.SYSTEM_ENVIRONMENT_PROPERTY_SOURCE_NAME,
new PropertiesPropertySource("contextProperties", getProperties()));
}
warnDeprecatedProperties(propertySources);
}

protected void warnDeprecatedProperties(MutablePropertySources propertySources) {
warnIfPropertyIsDeprecated(propertySources, WORK_CORE_POOL_SIZE);
warnIfPropertyIsDeprecated(propertySources, WORK_MAX_POOL_SIZE);
warnIfPropertyIsDeprecated(propertySources, WORK_KEEP_ALIVE_IN_SECONDS);
}

private void warnIfPropertyIsDeprecated(MutablePropertySources propertySources, String property) {
propertySources.stream()
.filter(ps -> ps.containsProperty(property))
.map(ps -> ps.getProperty(property))
.filter(Objects::nonNull)
.findFirst()
.ifPresent(value -> log.warn(
"{} property is not supported in community edition anymore. It will be ignored.",
property));
}

protected BonitaSpringContext createContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ bonita.tenant.connector.warnWhenLongerThanMillis=10000
# Work service
# Time to wait in seconds for all work to terminate when the service is paused or stopped
bonita.tenant.work.terminationTimeout=30
bonita.tenant.work.corePoolSize=10
bonita.tenant.work.maximumPoolSize=10
bonita.tenant.work.keepAliveTimeSeconds=60
bonita.tenant.work.queueCapacity=500000

# Add a delay on work when the transaction that registers the work has multiple XA Resources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
**/
package org.bonitasoft.engine.work;

import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -38,8 +40,10 @@ public interface BonitaExecutorService {
*
* @param work
*/
void submit(WorkDescriptor work);
Future<?> submit(WorkDescriptor work);

boolean awaitTermination(long workTerminationTimeout, TimeUnit seconds) throws InterruptedException;

ThreadPoolExecutor getExecutor();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Copyright (C) 2024 Bonitasoft S.A.
* Bonitasoft, 32 rue Gustave Eiffel - 38000 Grenoble
* This library is free software; you can redistribute it and/or modify it under the terms
* of the GNU Lesser General Public License as published by the Free Software Foundation
* version 2.1 of the License.
* This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public License along with this
* program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth
* Floor, Boston, MA 02110-1301, USA.
**/
package org.bonitasoft.engine.work;

import java.util.concurrent.ThreadPoolExecutor;

public interface BonitaThreadPoolExecutorFactory {

ThreadPoolExecutor create();

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,7 @@

import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

import io.micrometer.core.instrument.Counter;
Expand All @@ -34,86 +27,74 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Julien Reboul
* @author Baptiste Mesta
*/

public class BonitaThreadPoolExecutor extends ThreadPoolExecutor implements BonitaExecutorService {
public class DefaultBonitaExecutorService implements BonitaExecutorService {

private Logger log = LoggerFactory.getLogger(BonitaThreadPoolExecutor.class);
private static final Logger log = LoggerFactory.getLogger(DefaultBonitaExecutorService.class);
public static final String NUMBER_OF_WORKS_PENDING = "bonita.bpmengine.work.pending";
public static final String NUMBER_OF_WORKS_RUNNING = "bonita.bpmengine.work.running";
public static final String NUMBER_OF_WORKS_EXECUTED = "bonita.bpmengine.work.executed";
public static final String WORKS_UNIT = "works";

private final BlockingQueue<Runnable> workQueue;
private final WorkFactory workFactory;
private final EngineClock engineClock;
private final WorkExecutionCallback workExecutionCallback;
private WorkExecutionAuditor workExecutionAuditor;
private MeterRegistry meterRegistry;
private final WorkExecutionAuditor workExecutionAuditor;
private final MeterRegistry meterRegistry;

private final AtomicLong runningWorks = new AtomicLong();
private final Counter executedWorkCounter;
private final Gauge numberOfWorksPending;
private final Gauge numberOfWorksRunning;

public BonitaThreadPoolExecutor(final int corePoolSize,
final int maximumPoolSize,
final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue,
final ThreadFactory threadFactory,
final RejectedExecutionHandler handler, WorkFactory workFactory, EngineClock engineClock,
WorkExecutionCallback workExecutionCallback,
WorkExecutionAuditor workExecutionAuditor, MeterRegistry meterRegistry, long tenantId) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.workQueue = workQueue;
private final ThreadPoolExecutor executor;

public DefaultBonitaExecutorService(final ThreadPoolExecutor executor,
final WorkFactory workFactory,
final EngineClock engineClock,
final WorkExecutionCallback workExecutionCallback,
final WorkExecutionAuditor workExecutionAuditor,
final MeterRegistry meterRegistry,
final long tenantId) {
this.executor = executor;
this.workFactory = workFactory;
this.engineClock = engineClock;
this.workExecutionCallback = workExecutionCallback;
this.workExecutionAuditor = workExecutionAuditor;
this.meterRegistry = meterRegistry;

Tags tags = Tags.of("tenant", String.valueOf(tenantId));
numberOfWorksPending = Gauge.builder(NUMBER_OF_WORKS_PENDING, workQueue, Collection::size)
.tags(tags).baseUnit("works").description("Works pending in the execution queue")
numberOfWorksPending = Gauge.builder(NUMBER_OF_WORKS_PENDING, executor.getQueue(), Collection::size)
.tags(tags).baseUnit(WORKS_UNIT).description("Works pending in the execution queue")
.register(meterRegistry);
numberOfWorksRunning = Gauge.builder(NUMBER_OF_WORKS_RUNNING, runningWorks, AtomicLong::get)
.tags(tags).baseUnit("works").description("Works currently executing")
.tags(tags).baseUnit(WORKS_UNIT).description("Works currently executing")
.register(meterRegistry);
executedWorkCounter = Counter.builder(NUMBER_OF_WORKS_EXECUTED)
.tags(tags).baseUnit("works").description("total works executed since last server start")
.tags(tags).baseUnit(WORKS_UNIT).description("total works executed since last server start")
.register(meterRegistry);
}

@Override
public void clearAllQueues() {
workQueue.clear();
public ThreadPoolExecutor getExecutor() {
return executor;
}

@Override
public Future<?> submit(final Runnable task) {
// only submit if not shutdown
if (!isShutdown()) {
execute(task);
}
return null;
public void clearAllQueues() {
executor.getQueue().clear();
}

@Override
public void shutdownAndEmptyQueue() {
super.shutdown();
log.info("Clearing queue of work, had {} elements", workQueue.size());
workQueue.clear();
executor.shutdown();
log.info("Clearing queue of work, had {} elements", executor.getQueue().size());
executor.getQueue().clear();
meterRegistry.remove(numberOfWorksPending);
meterRegistry.remove(numberOfWorksRunning);
meterRegistry.remove(executedWorkCounter);
}

@Override
public void submit(WorkDescriptor work) {
submit(() -> {
public Future<?> submit(WorkDescriptor work) {
return executor.submit(() -> {
if (isRequiringDelayedExecution(work)) {
// Future implementation should use a real delay e.g. using a ScheduledThreadPoolExecutor
// Will be executed later
Expand Down Expand Up @@ -152,6 +133,11 @@ public void submit(WorkDescriptor work) {
});
}

@Override
public boolean awaitTermination(long workTerminationTimeout, TimeUnit seconds) throws InterruptedException {
return executor.awaitTermination(workTerminationTimeout, seconds);
}

private boolean isRequiringDelayedExecution(WorkDescriptor work) {
return work.getExecutionThreshold() != null && work.getExecutionThreshold().isAfter(engineClock.now());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@
**/
package org.bonitasoft.engine.work;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import io.micrometer.core.instrument.MeterRegistry;
import org.bonitasoft.engine.commons.time.EngineClock;
Expand All @@ -45,83 +40,55 @@
@Component("bonitaExecutorServiceFactory")
public class DefaultBonitaExecutorServiceFactory implements BonitaExecutorServiceFactory {

private Logger logger = LoggerFactory.getLogger(DefaultBonitaExecutorServiceFactory.class);
private final int corePoolSize;
private final int queueCapacity;
private final int maximumPoolSize;
private final long keepAliveTimeSeconds;
private final EngineClock engineClock;
private final WorkFactory workFactory;
private static final String BONITA_WORK_EXECUTOR = "bonita-work-executor";
private final Logger logger = LoggerFactory.getLogger(DefaultBonitaExecutorServiceFactory.class);

private final long tenantId;
private final WorkExecutionAuditor workExecutionAuditor;
private final MeterRegistry meterRegistry;
private final ExecutorServiceMetricsProvider executorServiceMetricsProvider;
private final BonitaThreadPoolExecutorFactory bonitaThreadPoolExecutorFactory;
private final EngineClock engineClock;
private final WorkFactory workFactory;
private final WorkExecutionAuditor workExecutionAuditor;

public DefaultBonitaExecutorServiceFactory(
WorkFactory workFactory,
@Value("${tenantId}") long tenantId,
@Value("${bonita.tenant.work.corePoolSize}") int corePoolSize,
@Value("${bonita.tenant.work.queueCapacity}") int queueCapacity,
@Value("${bonita.tenant.work.maximumPoolSize}") int maximumPoolSize,
@Value("${bonita.tenant.work.keepAliveTimeSeconds}") long keepAliveTimeSeconds,
public DefaultBonitaExecutorServiceFactory(@Value("${tenantId}") long tenantId,
MeterRegistry meterRegistry,
EngineClock engineClock,
WorkFactory workFactory,
WorkExecutionAuditor workExecutionAuditor,
MeterRegistry meterRegistry,
ExecutorServiceMetricsProvider executorServiceMetricsProvider) {
this.workFactory = workFactory;
ExecutorServiceMetricsProvider executorServiceMetricsProvider,
BonitaThreadPoolExecutorFactory bonitaThreadPoolExecutorFactory) {
this.tenantId = tenantId;
this.corePoolSize = corePoolSize;
this.queueCapacity = queueCapacity;
this.maximumPoolSize = maximumPoolSize;
this.keepAliveTimeSeconds = keepAliveTimeSeconds;
this.engineClock = engineClock;
this.workExecutionAuditor = workExecutionAuditor;
this.meterRegistry = meterRegistry;
this.workFactory = workFactory;
this.workExecutionAuditor = workExecutionAuditor;
this.engineClock = engineClock;
this.executorServiceMetricsProvider = executorServiceMetricsProvider;
this.bonitaThreadPoolExecutorFactory = bonitaThreadPoolExecutorFactory;
}

@Override
public BonitaExecutorService createExecutorService(WorkExecutionCallback workExecutionCallback) {
final BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(queueCapacity);
final RejectedExecutionHandler handler = new QueueRejectedExecutionHandler();
final WorkerThreadFactory threadFactory = new WorkerThreadFactory("Bonita-Worker", tenantId, maximumPoolSize);

final BonitaThreadPoolExecutor bonitaThreadPoolExecutor = new BonitaThreadPoolExecutor(corePoolSize,
maximumPoolSize, keepAliveTimeSeconds, TimeUnit.SECONDS,
workQueue, threadFactory, handler, workFactory, engineClock, workExecutionCallback,
workExecutionAuditor, meterRegistry, tenantId);
final ThreadPoolExecutor bonitaThreadPoolExecutor = bonitaThreadPoolExecutorFactory.create();
final BonitaExecutorService bonitaExecutorService = new DefaultBonitaExecutorService(bonitaThreadPoolExecutor,
workFactory,
engineClock,
workExecutionCallback,
workExecutionAuditor,
meterRegistry,
tenantId);
logger.info(
"Creating a new Thread pool to handle works: " + bonitaThreadPoolExecutor);
"Creating a new Thread pool to handle works: {}", bonitaThreadPoolExecutor);

//TODO this returns the timed executor service, this should be used instead of the BonitaExecutorService but we should change it everywhere
executorServiceMetricsProvider
.bindMetricsOnly(meterRegistry, bonitaThreadPoolExecutor, "bonita-work-executor", tenantId);
return bonitaThreadPoolExecutor;
.bindMetricsOnly(meterRegistry, bonitaThreadPoolExecutor, BONITA_WORK_EXECUTOR, tenantId);
return bonitaExecutorService;
}

@Override
public void unbind() {
executorServiceMetricsProvider.unbind(meterRegistry, "bonita-work-executor", tenantId);
}

private final class QueueRejectedExecutionHandler implements RejectedExecutionHandler {

public QueueRejectedExecutionHandler() {
}

@Override
public void rejectedExecution(final Runnable task, final ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
logger.info("Tried to run work " + task
+ " but the work service is shutdown. work will be restarted with the node");
} else {
throw new RejectedExecutionException(
"Unable to run the task "
+ task
+ "\n your work queue is full you might consider changing your configuration to scale more. See parameter 'queueCapacity' in bonita.home configuration files.");
}
}

executorServiceMetricsProvider.unbind(meterRegistry, BONITA_WORK_EXECUTOR, tenantId);
}

}
Loading

0 comments on commit 874714c

Please sign in to comment.