This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0acdfe9519 [INLONG-10195][Sort] Hudi connector support audit ID 
(#10231)
0acdfe9519 is described below

commit 0acdfe9519a18048171b3ed8e0dcd71f0527b0b8
Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com>
AuthorDate: Wed May 22 09:47:12 2024 +0800

    [INLONG-10195][Sort] Hudi connector support audit ID (#10231)
---
 .../sort-flink-v1.15/sort-connectors/hudi/pom.xml  |   1 -
 .../inlong/sort/hudi/sink/StreamWriteFunction.java | 495 +++++++++++++++++++++
 .../inlong/sort/hudi/sink/StreamWriteOperator.java |  43 ++
 .../sort/hudi/sink/append/AppendWriteFunction.java | 156 +++++++
 .../sort/hudi/sink/append/AppendWriteOperator.java |  44 ++
 .../sink/bucket/BucketStreamWriteFunction.java     | 185 ++++++++
 .../sink/bucket/BucketStreamWriteOperator.java     |  40 ++
 .../hudi/sink/bulk/BulkInsertWriteFunction.java    | 229 ++++++++++
 .../hudi/sink/bulk/BulkInsertWriteOperator.java    |  55 +++
 .../inlong/sort/hudi/sink/utils/Pipelines.java     | 469 +++++++++++++++++++
 .../inlong/sort/hudi/table/HoodieTableFactory.java | 355 +++++++++++++++
 .../inlong/sort/hudi/table/HoodieTableSink.java    | 150 +++++++
 .../hudi/table/sink/HudiTableInlongFactory.java    |  55 ---
 .../org.apache.flink.table.factories.Factory       |   2 +-
 licenses/inlong-sort-connectors/LICENSE            |  13 +
 15 files changed, 2235 insertions(+), 57 deletions(-)

diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml
index ca3dc60e6d..075b6da594 100644
--- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml
+++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml
@@ -79,7 +79,6 @@
                         <configuration>
                             <artifactSet>
                                 <includes>
-                                    <include>org.apache.hudi:*</include>
                                     
<include>org.apache.hive:hive-exec</include>
                                     <include>org.apache.hadoop:*</include>
                                     <include>com.fasterxml.woodstox:*</include>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java
new file mode 100644
index 0000000000..33551f6de5
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.ObjectSizeCalculator;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.table.action.commit.FlinkWriteHelper;
+import org.apache.hudi.util.StreamerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Random;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p><h2>Work Flow</h2>
+ *
+ * <p>The function firstly buffers the data as a batch of {@link 
HoodieRecord}s,
+ * It flushes(write) the records batch when the batch size exceeds the 
configured size {@link FlinkOptions#WRITE_BATCH_SIZE}
+ * or the total buffer size exceeds the configured size {@link 
FlinkOptions#WRITE_TASK_MAX_SIZE}
+ * or a Flink checkpoint starts. After a batch has been written successfully,
+ * the function notifies its operator coordinator {@link 
StreamWriteOperatorCoordinator} to mark a successful write.
+ *
+ * <p><h2>The Semantics</h2>
+ *
+ * <p>The task implements exactly-once semantics by buffering the data between 
checkpoints. The operator coordinator
+ * starts a new instant on the timeline when a checkpoint triggers, the 
coordinator checkpoints always
+ * start before its operator, so when this function starts a checkpoint, a 
REQUESTED instant already exists.
+ *
+ * <p>The function process thread blocks data buffering after the checkpoint 
thread finishes flushing the existing data buffer until
+ * the current checkpoint succeed and the coordinator starts a new instant. 
Any error triggers the job failure during the metadata committing,
+ * when the job recovers from a failure, the write function re-send the write 
metadata to the coordinator to see if these metadata
+ * can re-commit, thus if unexpected error happens during the instant 
committing, the coordinator would retry to commit when the job
+ * recovers.
+ *
+ * <p><h2>Fault Tolerance</h2>
+ *
+ * <p>The operator coordinator checks and commits the last instant then starts 
a new one after a checkpoint finished successfully.
+ * It rolls back any inflight instant before it starts a new instant, this 
means one hoodie instant only span one checkpoint,
+ * the write function blocks data buffer flushing for the configured 
checkpoint timeout
+ * before it throws exception, any checkpoint failure would finally trigger 
the job failure.
+ *
+ * <p>Note: The function task requires the input stream be shuffled by the 
file IDs.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ *
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(StreamWriteFunction.class);
+
+    /**
+     * Write buffer as buckets for a checkpoint. The key is bucket ID.
+     */
+    private transient Map<String, DataBucket> buckets;
+
+    private transient BiFunction<List<HoodieRecord>, String, 
List<WriteStatus>> writeFunction;
+
+    /**
+     * Total size tracer.
+     */
+    private transient TotalSizeTracer tracer;
+    private final MetricOption metricOption;
+    private SinkMetricData sinkMetricData;
+
+    /**
+     * Constructs a StreamingSinkFunction.
+     *
+     * @param config The config options
+     */
+    public StreamWriteFunction(Configuration config, MetricOption 
metricOption) {
+        super(config);
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public void open(Configuration parameters) {
+        this.tracer = new TotalSizeTracer(this.config);
+        initBuffer();
+        initWriteFunction();
+        if (metricOption != null) {
+            this.sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+        }
+    }
+
+    @Override
+    public void snapshotState() {
+        // Based on the fact that the coordinator starts the checkpoint first,
+        // it would check the validity.
+        // wait for the buffer data flush out and request a new instant
+        flushRemaining(false);
+    }
+
+    @Override
+    public void processElement(I value, ProcessFunction<I, Object>.Context 
ctx, Collector<Object> out)
+            throws Exception {
+        bufferRecord((HoodieRecord<?>) value);
+        if (sinkMetricData != null) {
+            sinkMetricData.invokeWithEstimate(value);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (this.writeClient != null) {
+            this.writeClient.close();
+        }
+    }
+
+    /**
+     * End input action for batch source.
+     */
+    public void endInput() {
+        super.endInput();
+        flushRemaining(true);
+        this.writeClient.cleanHandles();
+        this.writeStatuses.clear();
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Utilities
+    // 
-------------------------------------------------------------------------
+
+    private void initBuffer() {
+        this.buckets = new LinkedHashMap<>();
+    }
+
+    private void initWriteFunction() {
+        final String writeOperation = this.config.get(FlinkOptions.OPERATION);
+        switch (WriteOperationType.fromValue(writeOperation)) {
+            case INSERT:
+                this.writeFunction = (records, instantTime) -> 
this.writeClient.insert(records, instantTime);
+                break;
+            case UPSERT:
+                this.writeFunction = (records, instantTime) -> 
this.writeClient.upsert(records, instantTime);
+                break;
+            case INSERT_OVERWRITE:
+                this.writeFunction = (records, instantTime) -> 
this.writeClient.insertOverwrite(records, instantTime);
+                break;
+            case INSERT_OVERWRITE_TABLE:
+                this.writeFunction =
+                        (records, instantTime) -> 
this.writeClient.insertOverwriteTable(records, instantTime);
+                break;
+            default:
+                throw new RuntimeException("Unsupported write operation : " + 
writeOperation);
+        }
+    }
+
+    /**
+     * Represents a data item in the buffer, this is needed to reduce the
+     * memory footprint.
+     *
+     * <p>A {@link HoodieRecord} was firstly transformed into a {@link 
DataItem}
+     * for buffering, it then transforms back to the {@link HoodieRecord} 
before flushing.
+     */
+    private static class DataItem {
+
+        private final String key; // record key
+        private final String instant; // 'U' or 'I'
+        private final HoodieRecordPayload<?> data; // record payload
+        private final HoodieOperation operation; // operation
+
+        private DataItem(String key, String instant, HoodieRecordPayload<?> 
data, HoodieOperation operation) {
+            this.key = key;
+            this.instant = instant;
+            this.data = data;
+            this.operation = operation;
+        }
+
+        public static DataItem fromHoodieRecord(HoodieRecord<?> record) {
+            return new DataItem(
+                    record.getRecordKey(),
+                    record.getCurrentLocation().getInstantTime(),
+                    ((HoodieAvroRecord) record).getData(),
+                    record.getOperation());
+        }
+
+        public HoodieRecord<?> toHoodieRecord(String partitionPath) {
+            HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath);
+            HoodieRecord<?> record = new HoodieAvroRecord<>(hoodieKey, data, 
operation);
+            HoodieRecordLocation loc = new HoodieRecordLocation(instant, null);
+            record.setCurrentLocation(loc);
+            return record;
+        }
+    }
+
+    /**
+     * Data bucket.
+     */
+    private static class DataBucket {
+
+        private final List<DataItem> records;
+        private final BufferSizeDetector detector;
+        private final String partitionPath;
+        private final String fileID;
+
+        private DataBucket(Double batchSize, HoodieRecord<?> hoodieRecord) {
+            this.records = new ArrayList<>();
+            this.detector = new BufferSizeDetector(batchSize);
+            this.partitionPath = hoodieRecord.getPartitionPath();
+            this.fileID = hoodieRecord.getCurrentLocation().getFileId();
+        }
+
+        /**
+         * Prepare the write data buffer: patch up all the records with 
correct partition path.
+         */
+        public List<HoodieRecord> writeBuffer() {
+            // rewrite all the records with new record key
+            return records.stream()
+                    .map(record -> record.toHoodieRecord(partitionPath))
+                    .collect(Collectors.toList());
+        }
+
+        /**
+         * Sets up before flush: patch up the first record with correct 
partition path and fileID.
+         *
+         * <p>Note: the method may modify the given records {@code records}.
+         */
+        public void preWrite(List<HoodieRecord> records) {
+            // rewrite the first record with expected fileID
+            HoodieRecord<?> first = records.get(0);
+            HoodieRecord<?> record =
+                    new HoodieAvroRecord<>(first.getKey(), 
(HoodieRecordPayload) first.getData(), first.getOperation());
+            HoodieRecordLocation newLoc = new 
HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID);
+            record.setCurrentLocation(newLoc);
+
+            records.set(0, record);
+        }
+
+        public void reset() {
+            this.records.clear();
+            this.detector.reset();
+        }
+    }
+
+    /**
+     * Tool to detect if to flush out the existing buffer.
+     * Sampling the record to compute the size with 0.01 percentage.
+     */
+    private static class BufferSizeDetector {
+
+        private final Random random = new Random(47);
+        private static final int DENOMINATOR = 100;
+
+        private final double batchSizeBytes;
+
+        private long lastRecordSize = -1L;
+        private long totalSize = 0L;
+
+        BufferSizeDetector(double batchSizeMb) {
+            this.batchSizeBytes = batchSizeMb * 1024 * 1024;
+        }
+
+        boolean detect(Object record) {
+            if (lastRecordSize == -1 || sampling()) {
+                lastRecordSize = ObjectSizeCalculator.getObjectSize(record);
+            }
+            totalSize += lastRecordSize;
+            return totalSize > this.batchSizeBytes;
+        }
+
+        boolean sampling() {
+            // 0.01 sampling percentage
+            return random.nextInt(DENOMINATOR) == 1;
+        }
+
+        void reset() {
+            this.lastRecordSize = -1L;
+            this.totalSize = 0L;
+        }
+    }
+
+    /**
+     * Tool to trace the total buffer size. It computes the maximum buffer 
size,
+     * if current buffer size is greater than the maximum buffer size, the 
data bucket
+     * flush triggers.
+     */
+    private static class TotalSizeTracer {
+
+        private long bufferSize = 0L;
+        private final double maxBufferSize;
+
+        TotalSizeTracer(Configuration conf) {
+            long mergeReaderMem = 100; // constant 100MB
+            long mergeMapMaxMem = 
conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
+            this.maxBufferSize =
+                    (conf.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - 
mergeReaderMem - mergeMapMaxMem) * 1024 * 1024;
+            final String errMsg = String.format(
+                    "'%s' should be at least greater than '%s' plus merge 
reader memory(constant 100MB now)",
+                    FlinkOptions.WRITE_TASK_MAX_SIZE.key(), 
FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
+            ValidationUtils.checkState(this.maxBufferSize > 0, errMsg);
+        }
+
+        /**
+         * Trace the given record size {@code recordSize}.
+         *
+         * @param recordSize The record size
+         * @return true if the buffer size exceeds the maximum buffer size
+         */
+        boolean trace(long recordSize) {
+            this.bufferSize += recordSize;
+            return this.bufferSize > this.maxBufferSize;
+        }
+
+        void countDown(long size) {
+            this.bufferSize -= size;
+        }
+
+        public void reset() {
+            this.bufferSize = 0;
+        }
+    }
+
+    /**
+     * Returns the bucket ID with the given value {@code value}.
+     */
+    private String getBucketID(HoodieRecord<?> record) {
+        final String fileId = record.getCurrentLocation().getFileId();
+        return StreamerUtil.generateBucketKey(record.getPartitionPath(), 
fileId);
+    }
+
+    /**
+     * Buffers the given record.
+     *
+     * <p>Flush the data bucket first if the bucket records size is greater 
than
+     * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}.
+     *
+     * <p>Flush the max size data bucket if the total buffer size exceeds the 
configured
+     * threshold {@link FlinkOptions#WRITE_TASK_MAX_SIZE}.
+     *
+     * @param value HoodieRecord
+     */
+    protected void bufferRecord(HoodieRecord<?> value) {
+        final String bucketID = getBucketID(value);
+
+        DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
+                k -> new 
DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
+        final DataItem item = DataItem.fromHoodieRecord(value);
+
+        bucket.records.add(item);
+
+        boolean flushBucket = bucket.detector.detect(item);
+        boolean flushBuffer = 
this.tracer.trace(bucket.detector.lastRecordSize);
+        if (flushBucket) {
+            if (flushBucket(bucket)) {
+                this.tracer.countDown(bucket.detector.totalSize);
+                bucket.reset();
+            }
+        } else if (flushBuffer) {
+            // find the max size bucket and flush it out
+            DataBucket bucketToFlush = this.buckets.values().stream()
+                    .max(Comparator.comparingLong(b -> b.detector.totalSize))
+                    .orElseThrow(NoSuchElementException::new);
+            if (flushBucket(bucketToFlush)) {
+                this.tracer.countDown(bucketToFlush.detector.totalSize);
+                bucketToFlush.reset();
+            } else {
+                LOG.warn("The buffer size hits the threshold {}, but still 
flush the max size data bucket failed!",
+                        this.tracer.maxBufferSize);
+            }
+        }
+    }
+
+    private boolean hasData() {
+        return this.buckets.size() > 0
+                && this.buckets.values().stream().anyMatch(bucket -> 
bucket.records.size() > 0);
+    }
+
+    @SuppressWarnings("unchecked, rawtypes")
+    private boolean flushBucket(DataBucket bucket) {
+        String instant = instantToWrite(true);
+
+        if (instant == null) {
+            // in case there are empty checkpoints that has no input data
+            LOG.info("No inflight instant when flushing data, skip.");
+            return false;
+        }
+
+        List<HoodieRecord> records = bucket.writeBuffer();
+        ValidationUtils.checkState(records.size() > 0, "Data bucket to flush 
has no buffering records");
+        if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
+            records = 
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, 
-1);
+        }
+        bucket.preWrite(records);
+        final List<WriteStatus> writeStatus = new 
ArrayList<>(writeFunction.apply(records, instant));
+        records.clear();
+        final WriteMetadataEvent event = WriteMetadataEvent.builder()
+                .taskID(taskID)
+                .instantTime(instant) // the write instant may shift but the 
event still use the currentInstant.
+                .writeStatus(writeStatus)
+                .lastBatch(false)
+                .endInput(false)
+                .build();
+
+        this.eventGateway.sendEventToCoordinator(event);
+        writeStatuses.addAll(writeStatus);
+        return true;
+    }
+
+    @SuppressWarnings("unchecked, rawtypes")
+    private void flushRemaining(boolean endInput) {
+        this.currentInstant = instantToWrite(hasData());
+        if (this.currentInstant == null) {
+            // in case there are empty checkpoints that has no input data
+            throw new HoodieException("No inflight instant when flushing 
data!");
+        }
+        final List<WriteStatus> writeStatus;
+        if (buckets.size() > 0) {
+            writeStatus = new ArrayList<>();
+            this.buckets.values()
+                    // The records are partitioned by the bucket ID and each 
batch sent to
+                    // the writer belongs to one bucket.
+                    .forEach(bucket -> {
+                        List<HoodieRecord> records = bucket.writeBuffer();
+                        if (records.size() > 0) {
+                            if (config.getBoolean(FlinkOptions.PRE_COMBINE)) {
+                                records = 
FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null,
+                                        -1);
+                            }
+                            bucket.preWrite(records);
+                            writeStatus.addAll(writeFunction.apply(records, 
currentInstant));
+                            records.clear();
+                            bucket.reset();
+                        }
+                    });
+        } else {
+            LOG.info("No data to write in subtask [{}] for instant [{}]", 
taskID, currentInstant);
+            writeStatus = Collections.emptyList();
+        }
+        final WriteMetadataEvent event = WriteMetadataEvent.builder()
+                .taskID(taskID)
+                .instantTime(currentInstant)
+                .writeStatus(writeStatus)
+                .lastBatch(true)
+                .endInput(endInput)
+                .build();
+
+        this.eventGateway.sendEventToCoordinator(event);
+        this.buckets.clear();
+        this.tracer.reset();
+        this.writeClient.cleanHandles();
+        this.writeStatuses.addAll(writeStatus);
+        // blocks flushing until the coordinator starts a new instant
+        this.confirming = true;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java
new file mode 100644
index 0000000000..a25caf9b35
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
+/**
+ * Operator for {@link StreamSink}.
+ *
+ * @param <I> The input type
+ * <p>
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class StreamWriteOperator<I> extends AbstractWriteOperator<I> {
+
+    public StreamWriteOperator(Configuration conf, MetricOption metricOption) {
+        super(new StreamWriteFunction<>(conf, metricOption));
+    }
+
+    public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, 
MetricOption metricOption) {
+        return WriteOperatorFactory.instance(conf, new 
StreamWriteOperator<>(conf, metricOption));
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java
new file mode 100644
index 0000000000..817e727f62
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.append;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p>The function writes base files directly for each checkpoint,
+ * the file may roll over when it’s size hits the configured threshold.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ *
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AppendWriteFunction.class);
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * Helper class for log mode.
+     */
+    private transient BulkInsertWriterHelper writerHelper;
+
+    /**
+     * Table row type.
+     */
+    private final RowType rowType;
+
+    private final MetricOption metricOption;
+
+    private SinkMetricData sinkMetricData;
+
+    /**
+     * Constructs an AppendWriteFunction.
+     *
+     * @param config The config options
+     */
+    public AppendWriteFunction(Configuration config, RowType rowType, 
MetricOption metricOption) {
+        super(config);
+        this.rowType = rowType;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public void open(Configuration parameters) {
+        if (metricOption != null) {
+            this.sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+        }
+    }
+
+    @Override
+    public void snapshotState() {
+        // Based on the fact that the coordinator starts the checkpoint first,
+        // it would check the validity.
+        // wait for the buffer data flush out and request a new instant
+        flushData(false);
+    }
+
+    @Override
+    public void processElement(I value, Context ctx, Collector<Object> out) 
throws Exception {
+        if (this.writerHelper == null) {
+            initWriterHelper();
+        }
+        this.writerHelper.write((RowData) value);
+        if (sinkMetricData != null) {
+            sinkMetricData.invokeWithEstimate(value);
+        }
+    }
+
+    /**
+     * End input action for batch source.
+     */
+    public void endInput() {
+        super.endInput();
+        flushData(true);
+        this.writeStatuses.clear();
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Utilities
+    // 
-------------------------------------------------------------------------
+    private void initWriterHelper() {
+        final String instant = instantToWrite(true);
+        if (instant == null) {
+            // in case there are empty checkpoints that has no input data
+            throw new HoodieException("No inflight instant when flushing 
data!");
+        }
+        this.writerHelper =
+                new BulkInsertWriterHelper(this.config, 
this.writeClient.getHoodieTable(), this.writeClient.getConfig(),
+                        instant, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(),
+                        getRuntimeContext().getAttemptNumber(),
+                        this.rowType);
+    }
+
+    private void flushData(boolean endInput) {
+        final List<WriteStatus> writeStatus;
+        if (this.writerHelper != null) {
+            writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
+            this.currentInstant = this.writerHelper.getInstantTime();
+        } else {
+            writeStatus = Collections.emptyList();
+            this.currentInstant = instantToWrite(false);
+            LOG.info("No data to write in subtask [{}] for instant [{}]", 
taskID, this.currentInstant);
+        }
+        final WriteMetadataEvent event = WriteMetadataEvent.builder()
+                .taskID(taskID)
+                .instantTime(this.currentInstant)
+                .writeStatus(writeStatus)
+                .lastBatch(true)
+                .endInput(endInput)
+                .build();
+        this.eventGateway.sendEventToCoordinator(event);
+        // nullify the write helper for next ckp
+        this.writerHelper = null;
+        this.writeStatuses.addAll(writeStatus);
+        // blocks flushing until the coordinator starts a new instant
+        this.confirming = true;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java
new file mode 100644
index 0000000000..41fa03e793
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.append;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
+/**
+ * Operator for {@link AppendWriteFunction}.
+ *
+ * @param <I> The input type
+ *
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class AppendWriteOperator<I> extends AbstractWriteOperator<I> {
+
+    public AppendWriteOperator(Configuration conf, RowType rowType, 
MetricOption metricOption) {
+        super(new AppendWriteFunction<>(conf, rowType, metricOption));
+    }
+
+    public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, 
RowType rowType,
+            MetricOption metricOption) {
+        return WriteOperatorFactory.instance(conf, new 
AppendWriteOperator<>(conf, rowType, metricOption));
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteFunction.java
new file mode 100644
index 0000000000..3bafbb806f
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.bucket;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.sink.StreamWriteFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A stream write function with bucket hash index.
+ *
+ * <p>The task holds a fresh new local index: {(partition + bucket number) 
&rarr fileId} mapping, this index
+ * is used for deciding whether the incoming records in an UPDATE or INSERT.
+ * The index is local because different partition paths have separate items in 
the index.
+ *
+ * @param <I> the input type
+ */
+public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketStreamWriteFunction.class);
+
+    private int parallelism;
+
+    private int bucketNum;
+
+    private String indexKeyFields;
+
+    /**
+     * BucketID to file group mapping in each partition.
+     * Map(partition -> Map(bucketId, fileID)).
+     */
+    private Map<String, Map<Integer, String>> bucketIndex;
+
+    /**
+     * Incremental bucket index of the current checkpoint interval,
+     * it is needed because the bucket type('I' or 'U') should be decided 
based on the committed files view,
+     * all the records in one bucket should have the same bucket type.
+     */
+    private Set<String> incBucketIndex;
+    private final MetricOption metricOption;
+    private SinkMetricData sinkMetricData;
+
+    /**
+     * Constructs a BucketStreamWriteFunction.
+     *
+     * @param config The config options
+     */
+    public BucketStreamWriteFunction(Configuration config, MetricOption 
metricOption) {
+        super(config);
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws IOException {
+        super.open(parameters);
+        this.bucketNum = 
config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+        this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD);
+        this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+        this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
+        this.bucketIndex = new HashMap<>();
+        this.incBucketIndex = new HashSet<>();
+        if (metricOption != null) {
+            this.sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+        }
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+        super.initializeState(context);
+    }
+
+    @Override
+    public void snapshotState() {
+        super.snapshotState();
+        this.incBucketIndex.clear();
+    }
+
+    @Override
+    public void processElement(I i, ProcessFunction<I, Object>.Context 
context, Collector<Object> collector)
+            throws Exception {
+        HoodieRecord<?> record = (HoodieRecord<?>) i;
+        final HoodieKey hoodieKey = record.getKey();
+        final String partition = hoodieKey.getPartitionPath();
+        final HoodieRecordLocation location;
+
+        bootstrapIndexIfNeed(partition);
+        Map<Integer, String> bucketToFileId = 
bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
+        final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, 
indexKeyFields, this.bucketNum);
+        final String bucketId = partition + "/" + bucketNum;
+
+        if (incBucketIndex.contains(bucketId)) {
+            location = new HoodieRecordLocation("I", 
bucketToFileId.get(bucketNum));
+        } else if (bucketToFileId.containsKey(bucketNum)) {
+            location = new HoodieRecordLocation("U", 
bucketToFileId.get(bucketNum));
+        } else {
+            String newFileId = 
BucketIdentifier.newBucketFileIdPrefix(bucketNum);
+            location = new HoodieRecordLocation("I", newFileId);
+            bucketToFileId.put(bucketNum, newFileId);
+            incBucketIndex.add(bucketId);
+        }
+        record.unseal();
+        record.setCurrentLocation(location);
+        record.seal();
+        bufferRecord(record);
+        if (sinkMetricData != null) {
+            sinkMetricData.invokeWithEstimate(record);
+        }
+    }
+
+    /**
+     * Determine whether the current fileID belongs to the current task.
+     * (partition + curBucket) % numPartitions == this taskID belongs to this 
task.
+     */
+    public boolean isBucketToLoad(int bucketNumber, String partition) {
+        final int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) 
% parallelism;
+        int globalIndex = partitionIndex + bucketNumber;
+        return BucketIdentifier.mod(globalIndex, parallelism) == taskID;
+    }
+
+    /**
+     * Get partition_bucket -> fileID mapping from the existing hudi table.
+     * This is a required operation for each restart to avoid having duplicate 
file ids for one bucket.
+     */
+    private void bootstrapIndexIfNeed(String partition) {
+        if (bucketIndex.containsKey(partition)) {
+            return;
+        }
+        LOG.info(String.format("Loading Hoodie Table %s, with path %s", 
this.metaClient.getTableConfig().getTableName(),
+                this.metaClient.getBasePath() + "/" + partition));
+
+        // Load existing fileID belongs to this task
+        Map<Integer, String> bucketToFileIDMap = new HashMap<>();
+        
this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice
 -> {
+            String fileId = fileSlice.getFileId();
+            int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileId);
+            if (isBucketToLoad(bucketNumber, partition)) {
+                LOG.info(String.format("Should load this partition bucket %s 
with fileId %s", bucketNumber, fileId));
+                // Validate that one bucketId has only ONE fileId
+                if (bucketToFileIDMap.containsKey(bucketNumber)) {
+                    throw new RuntimeException(String.format("Duplicate fileId 
%s from bucket %s of partition %s found "
+                            + "during the BucketStreamWriteFunction index 
bootstrap.", fileId, bucketNumber,
+                            partition));
+                } else {
+                    LOG.info(String.format("Adding fileId %s to the bucket %s 
of partition %s.", fileId, bucketNumber,
+                            partition));
+                    bucketToFileIDMap.put(bucketNumber, fileId);
+                }
+            }
+        });
+        bucketIndex.put(partition, bucketToFileIDMap);
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteOperator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteOperator.java
new file mode 100644
index 0000000000..40b1a29423
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteOperator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.bucket;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
+/**
+ * Operator for {@link BucketStreamWriteFunction}.
+ *
+ * @param <I> The input type
+ */
+public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {
+
+    public BucketStreamWriteOperator(Configuration conf, MetricOption 
metricOption) {
+        super(new BucketStreamWriteFunction<>(conf, metricOption));
+    }
+
+    public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, 
MetricOption metricOption) {
+        return WriteOperatorFactory.instance(conf, new 
BucketStreamWriteOperator<>(conf, metricOption));
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java
new file mode 100644
index 0000000000..44c8966e30
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.bulk;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.SinkMetricData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.Collector;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.WriterHelpers;
+import org.apache.hudi.sink.common.AbstractWriteFunction;
+import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.meta.CkpMetadata;
+import org.apache.hudi.sink.utils.TimeWait;
+import org.apache.hudi.util.FlinkWriteClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Sink function to write the data to the underneath filesystem.
+ *
+ * <p>The function should only be used in operation type {@link 
WriteOperationType#BULK_INSERT}.
+ *
+ * <p>Note: The function task requires the input stream be shuffled by 
partition path.
+ *
+ * @param <I> Type of the input record
+ * @see StreamWriteOperatorCoordinator
+ *
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class BulkInsertWriteFunction<I>
+        extends
+            AbstractWriteFunction<I> {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertWriteFunction.class);
+
+    /**
+     * Helper class for bulk insert mode.
+     */
+    private transient BulkInsertWriterHelper writerHelper;
+
+    /**
+     * Config options.
+     */
+    private final Configuration config;
+
+    /**
+     * Table row type.
+     */
+    private final RowType rowType;
+
+    /**
+     * Id of current subtask.
+     */
+    private int taskID;
+
+    /**
+     * Write Client.
+     */
+    private transient HoodieFlinkWriteClient writeClient;
+
+    /**
+     * The initial inflight instant when start up.
+     */
+    private volatile String initInstant;
+
+    /**
+     * Gateway to send operator events to the operator coordinator.
+     */
+    private transient OperatorEventGateway eventGateway;
+
+    /**
+     * Checkpoint metadata.
+     */
+    private CkpMetadata ckpMetadata;
+
+    private final MetricOption metricOption;
+
+    private SinkMetricData sinkMetricData;
+    /**
+     * Constructs a StreamingSinkFunction.
+     *
+     * @param config The config options
+     */
+    public BulkInsertWriteFunction(Configuration config, RowType rowType, 
MetricOption metricOption) {
+        this.config = config;
+        this.rowType = rowType;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public void open(Configuration parameters) {
+        this.taskID = getRuntimeContext().getIndexOfThisSubtask();
+        this.writeClient = FlinkWriteClients.createWriteClient(this.config, 
getRuntimeContext());
+        this.ckpMetadata = CkpMetadata.getInstance(config);
+        this.initInstant = lastPendingInstant();
+        if (metricOption != null) {
+            this.sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+        }
+        sendBootstrapEvent();
+        initWriterHelper();
+    }
+
+    @Override
+    public void processElement(I value, Context ctx, Collector<Object> out) 
throws IOException {
+        this.writerHelper.write((RowData) value);
+        if (sinkMetricData != null) {
+            sinkMetricData.invokeWithEstimate(value);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (this.writeClient != null) {
+            this.writeClient.close();
+        }
+    }
+
+    /**
+     * End input action for batch source.
+     */
+    public void endInput() {
+        final List<WriteStatus> writeStatus = 
this.writerHelper.getWriteStatuses(this.taskID);
+
+        final WriteMetadataEvent event = WriteMetadataEvent.builder()
+                .taskID(taskID)
+                .instantTime(this.writerHelper.getInstantTime())
+                .writeStatus(writeStatus)
+                .lastBatch(true)
+                .endInput(true)
+                .build();
+        this.eventGateway.sendEventToCoordinator(event);
+    }
+
+    @Override
+    public void handleOperatorEvent(OperatorEvent event) {
+        // no operation
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Getter/Setter
+    // 
-------------------------------------------------------------------------
+
+    public void setOperatorEventGateway(OperatorEventGateway 
operatorEventGateway) {
+        this.eventGateway = operatorEventGateway;
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Utilities
+    // 
-------------------------------------------------------------------------
+
+    private void initWriterHelper() {
+        String instant = instantToWrite();
+        this.writerHelper = WriterHelpers.getWriterHelper(this.config, 
this.writeClient.getHoodieTable(),
+                this.writeClient.getConfig(),
+                instant, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(),
+                getRuntimeContext().getAttemptNumber(),
+                this.rowType);
+    }
+
+    private void sendBootstrapEvent() {
+        WriteMetadataEvent event = WriteMetadataEvent.builder()
+                .taskID(taskID)
+                .writeStatus(Collections.emptyList())
+                .instantTime("")
+                .bootstrap(true)
+                .build();
+        this.eventGateway.sendEventToCoordinator(event);
+        LOG.info("Send bootstrap write metadata event to coordinator, 
task[{}].", taskID);
+    }
+
+    /**
+     * Returns the last pending instant time.
+     */
+    protected String lastPendingInstant() {
+        return this.ckpMetadata.lastPendingInstant();
+    }
+
+    private String instantToWrite() {
+        String instant = lastPendingInstant();
+        // if exactly-once semantics turns on,
+        // waits for the checkpoint notification until the checkpoint timeout 
threshold hits.
+        TimeWait timeWait = TimeWait.builder()
+                .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT))
+                .action("instant initialize")
+                .build();
+        while (instant == null || instant.equals(this.initInstant)) {
+            // wait condition:
+            // 1. there is no inflight instant
+            // 2. the inflight instant does not change
+            // sleep for a while
+            timeWait.waitFor();
+            // refresh the inflight instant
+            instant = lastPendingInstant();
+        }
+        return instant;
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java
new file mode 100644
index 0000000000..55d2290666
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.bulk;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.sink.common.AbstractWriteOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+
+/**
+ * Operator for bulk insert mode sink.
+ *
+ * @param <I> The input type
+ *
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class BulkInsertWriteOperator<I>
+        extends
+            AbstractWriteOperator<I>
+        implements
+            BoundedOneInput {
+
+    public BulkInsertWriteOperator(Configuration conf, RowType rowType, 
MetricOption metricOption) {
+        super(new BulkInsertWriteFunction<>(conf, rowType, metricOption));
+    }
+
+    @Override
+    public void handleOperatorEvent(OperatorEvent event) {
+        // no operation
+    }
+
+    public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, 
RowType rowType,
+            MetricOption metricOption) {
+        return WriteOperatorFactory.instance(conf, new 
BulkInsertWriteOperator<>(conf, rowType, metricOption));
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java
new file mode 100644
index 0000000000..ea92e9361e
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.sink.utils;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.hudi.sink.StreamWriteOperator;
+import org.apache.inlong.sort.hudi.sink.append.AppendWriteOperator;
+import org.apache.inlong.sort.hudi.sink.bucket.BucketStreamWriteOperator;
+import org.apache.inlong.sort.hudi.sink.bulk.BulkInsertWriteOperator;
+
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.CleanFunction;
+import org.apache.hudi.sink.bootstrap.BootstrapOperator;
+import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
+import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
+import org.apache.hudi.sink.bulk.RowDataKeyGen;
+import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
+import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
+import org.apache.hudi.sink.clustering.ClusteringCommitSink;
+import org.apache.hudi.sink.clustering.ClusteringOperator;
+import org.apache.hudi.sink.clustering.ClusteringPlanEvent;
+import org.apache.hudi.sink.clustering.ClusteringPlanOperator;
+import org.apache.hudi.sink.common.WriteOperatorFactory;
+import org.apache.hudi.sink.compact.CompactOperator;
+import org.apache.hudi.sink.compact.CompactionCommitEvent;
+import org.apache.hudi.sink.compact.CompactionCommitSink;
+import org.apache.hudi.sink.compact.CompactionPlanEvent;
+import org.apache.hudi.sink.compact.CompactionPlanOperator;
+import org.apache.hudi.sink.partitioner.BucketAssignFunction;
+import org.apache.hudi.sink.partitioner.BucketIndexPartitioner;
+import org.apache.hudi.sink.transform.RowDataToHoodieFunctions;
+import org.apache.hudi.table.format.FilePathUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities to generate all kinds of sub-pipelines.
+ * <p>
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class Pipelines {
+
+    /**
+     * Bulk insert the input dataset at once.
+     *
+     * <p>By default, the input dataset would shuffle by the partition path 
first then
+     * sort by the partition path before passing around to the write function.
+     * The whole pipeline looks like the following:
+     *
+     * <pre>
+     *      | input1 | ===\     /=== |sorter| === | task1 | (p1, p2)
+     *                   shuffle
+     *      | input2 | ===/     \=== |sorter| === | task2 | (p3, p4)
+     *
+     *      Note: Both input1 and input2's dataset come from partitions: p1, 
p2, p3, p4
+     * </pre>
+     *
+     * <p>The write task switches to new file handle each time it receives a 
record
+     * from the different partition path, the shuffle and sort would reduce 
small files.
+     *
+     * <p>The bulk insert should be run in batch execution mode.
+     *
+     * @param conf       The configuration
+     * @param rowType    The input row type
+     * @param dataStream The input data stream
+     * @return the bulk insert data stream sink
+     */
+    public static DataStreamSink<Object> bulkInsert(Configuration conf, 
RowType rowType, DataStream<RowData> dataStream,
+            MetricOption metricOption) {
+        WriteOperatorFactory<RowData> operatorFactory = 
BulkInsertWriteOperator.getFactory(conf, rowType, metricOption);
+        if (OptionsResolver.isBucketIndexType(conf)) {
+            String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
+            int numBuckets = 
conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+
+            BucketIndexPartitioner<HoodieKey> partitioner = new 
BucketIndexPartitioner<>(numBuckets, indexKeys);
+            RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
+            RowType rowTypeWithFileId = 
BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
+            InternalTypeInfo<RowData> typeInfo = 
InternalTypeInfo.of(rowTypeWithFileId);
+
+            Map<String, String> bucketIdToFileId = new HashMap<>();
+            dataStream = dataStream.partitionCustom(partitioner, 
keyGen::getHoodieKey)
+                    .map(record -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record,
+                            indexKeys, numBuckets), typeInfo)
+                    
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism 
as write task to
+                                                                               
 // avoid shuffle
+            if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
+                SortOperatorGen sortOperatorGen = 
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
+                dataStream = dataStream.transform("file_sorter", typeInfo, 
sortOperatorGen.createSortOperator())
+                        
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism 
as write task to
+                                                                               
     // avoid shuffle
+                
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+                        conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 
1024L * 1024L);
+            }
+            return dataStream
+                    .transform(opName("bucket_bulk_insert", conf), 
TypeInformation.of(Object.class), operatorFactory)
+                    .uid(opUID("bucket_bulk_insert", conf))
+                    .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+                    .addSink(DummySink.INSTANCE)
+                    .name("dummy");
+        }
+
+        final String[] partitionFields = 
FilePathUtils.extractPartitionKeys(conf);
+        if (partitionFields.length > 0) {
+            RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, 
rowType);
+            if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT)) 
{
+
+                // shuffle by partition keys
+                // use #partitionCustom instead of #keyBy to avoid duplicate 
sort operations,
+                // see BatchExecutionUtils#applyBatchExecutionSettings for 
details.
+                Partitioner<String> partitioner =
+                        (key, channels) -> 
KeyGroupRangeAssignment.assignKeyToParallelOperator(key,
+                                
KeyGroupRangeAssignment.computeDefaultMaxParallelism(
+                                        
conf.getInteger(FlinkOptions.WRITE_TASKS)),
+                                channels);
+                dataStream = dataStream.partitionCustom(partitioner, 
rowDataKeyGen::getPartitionPath);
+            }
+            if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
+                SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, 
partitionFields);
+                // sort by partition keys
+                dataStream = dataStream
+                        .transform("partition_key_sorter",
+                                InternalTypeInfo.of(rowType),
+                                sortOperatorGen.createSortOperator())
+                        
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+                
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
+                        conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 
1024L * 1024L);
+            }
+        }
+        return dataStream
+                .transform(opName("hoodie_bulk_insert_write", conf),
+                        TypeInformation.of(Object.class),
+                        operatorFactory)
+                // follow the parallelism of upstream operators to avoid 
shuffle
+                .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
+                .addSink(DummySink.INSTANCE)
+                .name("dummy");
+    }
+
+    /**
+     * Insert the dataset with append mode(no upsert or deduplication).
+     *
+     * <p>The input dataset would be rebalanced among the write tasks:
+     *
+     * <pre>
+     *      | input1 | ===\     /=== | task1 | (p1, p2, p3, p4)
+     *                   shuffle
+     *      | input2 | ===/     \=== | task2 | (p1, p2, p3, p4)
+     *
+     *      Note: Both input1 and input2's dataset come from partitions: p1, 
p2, p3, p4
+     * </pre>
+     *
+     * <p>The write task switches to new file handle each time it receives a 
record
+     * from the different partition path, so there may be many small files.
+     *
+     * @param conf       The configuration
+     * @param rowType    The input row type
+     * @param dataStream The input data stream
+     * @param bounded    Whether the input stream is bounded
+     * @return the appending data stream sink
+     */
+    public static DataStream<Object> append(
+            Configuration conf,
+            RowType rowType,
+            DataStream<RowData> dataStream,
+            boolean bounded,
+            MetricOption metricOption) {
+        if (!bounded) {
+            // In principle, the config should be immutable, but the 
boundedness
+            // is only visible when creating the sink pipeline.
+            conf.setBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, false);
+        }
+        WriteOperatorFactory<RowData> operatorFactory = 
AppendWriteOperator.getFactory(conf, rowType, metricOption);
+
+        return dataStream
+                .transform(opName("hoodie_append_write", conf), 
TypeInformation.of(Object.class), operatorFactory)
+                .uid(opUID("hoodie_stream_write", conf))
+                .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+    }
+
+    /**
+     * Constructs bootstrap pipeline as streaming.
+     * The bootstrap operator loads the existing data index (primary key to 
file id mapping),
+     * then sends the indexing data set to subsequent operator(usually the 
bucket assign operator).
+     */
+    public static DataStream<HoodieRecord> bootstrap(
+            Configuration conf,
+            RowType rowType,
+            DataStream<RowData> dataStream) {
+        return bootstrap(conf, rowType, dataStream, false, false);
+    }
+
+    /**
+     * Constructs bootstrap pipeline.
+     * The bootstrap operator loads the existing data index (primary key to 
file id mapping),
+     * then send the indexing data set to subsequent operator(usually the 
bucket assign operator).
+     *
+     * @param conf       The configuration
+     * @param rowType    The row type
+     * @param dataStream The data stream
+     * @param bounded    Whether the source is bounded
+     * @param overwrite  Whether it is insert overwrite
+     */
+    public static DataStream<HoodieRecord> bootstrap(
+            Configuration conf,
+            RowType rowType,
+            DataStream<RowData> dataStream,
+            boolean bounded,
+            boolean overwrite) {
+        final boolean globalIndex = 
conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED);
+        if (overwrite || OptionsResolver.isBucketIndexType(conf)) {
+            return rowDataToHoodieRecord(conf, rowType, dataStream);
+        } else if (bounded && !globalIndex && 
OptionsResolver.isPartitionedTable(conf)) {
+            return boundedBootstrap(conf, rowType, dataStream);
+        } else {
+            return streamBootstrap(conf, rowType, dataStream, bounded);
+        }
+    }
+
+    private static DataStream<HoodieRecord> streamBootstrap(
+            Configuration conf,
+            RowType rowType,
+            DataStream<RowData> dataStream,
+            boolean bounded) {
+        DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, 
rowType, dataStream);
+
+        if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) {
+            dataStream1 = dataStream1
+                    .transform(
+                            "index_bootstrap",
+                            TypeInformation.of(HoodieRecord.class),
+                            new BootstrapOperator<>(conf))
+                    .setParallelism(
+                            
conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism()))
+                    .uid(opUID("index_bootstrap", conf));
+        }
+
+        return dataStream1;
+    }
+
+    /**
+     * Constructs bootstrap pipeline for batch execution mode.
+     * The indexing data set is loaded before the actual data write
+     * in order to support batch UPSERT.
+     */
+    private static DataStream<HoodieRecord> boundedBootstrap(
+            Configuration conf,
+            RowType rowType,
+            DataStream<RowData> dataStream) {
+        final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, 
rowType);
+        // shuffle by partition keys
+        dataStream = dataStream
+                .keyBy(rowDataKeyGen::getPartitionPath);
+
+        return rowDataToHoodieRecord(conf, rowType, dataStream)
+                .transform(
+                        "batch_index_bootstrap",
+                        TypeInformation.of(HoodieRecord.class),
+                        new BatchBootstrapOperator<>(conf))
+                .setParallelism(
+                        
conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism()))
+                .uid(opUID("batch_index_bootstrap", conf));
+    }
+
+    /**
+     * Transforms the row data to hoodie records.
+     */
+    public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration 
conf, RowType rowType,
+            DataStream<RowData> dataStream) {
+        return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), 
TypeInformation.of(HoodieRecord.class))
+                
.setParallelism(dataStream.getParallelism()).name("row_data_to_hoodie_record");
+    }
+
+    /**
+     * The streaming write pipeline.
+     *
+     * <p>The input dataset shuffles by the primary key first then
+     * shuffles by the file group ID before passing around to the write 
function.
+     * The whole pipeline looks like the following:
+     *
+     * <pre>
+     *      | input1 | ===\     /=== | bucket assigner | ===\     /=== | task1 
|
+     *                   shuffle(by PK)                    shuffle(by bucket 
ID)
+     *      | input2 | ===/     \=== | bucket assigner | ===/     \=== | task2 
|
+     *
+     *      Note: a file group must be handled by one write task to avoid 
write conflict.
+     * </pre>
+     *
+     * <p>The bucket assigner assigns the inputs to suitable file groups, the 
write task caches
+     * and flushes the data set to disk.
+     *
+     * @param conf       The configuration
+     * @param dataStream The input data stream
+     * @return the stream write data stream pipeline
+     */
+    public static DataStream<Object> hoodieStreamWrite(Configuration conf, 
DataStream<HoodieRecord> dataStream,
+            MetricOption metricOption) {
+        if (OptionsResolver.isBucketIndexType(conf)) {
+            WriteOperatorFactory<HoodieRecord> operatorFactory =
+                    BucketStreamWriteOperator.getFactory(conf, metricOption);
+            int bucketNum = 
conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
+            String indexKeyFields = 
conf.getString(FlinkOptions.INDEX_KEY_FIELD);
+            BucketIndexPartitioner<HoodieKey> partitioner = new 
BucketIndexPartitioner<>(bucketNum, indexKeyFields);
+            return dataStream.partitionCustom(partitioner, 
HoodieRecord::getKey)
+                    .transform(opName("bucket_write", conf), 
TypeInformation.of(Object.class), operatorFactory)
+                    .uid(opUID("bucket_write", conf))
+                    .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+        } else {
+            WriteOperatorFactory<HoodieRecord> operatorFactory = 
StreamWriteOperator.getFactory(conf, metricOption);
+            return dataStream
+                    // Key-by record key, to avoid multiple subtasks write to 
a bucket at the same time
+                    .keyBy(HoodieRecord::getRecordKey)
+                    .transform(
+                            "bucket_assigner",
+                            TypeInformation.of(HoodieRecord.class),
+                            new KeyedProcessOperator<>(new 
BucketAssignFunction<>(conf)))
+                    .uid(opUID("bucket_assigner", conf))
+                    
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
+                    // shuffle by fileId(bucket id)
+                    .keyBy(record -> record.getCurrentLocation().getFileId())
+                    .transform(opName("stream_write", conf), 
TypeInformation.of(Object.class), operatorFactory)
+                    .uid(opUID("stream_write", conf))
+                    .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
+        }
+    }
+
+    /**
+     * The compaction tasks pipeline.
+     *
+     * <p>The compaction plan operator monitors the new compaction plan on the 
timeline
+     * then distributes the sub-plans to the compaction tasks. The compaction 
task then
+     * handle over the metadata to commit task for compaction transaction 
commit.
+     * The whole pipeline looks like the following:
+     *
+     * <pre>
+     *                                     /=== | task1 | ===\
+     *      | plan generation | ===> hash                      | commit |
+     *                                     \=== | task2 | ===/
+     *
+     *      Note: both the compaction plan generation task and commission task 
are singleton.
+     * </pre>
+     *
+     * @param conf       The configuration
+     * @param dataStream The input data stream
+     * @return the compaction pipeline
+     */
+    public static DataStreamSink<CompactionCommitEvent> compact(Configuration 
conf, DataStream<Object> dataStream) {
+        return dataStream.transform("compact_plan_generate",
+                TypeInformation.of(CompactionPlanEvent.class),
+                new CompactionPlanOperator(conf))
+                .setParallelism(1) // plan generate must be singleton
+                // make the distribution strategy deterministic to avoid 
concurrent modifications
+                // on the same bucket files
+                .keyBy(plan -> 
plan.getOperation().getFileGroupId().getFileId())
+                .transform("compact_task",
+                        TypeInformation.of(CompactionCommitEvent.class),
+                        new CompactOperator(conf))
+                .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
+                .addSink(new CompactionCommitSink(conf))
+                .name("compact_commit")
+                .setParallelism(1); // compaction commit should be singleton
+    }
+
+    /**
+     * The clustering tasks pipeline.
+     *
+     * <p>The clustering plan operator monitors the new clustering plan on the 
timeline
+     * then distributes the sub-plans to the clustering tasks. The clustering 
task then
+     * handle over the metadata to commit task for clustering transaction 
commit.
+     * The whole pipeline looks like the following:
+     *
+     * <pre>
+     *                                     /=== | task1 | ===\
+     *      | plan generation | ===> hash                      | commit |
+     *                                     \=== | task2 | ===/
+     *
+     *      Note: both the clustering plan generation task and commission task 
are singleton.
+     * </pre>
+     *
+     * @param conf       The configuration
+     * @param rowType    The input row type
+     * @param dataStream The input data stream
+     * @return the clustering pipeline
+     */
+    public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration 
conf, RowType rowType,
+            DataStream<Object> dataStream) {
+        DataStream<ClusteringCommitEvent> clusteringStream = 
dataStream.transform("cluster_plan_generate",
+                TypeInformation.of(ClusteringPlanEvent.class),
+                new ClusteringPlanOperator(conf))
+                .setParallelism(1) // plan generate must be singleton
+                .keyBy(plan ->
+                // make the distribution strategy deterministic to avoid 
concurrent modifications
+                // on the same bucket files
+                plan.getClusteringGroupInfo().getOperations()
+                        
.stream().map(ClusteringOperation::getFileId).collect(Collectors.joining()))
+                .transform("clustering_task",
+                        TypeInformation.of(ClusteringCommitEvent.class),
+                        new ClusteringOperator(conf, rowType))
+                
.setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));
+        if (OptionsResolver.sortClusteringEnabled(conf)) {
+            
ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
+                    conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 
1024L);
+        }
+        return clusteringStream.addSink(new ClusteringCommitSink(conf))
+                .name("clustering_commit")
+                .setParallelism(1); // compaction commit should be singleton
+    }
+
+    public static DataStreamSink<Object> clean(Configuration conf, 
DataStream<Object> dataStream) {
+        return dataStream.addSink(new CleanFunction<>(conf))
+                .setParallelism(1)
+                .name("clean_commits");
+    }
+
+    public static DataStreamSink<Object> dummySink(DataStream<Object> 
dataStream) {
+        return dataStream.addSink(DummySink.INSTANCE)
+                .setParallelism(1)
+                .name("dummy");
+    }
+
+    public static String opName(String operatorN, Configuration conf) {
+        return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME);
+    }
+
+    public static String opUID(String operatorN, Configuration conf) {
+        return "uid_" + operatorN + "_" + 
conf.getString(FlinkOptions.TABLE_NAME);
+    }
+
+    /**
+     * Dummy sink that does nothing.
+     */
+    public static class DummySink implements SinkFunction<Object> {
+
+        private static final long serialVersionUID = 1L;
+        public static DummySink INSTANCE = new DummySink();
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java
new file mode 100644
index 0000000000..ac153ef729
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.constraints.UniqueConstraint;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.factories.DynamicTableSinkFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
+import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.DataTypeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
+
+/**
+ * Hoodie data source/sink factory.
+ * <p>
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class HoodieTableFactory implements DynamicTableSinkFactory {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HoodieTableFactory.class);
+
+    public static final String FACTORY_ID = "hudi-inlong";
+
+    @Override
+    public DynamicTableSink createDynamicTableSink(Context context) {
+        Configuration conf = 
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
+        
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
+                "Option [path] should not be empty.");
+        ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
+        sanityCheck(conf, schema);
+        setupConfOptions(conf, context.getObjectIdentifier(), 
context.getCatalogTable(), schema);
+
+        String inlongMetric = conf.getOptional(INLONG_METRIC).orElse(null);
+        String auditHostAndPorts = conf.get(INLONG_AUDIT);
+        String auditKeys = conf.get(AUDIT_KEYS);
+
+        MetricOption metricOption = MetricOption.builder()
+                .withInlongLabels(inlongMetric)
+                .withAuditAddress(auditHostAndPorts)
+                .withAuditKeys(auditKeys)
+                .build();
+        return new HoodieTableSink(conf, schema, metricOption);
+    }
+
+    @Override
+    public String factoryIdentifier() {
+        return FACTORY_ID;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        return Collections.singleton(FlinkOptions.PATH);
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        Set<ConfigOption<?>> configOptions = FlinkOptions.optionalOptions();
+        configOptions.add(AUDIT_KEYS);
+        configOptions.add(INLONG_METRIC);
+        configOptions.add(INLONG_AUDIT);
+        return configOptions;
+    }
+
+    // 
-------------------------------------------------------------------------
+    // Utilities
+    // 
-------------------------------------------------------------------------
+
+    /**
+     * The sanity check.
+     *
+     * @param conf   The table options
+     * @param schema The table schema
+     */
+    private void sanityCheck(Configuration conf, ResolvedSchema schema) {
+        List<String> fields = schema.getColumnNames();
+
+        // validate record key in pk absence.
+        if (!schema.getPrimaryKey().isPresent()) {
+            String[] recordKeys = 
conf.get(FlinkOptions.RECORD_KEY_FIELD).split(",");
+            if (recordKeys.length == 1
+                    && 
FlinkOptions.RECORD_KEY_FIELD.defaultValue().equals(recordKeys[0])
+                    && !fields.contains(recordKeys[0])) {
+                throw new HoodieValidationException("Primary key definition is 
required, use either PRIMARY KEY syntax "
+                        + "or option '" + FlinkOptions.RECORD_KEY_FIELD.key() 
+ "' to specify.");
+            }
+
+            Arrays.stream(recordKeys)
+                    .filter(field -> !fields.contains(field))
+                    .findAny()
+                    .ifPresent(f -> {
+                        throw new HoodieValidationException("Field '" + f + "' 
specified in option "
+                                + "'" + FlinkOptions.RECORD_KEY_FIELD.key() + 
"' does not exist in the table schema.");
+                    });
+        }
+
+        // validate pre_combine key
+        String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD);
+        if (!fields.contains(preCombineField)) {
+            if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) {
+                throw new HoodieValidationException("Option '" + 
FlinkOptions.PRECOMBINE_FIELD.key()
+                        + "' is required for payload class: " + 
DefaultHoodieRecordPayload.class.getName());
+            }
+            if 
(preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) {
+                conf.setString(FlinkOptions.PRECOMBINE_FIELD, 
FlinkOptions.NO_PRE_COMBINE);
+            } else if (!preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)) {
+                throw new HoodieValidationException("Field " + preCombineField 
+ " does not exist in the table schema."
+                        + "Please check '" + 
FlinkOptions.PRECOMBINE_FIELD.key() + "' option.");
+            }
+        }
+    }
+
+    /**
+     * Sets up the config options based on the table definition, for e.g, the 
table name, primary key.
+     *
+     * @param conf      The configuration to set up
+     * @param tablePath The table path
+     * @param table     The catalog table
+     * @param schema    The physical schema
+     */
+    private static void setupConfOptions(
+            Configuration conf,
+            ObjectIdentifier tablePath,
+            CatalogTable table,
+            ResolvedSchema schema) {
+        // table name
+        conf.setString(FlinkOptions.TABLE_NAME.key(), 
tablePath.getObjectName());
+        // hoodie key about options
+        setupHoodieKeyOptions(conf, table);
+        // compaction options
+        setupCompactionOptions(conf);
+        // hive options
+        setupHiveOptions(conf, tablePath);
+        // read options
+        setupReadOptions(conf);
+        // write options
+        setupWriteOptions(conf);
+        // infer avro schema from physical DDL schema
+        inferAvroSchema(conf, 
schema.toPhysicalRowDataType().notNull().getLogicalType());
+    }
+
+    /**
+     * Sets up the hoodie key options (e.g. record key and partition key) from 
the table definition.
+     */
+    private static void setupHoodieKeyOptions(Configuration conf, CatalogTable 
table) {
+        List<String> pkColumns = table.getSchema().getPrimaryKey()
+                
.map(UniqueConstraint::getColumns).orElse(Collections.emptyList());
+        if (pkColumns.size() > 0) {
+            // the PRIMARY KEY syntax always has higher priority than option 
FlinkOptions#RECORD_KEY_FIELD
+            String recordKey = String.join(",", pkColumns);
+            conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey);
+        }
+        List<String> partitionKeys = table.getPartitionKeys();
+        if (partitionKeys.size() > 0) {
+            // the PARTITIONED BY syntax always has higher priority than 
option FlinkOptions#PARTITION_PATH_FIELD
+            conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", 
partitionKeys));
+        }
+        // set index key for bucket index if not defined
+        if 
(conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name()))
 {
+            if (conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) {
+                conf.setString(FlinkOptions.INDEX_KEY_FIELD, 
conf.getString(FlinkOptions.RECORD_KEY_FIELD));
+            } else {
+                Set<String> recordKeySet =
+                        
Arrays.stream(conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","))
+                                .collect(Collectors.toSet());
+                Set<String> indexKeySet =
+                        
Arrays.stream(conf.getString(FlinkOptions.INDEX_KEY_FIELD).split(","))
+                                .collect(Collectors.toSet());
+                if (!recordKeySet.containsAll(indexKeySet)) {
+                    throw new HoodieValidationException(
+                            FlinkOptions.INDEX_KEY_FIELD + " should be a 
subset of or equal to the recordKey fields");
+                }
+            }
+        }
+
+        // tweak the key gen class if possible
+        final String[] partitions = 
conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(",");
+        final String[] pks = 
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+        if (partitions.length == 1) {
+            final String partitionField = partitions[0];
+            if (partitionField.isEmpty()) {
+                conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, 
NonpartitionedAvroKeyGenerator.class.getName());
+                LOG.info("Table option [{}] is reset to {} because this is a 
non-partitioned table",
+                        FlinkOptions.KEYGEN_CLASS_NAME.key(), 
NonpartitionedAvroKeyGenerator.class.getName());
+                return;
+            }
+            DataType partitionFieldType = 
table.getSchema().getFieldDataType(partitionField)
+                    .orElseThrow(() -> new HoodieValidationException("Field " 
+ partitionField + " does not exist"));
+            if (pks.length <= 1 && 
DataTypeUtils.isDatetimeType(partitionFieldType)) {
+                // timestamp based key gen only supports simple primary key
+                setupTimestampKeygenOptions(conf, partitionFieldType);
+                return;
+            }
+        }
+        boolean complexHoodieKey = pks.length > 1 || partitions.length > 1;
+        if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, 
FlinkOptions.KEYGEN_CLASS_NAME)) {
+            conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, 
ComplexAvroKeyGenerator.class.getName());
+            LOG.info("Table option [{}] is reset to {} because record key or 
partition path has two or more fields",
+                    FlinkOptions.KEYGEN_CLASS_NAME.key(), 
ComplexAvroKeyGenerator.class.getName());
+        }
+    }
+
+    /**
+     * Sets up the keygen options when the partition path is datetime type.
+     *
+     * <p>The UTC timezone is used as default.
+     */
+    public static void setupTimestampKeygenOptions(Configuration conf, 
DataType fieldType) {
+        if (conf.contains(FlinkOptions.KEYGEN_CLASS_NAME)) {
+            // the keygen clazz has been set up explicitly, skipping
+            return;
+        }
+
+        conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, 
TimestampBasedAvroKeyGenerator.class.getName());
+        LOG.info("Table option [{}] is reset to {} because datetime 
partitioning turns on",
+                FlinkOptions.KEYGEN_CLASS_NAME.key(), 
TimestampBasedAvroKeyGenerator.class.getName());
+        if (DataTypeUtils.isTimestampType(fieldType)) {
+            int precision = 
DataTypeUtils.precision(fieldType.getLogicalType());
+            if (precision == 0) {
+                // seconds
+                
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
+                        
TimestampBasedAvroKeyGenerator.TimestampType.UNIX_TIMESTAMP.name());
+            } else if (precision == 3) {
+                // milliseconds
+                
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
+                        
TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name());
+            }
+            String outputPartitionFormat =
+                    
conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR);
+            
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, 
outputPartitionFormat);
+        } else {
+            
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP,
+                    
TimestampBasedAvroKeyGenerator.TimestampType.SCALAR.name());
+            conf.setString(KeyGeneratorOptions.Config.INPUT_TIME_UNIT, 
TimeUnit.DAYS.toString());
+
+            String outputPartitionFormat =
+                    
conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY);
+            
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, 
outputPartitionFormat);
+            // the option is actually useless, it only works for validation
+            
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP,
+                    FlinkOptions.PARTITION_FORMAT_DAY);
+        }
+        
conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP,
 "UTC");
