yuxiqian commented on code in PR #3499:
URL: https://github.com/apache/flink-cdc/pull/3499#discussion_r1713116670


##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/DataSink.java:
##########
@@ -38,7 +38,8 @@ public interface DataSink {
      * Get the {@code HashFunctionProvider<DataChangeEvent>} for calculating 
hash value if you need
      * to partition by data change event before Sink.
      */
-    default HashFunctionProvider<DataChangeEvent> 
getDataChangeEventHashFunctionProvider() {
+    default HashFunctionProvider<DataChangeEvent> 
getDataChangeEventHashFunctionProvider(

Review Comment:
   Since this is a `@PublicEvolving` interface, should we provide a overloading 
fallback to keep backwards compatibility?
   
   ```java
   default HashFunctionProvider<DataChangeEvent> 
getDataChangeEventHashFunctionProvider() {
       return new DefaultDataChangeEventHashFunctionProvider();
   }
   
   default HashFunctionProvider<DataChangeEvent> 
getDataChangeEventHashFunctionProvider(int parallelism) {
       return getDataChangeEventHashFunctionProvider(); // fallback to nullary 
version if it isn't overridden
   }
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunction.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.data.RecordData;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.function.HashFunction;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.flink.sink.RowAssignerChannelComputer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+
+import java.io.Serializable;
+import java.time.ZoneId;
+import java.util.List;
+
+/**
+ * A {@link HashFunction} implementation for {@link PaimonDataSink}. Shuffle 
{@link DataChangeEvent}
+ * by hash of PrimaryKey.
+ */
+public class PaimonHashFunction implements HashFunction<DataChangeEvent>, 
Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final List<RecordData.FieldGetter> fieldGetters;
+
+    private final RowAssignerChannelComputer channelComputer;
+
+    public PaimonHashFunction(
+            Options options, TableId tableId, Schema schema, ZoneId zoneId, 
int parallelism) {
+        Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(options);
+        FileStoreTable table;
+        try {
+            table = (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId.toString()));

Review Comment:
   `TableId` is nullable and might cause an NPE. Maybe assert it's not null 
too, and throw a more descriptive exception like "invalid tableId"?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java:
##########
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.OperatorIDGenerator;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.TableSchemaInfo;
+import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.index.BucketAssigner;
+import org.apache.paimon.index.HashBucketAssigner;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.RowKeyExtractor;
+import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.utils.MathUtils;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/** Assign bucket for every given {@link DataChangeEvent}. */
+public class BucketAssignOperator extends AbstractStreamOperator<Event>
+        implements OneInputStreamOperator<Event, Event> {
+
+    public static final String COMMIT_USER = "admin";

Review Comment:
   Should we use commit user configured by `PaimonDataSinkOptions.COMMIT_USER` 
instead of hard-encoding one?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventKeySelector.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cdc.common.event.Event;
+
+/** {@link KeySelector} for {@link BucketWrapperChangeEvent}. */
+public class BucketWrapperEventKeySelector implements KeySelector<Event, 
Integer> {
+    @Override
+    public Integer getKey(Event event) {
+        return ((BucketWrapper) event).getBucket();
+    }
+}

Review Comment:
   Will this be reused somehow? Maybe we can just use a lambda expression 
instead of creating independent classes.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapper.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+/** Wrapper class with bucket. */
+public interface BucketWrapper {
+
+    int getBucket();

Review Comment:
   Need JavaDocs for interface methods.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java:
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink;
+
+import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.paimon.options.Options;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/** Tests for {@link PaimonHashFunction}. */
+public class PaimonHashFunctionTest {
+
+    @TempDir public static Path temporaryFolder;
+
+    private Catalog catalog;
+
+    private Options catalogOptions;
+
+    private static final String TEST_DATABASE = "test";
+
+    @BeforeEach
+    public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
+        catalogOptions = new Options();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.setString("warehouse", warehouse);
+        catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+        catalog.createDatabase(TEST_DATABASE, true);
+    }
+
+    @AfterEach
+    public void afterEach() throws Exception {
+        catalog.dropDatabase(TEST_DATABASE, true, true);
+        catalog.close();
+    }
+
+    @Test
+    public void testHashCodeForFixedBucketTable() {
+        TableId tableId = TableId.tableId(TEST_DATABASE, "test_table");
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put("bucket", "10");
+        MetadataApplier metadataApplier =
+                new PaimonMetadataApplier(catalogOptions, tableOptions, new 
HashMap<>());
+        Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("col1", DataTypes.STRING().notNull())
+                        .physicalColumn("col2", DataTypes.STRING())
+                        .physicalColumn("pt", DataTypes.STRING())
+                        .primaryKey("col1", "pt")
+                        .partitionKey("pt")
+                        .build();
+        CreateTableEvent createTableEvent = new CreateTableEvent(tableId, 
schema);
+        metadataApplier.applySchemaChange(createTableEvent);
+        BinaryRecordDataGenerator generator =
+                new 
BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]));
+        PaimonHashFunction hashFunction =
+                new PaimonHashFunction(catalogOptions, tableId, schema, 
ZoneId.systemDefault(), 4);
+        DataChangeEvent dataChangeEvent1 =
+                DataChangeEvent.insertEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2024")
+                                }));
+        int key1 = hashFunction.hashcode(dataChangeEvent1);
+
+        DataChangeEvent dataChangeEvent2 =
+                DataChangeEvent.updateEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2024")
+                                }),
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2"),
+                                    BinaryStringData.fromString("2024")
+                                }));
+        int key2 = hashFunction.hashcode(dataChangeEvent2);
+
+        DataChangeEvent dataChangeEvent3 =
+                DataChangeEvent.deleteEvent(
+                        tableId,
+                        generator.generate(
+                                new Object[] {
+                                    BinaryStringData.fromString("1"),
+                                    BinaryStringData.fromString("2"),
+                                    BinaryStringData.fromString("2024")
+                                }));
+        int key3 = hashFunction.hashcode(dataChangeEvent3);
+
+        Assertions.assertEquals(key1, key2, key3);

Review Comment:
   Minor: I remember that Muhammet is working on migration from JUnit 
assertions to AssertJ style ones. Maybe we should follow this guideline in 
newly added test cases?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventPartitioner.java:
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.common.functions.Partitioner;
+
+/** {@link Partitioner} for {@link BucketWrapperChangeEvent}. */
+public class BucketWrapperEventPartitioner implements Partitioner<Integer> {
+    @Override
+    public int partition(Integer bucket, int numPartitions) {
+        return bucket % numPartitions;

Review Comment:
   Ditto



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventTypeInfo.java:
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEvent;
+
+/** A {@link TypeInformation} for {@link PaimonEvent}. */
+public class BucketWrapperEventTypeInfo extends TypeInformation<Event> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    public int getArity() {
+        return 0;
+    }
+
+    @Override
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @Override
+    public Class<Event> getTypeClass() {
+        return Event.class;
+    }
+
+    @Override
+    public boolean isKeyType() {
+        return false;
+    }
+
+    @Override
+    public TypeSerializer<Event> createSerializer(ExecutionConfig config) {
+        return BucketWrapperEventSerializer.INSTANCE;
+    }
+
+    @Override
+    public String toString() {
+        return "Event";

Review Comment:
   Could this be more specific?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2;
+
+import org.apache.flink.cdc.common.event.Event;
+import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
+import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventKeySelector;
+import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventPartitioner;
+import 
org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.apache.paimon.flink.sink.MultiTableCommittable;
+import org.apache.paimon.flink.sink.MultiTableCommittableSerializer;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.sink.CommitMessageSerializer;
+
+import java.time.ZoneId;
+
+/** A {@link PaimonSink} to process {@link Event}. */
+public class PaimonEventSink extends PaimonSink<Event> implements 
WithPreWriteTopology<Event> {
+
+    public final String schemaOperatorUid;
+
+    public final ZoneId zoneId;
+
+    public PaimonEventSink(
+            Options catalogOptions,
+            String commitUser,
+            PaimonRecordSerializer<Event> serializer,
+            String schemaOperatorUid,
+            ZoneId zoneId) {
+        super(catalogOptions, commitUser, serializer);
+        this.schemaOperatorUid = schemaOperatorUid;
+        this.zoneId = zoneId;
+    }
+
+    @Override
+    public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) 
{
+        // Shuffle by key hash => Assign bucket => Shuffle by bucket.
+        return dataStream
+                .transform(
+                        "BucketAssign",
+                        new BucketWrapperEventTypeInfo(),
+                        new BucketAssignOperator(catalogOptions, 
schemaOperatorUid, zoneId))
+                .name("Assign Bucket")
+                .partitionCustom(
+                        new BucketWrapperEventPartitioner(), new 
BucketWrapperEventKeySelector());

Review Comment:
   These interfaces are marked as `@FunctionalInterface`s, and should be safe 
to be replaced by lambda expressions.
   
   ```suggestion
                   .partitionCustom(
                           (bucket, numPartitions) -> bucket % numPartitions,
                           event -> ((BucketWrapper) event).getBucket());
   ```



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.cdc.common.event.ChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+
+import java.util.Objects;
+
+/** A wrapper class for {@link ChangeEvent} to attach bucket id. */
+public class BucketWrapperChangeEvent implements ChangeEvent, BucketWrapper {

Review Comment:
   Should this be `Serializable`?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/PaimonHashFunctionTest.java:
##########


Review Comment:
   Wondering if we could add E2e test cases for Paimon since this testcase 
seems a little insufficient.



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperFlushEvent.java:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;
+
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.TableId;
+
+import java.util.Objects;
+
+/** A wrapper class for {@link FlushEvent} to attach bucket id. */
+public class BucketWrapperFlushEvent extends FlushEvent implements 
BucketWrapper {
+
+    private final int bucket;
+
+    public BucketWrapperFlushEvent(int bucket, TableId tableId) {
+        super(tableId);
+        this.bucket = bucket;
+    }
+
+    @Override
+    public int getBucket() {
+        return bucket;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        if (!super.equals(o)) {
+            return false;
+        }
+        BucketWrapperFlushEvent that = (BucketWrapperFlushEvent) o;
+        return bucket == that.bucket;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(super.hashCode(), bucket);
+    }
+
+    @Override
+    public String toString() {
+        return "BucketWrapperFlushEvent{" + "bucket=" + bucket + '}';

Review Comment:
   Would be helpful if we could print `tableId` in String representation too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to