Skip to content

Commit

Permalink
[FLINK-36541][pipeline-connector][paimon] pass checkpointId to StoreS…
Browse files Browse the repository at this point in the history
…inkWrite#prepareCommit correctly.
  • Loading branch information
lvyanquan committed Oct 18, 2024
1 parent a1781f4 commit f15109e
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
Expand All @@ -32,10 +33,15 @@
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.CommitMessageSerializer;

import java.io.IOException;
import java.util.Collection;

/**
* A {@link Sink} for Paimon. Maintain this package until Paimon has it own sinkV2 implementation.
*/
public class PaimonSink<InputT> implements WithPreCommitTopology<InputT, MultiTableCommittable> {
public class PaimonSink<InputT>
implements WithPreCommitTopology<InputT, MultiTableCommittable>,
StatefulSink<InputT, PaimonWriterState> {

// provided a default commit user.
public static final String DEFAULT_COMMIT_USER = "admin";
Expand All @@ -60,10 +66,27 @@ public PaimonSink(
}

@Override
public PaimonWriter<InputT> createWriter(InitContext context) {
public StatefulSinkWriter<InputT, PaimonWriterState> restoreWriter(
InitContext context, Collection<PaimonWriterState> recoveredState) throws IOException {
PaimonWriterState paimonWriterState = recoveredState.iterator().next();
return new PaimonWriter<>(
catalogOptions,
context.metricGroup(),
commitUser,
serializer,
paimonWriterState.getCheckpointId());
}

@Override
public PaimonWriter<InputT> createWriter(InitContext context) throws IOException {
return new PaimonWriter<>(catalogOptions, context.metricGroup(), commitUser, serializer);
}

@Override
public SimpleVersionedSerializer<PaimonWriterState> getWriterStateSerializer() {
return new PaimonWriterStateSerializer();
}

@Override
public Committer<MultiTableCommittable> createCommitter() {
return new PaimonCommitter(catalogOptions, commitUser);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.flink.cdc.connectors.paimon.sink.v2;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;

Expand All @@ -39,6 +41,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -48,7 +51,8 @@

/** A {@link Sink} to write {@link DataChangeEvent} to Paimon storage. */
public class PaimonWriter<InputT>
implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, MultiTableCommittable> {
implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, MultiTableCommittable>,
StatefulSink.StatefulSinkWriter<InputT, PaimonWriterState> {

// use `static` because Catalog is unSerializable.
private static Catalog catalog;
Expand All @@ -68,6 +72,9 @@ public class PaimonWriter<InputT>
private final MetricGroup metricGroup;
private final List<MultiTableCommittable> committables;

/** A workaround variable to pass to {@link StoreSinkWrite#prepareCommit(boolean, long)} */
private long checkpointId;

public PaimonWriter(
Options catalogOptions,
MetricGroup metricGroup,
Expand All @@ -85,6 +92,28 @@ public PaimonWriter(
new ExecutorThreadFactory(
Thread.currentThread().getName() + "-CdcMultiWrite-Compaction"));
this.serializer = serializer;
this.checkpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1;
}

public PaimonWriter(
Options catalogOptions,
MetricGroup metricGroup,
String commitUser,
PaimonRecordSerializer<InputT> serializer,
long checkpointId) {
catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
this.metricGroup = metricGroup;
this.commitUser = commitUser;
this.tables = new HashMap<>();
this.writes = new HashMap<>();
this.committables = new ArrayList<>();
this.ioManager = new IOManagerAsync();
this.compactExecutor =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory(
Thread.currentThread().getName() + "-CdcMultiWrite-Compaction"));
this.serializer = serializer;
this.checkpointId = checkpointId;
}

@Override
Expand Down Expand Up @@ -166,10 +195,11 @@ public void flush(boolean endOfInput) throws IOException {
Identifier key = entry.getKey();
StoreSinkWrite write = entry.getValue();
boolean waitCompaction = false;
// checkpointId will be updated correctly by PreCommitOperator.
long checkpointId = 1L;
committables.addAll(
write.prepareCommit(waitCompaction, checkpointId).stream()
// Execution order: flush(boolean endOfInput) => prepareCommit() =>
// snapshotState(long checkpointId). So here we set it to snapshotId+1 to avoid
// prepareCommit the same checkpointId
write.prepareCommit(waitCompaction, checkpointId + 1).stream()
.map(
committable ->
MultiTableCommittable.fromCommittable(key, committable))
Expand All @@ -186,4 +216,10 @@ public void close() throws Exception {
compactExecutor.shutdownNow();
}
}

@Override
public List<PaimonWriterState> snapshotState(long checkpointId) throws IOException {
this.checkpointId = checkpointId;
return Collections.singletonList(new PaimonWriterState(checkpointId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.paimon.sink.v2;

import java.io.Serializable;

/** State for {@link PaimonWriter}. */
public class PaimonWriterState implements Serializable {

private final long checkpointId;

public PaimonWriterState(long checkpointId) {
this.checkpointId = checkpointId;
}

public long getCheckpointId() {
return checkpointId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.paimon.sink.v2;

import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.api.TableException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;

/** A serializer for the {@link PaimonWriterState}. */
public class PaimonWriterStateSerializer implements SimpleVersionedSerializer<PaimonWriterState> {

private static final int currentVersion = 0;

@Override
public int getVersion() {
return currentVersion;
}

@Override
public byte[] serialize(PaimonWriterState paimonWriterState) throws IOException {
try (ByteArrayOutputStream bao = new ByteArrayOutputStream(256)) {
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(bao);
view.writeLong(paimonWriterState.getCheckpointId());
return bao.toByteArray();
}
}

@Override
public PaimonWriterState deserialize(int version, byte[] serialized) throws IOException {
if (version == 0) {
try (ByteArrayInputStream bis = new ByteArrayInputStream(serialized)) {
DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(bis);
long check = view.readLong();
return new PaimonWriterState(check);
}
}
throw new TableException(
String.format(
"Can't serialized data with version %d because the max support version is %d.",
version, currentVersion));
}
}

0 comments on commit f15109e

Please sign in to comment.