From 8ce7e8f1d2e794c7edb6ca076513b80890f4a9a6 Mon Sep 17 00:00:00 2001 From: "majin.nathan" Date: Wed, 16 Oct 2024 20:23:18 +0800 Subject: [PATCH] [AMORO-3265] Refactor SimpleFuture for process api --- .../amoro/server/AmoroServiceContainer.java | 2 +- .../server/DefaultOptimizingService.java | 10 +- .../amoro/server/RestCatalogService.java | 2 +- .../amoro/server/catalog/InternalCatalog.java | 6 +- .../server/catalog/InternalCatalogImpl.java | 2 +- .../amoro/server/catalog/ServerCatalog.java | 2 +- .../server/dashboard/DashboardServer.java | 4 +- .../server/dashboard/utils/CommonUtil.java | 2 +- .../server/manager/AbstractPluginManager.java | 4 +- .../server/optimizing/KeyedTableCommit.java | 2 +- .../server/optimizing/OptimizingQueue.java | 2 +- .../amoro/server/optimizing/TaskRuntime.java | 8 +- .../server/optimizing/UnKeyedTableCommit.java | 2 +- .../server/persistence/PersistentBase.java | 4 +- .../server/table/DefaultTableService.java | 10 +- .../amoro/server/utils/PreconditionUtils.java | 4 +- .../server/TestDefaultOptimizingService.java | 4 +- .../manager/TestAbstractPluginManager.java | 4 +- .../commit/TestMixIcebergCommit.java | 2 +- .../commit/TestUnKeyedTableCommit.java | 2 +- .../persistence/TestPersistentBase.java | 2 +- .../server/table/TestCatalogService.java | 6 +- .../server/table/TestDatabaseService.java | 6 +- .../server/table/TestTableRuntimeManager.java | 2 +- .../amoro/server/table/TestTableService.java | 6 +- .../exception/AlreadyExistsException.java | 2 +- .../exception/AmoroRuntimeException.java | 4 +- .../exception/BlockerConflictException.java | 2 +- .../amoro}/exception/ForbiddenException.java | 2 +- .../exception/IllegalMetadataException.java | 2 +- .../exception/IllegalTaskStateException.java | 14 +- .../exception/LoadingPluginException.java | 2 +- .../exception/ObjectNotExistsException.java | 2 +- .../exception/OptimizingClosedException.java | 2 +- .../exception/OptimizingCommitException.java | 2 +- .../exception/PersistenceException.java | 2 +- .../amoro}/exception/PluginAuthException.java | 2 +- .../exception/PluginRetryAuthException.java | 2 +- .../exception/SignatureCheckException.java | 2 +- .../exception/TaskNotFoundException.java | 2 +- .../exception/TaskRuntimeException.java | 2 +- .../amoro}/exception/UndefinedException.java | 2 +- .../apache/amoro/process/SimpleFuture.java | 103 ++++++-- .../apache/amoro/process/TableProcess.java | 2 +- .../amoro/process/TestSimpleFuture.java | 228 ++++++++++++++++++ 45 files changed, 390 insertions(+), 91 deletions(-) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/AlreadyExistsException.java (96%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/AmoroRuntimeException.java (98%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/BlockerConflictException.java (95%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/ForbiddenException.java (95%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/IllegalMetadataException.java (95%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/IllegalTaskStateException.java (75%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/LoadingPluginException.java (96%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/ObjectNotExistsException.java (96%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/OptimizingClosedException.java (96%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/OptimizingCommitException.java (96%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/PersistenceException.java (96%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/PluginAuthException.java (95%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/PluginRetryAuthException.java (95%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/SignatureCheckException.java (96%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/TaskNotFoundException.java (96%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/TaskRuntimeException.java (96%) rename {amoro-ams/src/main/java/org/apache/amoro/server => amoro-common/src/main/java/org/apache/amoro}/exception/UndefinedException.java (95%) create mode 100644 amoro-common/src/test/java/org/apache/amoro/process/TestSimpleFuture.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 467f45aac6..3d70c0acbf 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -26,12 +26,12 @@ import org.apache.amoro.api.OptimizingService; import org.apache.amoro.config.ConfigHelpers; import org.apache.amoro.config.Configurations; +import org.apache.amoro.exception.AmoroRuntimeException; import org.apache.amoro.server.dashboard.DashboardServer; import org.apache.amoro.server.dashboard.JavalinJsonMapper; import org.apache.amoro.server.dashboard.response.ErrorResponse; import org.apache.amoro.server.dashboard.utils.AmsUtil; import org.apache.amoro.server.dashboard.utils.CommonUtil; -import org.apache.amoro.server.exception.AmoroRuntimeException; import org.apache.amoro.server.manager.EventsManager; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.persistence.SqlSessionFactoryProvider; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java index 7069b5b498..5a9d0c052e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java @@ -29,14 +29,14 @@ import org.apache.amoro.api.OptimizingTaskResult; import org.apache.amoro.config.Configurations; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.exception.ForbiddenException; +import org.apache.amoro.exception.IllegalTaskStateException; +import org.apache.amoro.exception.ObjectNotExistsException; +import org.apache.amoro.exception.PluginRetryAuthException; +import org.apache.amoro.exception.TaskNotFoundException; import org.apache.amoro.properties.CatalogMetaProperties; import org.apache.amoro.resource.Resource; import org.apache.amoro.resource.ResourceGroup; -import org.apache.amoro.server.exception.ForbiddenException; -import org.apache.amoro.server.exception.IllegalTaskStateException; -import org.apache.amoro.server.exception.ObjectNotExistsException; -import org.apache.amoro.server.exception.PluginRetryAuthException; -import org.apache.amoro.server.exception.TaskNotFoundException; import org.apache.amoro.server.optimizing.OptimizingQueue; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.optimizing.TaskRuntime; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java index 5c37d6ca8a..a58bfb6795 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/RestCatalogService.java @@ -37,10 +37,10 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.events.IcebergReportEvent; +import org.apache.amoro.exception.ObjectNotExistsException; import org.apache.amoro.properties.CatalogMetaProperties; import org.apache.amoro.server.catalog.InternalCatalog; import org.apache.amoro.server.catalog.ServerCatalog; -import org.apache.amoro.server.exception.ObjectNotExistsException; import org.apache.amoro.server.manager.EventsManager; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.table.TableService; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java index 967f94f664..2c0a201818 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalog.java @@ -23,9 +23,9 @@ import org.apache.amoro.TableIDWithFormat; import org.apache.amoro.api.CatalogMeta; import org.apache.amoro.api.TableIdentifier; -import org.apache.amoro.server.exception.AlreadyExistsException; -import org.apache.amoro.server.exception.IllegalMetadataException; -import org.apache.amoro.server.exception.ObjectNotExistsException; +import org.apache.amoro.exception.AlreadyExistsException; +import org.apache.amoro.exception.IllegalMetadataException; +import org.apache.amoro.exception.ObjectNotExistsException; import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java index b812c61da2..70e52a60bd 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/InternalCatalogImpl.java @@ -27,12 +27,12 @@ import org.apache.amoro.TableFormat; import org.apache.amoro.api.CatalogMeta; import org.apache.amoro.config.Configurations; +import org.apache.amoro.exception.ObjectNotExistsException; import org.apache.amoro.formats.iceberg.IcebergTable; import org.apache.amoro.io.AuthenticatedFileIO; import org.apache.amoro.mixed.InternalMixedIcebergCatalog; import org.apache.amoro.server.AmoroManagementConf; import org.apache.amoro.server.RestCatalogService; -import org.apache.amoro.server.exception.ObjectNotExistsException; import org.apache.amoro.server.table.TableMetadata; import org.apache.amoro.server.table.internal.InternalIcebergCreator; import org.apache.amoro.server.table.internal.InternalIcebergHandler; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java index 48fe6b951f..dfeede9c29 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/ServerCatalog.java @@ -21,7 +21,7 @@ import org.apache.amoro.AmoroTable; import org.apache.amoro.TableIDWithFormat; import org.apache.amoro.api.CatalogMeta; -import org.apache.amoro.server.exception.IllegalMetadataException; +import org.apache.amoro.exception.IllegalMetadataException; import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java index f858e8e810..364c9728d7 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/DashboardServer.java @@ -32,6 +32,8 @@ import io.javalin.http.staticfiles.Location; import io.javalin.http.staticfiles.StaticFileConfig; import org.apache.amoro.config.Configurations; +import org.apache.amoro.exception.ForbiddenException; +import org.apache.amoro.exception.SignatureCheckException; import org.apache.amoro.server.AmoroManagementConf; import org.apache.amoro.server.DefaultOptimizingService; import org.apache.amoro.server.RestCatalogService; @@ -48,8 +50,6 @@ import org.apache.amoro.server.dashboard.controller.VersionController; import org.apache.amoro.server.dashboard.response.ErrorResponse; import org.apache.amoro.server.dashboard.utils.ParamSignatureCalculator; -import org.apache.amoro.server.exception.ForbiddenException; -import org.apache.amoro.server.exception.SignatureCheckException; import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.terminal.TerminalManager; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/CommonUtil.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/CommonUtil.java index 35162382f0..378f600872 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/CommonUtil.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/utils/CommonUtil.java @@ -19,7 +19,7 @@ package org.apache.amoro.server.dashboard.utils; import io.javalin.http.Context; -import org.apache.amoro.server.exception.SignatureCheckException; +import org.apache.amoro.exception.SignatureCheckException; import org.apache.commons.lang3.StringUtils; import org.apache.commons.net.telnet.TelnetClient; import org.slf4j.Logger; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java index 665d74d7ab..05820b7208 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/manager/AbstractPluginManager.java @@ -19,9 +19,9 @@ package org.apache.amoro.server.manager; import org.apache.amoro.ActivePlugin; +import org.apache.amoro.exception.AlreadyExistsException; +import org.apache.amoro.exception.LoadingPluginException; import org.apache.amoro.server.Environments; -import org.apache.amoro.server.exception.AlreadyExistsException; -import org.apache.amoro.server.exception.LoadingPluginException; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java index 79e8cb521a..2312072bcc 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/KeyedTableCommit.java @@ -24,13 +24,13 @@ import org.apache.amoro.api.CommitMetaProducer; import org.apache.amoro.data.DataFileType; import org.apache.amoro.data.PrimaryKeyedFile; +import org.apache.amoro.exception.OptimizingCommitException; import org.apache.amoro.hive.utils.TableTypeUtil; import org.apache.amoro.op.OverwriteBaseFiles; import org.apache.amoro.op.SnapshotSummary; import org.apache.amoro.optimizing.RewriteFilesInput; import org.apache.amoro.optimizing.RewriteFilesOutput; import org.apache.amoro.server.AmoroServiceConstants; -import org.apache.amoro.server.exception.OptimizingCommitException; import org.apache.amoro.table.MixedTable; import org.apache.amoro.utils.ContentFiles; import org.apache.amoro.utils.MixedTableUtil; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java index 2dcfcaca86..96d6de74b3 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java @@ -22,10 +22,10 @@ import org.apache.amoro.OptimizerProperties; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.api.OptimizingTaskId; +import org.apache.amoro.exception.OptimizingClosedException; import org.apache.amoro.optimizing.RewriteFilesInput; import org.apache.amoro.resource.ResourceGroup; import org.apache.amoro.server.AmoroServiceConstants; -import org.apache.amoro.server.exception.OptimizingClosedException; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.optimizing.plan.OptimizingPlanner; import org.apache.amoro.server.persistence.PersistentBase; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java index eb3ecca228..0b5309ee96 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/TaskRuntime.java @@ -22,10 +22,10 @@ import org.apache.amoro.api.OptimizingTask; import org.apache.amoro.api.OptimizingTaskId; import org.apache.amoro.api.OptimizingTaskResult; +import org.apache.amoro.exception.IllegalTaskStateException; +import org.apache.amoro.exception.OptimizingClosedException; +import org.apache.amoro.exception.TaskRuntimeException; import org.apache.amoro.server.AmoroServiceConstants; -import org.apache.amoro.server.exception.IllegalTaskStateException; -import org.apache.amoro.server.exception.OptimizingClosedException; -import org.apache.amoro.server.exception.TaskRuntimeException; import org.apache.amoro.server.persistence.StatedPersistentBase; import org.apache.amoro.server.persistence.mapper.OptimizingMapper; import org.apache.amoro.server.resource.OptimizerThread; @@ -281,7 +281,7 @@ public void accept(Status targetStatus) { throw new OptimizingClosedException(taskId.getProcessId()); } if (!getNext().contains(targetStatus)) { - throw new IllegalTaskStateException(taskId, status, targetStatus); + throw new IllegalTaskStateException(taskId, status.name(), targetStatus.name()); } status = targetStatus; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java index 10b3802f58..54adce64fd 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/UnKeyedTableCommit.java @@ -23,6 +23,7 @@ import org.apache.amoro.api.CommitMetaProducer; import org.apache.amoro.data.FileNameRules; +import org.apache.amoro.exception.OptimizingCommitException; import org.apache.amoro.hive.HMSClientPool; import org.apache.amoro.hive.table.SupportHive; import org.apache.amoro.hive.utils.HivePartitionUtil; @@ -33,7 +34,6 @@ import org.apache.amoro.optimizing.RewriteFilesOutput; import org.apache.amoro.properties.HiveTableProperties; import org.apache.amoro.server.AmoroServiceConstants; -import org.apache.amoro.server.exception.OptimizingCommitException; import org.apache.amoro.server.utils.IcebergTableUtil; import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.UnkeyedTable; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java index 4a2d6fb26c..e3f822c385 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/PersistentBase.java @@ -18,8 +18,8 @@ package org.apache.amoro.server.persistence; -import org.apache.amoro.server.exception.AmoroRuntimeException; -import org.apache.amoro.server.exception.PersistenceException; +import org.apache.amoro.exception.AmoroRuntimeException; +import org.apache.amoro.exception.PersistenceException; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.ibatis.session.TransactionIsolationLevel; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index b868cb38c9..c5c613228e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -29,16 +29,16 @@ import org.apache.amoro.api.TableIdentifier; import org.apache.amoro.config.Configurations; import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.exception.AlreadyExistsException; +import org.apache.amoro.exception.BlockerConflictException; +import org.apache.amoro.exception.IllegalMetadataException; +import org.apache.amoro.exception.ObjectNotExistsException; +import org.apache.amoro.exception.PersistenceException; import org.apache.amoro.server.AmoroManagementConf; import org.apache.amoro.server.catalog.CatalogBuilder; import org.apache.amoro.server.catalog.ExternalCatalog; import org.apache.amoro.server.catalog.InternalCatalog; import org.apache.amoro.server.catalog.ServerCatalog; -import org.apache.amoro.server.exception.AlreadyExistsException; -import org.apache.amoro.server.exception.BlockerConflictException; -import org.apache.amoro.server.exception.IllegalMetadataException; -import org.apache.amoro.server.exception.ObjectNotExistsException; -import org.apache.amoro.server.exception.PersistenceException; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.persistence.StatedPersistentBase; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/utils/PreconditionUtils.java b/amoro-ams/src/main/java/org/apache/amoro/server/utils/PreconditionUtils.java index 9acebf80f2..7db3b41ba1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/utils/PreconditionUtils.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/utils/PreconditionUtils.java @@ -18,8 +18,8 @@ package org.apache.amoro.server.utils; -import org.apache.amoro.server.exception.AlreadyExistsException; -import org.apache.amoro.server.exception.ObjectNotExistsException; +import org.apache.amoro.exception.AlreadyExistsException; +import org.apache.amoro.exception.ObjectNotExistsException; public class PreconditionUtils { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java index 5a237700d7..fbb5b6d334 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java @@ -28,11 +28,11 @@ import org.apache.amoro.api.OptimizingTaskResult; import org.apache.amoro.catalog.BasicCatalogTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.exception.IllegalTaskStateException; +import org.apache.amoro.exception.PluginRetryAuthException; import org.apache.amoro.io.MixedDataTestHelpers; import org.apache.amoro.optimizing.RewriteFilesOutput; import org.apache.amoro.optimizing.TableOptimizing; -import org.apache.amoro.server.exception.IllegalTaskStateException; -import org.apache.amoro.server.exception.PluginRetryAuthException; import org.apache.amoro.server.optimizing.OptimizingProcess; import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.optimizing.TaskRuntime; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestAbstractPluginManager.java b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestAbstractPluginManager.java index 79520890ed..19a4002e87 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestAbstractPluginManager.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestAbstractPluginManager.java @@ -20,8 +20,8 @@ import com.clearspring.analytics.util.Lists; import org.apache.amoro.ActivePlugin; -import org.apache.amoro.server.exception.AlreadyExistsException; -import org.apache.amoro.server.exception.LoadingPluginException; +import org.apache.amoro.exception.AlreadyExistsException; +import org.apache.amoro.exception.LoadingPluginException; import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.junit.jupiter.api.Assertions; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestMixIcebergCommit.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestMixIcebergCommit.java index 81ec3ca137..797503c83f 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestMixIcebergCommit.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestMixIcebergCommit.java @@ -25,12 +25,12 @@ import org.apache.amoro.catalog.CatalogTestHelper; import org.apache.amoro.data.DataFileType; import org.apache.amoro.data.DefaultKeyedFile; +import org.apache.amoro.exception.OptimizingCommitException; import org.apache.amoro.optimizing.RewriteFilesInput; import org.apache.amoro.optimizing.RewriteFilesOutput; import org.apache.amoro.scan.CombinedScanTask; import org.apache.amoro.scan.KeyedTableScanTask; import org.apache.amoro.scan.MixedFileScanTask; -import org.apache.amoro.server.exception.OptimizingCommitException; import org.apache.amoro.server.optimizing.KeyedTableCommit; import org.apache.amoro.server.optimizing.RewriteStageTask; import org.apache.amoro.server.optimizing.TaskRuntime; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestUnKeyedTableCommit.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestUnKeyedTableCommit.java index 61da4399cf..d479b05fd8 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestUnKeyedTableCommit.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/commit/TestUnKeyedTableCommit.java @@ -24,9 +24,9 @@ import org.apache.amoro.catalog.BasicCatalogTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; import org.apache.amoro.catalog.TableTestBase; +import org.apache.amoro.exception.OptimizingCommitException; import org.apache.amoro.optimizing.RewriteFilesInput; import org.apache.amoro.optimizing.RewriteFilesOutput; -import org.apache.amoro.server.exception.OptimizingCommitException; import org.apache.amoro.server.optimizing.RewriteStageTask; import org.apache.amoro.server.optimizing.TaskRuntime; import org.apache.amoro.server.optimizing.UnKeyedTableCommit; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/persistence/TestPersistentBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/persistence/TestPersistentBase.java index b28e6f642c..9af985879b 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/persistence/TestPersistentBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/persistence/TestPersistentBase.java @@ -20,7 +20,7 @@ import static org.mockito.Mockito.never; -import org.apache.amoro.server.exception.UndefinedException; +import org.apache.amoro.exception.UndefinedException; import org.apache.ibatis.session.SqlSession; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java index 12ce5d953e..fd6a5def6d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestCatalogService.java @@ -22,13 +22,13 @@ import org.apache.amoro.TestedCatalogs; import org.apache.amoro.api.CatalogMeta; import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.exception.AlreadyExistsException; +import org.apache.amoro.exception.IllegalMetadataException; +import org.apache.amoro.exception.ObjectNotExistsException; import org.apache.amoro.hive.TestHMS; import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.properties.CatalogMetaProperties; import org.apache.amoro.server.catalog.InternalCatalog; -import org.apache.amoro.server.exception.AlreadyExistsException; -import org.apache.amoro.server.exception.IllegalMetadataException; -import org.apache.amoro.server.exception.ObjectNotExistsException; import org.junit.Assert; import org.junit.Assume; import org.junit.ClassRule; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java index 759f388fbd..885db19b48 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDatabaseService.java @@ -26,10 +26,10 @@ import org.apache.amoro.TableTestHelper; import org.apache.amoro.TestedCatalogs; import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.exception.AlreadyExistsException; +import org.apache.amoro.exception.IllegalMetadataException; +import org.apache.amoro.exception.ObjectNotExistsException; import org.apache.amoro.server.catalog.InternalCatalog; -import org.apache.amoro.server.exception.AlreadyExistsException; -import org.apache.amoro.server.exception.IllegalMetadataException; -import org.apache.amoro.server.exception.ObjectNotExistsException; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.junit.Assert; import org.junit.Assume; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java index b6b8ce1e77..f77996d363 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableRuntimeManager.java @@ -24,9 +24,9 @@ import org.apache.amoro.TableTestHelper; import org.apache.amoro.catalog.BasicCatalogTestHelper; import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.exception.ObjectNotExistsException; import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.hive.catalog.HiveTableTestHelper; -import org.apache.amoro.server.exception.ObjectNotExistsException; import org.apache.amoro.table.MixedTable; import org.junit.Assert; import org.junit.Test; diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java index cbf8010c6e..a1e7a59a0e 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableService.java @@ -31,14 +31,14 @@ import org.apache.amoro.api.Blocker; import org.apache.amoro.api.TableIdentifier; import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.exception.AlreadyExistsException; +import org.apache.amoro.exception.BlockerConflictException; +import org.apache.amoro.exception.ObjectNotExistsException; import org.apache.amoro.hive.catalog.HiveCatalogTestHelper; import org.apache.amoro.hive.catalog.HiveTableTestHelper; import org.apache.amoro.server.AmoroManagementConf; import org.apache.amoro.server.catalog.InternalCatalog; import org.apache.amoro.server.catalog.ServerCatalog; -import org.apache.amoro.server.exception.AlreadyExistsException; -import org.apache.amoro.server.exception.BlockerConflictException; -import org.apache.amoro.server.exception.ObjectNotExistsException; import org.apache.amoro.table.blocker.RenewableBlocker; import org.junit.Assert; import org.junit.Test; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/AlreadyExistsException.java b/amoro-common/src/main/java/org/apache/amoro/exception/AlreadyExistsException.java similarity index 96% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/AlreadyExistsException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/AlreadyExistsException.java index 3a32c168b8..625363463a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/AlreadyExistsException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/AlreadyExistsException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; import org.apache.amoro.api.TableIdentifier; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/AmoroRuntimeException.java b/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java similarity index 98% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/AmoroRuntimeException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java index a013b55e25..555c238c10 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/AmoroRuntimeException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/AmoroRuntimeException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; import org.apache.amoro.ErrorCodes; import org.apache.amoro.ServerTableIdentifier; @@ -142,7 +142,7 @@ public static AmoroRuntimeException wrap( } } - private static AmoroRuntimeException wrap(Throwable throwable) { + public static AmoroRuntimeException wrap(Throwable throwable) { return wrap(throwable, UndefinedException::new); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/BlockerConflictException.java b/amoro-common/src/main/java/org/apache/amoro/exception/BlockerConflictException.java similarity index 95% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/BlockerConflictException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/BlockerConflictException.java index bc12499a96..34f5803ee9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/BlockerConflictException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/BlockerConflictException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; public class BlockerConflictException extends AmoroRuntimeException { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/ForbiddenException.java b/amoro-common/src/main/java/org/apache/amoro/exception/ForbiddenException.java similarity index 95% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/ForbiddenException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/ForbiddenException.java index 420a90939f..5f3538ee7b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/ForbiddenException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/ForbiddenException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; /** forbiddenException */ public class ForbiddenException extends AmoroRuntimeException { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalMetadataException.java b/amoro-common/src/main/java/org/apache/amoro/exception/IllegalMetadataException.java similarity index 95% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalMetadataException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/IllegalMetadataException.java index 040bd02112..37ef01ec3e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalMetadataException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/IllegalMetadataException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; public class IllegalMetadataException extends AmoroRuntimeException { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalTaskStateException.java b/amoro-common/src/main/java/org/apache/amoro/exception/IllegalTaskStateException.java similarity index 75% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalTaskStateException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/IllegalTaskStateException.java index 26df696fa8..55ba288547 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/IllegalTaskStateException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/IllegalTaskStateException.java @@ -16,19 +16,17 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; import org.apache.amoro.api.OptimizingTaskId; -import org.apache.amoro.server.optimizing.TaskRuntime; public class IllegalTaskStateException extends AmoroRuntimeException { - private final TaskRuntime.Status preStatus; - private final TaskRuntime.Status targetStatus; + private final String preStatus; + private final String targetStatus; private final OptimizingTaskId taskId; - public IllegalTaskStateException( - OptimizingTaskId taskId, TaskRuntime.Status preStatus, TaskRuntime.Status targetStatus) { + public IllegalTaskStateException(OptimizingTaskId taskId, String preStatus, String targetStatus) { super( String.format("Illegal Task of %s status from %s to %s", taskId, preStatus, targetStatus)); this.taskId = taskId; @@ -36,11 +34,11 @@ public IllegalTaskStateException( this.targetStatus = targetStatus; } - public TaskRuntime.Status getPreStatus() { + public String getPreStatus() { return preStatus; } - public TaskRuntime.Status getTargetStatus() { + public String getTargetStatus() { return targetStatus; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/LoadingPluginException.java b/amoro-common/src/main/java/org/apache/amoro/exception/LoadingPluginException.java similarity index 96% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/LoadingPluginException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/LoadingPluginException.java index 4940ff3b11..a60905b8af 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/LoadingPluginException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/LoadingPluginException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; public class LoadingPluginException extends AmoroRuntimeException { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/ObjectNotExistsException.java b/amoro-common/src/main/java/org/apache/amoro/exception/ObjectNotExistsException.java similarity index 96% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/ObjectNotExistsException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/ObjectNotExistsException.java index b9e38863b1..925c7f354d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/ObjectNotExistsException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/ObjectNotExistsException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.api.TableIdentifier; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingClosedException.java b/amoro-common/src/main/java/org/apache/amoro/exception/OptimizingClosedException.java similarity index 96% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingClosedException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/OptimizingClosedException.java index 2940e1b9c8..a6fc8cd52a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingClosedException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/OptimizingClosedException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; public class OptimizingClosedException extends AmoroRuntimeException { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingCommitException.java b/amoro-common/src/main/java/org/apache/amoro/exception/OptimizingCommitException.java similarity index 96% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingCommitException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/OptimizingCommitException.java index 9dc13bc354..a48414ac15 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/OptimizingCommitException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/OptimizingCommitException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; public class OptimizingCommitException extends Exception { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PersistenceException.java b/amoro-common/src/main/java/org/apache/amoro/exception/PersistenceException.java similarity index 96% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/PersistenceException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/PersistenceException.java index bdc0c6dfb2..b4be24d72b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PersistenceException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/PersistenceException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; public class PersistenceException extends AmoroRuntimeException { public PersistenceException(String message, Throwable cause) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginAuthException.java b/amoro-common/src/main/java/org/apache/amoro/exception/PluginAuthException.java similarity index 95% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginAuthException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/PluginAuthException.java index e772044ac1..f8d5de9407 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginAuthException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/PluginAuthException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; public class PluginAuthException extends AmoroRuntimeException { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginRetryAuthException.java b/amoro-common/src/main/java/org/apache/amoro/exception/PluginRetryAuthException.java similarity index 95% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginRetryAuthException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/PluginRetryAuthException.java index 2730fe6dda..67de2e736f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/PluginRetryAuthException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/PluginRetryAuthException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; public class PluginRetryAuthException extends AmoroRuntimeException { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/SignatureCheckException.java b/amoro-common/src/main/java/org/apache/amoro/exception/SignatureCheckException.java similarity index 96% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/SignatureCheckException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/SignatureCheckException.java index 3dd53034ec..3576410185 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/SignatureCheckException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/SignatureCheckException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; /** SignatureCheckException */ public class SignatureCheckException extends AmoroRuntimeException { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskNotFoundException.java b/amoro-common/src/main/java/org/apache/amoro/exception/TaskNotFoundException.java similarity index 96% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskNotFoundException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/TaskNotFoundException.java index c3ea2598b2..87b37c5b72 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskNotFoundException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/TaskNotFoundException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; import org.apache.amoro.api.OptimizingTaskId; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskRuntimeException.java b/amoro-common/src/main/java/org/apache/amoro/exception/TaskRuntimeException.java similarity index 96% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskRuntimeException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/TaskRuntimeException.java index 6893881abe..282bc5788b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/TaskRuntimeException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/TaskRuntimeException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; public class TaskRuntimeException extends AmoroRuntimeException { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/exception/UndefinedException.java b/amoro-common/src/main/java/org/apache/amoro/exception/UndefinedException.java similarity index 95% rename from amoro-ams/src/main/java/org/apache/amoro/server/exception/UndefinedException.java rename to amoro-common/src/main/java/org/apache/amoro/exception/UndefinedException.java index ca722be353..0fb0d91875 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/exception/UndefinedException.java +++ b/amoro-common/src/main/java/org/apache/amoro/exception/UndefinedException.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.amoro.server.exception; +package org.apache.amoro.exception; public class UndefinedException extends AmoroRuntimeException { public UndefinedException(Throwable throwable) { diff --git a/amoro-common/src/main/java/org/apache/amoro/process/SimpleFuture.java b/amoro-common/src/main/java/org/apache/amoro/process/SimpleFuture.java index 3a02ea0974..5df4b7ee13 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/SimpleFuture.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/SimpleFuture.java @@ -18,42 +18,115 @@ package org.apache.amoro.process; -import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.exception.AmoroRuntimeException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; /** A simple wrapper of CompletableFuture for better code readability. */ public class SimpleFuture { - protected CompletableFuture future; + private final CompletableFuture completedFuture = new CompletableFuture<>(); + private final Map> callbackMap = new HashMap<>(); + private CompletableFuture triggerFuture; + private CompletableFuture callbackFuture; public SimpleFuture() { - future = new CompletableFuture<>(); + this(new CompletableFuture<>()); } - private SimpleFuture(CompletableFuture future) { - this.future = future; + protected SimpleFuture(CompletableFuture triggerFuture) { + this.triggerFuture = triggerFuture; + this.callbackFuture = triggerFuture; + whenCompleted(() -> {}); } public void whenCompleted(Runnable runnable) { - future = - future.whenComplete( + callbackFuture = + callbackFuture.whenComplete( (v, e) -> { - Preconditions.checkState(e == null); - runnable.run(); + + /** + * Only chain the callback if there is no exception. Exceptions would be threw in the + * complete() method, or join() method. + */ + if (e == null) { + runnable.run(); + if (callbackMap.get(runnable) == callbackFuture) { + completedFuture.complete(null); + } + } else { + throw AmoroRuntimeException.wrap(e); + } }); + callbackMap.put(runnable, callbackFuture); + } + + public boolean isDone() { + return completedFuture.isDone(); } - public SimpleFuture complete() { - future.complete(null); - return this; + public void reset() { + triggerFuture = new CompletableFuture<>(); + callbackFuture = triggerFuture; + whenCompleted(() -> {}); + for (Runnable runnable : callbackMap.keySet()) { + whenCompleted(runnable); + } } + /** + * This method will trigger all callback functions in the same thread and wait until all callback + * functions are completed. If throws exception, completedFuture is not done. + */ + public void complete() { + try { + if (triggerFuture.complete(null)) { + callbackFuture.get(); + } + } catch (Throwable throwable) { + throw normalize(throwable); + } + } + + /** Return until completedFuture is done, which means task or process is truly finished */ public void join() { - future.join(); + try { + completedFuture.join(); + } catch (Throwable throwable) { + throw normalize(throwable); + } + } + + private AmoroRuntimeException normalize(Throwable throwable) { + if (throwable instanceof ExecutionException && throwable.getCause() != null) { + return AmoroRuntimeException.wrap(throwable.getCause()); + } + return AmoroRuntimeException.wrap(throwable); + } + + public SimpleFuture or(SimpleFuture anotherFuture) { + return new SimpleFuture( + CompletableFuture.anyOf(completedFuture, anotherFuture.completedFuture)); + } + + public SimpleFuture and(SimpleFuture anotherFuture) { + return new SimpleFuture( + CompletableFuture.allOf(completedFuture, anotherFuture.completedFuture)); + } + + public static SimpleFuture allOf(List futures) { + return new SimpleFuture( + CompletableFuture.allOf( + futures.stream().map(f -> f.completedFuture).toArray(CompletableFuture[]::new))); } - public SimpleFuture anyOf(SimpleFuture another) { - return new SimpleFuture(CompletableFuture.anyOf(future, another.future)); + public static SimpleFuture anyOf(List futures) { + return new SimpleFuture( + CompletableFuture.anyOf( + futures.stream().map(f -> f.completedFuture).toArray(CompletableFuture[]::new))); } } diff --git a/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java b/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java index 2df63b7fcf..f9c32e3317 100644 --- a/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/TableProcess.java @@ -87,7 +87,7 @@ public ProcessStatus getStatus() { @Override public SimpleFuture getSubmitFuture() { - return submitFuture.anyOf(completeFuture); + return submitFuture.or(completeFuture); } @Override diff --git a/amoro-common/src/test/java/org/apache/amoro/process/TestSimpleFuture.java b/amoro-common/src/test/java/org/apache/amoro/process/TestSimpleFuture.java new file mode 100644 index 0000000000..52a4b19e06 --- /dev/null +++ b/amoro-common/src/test/java/org/apache/amoro/process/TestSimpleFuture.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.process; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; + +import org.apache.amoro.exception.AmoroRuntimeException; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class TestSimpleFuture { + + private SimpleFuture simpleFuture; + private int[] callbackNum = {-1, -1, -1, -1, -1}; + + @Before + public void setUp() { + simpleFuture = new SimpleFuture(); + long threadId = Thread.currentThread().getId(); + for (int i = 0; i < 5; i++) { + final int num = i; + simpleFuture.whenCompleted( + () -> { + // Trigger error if callbackNum[num] == 0 + if (callbackNum[num] == 0) { + System.out.println("current num: " + num); + throw new RuntimeException("Callback error"); + } + callbackNum[num] = num; + Assert.assertEquals( + "Callback should be run in the same thread", + threadId, + Thread.currentThread().getId()); + }); + } + } + + @Test + public void testComplete() { + simpleFuture.complete(); + + for (int i = 0; i < 5; i++) { + Assert.assertEquals("Current callback num: " + i, i, callbackNum[i]); + } + Assert.assertTrue("SimpleFuture should complete if callback no error", simpleFuture.isDone()); + } + + @Test + public void testCompleteTwiceButNotReset() { + simpleFuture.complete(); + // The second call will not trigger any callback + for (int i = 0; i < 5; i++) { + callbackNum[i] = -1; + } + simpleFuture.complete(); + for (int i = 0; i < 5; i++) { + Assert.assertEquals("Current callback num: " + i, -1, callbackNum[i]); + } + Assert.assertTrue("SimpleFuture should not complete if callback error", simpleFuture.isDone()); + } + + // Additional tests for edge cases and error conditions + @Test + public void testCallbackError() { + callbackNum[2] = 0; // Trigger error in callbackNum[2] + + try { + simpleFuture.complete(); + } catch (Throwable throwable) { + Assert.assertTrue("Should catch the error", throwable instanceof AmoroRuntimeException); + Assert.assertTrue("Should catch the error", throwable.getCause() instanceof RuntimeException); + Assert.assertEquals( + "Should catch the error", "Callback error", throwable.getCause().getMessage()); + } + for (int i = 0; i < 5; i++) { + if (i < 2) { + Assert.assertEquals("Current callback num: " + i, i, callbackNum[i]); + } else if (i == 2) { + Assert.assertEquals("Current callback num: " + i, 0, callbackNum[i]); + } else { + Assert.assertEquals("Current callback num: " + i, -1, callbackNum[i]); + } + } + Assert.assertFalse("SimpleFuture should not complete if callback error", simpleFuture.isDone()); + } + + @Test + public void testReset() { + callbackNum[2] = 0; // Trigger error in callbackNum[2] + try { + simpleFuture.complete(); + } catch (Throwable throwable) { + Assert.assertTrue("Should catch the error", throwable instanceof AmoroRuntimeException); + } + Assert.assertFalse("SimpleFuture should not complete if callback error", simpleFuture.isDone()); + + callbackNum = new int[] {-1, -1, -1, -1, -1}; // Trigger normal callback + simpleFuture.reset(); + simpleFuture.complete(); + for (int i = 0; i < 5; i++) { + Assert.assertEquals("Current callback num: " + i, i, callbackNum[i]); + } + Assert.assertTrue("SimpleFuture should not complete if callback error", simpleFuture.isDone()); + } + + @Test + public void testIsDone() { + simpleFuture.complete(); + Assert.assertTrue("Future should be completed", simpleFuture.isDone()); + } + + @Test(expected = AmoroRuntimeException.class) + public void testCompleteException() throws ExecutionException, InterruptedException { + CompletableFuture future = mock(CompletableFuture.class); + doReturn(true).when(future).complete(null); + doThrow(new RuntimeException()).when(future).get(); + SimpleFuture simpleFuture = new SimpleFuture(future); + + simpleFuture.complete(); + } + + @Test + public void testJoin() { + simpleFuture.complete(); + simpleFuture.join(); + Assert.assertTrue("Future should be completed", simpleFuture.isDone()); + } + + @Test + public void testJoinException() throws ExecutionException, InterruptedException { + CompletableFuture future = mock(CompletableFuture.class); + doThrow(new RuntimeException()).when(future).get(); + SimpleFuture simpleFuture = new SimpleFuture(future); + simpleFuture.reset(); + simpleFuture.complete(); + simpleFuture.join(); + Assert.assertTrue("Future should be completed", simpleFuture.isDone()); + } + + @Test + public void testOr() { + SimpleFuture anotherFuture = new SimpleFuture(); + SimpleFuture combinedFuture = simpleFuture.or(anotherFuture); + + simpleFuture.complete(); + Assert.assertTrue( + "Combined future should be completed when either future completes", + combinedFuture.isDone()); + } + + @Test + public void testAnd() { + SimpleFuture anotherFuture = new SimpleFuture(); + SimpleFuture combinedFuture = simpleFuture.and(anotherFuture); + + simpleFuture.complete(); + anotherFuture.complete(); + Assert.assertTrue( + "Combined future should be completed when both futures complete", combinedFuture.isDone()); + } + + @Test + public void testAllOf() { + List futures = + Arrays.asList(new SimpleFuture(), new SimpleFuture(), new SimpleFuture()); + SimpleFuture combinedFuture = SimpleFuture.allOf(futures); + + futures.forEach(SimpleFuture::complete); + Assert.assertTrue( + "Combined future should be completed when all futures complete", combinedFuture.isDone()); + } + + @Test + public void testAnyOf() { + List futures = Arrays.asList(new SimpleFuture(), new SimpleFuture()); + SimpleFuture combinedFuture = SimpleFuture.anyOf(futures); + + futures.get(0).complete(); + Assert.assertTrue( + "Combined future should be completed when any future completes", combinedFuture.isDone()); + } + + // Test for when the future is already completed before calling complete() + @Test + public void testCompleteAlreadyCompleted() { + simpleFuture.complete(); + try { + simpleFuture.complete(); + } catch (Throwable throwable) { + Assert.fail(throwable.getMessage()); + } + } + + // Test for when the future is already completed before calling join() + @Test + public void testJoinAlreadyCompleted() { + simpleFuture.complete(); + try { + simpleFuture.join(); + } catch (Throwable throwable) { + Assert.fail(throwable.getMessage()); + } + } +}