Skip to content

Commit

Permalink
[proxima-beam] update dependencies and loop timer
Browse files Browse the repository at this point in the history
  • Loading branch information
je-ik committed Jun 27, 2022
1 parent 9926367 commit b8a8694
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,9 @@ public TypeDescriptor<Pair<K, V>> getOutputTypeDescriptor() {
@VisibleForTesting
static class ReduceValueStateByKey<K, V, S, O> extends DoFn<KV<K, V>, Pair<K, O>> {

private static final Instant MAX_ACCEPTABLE_STAMP =
BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(300));

static <K, V, S, O> ReduceValueStateByKey<K, V, S, O> of(
Closure<S> initialState,
Closure<S> stateUpdate,
Expand Down Expand Up @@ -1458,16 +1461,19 @@ public void processElement(

@OnTimer("earlyTimer")
public void onTimer(
OnTimerContext context,
@Timestamp Instant ts,
@StateId("value") ValueState<Pair<K, S>> valueState,
@TimerId("earlyTimer") Timer earlyTimer) {
@TimerId("earlyTimer") Timer earlyTimer,
OutputReceiver<Pair<K, O>> collector) {

Pair<K, S> current = Objects.requireNonNull(valueState.read());
O outputElem = output.call(current.getSecond(), null);
if (outputElem != null) {
context.output(Pair.of(current.getFirst(), outputElem));
collector.output(Pair.of(current.getFirst(), outputElem));
}
if (ts.isBefore(MAX_ACCEPTABLE_STAMP)) {
earlyTimer.offset(earlyEmitting).setRelative();
}
earlyTimer.offset(earlyEmitting).setRelative();
}

@SuppressWarnings("unchecked")
Expand Down
18 changes: 18 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -657,12 +657,30 @@
<version>${grpc.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
<version>${grpc.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-context</artifactId>
<version>${grpc.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-census</artifactId>
<version>${grpc.version}</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
Expand Down

0 comments on commit b8a8694

Please sign in to comment.