+    }
+
+    /**
+     * Sets up the compaction options from the table definition.
+     */
+    private static void setupCompactionOptions(Configuration conf) {
+        int commitsToRetain = 
conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS);
+        int minCommitsToKeep = 
conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS);
+        if (commitsToRetain >= minCommitsToKeep) {
+            LOG.info("Table option [{}] is reset to {} to be greater than 
{}={},\n"
+                    + "to avoid risk of missing data from few instants in 
incremental pull",
+                    FlinkOptions.ARCHIVE_MIN_COMMITS.key(), commitsToRetain + 
10,
+                    FlinkOptions.CLEAN_RETAIN_COMMITS.key(), commitsToRetain);
+            conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain 
+ 10);
+            conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain 
+ 20);
+        }
+    }
+
+    /**
+     * Sets up the hive options from the table definition.
+     */
+    private static void setupHiveOptions(Configuration conf, ObjectIdentifier 
tablePath) {
+        if (!conf.contains(FlinkOptions.HIVE_SYNC_DB)) {
+            conf.setString(FlinkOptions.HIVE_SYNC_DB, 
tablePath.getDatabaseName());
+        }
+        if (!conf.contains(FlinkOptions.HIVE_SYNC_TABLE)) {
+            conf.setString(FlinkOptions.HIVE_SYNC_TABLE, 
tablePath.getObjectName());
+        }
+    }
+
+    /**
+     * Sets up the read options from the table definition.
+     */
+    private static void setupReadOptions(Configuration conf) {
+        if (!conf.getBoolean(FlinkOptions.READ_AS_STREAMING)
+                && 
(conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent()
+                        || 
conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent())) {
+            conf.setString(FlinkOptions.QUERY_TYPE, 
FlinkOptions.QUERY_TYPE_INCREMENTAL);
+        }
+    }
+
+    /**
+     * Sets up the write options from the table definition.
+     */
+    private static void setupWriteOptions(Configuration conf) {
+        if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION)
+                && OptionsResolver.isCowTable(conf)) {
+            conf.setBoolean(FlinkOptions.PRE_COMBINE, true);
+        }
+    }
+
+    /**
+     * Inferences the deserialization Avro schema from the table schema (e.g. 
the DDL)
+     * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and
+     * {@link FlinkOptions#SOURCE_AVRO_SCHEMA} are not specified.
+     *
+     * @param conf    The configuration
+     * @param rowType The specified table row type
+     */
+    private static void inferAvroSchema(Configuration conf, LogicalType 
rowType) {
+        if (!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()
+                && 
!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
+            String inferredSchema = 
AvroSchemaConverter.convertToSchema(rowType).toString();
+            conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema);
+        }
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java
new file mode 100644
index 0000000000..173befd988
--- /dev/null
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.hudi.table;
+
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.hudi.sink.utils.Pipelines;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
+import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hudi.adapter.DataStreamSinkProviderAdapter;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsInference;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.util.ChangelogModes;
+
+import java.util.Map;
+
+/**
+ * Hoodie table sink.
+ * <p>
+ * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3
+ */
+public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning, SupportsOverwrite {
+
+    private final Configuration conf;
+    private final ResolvedSchema schema;
+    private boolean overwrite = false;
+    private final MetricOption metricOption;
+
+    public HoodieTableSink(Configuration conf, ResolvedSchema schema, 
MetricOption metricOption) {
+        this.conf = conf;
+        this.schema = schema;
+        this.metricOption = metricOption;
+
+    }
+
+    public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean 
overwrite, MetricOption metricOption) {
+        this.conf = conf;
+        this.schema = schema;
+        this.overwrite = overwrite;
+        this.metricOption = metricOption;
+    }
+
+    @Override
+    public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+        return (DataStreamSinkProviderAdapter) dataStream -> {
+
+            // setup configuration
+            long ckpTimeout = dataStream.getExecutionEnvironment()
+                    .getCheckpointConfig().getCheckpointTimeout();
+            conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
+            // set up default parallelism
+            OptionsInference.setupSinkTasks(conf, 
dataStream.getExecutionConfig().getParallelism());
+
+            RowType rowType = (RowType) 
schema.toSinkRowDataType().notNull().getLogicalType();
+
+            // bulk_insert mode
+            final String writeOperation = 
this.conf.get(FlinkOptions.OPERATION);
+            if (WriteOperationType.fromValue(writeOperation) == 
WriteOperationType.BULK_INSERT) {
+                return Pipelines.bulkInsert(conf, rowType, dataStream, 
metricOption);
+            }
+
+            // Append mode
+            if (OptionsResolver.isAppendMode(conf)) {
+                DataStream<Object> pipeline =
+                        Pipelines.append(conf, rowType, dataStream, 
context.isBounded(), metricOption);
+                if (OptionsResolver.needsAsyncClustering(conf)) {
+                    return Pipelines.cluster(conf, rowType, pipeline);
+                } else {
+                    return Pipelines.dummySink(pipeline);
+                }
+            }
+
+            DataStream<Object> pipeline;
+            // bootstrap
+            final DataStream<HoodieRecord> hoodieRecordDataStream =
+                    Pipelines.bootstrap(conf, rowType, dataStream, 
context.isBounded(), overwrite);
+            // write pipeline
+            pipeline = Pipelines.hoodieStreamWrite(conf, 
hoodieRecordDataStream, metricOption);
+            // compaction
+            if (OptionsResolver.needsAsyncCompaction(conf)) {
+                // use synchronous compaction for bounded source.
+                if (context.isBounded()) {
+                    conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, 
false);
+                }
+                return Pipelines.compact(conf, pipeline);
+            } else {
+                return Pipelines.clean(conf, pipeline);
+            }
+        };
+    }
+
+    @Override
+    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
+        if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) {
+            return ChangelogModes.FULL;
+        } else {
+            return ChangelogModes.UPSERT;
+        }
+    }
+
+    @Override
+    public DynamicTableSink copy() {
+        return new HoodieTableSink(this.conf, this.schema, this.overwrite, 
this.metricOption);
+    }
+
+    @Override
+    public String asSummaryString() {
+        return "HoodieTableSink";
+    }
+
+    @Override
+    public void applyStaticPartition(Map<String, String> partitions) {
+        // #applyOverwrite should have been invoked.
+        if (this.overwrite && partitions.size() > 0) {
+            this.conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.INSERT_OVERWRITE.value());
+        }
+    }
+
+    @Override
+    public void applyOverwrite(boolean overwrite) {
+        this.overwrite = overwrite;
+        // set up the operation as INSERT_OVERWRITE_TABLE first,
+        // if there are explicit partitions, #applyStaticPartition would 
overwrite the option.
+        this.conf.setString(FlinkOptions.OPERATION, 
WriteOperationType.INSERT_OVERWRITE_TABLE.value());
+    }
+}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java
deleted file mode 100644
index d2985ac5d4..0000000000
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.hudi.table.sink;
-
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.table.connector.sink.DynamicTableSink;
-import org.apache.hudi.table.HoodieTableFactory;
-
-import java.util.Set;
-
-import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
-import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
-
-public class HudiTableInlongFactory extends HoodieTableFactory {
-
-    public static final String SORT_CONNECTOR_IDENTIFY_HUDI = "hudi-inlong";
-
-    public HudiTableInlongFactory() {
-        super();
-    }
-
-    @Override
-    public String factoryIdentifier() {
-        return SORT_CONNECTOR_IDENTIFY_HUDI;
-    }
-
-    @Override
-    public DynamicTableSink createDynamicTableSink(Context context) {
-        return super.createDynamicTableSink(context);
-    }
-
-    @Override
-    public Set<ConfigOption<?>> optionalOptions() {
-        Set<ConfigOption<?>> configOptions = super.optionalOptions();
-        configOptions.add(INLONG_METRIC);
-        configOptions.add(INLONG_AUDIT);
-        return configOptions;
-    }
-
-}
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 709a02e60c..d8a0e68a77 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.inlong.sort.hudi.table.sink.HudiTableInlongFactory
+org.apache.inlong.sort.hudi.table.HoodieTableFactory
diff --git a/licenses/inlong-sort-connectors/LICENSE 
b/licenses/inlong-sort-connectors/LICENSE
index e3a995cc65..7be72aace2 100644
--- a/licenses/inlong-sort-connectors/LICENSE
+++ b/licenses/inlong-sort-connectors/LICENSE
@@ -880,6 +880,19 @@ License : 
https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 Source  : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the 
software have been modified.)
 License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE
 
+1.3.26 
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java
+       
inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HudiTableInlongFactory.java
+Source  : org.apache.hudi:hudi-flink1.15-bundle:0.12.3 (Please note that the 
software have been modified.)
+License : https://github.com/apache/hudi/blob/master/LICENSE
+
 
 
 =======================================================================

Reply via email to