Skip to content

Commit

Permalink
javafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
RustedBones committed Aug 12, 2024
1 parent 2825c6d commit 063cf26
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,14 @@ public void processElement(
PaneInfo pane,
OutputReceiver<KV<Input, TryWrapper>> out) {
inputCount++;
flush(r -> {
final KV<Input, TryWrapper> io = r.getValue();
final Instant ts = r.getTimestamp();
final Collection<BoundedWindow> ws = Collections.singleton(r.getWindow());
final PaneInfo p = r.getPane();
out.outputWindowedValue(io, ts, ws, p);
});
flush(
r -> {
final KV<Input, TryWrapper> io = r.getValue();
final Instant ts = r.getTimestamp();
final Collection<BoundedWindow> ws = Collections.singleton(r.getWindow());
final PaneInfo p = r.getPane();
out.outputWindowedValue(io, ts, ws, p);
});
final Cache<String, Output> cache = getResourceCache();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;

import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
Expand All @@ -45,7 +44,8 @@ public abstract class BaseAsyncDoFn<Input, Output, Resource, Future>
public abstract Future processElement(Input input);

private final ConcurrentMap<UUID, Future> futures = new ConcurrentHashMap<>();
private final ConcurrentLinkedQueue<Pair<UUID, ValueInSingleWindow<Output>>> results = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Pair<UUID, ValueInSingleWindow<Output>>> results =
new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();

@StartBundle
Expand Down Expand Up @@ -76,10 +76,7 @@ public void finishBundle(FinishBundleContext context) {
// TODO: remove in 0.15.0
@Deprecated
public void processElement(
Input input,
Instant timestamp,
OutputReceiver<Output> out,
BoundedWindow window) {
Input input, Instant timestamp, OutputReceiver<Output> out, BoundedWindow window) {
processElement(input, timestamp, window, null, out);
}

Expand All @@ -90,13 +87,14 @@ public void processElement(
BoundedWindow window,
PaneInfo pane,
OutputReceiver<Output> out) {
flush(r -> {
final Output o = r.getValue();
final Instant ts = r.getTimestamp();
final Collection<BoundedWindow> ws = Collections.singleton(r.getWindow());
final PaneInfo p = r.getPane();
out.outputWindowedValue(o, ts, ws, p);
});
flush(
r -> {
final Output o = r.getValue();
final Instant ts = r.getTimestamp();
final Collection<BoundedWindow> ws = Collections.singleton(r.getWindow());
final PaneInfo p = r.getPane();
out.outputWindowedValue(o, ts, ws, p);
});

try {
final UUID uuid = UUID.randomUUID();
Expand All @@ -108,7 +106,8 @@ public void processElement(
}
}

private Future handleOutput(Future future, UUID key, Instant timestamp, BoundedWindow window, PaneInfo pane) {
private Future handleOutput(
Future future, UUID key, Instant timestamp, BoundedWindow window, PaneInfo pane) {
return addCallback(
future,
output -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public abstract class BaseAsyncLookupDoFn<Input, Output, Client, Future, TryWrap
private final Semaphore semaphore;
private final ConcurrentMap<UUID, Future> futures = new ConcurrentHashMap<>();
private final ConcurrentMap<Input, Future> inFlightRequests = new ConcurrentHashMap<>();
private final ConcurrentLinkedQueue<Pair<UUID, ValueInSingleWindow<KV<Input, TryWrapper>>>> results = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Pair<UUID, ValueInSingleWindow<KV<Input, TryWrapper>>>>
results = new ConcurrentLinkedQueue<>();
private long inputCount;
private long outputCount;

Expand Down Expand Up @@ -179,13 +180,14 @@ public void processElement(
PaneInfo pane,
OutputReceiver<KV<Input, TryWrapper>> out) {
inputCount++;
flush(r -> {
final KV<Input, TryWrapper> io = r.getValue();
final Instant ts = r.getTimestamp();
final Collection<BoundedWindow> ws = Collections.singleton(r.getWindow());
final PaneInfo p = r.getPane();
out.outputWindowedValue(io, ts, ws, p);
});
flush(
r -> {
final KV<Input, TryWrapper> io = r.getValue();
final Instant ts = r.getTimestamp();
final Collection<BoundedWindow> ws = Collections.singleton(r.getWindow());
final PaneInfo p = r.getPane();
out.outputWindowedValue(io, ts, ws, p);
});
final Client client = getResourceClient();
final Cache<Input, Output> cache = getResourceCache();

Expand Down Expand Up @@ -246,16 +248,24 @@ public void finishBundle(FinishBundleContext context) {
outputCount);
}

private Future handleOutput(Future future, Input input, UUID key, Instant timestamp, BoundedWindow window, PaneInfo pane) {
private Future handleOutput(
Future future,
Input input,
UUID key,
Instant timestamp,
BoundedWindow window,
PaneInfo pane) {
return addCallback(
future,
output -> {
final ValueInSingleWindow<KV<Input, TryWrapper>> result = ValueInSingleWindow.of(KV.of(input, success(output)), timestamp, window, pane);
final ValueInSingleWindow<KV<Input, TryWrapper>> result =
ValueInSingleWindow.of(KV.of(input, success(output)), timestamp, window, pane);
results.add(Pair.of(key, result));
return null;
},
throwable -> {
final ValueInSingleWindow<KV<Input, TryWrapper>> result = ValueInSingleWindow.of(KV.of(input, failure(throwable)), timestamp, window, pane);
final ValueInSingleWindow<KV<Input, TryWrapper>> result =
ValueInSingleWindow.of(KV.of(input, failure(throwable)), timestamp, window, pane);
results.add(Pair.of(key, result));
return null;
});
Expand Down

0 comments on commit 063cf26

Please sign in to comment.