Skip to content

Commit

Permalink
Code review with Nate
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Jul 9, 2024
1 parent 68d1a56 commit 036dd6d
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@
/**
* Base implementation of OperationInitializer that delegates to a pool of threads.
*/
class OperationInitializationThreadPoolBase implements OperationInitializer {
public class OperationInitializationThreadPoolBase implements OperationInitializer {

private final ThreadLocal<Boolean> isInitializationThread = ThreadLocal.withInitial(() -> false);

private final ThreadPoolExecutor executorService;

private final int numThreads;

OperationInitializationThreadPoolBase(ThreadInitializationFactory factory, int numThreads, String threadGroupName) {
protected OperationInitializationThreadPoolBase(final ThreadInitializationFactory factory, final int numThreads,
final String threadGroupName) {
this.numThreads = numThreads;
final ThreadGroup threadGroup = new ThreadGroup(threadGroupName);
final ThreadFactory threadFactory = new NamingThreadFactory(
Expand All @@ -54,7 +55,7 @@ public boolean canParallelize() {
}

@Override
public Future<?> submit(Runnable runnable) {
public Future<?> submit(final Runnable runnable) {
return executorService.submit(runnable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1539,14 +1539,14 @@ private static boolean snapshotAllTable(
nonEmptyColumnsIndices)) {
return false;
}
boolean canParallelize = false;
if (!nonEmptyColumnsIndices.isEmpty()) {
final ExecutionContext executionContext = ExecutionContext.getContext();
final boolean canParallelize =
ENABLE_PARALLEL_SNAPSHOT &&
executionContext.getOperationInitializer().canParallelize() &&
nonEmptyColumnsIndices.size() > 1 &&
(snapshot.rowsIncluded.size() > MINIMUM_PARALLEL_SNAPSHOT_ROWS ||
!allColumnSourcesInMemory(table, columnSources, nonEmptyColumnsIndices));
canParallelize = ENABLE_PARALLEL_SNAPSHOT &&
executionContext.getOperationInitializer().canParallelize() &&
nonEmptyColumnsIndices.size() > 1 &&
(snapshot.rowsIncluded.size() >= MINIMUM_PARALLEL_SNAPSHOT_ROWS ||
!allColumnSourcesInMemory(table, columnSources, nonEmptyColumnsIndices));
if (canParallelize) {
if (!snapshotColumnsParallel(columnSources, nonEmptyColumnsIndices, table, usePrev, executionContext,
snapshot)) {
Expand All @@ -1561,6 +1561,7 @@ private static boolean snapshotAllTable(
final LogEntry logEntry = log.debug().append(System.identityHashCode(logIdentityObject))
.append(": Snapshot candidate step=")
.append((usePrev ? -1 : 0) + LogicalClock.getStep(getConcurrentAttemptClockValue()))
.append(", canParallelize=").append(canParallelize)
.append(", rows=").append(snapshot.rowsIncluded).append("/").append(keysToSnapshot)
.append(", cols=");
if (columnsToSnapshot == null) {
Expand Down Expand Up @@ -1592,17 +1593,18 @@ private static boolean allColumnSourcesInMemory(
}

/**
* Recursively check if the column source is in-memory or redirected to an in-memory column source.
* Check if the column source is in-memory or redirected to an in-memory column source.
*/
private static boolean isColumnSourceInMemory(final ColumnSource<?> columnSource) {
if (columnSource instanceof InMemoryColumnSource) {
return true;
}
if (!(columnSource instanceof RedirectedColumnSource)) {
return false;
}
final ColumnSource<?> innerSource = ((RedirectedColumnSource<?>) columnSource).getInnerSource();
return isColumnSourceInMemory(innerSource);
private static boolean isColumnSourceInMemory(ColumnSource<?> columnSource) {
do {
if (columnSource instanceof InMemoryColumnSource) {
return ((InMemoryColumnSource) columnSource).isInMemory();
}
if (!(columnSource instanceof RedirectedColumnSource)) {
return false;
}
columnSource = ((RedirectedColumnSource<?>) columnSource).getInnerSource();
} while (true);
}

private static boolean snapshotColumnsParallel(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2707,31 +2707,32 @@ public void testUngroupingAgnostic() {
ColumnVectors.ofObject(t1, "Y", String.class).toArray());
}

private static class CustomOperationInitializationThreadPool extends EgressInitializationThreadPool {
private static final class CustomOperationInitializationThreadPool extends EgressInitializationThreadPool {
private final AtomicBoolean isUsed = new AtomicBoolean(false);

CustomOperationInitializationThreadPool() {
public CustomOperationInitializationThreadPool() {
super(ThreadInitializationFactory.NO_OP);
}

boolean isUsed() {
public boolean isUsed() {
return isUsed.get();
}

@Override
public Future<?> submit(Runnable runnable) {
public Future<?> submit(final Runnable runnable) {
isUsed.set(true);
return super.submit(runnable);
}

void reset() {
public void reset() {
isUsed.set(false);
}
}

public void testEmptyTableSnapshot() {
final Table emptyTableNoColumns = emptyTable(0);
final Table emptyTableWithColumns = emptyTable(0).update("X = i");
final Table emptyTableWithSingleColumn = emptyTable(0).update("X = i");
final Table emptyTableWithMultipleColumns = emptyTable(0).update("X = i", "Y = 2*i", "Z = 3*i");
final CustomOperationInitializationThreadPool customOperationInitilaizer =
new CustomOperationInitializationThreadPool();
try (SafeCloseable ignored =
Expand All @@ -2744,12 +2745,26 @@ public void testEmptyTableSnapshot() {
}

try (final BarrageMessage snap =
ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) emptyTableWithColumns)) {
ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) emptyTableWithSingleColumn)) {
assertTrue(snap.rowsIncluded.isEmpty());
assertTrue(snap.addColumnData.length == 1);
assertTrue(snap.modColumnData.length == 1);
assertTrue(snap.addColumnData[0].data.isEmpty());
assertTrue(snap.modColumnData[0].data.isEmpty());
}

try (final BarrageMessage snap =
ConstructSnapshot.constructBackplaneSnapshot(this, (BaseTable<?>) emptyTableWithMultipleColumns)) {
assertTrue(snap.rowsIncluded.isEmpty());
assertTrue(snap.addColumnData.length == 3);
assertTrue(snap.modColumnData.length == 3);
assertTrue(snap.addColumnData[0].data.isEmpty());
assertTrue(snap.addColumnData[1].data.isEmpty());
assertTrue(snap.addColumnData[2].data.isEmpty());
assertTrue(snap.modColumnData[0].data.isEmpty());
assertTrue(snap.modColumnData[1].data.isEmpty());
assertTrue(snap.modColumnData[2].data.isEmpty());
}
}

// Verify that the custom operation initializer thread pool was not used because empty tables should not be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public FlightServiceGrpcImpl(
final SessionService.ErrorTransformer errorTransformer,
final TicketRouter ticketRouter,
final ArrowFlightUtil.DoExchangeMarshaller.Factory doExchangeFactory,
Map<String, AuthenticationRequestHandler> authRequestHandlers,
final Map<String, AuthenticationRequestHandler> authRequestHandlers,
@Named(OperationInitializer.EGRESS_NAME) final OperationInitializer operationInitializer) {
this.executorService = executorService;
this.streamGeneratorFactory = streamGeneratorFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ public void close() {
}
}

private final OperationInitializer operationInitializer;
private final UpdatePropagationJob updatePropagationJob;

/**
Expand Down Expand Up @@ -336,8 +335,7 @@ public BarrageMessageProducer(

this.propagationRowSet = RowSetFactory.empty();
this.updateIntervalMs = updateIntervalMs;
this.operationInitializer = operationInitializer;
this.updatePropagationJob = new UpdatePropagationJob();
this.updatePropagationJob = new UpdatePropagationJob(operationInitializer);
this.onGetSnapshot = onGetSnapshot;

this.parentTableSize = parent.size();
Expand Down Expand Up @@ -1009,7 +1007,7 @@ private class UpdatePropagationJob implements Runnable {
private final AtomicBoolean needsRun = new AtomicBoolean();
private final ExecutionContext executionContext;

UpdatePropagationJob() {
UpdatePropagationJob(final OperationInitializer operationInitializer) {
this.executionContext = ExecutionContext.newBuilder()
.setOperationInitializer(operationInitializer)
.markSystemic()
Expand Down

0 comments on commit 036dd6d

Please sign in to comment.