From 64d7c2e1dd3f082930b99313453825568f39625f Mon Sep 17 00:00:00 2001 From: sergiyvamz <75754709+sergiyvamz@users.noreply.github.com> Date: Wed, 20 Dec 2023 13:05:56 -0800 Subject: [PATCH] introduces Enhanced Host Monitoring v.2 plugin (#513) performance tests fixes and docs Co-authored-by: sergiyv-bitquill --- README.md | 20 + build.gradle.kts | 7 +- .../mysql/cj/conf/PropertyDefinitions.java | 2 +- .../plugins/efm2/DefaultMonitorService.java | 174 +++++++ .../cj/jdbc/ha/plugins/efm2/IMonitor.java | 42 ++ .../ha/plugins/efm2/IMonitorInitializer.java | 49 ++ .../jdbc/ha/plugins/efm2/IMonitorService.java | 62 +++ .../cj/jdbc/ha/plugins/efm2/Monitor.java | 427 ++++++++++++++++++ .../efm2/MonitorConnectionContext.java | 87 ++++ .../efm2/NodeMonitoringConnectionPlugin.java | 312 +++++++++++++ ...NodeMonitoringConnectionPluginFactory.java | 52 +++ .../jdbc/ha/util/SlidingExpirationCache.java | 292 ++++++++++++ ...idingExpirationCacheWithCleanupThread.java | 93 ++++ ...ysqlPerformanceForEfm2IntegrationTest.java | 72 +++ ...AuroraMysqlPerformanceIntegrationTest.java | 19 +- 15 files changed, 1698 insertions(+), 12 deletions(-) create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/DefaultMonitorService.java create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitor.java create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitorInitializer.java create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitorService.java create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/Monitor.java create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/MonitorConnectionContext.java create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/NodeMonitoringConnectionPlugin.java create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/NodeMonitoringConnectionPluginFactory.java create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/ha/util/SlidingExpirationCache.java create mode 100644 src/main/user-impl/java/com/mysql/cj/jdbc/ha/util/SlidingExpirationCacheWithCleanupThread.java create mode 100644 src/test/java/testsuite/integration/container/AuroraMysqlPerformanceForEfm2IntegrationTest.java diff --git a/README.md b/README.md index 768ee5465..801df5c80 100644 --- a/README.md +++ b/README.md @@ -529,6 +529,26 @@ You can include additional monitoring configurations by adding the prefix `monit > > It is suggested to turn off Enhanced Failure Monitoring plugin, or to avoid using RDS Proxy endpoints when the plugin is active. +## **Experimental** Enhanced Failure Monitoring Plugin v2 + +> [!WARNING] +> This plugin is experimental and users should test the plugin before using it in production environment. + +Enhanced Failure Monitoring Plugin v2 is an alternative implementation of enhanced failure monitoring, and it is functionally equal to the Enhanced Failure Monitoring Plugin described above. Both plugins share the same set of [configuration parameters](#enhanced-failure-monitoring-parameters). Enhanced Failure Monitoring Plugin v2 plugin is designed to be a drop-in replacement for the Enhanced Failure Monitoring Plugin. + +> [!NOTE] +> Since these two plugins are separate plugins, users may decide to use them together with a single connection. While this should not have any negative side effects, it is not recommended. It is recommended to use either the Enhanced Failure Monitoring Plugin, or the Enhanced Failure Monitoring Plugin v2 where it's needed. + +In order to use Enhanced Failure Monitoring Plugin v2, users should add `software.aws.rds.jdbc.mysql.shading.com.mysql.cj.jdbc.ha.plugins.efm2.NodeMonitoringConnectionPluginFactory` to `connectionPluginFactories`. + +Enhanced Failure Monitoring Plugin v2 is designed to address [some of the issues](https://github.com/awslabs/aws-mysql-jdbc/issues/412) that have been reported by multiple users. The following changes have been made: +- Used weak pointers to ease garbage collection +- Split monitoring logic into two separate threads to increase overall monitoring stability +- Reviewed locks for monitoring context +- Reviewed and redesigned stopping of idle monitoring threads +- Reviewed and simplified monitoring logic + + ## AWS Secrets Manager Plugin The AWS JDBC Driver for MySQL supports usage of database credentials stored in the [AWS Secrets Manager](https://aws.amazon.com/secrets-manager/) through the AWS Secrets Manager Plugin. This plugin is optional and can be enabled with the `connectionPluginFactories` parameter as seen in the [connection plugin manager parameters table](#connection-plugin-manager-parameters). When a user creates a new connection with this plugin enabled, the plugin will retrieve the secret and the connection will be created using those credentials. diff --git a/build.gradle.kts b/build.gradle.kts index feaaf0f70..6ff5f9256 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -136,7 +136,7 @@ tasks.shadowJar { exclude("instrumentation/**") exclude("demo/**") exclude("documentation/**") - exclude("customplugins/**"); + exclude("customplugins/**") includeEmptyDirs = false } @@ -447,7 +447,10 @@ tasks.register("in-container-aurora") { } tasks.register("in-container-aurora-performance") { - filter.includeTestsMatching("testsuite.integration.container.AuroraMysqlPerformanceIntegrationTest") + filter.includeTestsMatching( + "testsuite.integration.container.AuroraMysqlPerformanceIntegrationTest") + filter.includeTestsMatching( + "testsuite.integration.container.AuroraMysqlPerformanceForEfm2IntegrationTest") } // Run all tests excluding integration tests diff --git a/src/main/core-api/java/com/mysql/cj/conf/PropertyDefinitions.java b/src/main/core-api/java/com/mysql/cj/conf/PropertyDefinitions.java index 68cf410a7..5389bd326 100644 --- a/src/main/core-api/java/com/mysql/cj/conf/PropertyDefinitions.java +++ b/src/main/core-api/java/com/mysql/cj/conf/PropertyDefinitions.java @@ -716,7 +716,7 @@ public enum DatabaseTerm { Messages.getString("ConnectionProperties.failureDetectionCount"), "0.4.0", CATEGORY_HA, Integer.MAX_VALUE, 0, Integer.MAX_VALUE), - new IntegerPropertyDefinition(PropertyKey.monitorDisposalTime, 60_000, RUNTIME_MODIFIABLE, + new IntegerPropertyDefinition(PropertyKey.monitorDisposalTime, 900_000, RUNTIME_MODIFIABLE, // 15 min Messages.getString("ConnectionProperties.monitorDisposalTime"), "0.4.0", CATEGORY_HA, Integer.MAX_VALUE, 0, Integer.MAX_VALUE), diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/DefaultMonitorService.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/DefaultMonitorService.java new file mode 100644 index 000000000..9bc42161d --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/DefaultMonitorService.java @@ -0,0 +1,174 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package com.mysql.cj.jdbc.ha.plugins.efm2; + +import com.mysql.cj.conf.HostInfo; +import com.mysql.cj.conf.PropertyKey; +import com.mysql.cj.conf.PropertySet; +import com.mysql.cj.jdbc.JdbcConnection; +import com.mysql.cj.jdbc.ha.plugins.BasicConnectionProvider; +import com.mysql.cj.jdbc.ha.util.SlidingExpirationCacheWithCleanupThread; +import com.mysql.cj.log.Log; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * This class handles the creation and clean up of monitoring threads to servers with one + * or more active connections. + */ +public class DefaultMonitorService implements IMonitorService { + protected static final long CACHE_CLEANUP_NANO = TimeUnit.MINUTES.toNanos(1); + + protected static final Executor ABORT_EXECUTOR = Executors.newSingleThreadExecutor(); + + protected static final SlidingExpirationCacheWithCleanupThread monitors = + new SlidingExpirationCacheWithCleanupThread<>( + IMonitor::canDispose, + (monitor) -> { + try { + monitor.close(); + } catch (Exception ex) { + // ignore + } + }, + CACHE_CLEANUP_NANO); + + protected final Log logger; + protected final IMonitorInitializer monitorInitializer; + + public DefaultMonitorService(Log logger) { + this( + (hostInfo, + propertySet, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount) -> + new Monitor( + new BasicConnectionProvider(), + hostInfo, + propertySet, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount, + logger), + logger + ); + } + + DefaultMonitorService(IMonitorInitializer monitorInitializer, Log logger) { + this.monitorInitializer = monitorInitializer; + this.logger = logger; + } + + @Override + public MonitorConnectionContext startMonitoring( + JdbcConnection connectionToAbort, + HostInfo hostInfo, + PropertySet propertySet, + int failureDetectionTimeMillis, + int failureDetectionIntervalMillis, + int failureDetectionCount) { + + final IMonitor monitor = this.getMonitor( + hostInfo, + propertySet, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount); + + final MonitorConnectionContext context = new MonitorConnectionContext(connectionToAbort); + monitor.startMonitoring(context); + + return context; + } + + @Override + public void stopMonitoring( + @NonNull final MonitorConnectionContext context, + @NonNull Connection connectionToAbort) { + + if (context.shouldAbort()) { + context.setInactive(); + try { + connectionToAbort.abort(ABORT_EXECUTOR); + connectionToAbort.close(); + } catch (final SQLException sqlEx) { + // ignore + if (logger.isTraceEnabled()) { + logger.logTrace( + String.format( + "[efm2.DefaultMonitorService.stopMonitoring]: Exception during aborting connection: %s", + sqlEx.getMessage())); + } + } + } else { + context.setInactive(); + } + } + + @Override + public void releaseResources() { + // do nothing + } + + protected IMonitor getMonitor( + HostInfo hostInfo, + PropertySet propertySet, + final int failureDetectionTimeMillis, + final int failureDetectionIntervalMillis, + final int failureDetectionCount) { + + final String monitorKey = String.format("%d:%d:%d:%s", + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount, + hostInfo.getHostPortPair()); + + final long cacheExpirationNano = TimeUnit.MILLISECONDS.toNanos( + propertySet.getIntegerProperty(PropertyKey.monitorDisposalTime).getValue()); + + return monitors.computeIfAbsent( + monitorKey, + (key) -> monitorInitializer.createMonitor( + hostInfo, + propertySet, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount), + cacheExpirationNano); + } +} diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitor.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitor.java new file mode 100644 index 000000000..fa5452666 --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitor.java @@ -0,0 +1,42 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package com.mysql.cj.jdbc.ha.plugins.efm2; + +/** + * Interface for monitors. This class uses background threads to monitor servers with one + * or more connections for more efficient failure detection during method execution. + */ +public interface IMonitor extends AutoCloseable, Runnable { + void startMonitoring(MonitorConnectionContext context); + + boolean canDispose(); +} diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitorInitializer.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitorInitializer.java new file mode 100644 index 000000000..c92f41047 --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitorInitializer.java @@ -0,0 +1,49 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package com.mysql.cj.jdbc.ha.plugins.efm2; + +import com.mysql.cj.conf.HostInfo; +import com.mysql.cj.conf.PropertySet; +import com.mysql.cj.log.Log; + +/** + * Interface for initialize a new {@link Monitor}. + */ +@FunctionalInterface +public interface IMonitorInitializer { + IMonitor createMonitor( + HostInfo hostInfo, + PropertySet propertySet, + final int failureDetectionTimeMillis, + final int failureDetectionIntervalMillis, + final int failureDetectionCount); +} diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitorService.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitorService.java new file mode 100644 index 000000000..3fed24f31 --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/IMonitorService.java @@ -0,0 +1,62 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package com.mysql.cj.jdbc.ha.plugins.efm2; + +import com.mysql.cj.conf.HostInfo; +import com.mysql.cj.conf.PropertySet; +import com.mysql.cj.jdbc.JdbcConnection; + +import java.sql.Connection; + +/** + * Interface for monitor services. This class implements ways to start and stop monitoring + * servers when connections are created. + */ +public interface IMonitorService { + MonitorConnectionContext startMonitoring( + JdbcConnection connectionToAbort, + HostInfo hostInfo, + PropertySet propertySet, + int failureDetectionTimeMillis, + int failureDetectionIntervalMillis, + int failureDetectionCount); + + /** + * Stop monitoring for a connection represented by the given {@link MonitorConnectionContext}. + * Removes the context from the {@link IMonitor}. + * + * @param context The {@link MonitorConnectionContext} representing a connection. + */ + void stopMonitoring(MonitorConnectionContext context, Connection connectionToAbort); + + void releaseResources(); +} diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/Monitor.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/Monitor.java new file mode 100644 index 000000000..c2b6662d2 --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/Monitor.java @@ -0,0 +1,427 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package com.mysql.cj.jdbc.ha.plugins.efm2; + +import com.mysql.cj.conf.HostInfo; +import com.mysql.cj.conf.PropertyKey; +import com.mysql.cj.conf.PropertySet; +import com.mysql.cj.jdbc.ha.plugins.IConnectionProvider; +import com.mysql.cj.log.Log; + +import java.lang.ref.WeakReference; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.checkerframework.checker.nullness.qual.NonNull; + +/** + * This class uses a background thread to monitor a particular server with one or more + * active {@link Connection}. + */ +public class Monitor implements IMonitor { + + protected static final String MONITORING_PROPERTY_PREFIX = "monitoring-"; + protected static final long THREAD_SLEEP_NANO = TimeUnit.MILLISECONDS.toNanos(1000); + protected static final Executor ABORT_EXECUTOR = Executors.newSingleThreadExecutor(); + + private final Queue> activeContexts = new ConcurrentLinkedQueue<>(); + private final HashMap>> newContexts = new HashMap<>(); + + private final AtomicBoolean stopped = new AtomicBoolean(false); + private Connection monitoringConn = null; + private final ExecutorService threadPool = Executors.newFixedThreadPool(2, runnableTarget -> { + final Thread monitoringThread = new Thread(runnableTarget); + monitoringThread.setDaemon(true); + return monitoringThread; + }); + + private final long failureDetectionTimeNano; + private final long failureDetectionIntervalNano; + private final int failureDetectionCount; + + private long invalidNodeStartTimeNano; + private long failureCount; + private boolean nodeUnhealthy = false; + + private final IConnectionProvider connectionProvider; + private final Log logger; + private final PropertySet propertySet; + private final HostInfo hostInfo; + private final String defaultTimeoutMillis; + + + /** + * Store the monitoring configuration for a connection. + * + * @param connectionProvider A provider for creating new connections. + * @param hostInfo The {@link HostInfo} of the server this {@link Monitor} instance is + * monitoring. + * @param propertySet The {@link PropertySet} containing additional monitoring configuration. + * @param failureDetectionTimeMillis Grace period after which node monitoring starts. + * @param failureDetectionIntervalMillis Interval between each failed connection check. + * @param failureDetectionCount Number of failed connection checks before considering + * database node as unhealthy. + * @param logger A {@link Log} implementation. + */ + public Monitor( + final IConnectionProvider connectionProvider, + final HostInfo hostInfo, + final PropertySet propertySet, + final int failureDetectionTimeMillis, + final int failureDetectionIntervalMillis, + final int failureDetectionCount, + final Log logger) { + + this.connectionProvider = connectionProvider; + this.hostInfo = hostInfo; + this.propertySet = propertySet; + this.logger = logger; + this.failureDetectionTimeNano = TimeUnit.MILLISECONDS.toNanos(failureDetectionTimeMillis); + this.failureDetectionIntervalNano = TimeUnit.MILLISECONDS.toNanos(failureDetectionIntervalMillis); + this.failureDetectionCount = failureDetectionCount; + this.defaultTimeoutMillis = String.valueOf(failureDetectionIntervalMillis); + + this.threadPool.submit(this::newContextRun); // task to handle new contexts + this.threadPool.submit(this); // task to handle active monitoring contexts + this.threadPool.shutdown(); // No more tasks are accepted by pool. + } + + @Override + public boolean canDispose() { + return this.activeContexts.isEmpty() && this.newContexts.isEmpty(); + } + + @Override + public void close() throws Exception { + this.stopped.set(true); + + // Waiting for 30s gives a thread enough time to exit monitoring loop and close database connection. + if (!this.threadPool.awaitTermination(30, TimeUnit.SECONDS)) { + this.threadPool.shutdownNow(); + } + if (this.logger.isTraceEnabled()) { + this.logger.logTrace( + String.format( + "[efm2.Monitor.close]: Stopped monitoring thread for node '%s'.", + this.hostInfo.getHostPortPair())); + } + } + + @Override + public void startMonitoring(final MonitorConnectionContext context) { + if (this.stopped.get()) { + this.logger.logWarn( + String.format( + "[efm2.Monitor.startMonitoring]: Monitoring was already stopped for node %s.", + this.hostInfo.getHostPortPair())); + } + + final long currentTimeNano = this.getCurrentTimeNano(); + long startMonitoringTimeNano = this.truncateNanoToSeconds( + currentTimeNano + this.failureDetectionTimeNano); + + Queue> queue = + this.newContexts.computeIfAbsent( + startMonitoringTimeNano, + (key) -> new ConcurrentLinkedQueue<>()); + queue.add(new WeakReference<>(context)); + } + + private long truncateNanoToSeconds(final long timeNano) { + return TimeUnit.SECONDS.toNanos(TimeUnit.NANOSECONDS.toSeconds(timeNano)); + } + + public void clearContexts() { + this.newContexts.clear(); + this.activeContexts.clear(); + } + + // This method helps to organize unit tests. + long getCurrentTimeNano() { + return System.nanoTime(); + } + + public void newContextRun() { + + try { + while (!this.stopped.get()) { + + final long currentTimeNano = this.getCurrentTimeNano(); + + final ArrayList processedKeys = new ArrayList<>(); + this.newContexts.entrySet().stream() + // Get entries with key (that is a time in nanos) less or equal than current time. + .filter(entry -> entry.getKey() < currentTimeNano) + .forEach(entry -> { + final Queue> queue = entry.getValue(); + processedKeys.add(entry.getKey()); + // Each value of found entry is a queue of monitoring contexts awaiting active monitoring. + // Add all contexts to an active monitoring contexts queue. + // Ignore disposed contexts. + WeakReference contextWeakRef; + while ((contextWeakRef = queue.poll()) != null) { + MonitorConnectionContext context = contextWeakRef.get(); + if (context != null && context.isActive()) { + this.activeContexts.add(contextWeakRef); + } + } + }); + processedKeys.forEach(this.newContexts::remove); + + TimeUnit.SECONDS.sleep(1); + } + } catch (final InterruptedException intEx) { + // do nothing; just exit the thread + } catch (final Exception ex) { + // this should not be reached; log and exit thread + if (this.logger.isTraceEnabled()) { + this.logger.logTrace( + String.format( + "[efm2.Monitor.newContextRun]: Stopping monitoring after unhandled exception was thrown in monitoring thread for node %s.", + this.hostInfo.getHostPortPair()), + ex); // We want to print full trace stack of the exception. + } + } + } + + @Override + public void run() { + + try { + while (!this.stopped.get()) { + + if (this.activeContexts.isEmpty()) { + TimeUnit.NANOSECONDS.sleep(THREAD_SLEEP_NANO); + continue; + } + + final long statusCheckStartTimeNano = this.getCurrentTimeNano(); + final boolean isValid = this.checkConnectionStatus(); + final long statusCheckEndTimeNano = this.getCurrentTimeNano(); + + this.updateNodeHealthStatus(isValid, statusCheckStartTimeNano, statusCheckEndTimeNano); + + final List> tmpActiveContexts = new ArrayList<>(); + WeakReference monitorContextWeakRef; + + while ((monitorContextWeakRef = this.activeContexts.poll()) != null) { + if (this.stopped.get()) { + break; + } + + MonitorConnectionContext monitorContext = monitorContextWeakRef.get(); + if (monitorContext == null) { + continue; + } + + if (this.nodeUnhealthy) { + // Kill connection. + monitorContext.setNodeUnhealthy(true); + final Connection connectionToAbort = monitorContext.getConnection(); + monitorContext.setInactive(); + if (connectionToAbort != null) { + this.abortConnection(connectionToAbort); + } + } else if (monitorContext.isActive()) { + tmpActiveContexts.add(monitorContextWeakRef); + } + } + + // activeContexts is empty now and tmpActiveContexts contains all yet active contexts + // Add active contexts back to the queue. + this.activeContexts.addAll(tmpActiveContexts); + + long delayNano = this.failureDetectionIntervalNano - (statusCheckEndTimeNano - statusCheckStartTimeNano); + if (delayNano < THREAD_SLEEP_NANO) { + delayNano = THREAD_SLEEP_NANO; + } + TimeUnit.NANOSECONDS.sleep(delayNano); + } + } catch (final InterruptedException intEx) { + // do nothing + } catch (final Exception ex) { + // this should not be reached; log and exit thread + if (this.logger.isTraceEnabled()) { + this.logger.logTrace( + String.format( + "[efm2.Monitor.run]: Stopping monitoring after unhandled exception was thrown in monitoring thread for node %s.", + this.hostInfo.getHostPortPair()), + ex); // We want to print full trace stack of the exception. + } + } finally { + this.stopped.set(true); + if (this.monitoringConn != null) { + try { + this.monitoringConn.close(); + } catch (final SQLException ex) { + // ignore + } + } + } + } + + /** + * Check the status of the monitored server by establishing a connection and sending a ping. + * + * @return True, if the server is still alive. + */ + boolean checkConnectionStatus() { + try { + if (this.monitoringConn == null || this.monitoringConn.isClosed()) { + // open a new connection + Map monitoringConnProperties = new HashMap<>(); + + // Default values for connect and socket timeout + monitoringConnProperties.put(PropertyKey.connectTimeout.getKeyName(), defaultTimeoutMillis); + monitoringConnProperties.put(PropertyKey.socketTimeout.getKeyName(), defaultTimeoutMillis); + + Properties originalProperties = this.propertySet.exposeAsProperties(); + if (originalProperties != null) { + originalProperties.stringPropertyNames().stream() + .filter(p -> p.startsWith(MONITORING_PROPERTY_PREFIX)) + .forEach(p -> monitoringConnProperties.put( + p.substring(MONITORING_PROPERTY_PREFIX.length()), + originalProperties.getProperty(p))); + } + + + if (this.logger.isTraceEnabled()) { + this.logger.logTrace( + "[efm2.Monitor.checkConnectionStatus]: Opening a monitoring connection to " + + this.hostInfo.getHostPortPair()); + } + + this.monitoringConn = this.connectionProvider.connect(copy( + this.hostInfo, + monitoringConnProperties)); + + if (this.logger.isTraceEnabled()) { + this.logger.logTrace( + "[efm2.Monitor.checkConnectionStatus]: Opened monitoring connection: " + + this.monitoringConn); + } + return true; + } + + final boolean isValid = this.monitoringConn.isValid( + (int) TimeUnit.NANOSECONDS.toSeconds(this.failureDetectionIntervalNano)); + return isValid; + + } catch (final SQLException sqlEx) { + return false; + } + } + + private void updateNodeHealthStatus( + final boolean connectionValid, + final long statusCheckStartNano, + final long statusCheckEndNano) { + + if (!connectionValid) { + this.failureCount++; + + if (this.invalidNodeStartTimeNano == 0) { + this.invalidNodeStartTimeNano = statusCheckStartNano; + } + + final long invalidNodeDurationNano = statusCheckEndNano - this.invalidNodeStartTimeNano; + final long maxInvalidNodeDurationNano = + this.failureDetectionIntervalNano * Math.max(0, this.failureDetectionCount); + + if (invalidNodeDurationNano >= maxInvalidNodeDurationNano) { + if (this.logger.isTraceEnabled()) { + this.logger.logTrace(String.format( + "[efm2.Monitor.updateNodeHealthStatus]: Host %s is *dead*.", + this.hostInfo.getHostPortPair())); + } + this.nodeUnhealthy = true; + return; + } + + if (this.logger.isTraceEnabled()) { + this.logger.logTrace(String.format( + "[efm2.Monitor.updateNodeHealthStatus]: Host %s is not *responding* %d.", + this.hostInfo.getHostPortPair(), + this.failureCount)); + } + return; + } + + if (this.failureCount > 0) { + // Node is back alive + if (this.logger.isTraceEnabled()) { + this.logger.logTrace(String.format( + "[efm2.Monitor.updateNodeHealthStatus]: Host %s is *alive*.", + this.hostInfo.getHostPortPair())); + } + } + + this.failureCount = 0; + this.invalidNodeStartTimeNano = 0; + this.nodeUnhealthy = false; + } + + private void abortConnection(final @NonNull Connection connectionToAbort) { + try { + connectionToAbort.abort(ABORT_EXECUTOR); + connectionToAbort.close(); + } catch (final SQLException sqlEx) { + // ignore + if (this.logger.isTraceEnabled()) { + this.logger.logTrace(String.format( + "[efm2.Monitor.abortConnection]: Exception during aborting connection: %s", + sqlEx.getMessage())); + } + } + } + + private HostInfo copy(HostInfo src, Map props) { + return new HostInfo( + null, + src.getHost(), + src.getPort(), + src.getUser(), + src.getPassword(), + props); + } +} diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/MonitorConnectionContext.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/MonitorConnectionContext.java new file mode 100644 index 000000000..33b4c1989 --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/MonitorConnectionContext.java @@ -0,0 +1,87 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package com.mysql.cj.jdbc.ha.plugins.efm2; + +import com.mysql.cj.jdbc.JdbcConnection; +import com.mysql.cj.log.Log; + +import java.lang.ref.WeakReference; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Monitoring context for each connection. This contains each connection's criteria for whether a server should be + * considered unhealthy. The context is shared between the main thread and the monitor thread. + */ +public class MonitorConnectionContext { + private final AtomicReference> connectionToAbortRef; + private final AtomicBoolean nodeUnhealthy = new AtomicBoolean(false); + + /** + * Constructor. + * + * @param connectionToAbort A reference to the connection associated with this context that will be aborted. + */ + public MonitorConnectionContext(final Connection connectionToAbort) { + this.connectionToAbortRef = new AtomicReference<>(new WeakReference<>(connectionToAbort)); + } + + public boolean isNodeUnhealthy() { + return this.nodeUnhealthy.get(); + } + + void setNodeUnhealthy(final boolean nodeUnhealthy) { + this.nodeUnhealthy.set(nodeUnhealthy); + } + + public boolean shouldAbort() { + return this.nodeUnhealthy.get() && this.connectionToAbortRef.get() != null; + } + + public void setInactive() { + this.connectionToAbortRef.set(null); + } + + public Connection getConnection() { + WeakReference copy = this.connectionToAbortRef.get(); + return copy == null ? null : copy.get(); + } + + public boolean isActive() { + WeakReference copy = this.connectionToAbortRef.get(); + return copy != null && copy.get() != null; + } +} diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/NodeMonitoringConnectionPlugin.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/NodeMonitoringConnectionPlugin.java new file mode 100644 index 000000000..d576187b0 --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/NodeMonitoringConnectionPlugin.java @@ -0,0 +1,312 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package com.mysql.cj.jdbc.ha.plugins.efm2; + +import com.mysql.cj.conf.ConnectionUrl; +import com.mysql.cj.conf.HostInfo; +import com.mysql.cj.conf.PropertyKey; +import com.mysql.cj.conf.PropertySet; +import com.mysql.cj.exceptions.CJCommunicationsException; +import com.mysql.cj.jdbc.JdbcConnection; +import com.mysql.cj.jdbc.ha.ConnectionProxy; +import com.mysql.cj.jdbc.ha.plugins.IConnectionPlugin; +import com.mysql.cj.jdbc.ha.plugins.ICurrentConnectionProvider; +import com.mysql.cj.jdbc.ha.plugins.NullArgumentMessage; +import com.mysql.cj.log.Log; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +/** + * Monitor the server while the connection is executing methods for more sophisticated + * failure detection. + */ +public class NodeMonitoringConnectionPlugin implements IConnectionPlugin { + + private static final String RETRIEVE_HOST_PORT_SQL = + "SELECT CONCAT(@@hostname, ':', @@port)"; + private static final Set SKIP_MONITORING_METHODS = new HashSet<>(Arrays.asList( + "close", + "next", + "abort", + "closeOnCompletion", + "getName", + "getVendor", + "getVendorTypeNumber", + "getBaseTypeName", + "getBaseType", + "getBinaryStream", + "getBytes", + "getArray", + "getBigDecimal", + "getSubString", + "getCharacterStream", + "getAsciiStream", + "getURL", + "getUserName", + "getDatabaseProductName", + "getParameterCount", + "getPrecision", + "getScale", + "getParameterType", + "getParameterTypeName", + "getParameterClassName", + "getConnection", + "getFetchDirection", + "getFetchSize", + "getColumnCount", + "getColumnDisplaySize", + "getColumnLabel", + "getColumnName", + "getSchemaName", + "getSQLTypeName", + "getSavepointId", + "getSavepointName", + "getMaxFieldSize", + "getMaxRows", + "getQueryTimeout", + "getAttributes", + "getString", + "getTime", + "getTimestamp", + "getType", + "getUnicodeStream", + "getWarnings", + "getBinaryStream", + "getBlob", + "getBoolean", + "getByte", + "getBytes", + "getClob", + "getConcurrency", + "getDate", + "getDouble", + "getFloat", + "getHoldability", + "getInt", + "getLong", + "getMetaData", + "getNCharacterStream", + "getNClob", + "getNString", + "getObject", + "getRef", + "getRow", + "getRowId", + "getSQLXML", + "getShort", + "getStatement")); + + protected IConnectionPlugin nextPlugin; + protected Log logger; + protected PropertySet propertySet; + private final Supplier monitorServiceSupplier; + private final ICurrentConnectionProvider currentConnectionProvider; + private IMonitorService monitorService; + + /** + * Initialize the node monitoring plugin. + * + * @param currentConnectionProvider A provider allowing the plugin to retrieve the + * current active connection and its connection settings. + * @param propertySet The property set used to initialize the active connection. + * @param nextPlugin The next connection plugin in the chain. + * @param logger An implementation of {@link Log}. + */ + public NodeMonitoringConnectionPlugin( + ICurrentConnectionProvider currentConnectionProvider, + PropertySet propertySet, + IConnectionPlugin nextPlugin, + Log logger) { + this( + currentConnectionProvider, + propertySet, + nextPlugin, + logger, + () -> new DefaultMonitorService(logger)); + } + + NodeMonitoringConnectionPlugin( + ICurrentConnectionProvider currentConnectionProvider, + PropertySet propertySet, + IConnectionPlugin nextPlugin, + Log logger, + Supplier monitorServiceSupplier) { + assertArgumentIsNotNull(currentConnectionProvider, "currentConnectionProvider"); + assertArgumentIsNotNull(propertySet, "propertySet"); + assertArgumentIsNotNull(nextPlugin, "nextPlugin"); + assertArgumentIsNotNull(logger, "logger"); + + this.currentConnectionProvider = currentConnectionProvider; + this.propertySet = propertySet; + this.logger = logger; + this.nextPlugin = nextPlugin; + this.monitorServiceSupplier = monitorServiceSupplier; + } + + /** + * Executes the given SQL function with {@link Monitor} if connection monitoring is enabled. + * Otherwise, executes the SQL function directly. + * + * @param methodInvokeOn Class of an object that method to monitor to be invoked on. + * @param methodName Name of the method to monitor. + * @param executeSqlFunc {@link Callable} SQL function. + * @param args Arguments used to execute the given method. + * @return Results of the {@link Callable} SQL function. + * @throws Exception if an error occurs. + */ + @Override + public Object execute( + Class methodInvokeOn, + String methodName, + Callable executeSqlFunc, Object[] args) throws Exception { + + // update config settings since they may change + final boolean isEnabled = this.propertySet + .getBooleanProperty(PropertyKey.failureDetectionEnabled) + .getValue(); + + if (!isEnabled || !this.doesNeedMonitoring(methodInvokeOn, methodName)) { + // do direct call + return this.nextPlugin.execute(methodInvokeOn, methodName, executeSqlFunc, args); + } + // ... otherwise, use a separate thread to execute method + + final int failureDetectionTimeMillis = this.propertySet + .getIntegerProperty(PropertyKey.failureDetectionTime) + .getValue(); + final int failureDetectionIntervalMillis = this.propertySet + .getIntegerProperty(PropertyKey.failureDetectionInterval) + .getValue(); + final int failureDetectionCount = this.propertySet + .getIntegerProperty(PropertyKey.failureDetectionCount) + .getValue(); + + initMonitorService(); + + Object result; + MonitorConnectionContext monitorContext = null; + + try { + if (this.logger.isTraceEnabled()) { + this.logger.logTrace(String.format( + "[efm2.NodeMonitoringConnectionPlugin.execute]: method=%s.%s, monitoring is activated", + methodInvokeOn.getName(), + methodName)); + } + + monitorContext = this.monitorService.startMonitoring( + this.currentConnectionProvider.getCurrentConnection(), //abort current connection if needed + this.currentConnectionProvider.getCurrentHostInfo(), + this.propertySet, + failureDetectionTimeMillis, + failureDetectionIntervalMillis, + failureDetectionCount); + + result = this.nextPlugin.execute(methodInvokeOn, methodName, executeSqlFunc, args); + + } finally { + if (monitorContext != null) { + this.monitorService.stopMonitoring(monitorContext, this.currentConnectionProvider.getCurrentConnection()); + } + + if (this.logger.isTraceEnabled()) { + this.logger.logTrace(String.format( + "[efm2.NodeMonitoringConnectionPlugin.execute]: method=%s.%s, monitoring is deactivated", + methodInvokeOn.getName(), + methodName)); + } + } + + return result; + } + + /** + * Checks whether the JDBC method passed to this connection plugin requires monitoring. + * + * @param methodInvokeOn The class of the JDBC method. + * @param methodName Name of the JDBC method. + * @return true if the method requires monitoring; false otherwise. + */ + protected boolean doesNeedMonitoring(Class methodInvokeOn, String methodName) { + return !SKIP_MONITORING_METHODS.contains(methodName); + } + + private void initMonitorService() { + if (this.monitorService == null) { + this.monitorService = this.monitorServiceSupplier.get(); + } + } + + @Override + public void transactionBegun() { + this.nextPlugin.transactionBegun(); + } + + @Override + public void transactionCompleted() { + this.nextPlugin.transactionCompleted(); + } + + @Override + public void openInitialConnection(ConnectionUrl connectionUrl) throws SQLException { + this.nextPlugin.openInitialConnection(connectionUrl); + } + + /** + * Call this plugin's monitor service to release all resources associated with this + * plugin. + */ + @Override + public void releaseResources() { + if (this.monitorService != null) { + this.monitorService.releaseResources(); + } + + this.monitorService = null; + this.nextPlugin.releaseResources(); + } + + private void assertArgumentIsNotNull(Object param, String paramName) { + if (param == null) { + throw new IllegalArgumentException(NullArgumentMessage.getMessage(paramName)); + } + } +} diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/NodeMonitoringConnectionPluginFactory.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/NodeMonitoringConnectionPluginFactory.java new file mode 100644 index 000000000..3fbf2682f --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/plugins/efm2/NodeMonitoringConnectionPluginFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package com.mysql.cj.jdbc.ha.plugins.efm2; + +import com.mysql.cj.conf.PropertySet; +import com.mysql.cj.jdbc.ha.plugins.IConnectionPlugin; +import com.mysql.cj.jdbc.ha.plugins.IConnectionPluginFactory; +import com.mysql.cj.jdbc.ha.plugins.ICurrentConnectionProvider; +import com.mysql.cj.log.Log; + +/** + * Class initializing a {@link NodeMonitoringConnectionPlugin}. + */ +public class NodeMonitoringConnectionPluginFactory implements IConnectionPluginFactory { + @Override + public IConnectionPlugin getInstance( + ICurrentConnectionProvider currentConnectionProvider, + PropertySet propertySet, + IConnectionPlugin nextPlugin, + Log logger) { + return new NodeMonitoringConnectionPlugin(currentConnectionProvider, propertySet, nextPlugin, logger); + } +} diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ha/util/SlidingExpirationCache.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/util/SlidingExpirationCache.java new file mode 100644 index 000000000..35f82303a --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/util/SlidingExpirationCache.java @@ -0,0 +1,292 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package com.mysql.cj.jdbc.ha.util; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; + +public class SlidingExpirationCache { + + protected final Map cache = new ConcurrentHashMap<>(); + protected long cleanupIntervalNanos = TimeUnit.MINUTES.toNanos(10); + protected final AtomicLong cleanupTimeNanos = new AtomicLong(System.nanoTime() + cleanupIntervalNanos); + protected final ShouldDisposeFunc shouldDisposeFunc; + protected final ItemDisposalFunc itemDisposalFunc; + + /** + * A cache that periodically cleans up expired entries. Fetching an expired entry marks that entry + * as not-expired and renews its expiration time. + */ + public SlidingExpirationCache() { + this.shouldDisposeFunc = null; + this.itemDisposalFunc = null; + } + + /** + * A cache that periodically cleans up expired entries. Fetching an expired entry marks that entry + * as not-expired and renews its expiration time. + * + * @param shouldDisposeFunc a function defining the conditions under which an expired entry should + * be cleaned up when we hit the cleanup time + * @param itemDisposalFunc a function that will be called on any item that meets the cleanup + * criteria at cleanup time. The criteria for cleanup is that the item + * is both expired and marked for cleanup via a call to + * shouldDisposeFunc. + */ + public SlidingExpirationCache( + final ShouldDisposeFunc shouldDisposeFunc, + final ItemDisposalFunc itemDisposalFunc) { + this.shouldDisposeFunc = shouldDisposeFunc; + this.itemDisposalFunc = itemDisposalFunc; + } + + public SlidingExpirationCache( + final ShouldDisposeFunc shouldDisposeFunc, + final ItemDisposalFunc itemDisposalFunc, + final long cleanupIntervalNanos) { + this.shouldDisposeFunc = shouldDisposeFunc; + this.itemDisposalFunc = itemDisposalFunc; + this.cleanupIntervalNanos = cleanupIntervalNanos; + } + + /** + * In addition to performing the logic defined by {@link Map#computeIfAbsent}, cleans up expired + * entries if we have hit cleanup time. If an expired entry is requested and we have not hit + * cleanup time or {@link ShouldDisposeFunc} indicated the entry should not be closed, the entry + * will be marked as non-expired. + * + * @param key the key with which the specified value is to be associated + * @param mappingFunction the function to compute a value + * @param itemExpirationNano the expiration time of the new or renewed entry + * @return the current (existing or computed) value associated with the specified key, or null if + * the computed value is null + */ + public V computeIfAbsent( + final K key, + Function mappingFunction, + final long itemExpirationNano) { + + cleanUp(); + final CacheItem cacheItem = cache.computeIfAbsent( + key, + k -> new CacheItem( + mappingFunction.apply(k), + System.nanoTime() + itemExpirationNano)); + return cacheItem.withExtendExpiration(itemExpirationNano).item; + } + + public V get(final K key, final long itemExpirationNano) { + cleanUp(); + final CacheItem cacheItem = cache.get(key); + return cacheItem == null ? null : cacheItem.withExtendExpiration(itemExpirationNano).item; + } + + /** + * Cleanup expired entries if we have hit the cleanup time, then remove and dispose the value + * associated with the given key. + * + * @param key the key associated with the value to be removed/disposed + */ + public void remove(final K key) { + removeAndDispose(key); + cleanUp(); + } + + protected void removeAndDispose(K key) { + final CacheItem cacheItem = cache.remove(key); + if (cacheItem != null && itemDisposalFunc != null) { + itemDisposalFunc.dispose(cacheItem.item); + } + } + + protected void removeIfExpired(K key) { + final CacheItem cacheItem = cache.get(key); + if (cacheItem == null || cacheItem.shouldCleanup()) { + removeAndDispose(key); + } + } + + /** + * Remove and dispose of all entries in the cache. + */ + public void clear() { + for (K key : cache.keySet()) { + removeAndDispose(key); + } + cache.clear(); + } + + /** + * Get a map copy of all entries in the cache, including expired entries. + * + * @return a map copy of all entries in the cache, including expired entries + */ + public Map getEntries() { + final Map entries = new HashMap<>(); + for (final Map.Entry entry : this.cache.entrySet()) { + entries.put(entry.getKey(), entry.getValue().item); + } + return entries; + } + + /** + * Get the current size of the cache, including expired entries. + * + * @return the current size of the cache, including expired entries. + */ + public int size() { + return this.cache.size(); + } + + protected void cleanUp() { + if (this.cleanupTimeNanos.get() > System.nanoTime()) { + return; + } + + this.cleanupTimeNanos.set(System.nanoTime() + cleanupIntervalNanos); + cache.forEach((key, value) -> removeIfExpired(key)); + } + + /** + * Set the cleanup interval for the cache. At cleanup time, expired entries marked for cleanup via + * {@link ShouldDisposeFunc} (if defined) are disposed. + * + * @param cleanupIntervalNanos the time interval defining when we should clean up expired + * entries marked for cleanup, in nanoseconds + */ + public void setCleanupIntervalNanos(long cleanupIntervalNanos) { + this.cleanupIntervalNanos = cleanupIntervalNanos; + this.cleanupTimeNanos.set(System.nanoTime() + cleanupIntervalNanos); + } + + /** + * An optional function defining the conditions under which an expired entry should be cleaned up + * at cleanup time. + * + * @param the type of object being analyzed for disposal + */ + public interface ShouldDisposeFunc { + boolean shouldDispose(V item); + } + + /** + * An optional function defining extra cleanup steps to take when a cache item is cleaned up. + * + * @param the type of object being disposed + */ + public interface ItemDisposalFunc { + void dispose(V item); + } + + // For testing purposes only + Map getCache() { + return cache; + } + + class CacheItem { + private final V item; + private long expirationTimeNano; + + /** + * CacheItem constructor. + * + * @param item the item value + * @param expirationTimeNano the amount of time before a CacheItem should be marked as expired. + */ + public CacheItem(final V item, final long expirationTimeNano) { + this.item = item; + this.expirationTimeNano = expirationTimeNano; + } + + /** + * Determines if a cache item should be cleaned up. An item should be cleaned up if it has past + * its expiration time and {@link ShouldDisposeFunc} (if defined) indicates that it should be + * cleaned up. + * + * @return true if the cache item should be cleaned up at cleanup time. Otherwise, returns + * false. + */ + boolean shouldCleanup() { + if (shouldDisposeFunc != null) { + return System.nanoTime() > expirationTimeNano && shouldDisposeFunc.shouldDispose(this.item); + } + return System.nanoTime() > expirationTimeNano; + } + + /** + * Renew a cache item's expiration time and return the value. + * + * @param itemExpirationNano the new expiration duration for the item + * @return the item value + */ + public CacheItem withExtendExpiration(final long itemExpirationNano) { + this.expirationTimeNano = System.nanoTime() + itemExpirationNano; + return this; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((item == null) ? 0 : item.hashCode()); + return result; + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final CacheItem other = (CacheItem) obj; + if (item == null) { + return other.item == null; + } else { + return item.equals(other.item); + } + } + + @Override + public String toString() { + return "CacheItem [item=" + item + ", expirationTime=" + expirationTimeNano + "]"; + } + } +} diff --git a/src/main/user-impl/java/com/mysql/cj/jdbc/ha/util/SlidingExpirationCacheWithCleanupThread.java b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/util/SlidingExpirationCacheWithCleanupThread.java new file mode 100644 index 000000000..ff2733c21 --- /dev/null +++ b/src/main/user-impl/java/com/mysql/cj/jdbc/ha/util/SlidingExpirationCacheWithCleanupThread.java @@ -0,0 +1,93 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package com.mysql.cj.jdbc.ha.util; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; + +public class SlidingExpirationCacheWithCleanupThread extends SlidingExpirationCache { + + private static final Logger LOGGER = + Logger.getLogger(SlidingExpirationCacheWithCleanupThread.class.getName()); + + protected static final ExecutorService cleanupThreadPool = Executors.newFixedThreadPool(1, runnableTarget -> { + final Thread monitoringThread = new Thread(runnableTarget); + monitoringThread.setDaemon(true); + return monitoringThread; + }); + + public SlidingExpirationCacheWithCleanupThread() { + super(); + this.initCleanupThread(); + } + + public SlidingExpirationCacheWithCleanupThread( + final ShouldDisposeFunc shouldDisposeFunc, + final ItemDisposalFunc itemDisposalFunc) { + super(shouldDisposeFunc, itemDisposalFunc); + this.initCleanupThread(); + } + + public SlidingExpirationCacheWithCleanupThread( + final ShouldDisposeFunc shouldDisposeFunc, + final ItemDisposalFunc itemDisposalFunc, + final long cleanupIntervalNanos) { + super(shouldDisposeFunc, itemDisposalFunc, cleanupIntervalNanos); + this.initCleanupThread(); + } + + protected void initCleanupThread() { + cleanupThreadPool.submit(() -> { + while (true) { + TimeUnit.NANOSECONDS.sleep(this.cleanupIntervalNanos); + + LOGGER.finest("Cleaning up..."); + this.cleanupTimeNanos.set(System.nanoTime() + cleanupIntervalNanos); + cache.forEach((key, value) -> { + try { + removeIfExpired(key); + } catch (Exception ex) { + // ignore + } + }); + } + }); + cleanupThreadPool.shutdown(); + } + + @Override + protected void cleanUp() { + // Intentionally do nothing. Cleanup thread does the job. + } +} diff --git a/src/test/java/testsuite/integration/container/AuroraMysqlPerformanceForEfm2IntegrationTest.java b/src/test/java/testsuite/integration/container/AuroraMysqlPerformanceForEfm2IntegrationTest.java new file mode 100644 index 000000000..7ec644a90 --- /dev/null +++ b/src/test/java/testsuite/integration/container/AuroraMysqlPerformanceForEfm2IntegrationTest.java @@ -0,0 +1,72 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License, version 2.0 + * (GPLv2), as published by the Free Software Foundation, with the + * following additional permissions: + * + * This program is distributed with certain software that is licensed + * under separate terms, as designated in a particular file or component + * or in the license documentation. Without limiting your rights under + * the GPLv2, the authors of this program hereby grant you an additional + * permission to link the program and your derivative works with the + * separately licensed software that they have included with the program. + * + * Without limiting the foregoing grant of rights under the GPLv2 and + * additional permission as to separately licensed software, this + * program is also subject to the Universal FOSS Exception, version 1.0, + * a copy of which can be found along with its FAQ at + * http://oss.oracle.com/licenses/universal-foss-exception. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + * See the GNU General Public License, version 2.0, for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see + * http://www.gnu.org/licenses/gpl-2.0.html. + */ + +package testsuite.integration.container; + +import com.mysql.cj.conf.PropertyKey; +import com.mysql.cj.jdbc.ha.plugins.failover.FailoverConnectionPluginFactory; +import java.io.IOException; +import java.sql.SQLException; +import java.util.Properties; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +public class AuroraMysqlPerformanceForEfm2IntegrationTest extends AuroraMysqlPerformanceIntegrationTest { + + @BeforeAll + public static void setUp() throws IOException, SQLException { + AuroraMysqlPerformanceIntegrationTest.setUp(); + } + + @AfterAll + public static void cleanUp() throws IOException { + doWritePerfDataToFile("./build/reports/tests/EnhancedMonitoring_efm2.xlsx", enhancedFailureMonitoringPerfDataList); + doWritePerfDataToFile("./build/reports/tests/FailoverWithEnhancedMonitoring_efm2.xlsx", failoverWithEfmPerfDataList); + doWritePerfDataToFile("./build/reports/tests/FailoverWithSocketTimeout_efm2.xlsx", failoverWithSocketTimeoutPerfDataList); + } + + @Override + protected Properties initDefaultPropsNoTimeouts() { + final Properties props = new Properties(); + props.setProperty(PropertyKey.USER.getKeyName(), TEST_USERNAME); + props.setProperty(PropertyKey.PASSWORD.getKeyName(), TEST_PASSWORD); + props.setProperty(PropertyKey.tcpKeepAlive.getKeyName(), Boolean.FALSE.toString()); + props.setProperty(PropertyKey.connectionPluginFactories.getKeyName(), + FailoverConnectionPluginFactory.class.getName() + + "," + + com.mysql.cj.jdbc.ha.plugins.efm2.NodeMonitoringConnectionPluginFactory.class.getName()); + + // Uncomment the following line to ease debugging + //props.setProperty(PropertyKey.logger.getKeyName(), "com.mysql.cj.log.StandardLogger"); + + return props; + } +} diff --git a/src/test/java/testsuite/integration/container/AuroraMysqlPerformanceIntegrationTest.java b/src/test/java/testsuite/integration/container/AuroraMysqlPerformanceIntegrationTest.java index b6a478207..c25a8fc0b 100644 --- a/src/test/java/testsuite/integration/container/AuroraMysqlPerformanceIntegrationTest.java +++ b/src/test/java/testsuite/integration/container/AuroraMysqlPerformanceIntegrationTest.java @@ -60,11 +60,11 @@ public class AuroraMysqlPerformanceIntegrationTest extends AuroraMysqlIntegrationBaseTest { - private static final int REPEAT_TIMES = 5; - private static final int FAILOVER_TIMEOUT_MS = 40000; - private static final List enhancedFailureMonitoringPerfDataList = new ArrayList<>(); - private static final List failoverWithEfmPerfDataList = new ArrayList<>(); - private static final List failoverWithSocketTimeoutPerfDataList = new ArrayList<>(); + protected static final int REPEAT_TIMES = 5; + protected static final int FAILOVER_TIMEOUT_MS = 40000; + protected static final List enhancedFailureMonitoringPerfDataList = new ArrayList<>(); + protected static final List failoverWithEfmPerfDataList = new ArrayList<>(); + protected static final List failoverWithSocketTimeoutPerfDataList = new ArrayList<>(); @BeforeAll public static void setUp() throws IOException, SQLException { @@ -73,12 +73,12 @@ public static void setUp() throws IOException, SQLException { @AfterAll public static void cleanUp() throws IOException { - doWritePerfDataToFile("./build/reports/tests/FailureDetectionResults_EnhancedMonitoring.xlsx", enhancedFailureMonitoringPerfDataList); - doWritePerfDataToFile("./build/reports/tests/FailoverPerformanceResults_EnhancedMonitoring.xlsx", failoverWithEfmPerfDataList); - doWritePerfDataToFile("./build/reports/tests/FailoverPerformanceResults_SocketTimeout.xlsx", failoverWithSocketTimeoutPerfDataList); + doWritePerfDataToFile("./build/reports/tests/EnhancedMonitoring.xlsx", enhancedFailureMonitoringPerfDataList); + doWritePerfDataToFile("./build/reports/tests/FailoverWithEnhancedMonitoring.xlsx", failoverWithEfmPerfDataList); + doWritePerfDataToFile("./build/reports/tests/FailoverWithSocketTimeout.xlsx", failoverWithSocketTimeoutPerfDataList); } - private static void doWritePerfDataToFile(String fileName, List dataList) throws IOException { + protected static void doWritePerfDataToFile(String fileName, List dataList) throws IOException { if (dataList.isEmpty()) { return; } @@ -108,6 +108,7 @@ private static void doWritePerfDataToFile(String fileName, List