From f15109e1541dee3e9b2b2a7ea5c6ea3f2fdff381 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Fri, 18 Oct 2024 14:32:57 +0800 Subject: [PATCH] [FLINK-36541][pipeline-connector][paimon] pass checkpointId to StoreSinkWrite#prepareCommit correctly. --- .../connectors/paimon/sink/v2/PaimonSink.java | 27 +++++++- .../paimon/sink/v2/PaimonWriter.java | 44 +++++++++++-- .../paimon/sink/v2/PaimonWriterState.java | 34 ++++++++++ .../sink/v2/PaimonWriterStateSerializer.java | 62 +++++++++++++++++++ 4 files changed, 161 insertions(+), 6 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterState.java create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterStateSerializer.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java index 61824ec44a..bf4841cf6d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSink.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Committer; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; @@ -32,10 +33,15 @@ import org.apache.paimon.options.Options; import org.apache.paimon.table.sink.CommitMessageSerializer; +import java.io.IOException; +import java.util.Collection; + /** * A {@link Sink} for Paimon. Maintain this package until Paimon has it own sinkV2 implementation. */ -public class PaimonSink implements WithPreCommitTopology { +public class PaimonSink + implements WithPreCommitTopology, + StatefulSink { // provided a default commit user. public static final String DEFAULT_COMMIT_USER = "admin"; @@ -60,10 +66,27 @@ public PaimonSink( } @Override - public PaimonWriter createWriter(InitContext context) { + public StatefulSinkWriter restoreWriter( + InitContext context, Collection recoveredState) throws IOException { + PaimonWriterState paimonWriterState = recoveredState.iterator().next(); + return new PaimonWriter<>( + catalogOptions, + context.metricGroup(), + commitUser, + serializer, + paimonWriterState.getCheckpointId()); + } + + @Override + public PaimonWriter createWriter(InitContext context) throws IOException { return new PaimonWriter<>(catalogOptions, context.metricGroup(), commitUser, serializer); } + @Override + public SimpleVersionedSerializer getWriterStateSerializer() { + return new PaimonWriterStateSerializer(); + } + @Override public Committer createCommitter() { return new PaimonCommitter(catalogOptions, commitUser); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index 87229f36c0..64ba551458 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -18,10 +18,12 @@ package org.apache.flink.cdc.connectors.paimon.sink.v2; import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; @@ -39,6 +41,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,7 +51,8 @@ /** A {@link Sink} to write {@link DataChangeEvent} to Paimon storage. */ public class PaimonWriter - implements TwoPhaseCommittingSink.PrecommittingSinkWriter { + implements TwoPhaseCommittingSink.PrecommittingSinkWriter, + StatefulSink.StatefulSinkWriter { // use `static` because Catalog is unSerializable. private static Catalog catalog; @@ -68,6 +72,9 @@ public class PaimonWriter private final MetricGroup metricGroup; private final List committables; + /** A workaround variable to pass to {@link StoreSinkWrite#prepareCommit(boolean, long)} */ + private long checkpointId; + public PaimonWriter( Options catalogOptions, MetricGroup metricGroup, @@ -85,6 +92,28 @@ public PaimonWriter( new ExecutorThreadFactory( Thread.currentThread().getName() + "-CdcMultiWrite-Compaction")); this.serializer = serializer; + this.checkpointId = CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1; + } + + public PaimonWriter( + Options catalogOptions, + MetricGroup metricGroup, + String commitUser, + PaimonRecordSerializer serializer, + long checkpointId) { + catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + this.metricGroup = metricGroup; + this.commitUser = commitUser; + this.tables = new HashMap<>(); + this.writes = new HashMap<>(); + this.committables = new ArrayList<>(); + this.ioManager = new IOManagerAsync(); + this.compactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + "-CdcMultiWrite-Compaction")); + this.serializer = serializer; + this.checkpointId = checkpointId; } @Override @@ -166,10 +195,11 @@ public void flush(boolean endOfInput) throws IOException { Identifier key = entry.getKey(); StoreSinkWrite write = entry.getValue(); boolean waitCompaction = false; - // checkpointId will be updated correctly by PreCommitOperator. - long checkpointId = 1L; committables.addAll( - write.prepareCommit(waitCompaction, checkpointId).stream() + // Execution order: flush(boolean endOfInput) => prepareCommit() => + // snapshotState(long checkpointId). So here we set it to snapshotId+1 to avoid + // prepareCommit the same checkpointId + write.prepareCommit(waitCompaction, checkpointId + 1).stream() .map( committable -> MultiTableCommittable.fromCommittable(key, committable)) @@ -186,4 +216,10 @@ public void close() throws Exception { compactExecutor.shutdownNow(); } } + + @Override + public List snapshotState(long checkpointId) throws IOException { + this.checkpointId = checkpointId; + return Collections.singletonList(new PaimonWriterState(checkpointId)); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterState.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterState.java new file mode 100644 index 0000000000..f2306e09c0 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterState.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import java.io.Serializable; + +/** State for {@link PaimonWriter}. */ +public class PaimonWriterState implements Serializable { + + private final long checkpointId; + + public PaimonWriterState(long checkpointId) { + this.checkpointId = checkpointId; + } + + public long getCheckpointId() { + return checkpointId; + } +} diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterStateSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterStateSerializer.java new file mode 100644 index 0000000000..0689129250 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriterStateSerializer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.paimon.sink.v2; + +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.table.api.TableException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** A serializer for the {@link PaimonWriterState}. */ +public class PaimonWriterStateSerializer implements SimpleVersionedSerializer { + + private static final int currentVersion = 0; + + @Override + public int getVersion() { + return currentVersion; + } + + @Override + public byte[] serialize(PaimonWriterState paimonWriterState) throws IOException { + try (ByteArrayOutputStream bao = new ByteArrayOutputStream(256)) { + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(bao); + view.writeLong(paimonWriterState.getCheckpointId()); + return bao.toByteArray(); + } + } + + @Override + public PaimonWriterState deserialize(int version, byte[] serialized) throws IOException { + if (version == 0) { + try (ByteArrayInputStream bis = new ByteArrayInputStream(serialized)) { + DataInputViewStreamWrapper view = new DataInputViewStreamWrapper(bis); + long check = view.readLong(); + return new PaimonWriterState(check); + } + } + throw new TableException( + String.format( + "Can't serialized data with version %d because the max support version is %d.", + version, currentVersion)); + } +}