Skip to content

Commit

Permalink
[AMORO-3239] Fix stack overflow caused by reading too many partitions…
Browse files Browse the repository at this point in the history
… in the filter (#3240)

* [AMORO-3239] Fix stack overflow caused by reading too many partitions in the filter

* [AMORO-3239] Add the "ignore-filter-partition-count" parameter

* move parameter "optimizer.ignore-filter-partition-count" to "self-optimizing.skip-filter-partition-count"

* move parameter "self-optimizing.skip-filter-partition-count" to "refresh-tables.max-pending-partition-count"
  • Loading branch information
7hong authored Oct 16, 2024
1 parent 8954349 commit cc29688
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ public class AmoroManagementConf {
.defaultValue(60000L)
.withDescription("Interval for refreshing table metadata.");

public static final ConfigOption<Integer> REFRESH_MAX_PENDING_PARTITIONS =
ConfigOptions.key("refresh-tables.max-pending-partition-count")
.intType()
.defaultValue(100)
.withDescription("Filters will not be used beyond that number of partitions");

public static final ConfigOption<Long> BLOCKER_TIMEOUT =
ConfigOptions.key("blocker.timeout")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,18 @@ public class OptimizingEvaluator {
protected final MixedTable mixedTable;
protected final TableRuntime tableRuntime;
protected final TableSnapshot currentSnapshot;
protected final int maxPendingPartitions;
protected boolean isInitialized = false;

protected Map<String, PartitionEvaluator> needOptimizingPlanMap = Maps.newHashMap();
protected Map<String, PartitionEvaluator> partitionPlanMap = Maps.newHashMap();

public OptimizingEvaluator(TableRuntime tableRuntime, MixedTable table) {
public OptimizingEvaluator(
TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) {
this.tableRuntime = tableRuntime;
this.mixedTable = table;
this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
this.maxPendingPartitions = maxPendingPartitions;
}

public TableRuntime getTableRuntime() {
Expand Down Expand Up @@ -137,6 +140,7 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
needOptimizingPlanMap.putAll(
partitionPlanMap.entrySet().stream()
.filter(entry -> entry.getValue().isNecessary())
.limit(maxPendingPartitions)
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public OptimizingPlanner(
MixedTable table,
double availableCore,
long maxInputSizePerThread) {
super(tableRuntime, table);
super(tableRuntime, table, Integer.MAX_VALUE);
this.partitionFilter =
tableRuntime.getPendingInput() == null
? Expressions.alwaysTrue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public void setup(TableManager tableManager, Configurations conf) {
new TableRuntimeRefreshExecutor(
tableManager,
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL));
conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL),
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ public class TableRuntimeRefreshExecutor extends BaseTableExecutor {

// 1 minutes
private final long interval;
private final int maxPendingPartitions;

public TableRuntimeRefreshExecutor(TableManager tableRuntimes, int poolSize, long interval) {
public TableRuntimeRefreshExecutor(
TableManager tableRuntimes, int poolSize, long interval, int maxPendingPartitions) {
super(tableRuntimes, poolSize);
this.interval = interval;
this.maxPendingPartitions = maxPendingPartitions;
}

@Override
Expand All @@ -48,7 +51,8 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) {

private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable table) {
if (tableRuntime.isOptimizingEnabled() && !tableRuntime.getOptimizingStatus().isProcessing()) {
OptimizingEvaluator evaluator = new OptimizingEvaluator(tableRuntime, table);
OptimizingEvaluator evaluator =
new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
if (evaluator.isNecessary()) {
OptimizingEvaluator.PendingInput pendingInput = evaluator.getOptimizingPendingInput();
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected void reboot() throws InterruptedException {
private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor {

public TableRuntimeRefresher() {
super(tableService(), 1, Integer.MAX_VALUE);
super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
}

void refreshPending() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void appendData(UnkeyedTable table, int id) {

void refreshPending() {
TableRuntimeRefreshExecutor refresher =
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE);
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testFragmentFiles() {
}

protected OptimizingEvaluator buildOptimizingEvaluator() {
return new OptimizingEvaluator(getTableRuntime(), getMixedTable());
return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100);
}

protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private void appendPosDelete(UnkeyedTable table) {

void refreshPending() {
TableRuntimeRefreshExecutor refresher =
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE);
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
Expand Down
1 change: 1 addition & 0 deletions dist/src/main/amoro-bin/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ ams:
refresh-tables:
thread-count: 10
interval: 60000 # 1min
max-pending-partition-count: 100 # default 100

self-optimizing:
commit-thread-count: 10
Expand Down

0 comments on commit cc29688

Please sign in to comment.