This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0acdfe9519 [INLONG-10195][Sort] Hudi connector support audit ID (#10231) 0acdfe9519 is described below commit 0acdfe9519a18048171b3ed8e0dcd71f0527b0b8 Author: XiaoYou201 <58425449+xiaoyou...@users.noreply.github.com> AuthorDate: Wed May 22 09:47:12 2024 +0800 [INLONG-10195][Sort] Hudi connector support audit ID (#10231) --- .../sort-flink-v1.15/sort-connectors/hudi/pom.xml | 1 - .../inlong/sort/hudi/sink/StreamWriteFunction.java | 495 +++++++++++++++++++++ .../inlong/sort/hudi/sink/StreamWriteOperator.java | 43 ++ .../sort/hudi/sink/append/AppendWriteFunction.java | 156 +++++++ .../sort/hudi/sink/append/AppendWriteOperator.java | 44 ++ .../sink/bucket/BucketStreamWriteFunction.java | 185 ++++++++ .../sink/bucket/BucketStreamWriteOperator.java | 40 ++ .../hudi/sink/bulk/BulkInsertWriteFunction.java | 229 ++++++++++ .../hudi/sink/bulk/BulkInsertWriteOperator.java | 55 +++ .../inlong/sort/hudi/sink/utils/Pipelines.java | 469 +++++++++++++++++++ .../inlong/sort/hudi/table/HoodieTableFactory.java | 355 +++++++++++++++ .../inlong/sort/hudi/table/HoodieTableSink.java | 150 +++++++ .../hudi/table/sink/HudiTableInlongFactory.java | 55 --- .../org.apache.flink.table.factories.Factory | 2 +- licenses/inlong-sort-connectors/LICENSE | 13 + 15 files changed, 2235 insertions(+), 57 deletions(-) diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml index ca3dc60e6d..075b6da594 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/pom.xml @@ -79,7 +79,6 @@ <configuration> <artifactSet> <includes> - <include>org.apache.hudi:*</include> <include>org.apache.hive:hive-exec</include> <include>org.apache.hadoop:*</include> <include>com.fasterxml.woodstox:*</include> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java new file mode 100644 index 0000000000..33551f6de5 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.sink; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SinkMetricData; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieOperation; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.ObjectSizeCalculator; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.common.AbstractStreamWriteFunction; +import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.table.action.commit.FlinkWriteHelper; +import org.apache.hudi.util.StreamerUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Random; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** + * Sink function to write the data to the underneath filesystem. + * + * <p><h2>Work Flow</h2> + * + * <p>The function firstly buffers the data as a batch of {@link HoodieRecord}s, + * It flushes(write) the records batch when the batch size exceeds the configured size {@link FlinkOptions#WRITE_BATCH_SIZE} + * or the total buffer size exceeds the configured size {@link FlinkOptions#WRITE_TASK_MAX_SIZE} + * or a Flink checkpoint starts. After a batch has been written successfully, + * the function notifies its operator coordinator {@link StreamWriteOperatorCoordinator} to mark a successful write. + * + * <p><h2>The Semantics</h2> + * + * <p>The task implements exactly-once semantics by buffering the data between checkpoints. The operator coordinator + * starts a new instant on the timeline when a checkpoint triggers, the coordinator checkpoints always + * start before its operator, so when this function starts a checkpoint, a REQUESTED instant already exists. + * + * <p>The function process thread blocks data buffering after the checkpoint thread finishes flushing the existing data buffer until + * the current checkpoint succeed and the coordinator starts a new instant. Any error triggers the job failure during the metadata committing, + * when the job recovers from a failure, the write function re-send the write metadata to the coordinator to see if these metadata + * can re-commit, thus if unexpected error happens during the instant committing, the coordinator would retry to commit when the job + * recovers. + * + * <p><h2>Fault Tolerance</h2> + * + * <p>The operator coordinator checks and commits the last instant then starts a new one after a checkpoint finished successfully. + * It rolls back any inflight instant before it starts a new instant, this means one hoodie instant only span one checkpoint, + * the write function blocks data buffer flushing for the configured checkpoint timeout + * before it throws exception, any checkpoint failure would finally trigger the job failure. + * + * <p>Note: The function task requires the input stream be shuffled by the file IDs. + * + * @param <I> Type of the input record + * @see StreamWriteOperatorCoordinator + * + * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3 + */ +public class StreamWriteFunction<I> extends AbstractStreamWriteFunction<I> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(StreamWriteFunction.class); + + /** + * Write buffer as buckets for a checkpoint. The key is bucket ID. + */ + private transient Map<String, DataBucket> buckets; + + private transient BiFunction<List<HoodieRecord>, String, List<WriteStatus>> writeFunction; + + /** + * Total size tracer. + */ + private transient TotalSizeTracer tracer; + private final MetricOption metricOption; + private SinkMetricData sinkMetricData; + + /** + * Constructs a StreamingSinkFunction. + * + * @param config The config options + */ + public StreamWriteFunction(Configuration config, MetricOption metricOption) { + super(config); + this.metricOption = metricOption; + } + + @Override + public void open(Configuration parameters) { + this.tracer = new TotalSizeTracer(this.config); + initBuffer(); + initWriteFunction(); + if (metricOption != null) { + this.sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } + } + + @Override + public void snapshotState() { + // Based on the fact that the coordinator starts the checkpoint first, + // it would check the validity. + // wait for the buffer data flush out and request a new instant + flushRemaining(false); + } + + @Override + public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) + throws Exception { + bufferRecord((HoodieRecord<?>) value); + if (sinkMetricData != null) { + sinkMetricData.invokeWithEstimate(value); + } + } + + @Override + public void close() { + if (this.writeClient != null) { + this.writeClient.close(); + } + } + + /** + * End input action for batch source. + */ + public void endInput() { + super.endInput(); + flushRemaining(true); + this.writeClient.cleanHandles(); + this.writeStatuses.clear(); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void initBuffer() { + this.buckets = new LinkedHashMap<>(); + } + + private void initWriteFunction() { + final String writeOperation = this.config.get(FlinkOptions.OPERATION); + switch (WriteOperationType.fromValue(writeOperation)) { + case INSERT: + this.writeFunction = (records, instantTime) -> this.writeClient.insert(records, instantTime); + break; + case UPSERT: + this.writeFunction = (records, instantTime) -> this.writeClient.upsert(records, instantTime); + break; + case INSERT_OVERWRITE: + this.writeFunction = (records, instantTime) -> this.writeClient.insertOverwrite(records, instantTime); + break; + case INSERT_OVERWRITE_TABLE: + this.writeFunction = + (records, instantTime) -> this.writeClient.insertOverwriteTable(records, instantTime); + break; + default: + throw new RuntimeException("Unsupported write operation : " + writeOperation); + } + } + + /** + * Represents a data item in the buffer, this is needed to reduce the + * memory footprint. + * + * <p>A {@link HoodieRecord} was firstly transformed into a {@link DataItem} + * for buffering, it then transforms back to the {@link HoodieRecord} before flushing. + */ + private static class DataItem { + + private final String key; // record key + private final String instant; // 'U' or 'I' + private final HoodieRecordPayload<?> data; // record payload + private final HoodieOperation operation; // operation + + private DataItem(String key, String instant, HoodieRecordPayload<?> data, HoodieOperation operation) { + this.key = key; + this.instant = instant; + this.data = data; + this.operation = operation; + } + + public static DataItem fromHoodieRecord(HoodieRecord<?> record) { + return new DataItem( + record.getRecordKey(), + record.getCurrentLocation().getInstantTime(), + ((HoodieAvroRecord) record).getData(), + record.getOperation()); + } + + public HoodieRecord<?> toHoodieRecord(String partitionPath) { + HoodieKey hoodieKey = new HoodieKey(this.key, partitionPath); + HoodieRecord<?> record = new HoodieAvroRecord<>(hoodieKey, data, operation); + HoodieRecordLocation loc = new HoodieRecordLocation(instant, null); + record.setCurrentLocation(loc); + return record; + } + } + + /** + * Data bucket. + */ + private static class DataBucket { + + private final List<DataItem> records; + private final BufferSizeDetector detector; + private final String partitionPath; + private final String fileID; + + private DataBucket(Double batchSize, HoodieRecord<?> hoodieRecord) { + this.records = new ArrayList<>(); + this.detector = new BufferSizeDetector(batchSize); + this.partitionPath = hoodieRecord.getPartitionPath(); + this.fileID = hoodieRecord.getCurrentLocation().getFileId(); + } + + /** + * Prepare the write data buffer: patch up all the records with correct partition path. + */ + public List<HoodieRecord> writeBuffer() { + // rewrite all the records with new record key + return records.stream() + .map(record -> record.toHoodieRecord(partitionPath)) + .collect(Collectors.toList()); + } + + /** + * Sets up before flush: patch up the first record with correct partition path and fileID. + * + * <p>Note: the method may modify the given records {@code records}. + */ + public void preWrite(List<HoodieRecord> records) { + // rewrite the first record with expected fileID + HoodieRecord<?> first = records.get(0); + HoodieRecord<?> record = + new HoodieAvroRecord<>(first.getKey(), (HoodieRecordPayload) first.getData(), first.getOperation()); + HoodieRecordLocation newLoc = new HoodieRecordLocation(first.getCurrentLocation().getInstantTime(), fileID); + record.setCurrentLocation(newLoc); + + records.set(0, record); + } + + public void reset() { + this.records.clear(); + this.detector.reset(); + } + } + + /** + * Tool to detect if to flush out the existing buffer. + * Sampling the record to compute the size with 0.01 percentage. + */ + private static class BufferSizeDetector { + + private final Random random = new Random(47); + private static final int DENOMINATOR = 100; + + private final double batchSizeBytes; + + private long lastRecordSize = -1L; + private long totalSize = 0L; + + BufferSizeDetector(double batchSizeMb) { + this.batchSizeBytes = batchSizeMb * 1024 * 1024; + } + + boolean detect(Object record) { + if (lastRecordSize == -1 || sampling()) { + lastRecordSize = ObjectSizeCalculator.getObjectSize(record); + } + totalSize += lastRecordSize; + return totalSize > this.batchSizeBytes; + } + + boolean sampling() { + // 0.01 sampling percentage + return random.nextInt(DENOMINATOR) == 1; + } + + void reset() { + this.lastRecordSize = -1L; + this.totalSize = 0L; + } + } + + /** + * Tool to trace the total buffer size. It computes the maximum buffer size, + * if current buffer size is greater than the maximum buffer size, the data bucket + * flush triggers. + */ + private static class TotalSizeTracer { + + private long bufferSize = 0L; + private final double maxBufferSize; + + TotalSizeTracer(Configuration conf) { + long mergeReaderMem = 100; // constant 100MB + long mergeMapMaxMem = conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY); + this.maxBufferSize = + (conf.getDouble(FlinkOptions.WRITE_TASK_MAX_SIZE) - mergeReaderMem - mergeMapMaxMem) * 1024 * 1024; + final String errMsg = String.format( + "'%s' should be at least greater than '%s' plus merge reader memory(constant 100MB now)", + FlinkOptions.WRITE_TASK_MAX_SIZE.key(), FlinkOptions.WRITE_MERGE_MAX_MEMORY.key()); + ValidationUtils.checkState(this.maxBufferSize > 0, errMsg); + } + + /** + * Trace the given record size {@code recordSize}. + * + * @param recordSize The record size + * @return true if the buffer size exceeds the maximum buffer size + */ + boolean trace(long recordSize) { + this.bufferSize += recordSize; + return this.bufferSize > this.maxBufferSize; + } + + void countDown(long size) { + this.bufferSize -= size; + } + + public void reset() { + this.bufferSize = 0; + } + } + + /** + * Returns the bucket ID with the given value {@code value}. + */ + private String getBucketID(HoodieRecord<?> record) { + final String fileId = record.getCurrentLocation().getFileId(); + return StreamerUtil.generateBucketKey(record.getPartitionPath(), fileId); + } + + /** + * Buffers the given record. + * + * <p>Flush the data bucket first if the bucket records size is greater than + * the configured value {@link FlinkOptions#WRITE_BATCH_SIZE}. + * + * <p>Flush the max size data bucket if the total buffer size exceeds the configured + * threshold {@link FlinkOptions#WRITE_TASK_MAX_SIZE}. + * + * @param value HoodieRecord + */ + protected void bufferRecord(HoodieRecord<?> value) { + final String bucketID = getBucketID(value); + + DataBucket bucket = this.buckets.computeIfAbsent(bucketID, + k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value)); + final DataItem item = DataItem.fromHoodieRecord(value); + + bucket.records.add(item); + + boolean flushBucket = bucket.detector.detect(item); + boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize); + if (flushBucket) { + if (flushBucket(bucket)) { + this.tracer.countDown(bucket.detector.totalSize); + bucket.reset(); + } + } else if (flushBuffer) { + // find the max size bucket and flush it out + DataBucket bucketToFlush = this.buckets.values().stream() + .max(Comparator.comparingLong(b -> b.detector.totalSize)) + .orElseThrow(NoSuchElementException::new); + if (flushBucket(bucketToFlush)) { + this.tracer.countDown(bucketToFlush.detector.totalSize); + bucketToFlush.reset(); + } else { + LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", + this.tracer.maxBufferSize); + } + } + } + + private boolean hasData() { + return this.buckets.size() > 0 + && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0); + } + + @SuppressWarnings("unchecked, rawtypes") + private boolean flushBucket(DataBucket bucket) { + String instant = instantToWrite(true); + + if (instant == null) { + // in case there are empty checkpoints that has no input data + LOG.info("No inflight instant when flushing data, skip."); + return false; + } + + List<HoodieRecord> records = bucket.writeBuffer(); + ValidationUtils.checkState(records.size() > 0, "Data bucket to flush has no buffering records"); + if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, -1); + } + bucket.preWrite(records); + final List<WriteStatus> writeStatus = new ArrayList<>(writeFunction.apply(records, instant)); + records.clear(); + final WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .instantTime(instant) // the write instant may shift but the event still use the currentInstant. + .writeStatus(writeStatus) + .lastBatch(false) + .endInput(false) + .build(); + + this.eventGateway.sendEventToCoordinator(event); + writeStatuses.addAll(writeStatus); + return true; + } + + @SuppressWarnings("unchecked, rawtypes") + private void flushRemaining(boolean endInput) { + this.currentInstant = instantToWrite(hasData()); + if (this.currentInstant == null) { + // in case there are empty checkpoints that has no input data + throw new HoodieException("No inflight instant when flushing data!"); + } + final List<WriteStatus> writeStatus; + if (buckets.size() > 0) { + writeStatus = new ArrayList<>(); + this.buckets.values() + // The records are partitioned by the bucket ID and each batch sent to + // the writer belongs to one bucket. + .forEach(bucket -> { + List<HoodieRecord> records = bucket.writeBuffer(); + if (records.size() > 0) { + if (config.getBoolean(FlinkOptions.PRE_COMBINE)) { + records = FlinkWriteHelper.newInstance().deduplicateRecords(records, (HoodieIndex) null, + -1); + } + bucket.preWrite(records); + writeStatus.addAll(writeFunction.apply(records, currentInstant)); + records.clear(); + bucket.reset(); + } + }); + } else { + LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, currentInstant); + writeStatus = Collections.emptyList(); + } + final WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .instantTime(currentInstant) + .writeStatus(writeStatus) + .lastBatch(true) + .endInput(endInput) + .build(); + + this.eventGateway.sendEventToCoordinator(event); + this.buckets.clear(); + this.tracer.reset(); + this.writeClient.cleanHandles(); + this.writeStatuses.addAll(writeStatus); + // blocks flushing until the coordinator starts a new instant + this.confirming = true; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java new file mode 100644 index 0000000000..a25caf9b35 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.sink; + +import org.apache.inlong.sort.base.metric.MetricOption; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.operators.StreamSink; +import org.apache.hudi.sink.common.AbstractWriteOperator; +import org.apache.hudi.sink.common.WriteOperatorFactory; + +/** + * Operator for {@link StreamSink}. + * + * @param <I> The input type + * <p> + * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3 + */ +public class StreamWriteOperator<I> extends AbstractWriteOperator<I> { + + public StreamWriteOperator(Configuration conf, MetricOption metricOption) { + super(new StreamWriteFunction<>(conf, metricOption)); + } + + public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, MetricOption metricOption) { + return WriteOperatorFactory.instance(conf, new StreamWriteOperator<>(conf, metricOption)); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java new file mode 100644 index 0000000000..817e727f62 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.sink.append; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SinkMetricData; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; +import org.apache.hudi.sink.common.AbstractStreamWriteFunction; +import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.List; + +/** + * Sink function to write the data to the underneath filesystem. + * + * <p>The function writes base files directly for each checkpoint, + * the file may roll over when it’s size hits the configured threshold. + * + * @param <I> Type of the input record + * @see StreamWriteOperatorCoordinator + * + * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3 + */ +public class AppendWriteFunction<I> extends AbstractStreamWriteFunction<I> { + + private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunction.class); + + private static final long serialVersionUID = 1L; + + /** + * Helper class for log mode. + */ + private transient BulkInsertWriterHelper writerHelper; + + /** + * Table row type. + */ + private final RowType rowType; + + private final MetricOption metricOption; + + private SinkMetricData sinkMetricData; + + /** + * Constructs an AppendWriteFunction. + * + * @param config The config options + */ + public AppendWriteFunction(Configuration config, RowType rowType, MetricOption metricOption) { + super(config); + this.rowType = rowType; + this.metricOption = metricOption; + } + + @Override + public void open(Configuration parameters) { + if (metricOption != null) { + this.sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } + } + + @Override + public void snapshotState() { + // Based on the fact that the coordinator starts the checkpoint first, + // it would check the validity. + // wait for the buffer data flush out and request a new instant + flushData(false); + } + + @Override + public void processElement(I value, Context ctx, Collector<Object> out) throws Exception { + if (this.writerHelper == null) { + initWriterHelper(); + } + this.writerHelper.write((RowData) value); + if (sinkMetricData != null) { + sinkMetricData.invokeWithEstimate(value); + } + } + + /** + * End input action for batch source. + */ + public void endInput() { + super.endInput(); + flushData(true); + this.writeStatuses.clear(); + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + private void initWriterHelper() { + final String instant = instantToWrite(true); + if (instant == null) { + // in case there are empty checkpoints that has no input data + throw new HoodieException("No inflight instant when flushing data!"); + } + this.writerHelper = + new BulkInsertWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(), + instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), + getRuntimeContext().getAttemptNumber(), + this.rowType); + } + + private void flushData(boolean endInput) { + final List<WriteStatus> writeStatus; + if (this.writerHelper != null) { + writeStatus = this.writerHelper.getWriteStatuses(this.taskID); + this.currentInstant = this.writerHelper.getInstantTime(); + } else { + writeStatus = Collections.emptyList(); + this.currentInstant = instantToWrite(false); + LOG.info("No data to write in subtask [{}] for instant [{}]", taskID, this.currentInstant); + } + final WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .instantTime(this.currentInstant) + .writeStatus(writeStatus) + .lastBatch(true) + .endInput(endInput) + .build(); + this.eventGateway.sendEventToCoordinator(event); + // nullify the write helper for next ckp + this.writerHelper = null; + this.writeStatuses.addAll(writeStatus); + // blocks flushing until the coordinator starts a new instant + this.confirming = true; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java new file mode 100644 index 0000000000..41fa03e793 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.sink.append; + +import org.apache.inlong.sort.base.metric.MetricOption; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hudi.sink.common.AbstractWriteOperator; +import org.apache.hudi.sink.common.WriteOperatorFactory; + +/** + * Operator for {@link AppendWriteFunction}. + * + * @param <I> The input type + * + * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3 + */ +public class AppendWriteOperator<I> extends AbstractWriteOperator<I> { + + public AppendWriteOperator(Configuration conf, RowType rowType, MetricOption metricOption) { + super(new AppendWriteFunction<>(conf, rowType, metricOption)); + } + + public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, RowType rowType, + MetricOption metricOption) { + return WriteOperatorFactory.instance(conf, new AppendWriteOperator<>(conf, rowType, metricOption)); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteFunction.java new file mode 100644 index 0000000000..3bafbb806f --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteFunction.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.sink.bucket; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SinkMetricData; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.index.bucket.BucketIdentifier; +import org.apache.hudi.sink.StreamWriteFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A stream write function with bucket hash index. + * + * <p>The task holds a fresh new local index: {(partition + bucket number) &rarr fileId} mapping, this index + * is used for deciding whether the incoming records in an UPDATE or INSERT. + * The index is local because different partition paths have separate items in the index. + * + * @param <I> the input type + */ +public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> { + + private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class); + + private int parallelism; + + private int bucketNum; + + private String indexKeyFields; + + /** + * BucketID to file group mapping in each partition. + * Map(partition -> Map(bucketId, fileID)). + */ + private Map<String, Map<Integer, String>> bucketIndex; + + /** + * Incremental bucket index of the current checkpoint interval, + * it is needed because the bucket type('I' or 'U') should be decided based on the committed files view, + * all the records in one bucket should have the same bucket type. + */ + private Set<String> incBucketIndex; + private final MetricOption metricOption; + private SinkMetricData sinkMetricData; + + /** + * Constructs a BucketStreamWriteFunction. + * + * @param config The config options + */ + public BucketStreamWriteFunction(Configuration config, MetricOption metricOption) { + super(config); + this.metricOption = metricOption; + } + + @Override + public void open(Configuration parameters) throws IOException { + super.open(parameters); + this.bucketNum = config.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); + this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD); + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); + this.bucketIndex = new HashMap<>(); + this.incBucketIndex = new HashSet<>(); + if (metricOption != null) { + this.sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + super.initializeState(context); + } + + @Override + public void snapshotState() { + super.snapshotState(); + this.incBucketIndex.clear(); + } + + @Override + public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) + throws Exception { + HoodieRecord<?> record = (HoodieRecord<?>) i; + final HoodieKey hoodieKey = record.getKey(); + final String partition = hoodieKey.getPartitionPath(); + final HoodieRecordLocation location; + + bootstrapIndexIfNeed(partition); + Map<Integer, String> bucketToFileId = bucketIndex.computeIfAbsent(partition, p -> new HashMap<>()); + final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum); + final String bucketId = partition + "/" + bucketNum; + + if (incBucketIndex.contains(bucketId)) { + location = new HoodieRecordLocation("I", bucketToFileId.get(bucketNum)); + } else if (bucketToFileId.containsKey(bucketNum)) { + location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum)); + } else { + String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum); + location = new HoodieRecordLocation("I", newFileId); + bucketToFileId.put(bucketNum, newFileId); + incBucketIndex.add(bucketId); + } + record.unseal(); + record.setCurrentLocation(location); + record.seal(); + bufferRecord(record); + if (sinkMetricData != null) { + sinkMetricData.invokeWithEstimate(record); + } + } + + /** + * Determine whether the current fileID belongs to the current task. + * (partition + curBucket) % numPartitions == this taskID belongs to this task. + */ + public boolean isBucketToLoad(int bucketNumber, String partition) { + final int partitionIndex = (partition.hashCode() & Integer.MAX_VALUE) % parallelism; + int globalIndex = partitionIndex + bucketNumber; + return BucketIdentifier.mod(globalIndex, parallelism) == taskID; + } + + /** + * Get partition_bucket -> fileID mapping from the existing hudi table. + * This is a required operation for each restart to avoid having duplicate file ids for one bucket. + */ + private void bootstrapIndexIfNeed(String partition) { + if (bucketIndex.containsKey(partition)) { + return; + } + LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(), + this.metaClient.getBasePath() + "/" + partition)); + + // Load existing fileID belongs to this task + Map<Integer, String> bucketToFileIDMap = new HashMap<>(); + this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> { + String fileId = fileSlice.getFileId(); + int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileId); + if (isBucketToLoad(bucketNumber, partition)) { + LOG.info(String.format("Should load this partition bucket %s with fileId %s", bucketNumber, fileId)); + // Validate that one bucketId has only ONE fileId + if (bucketToFileIDMap.containsKey(bucketNumber)) { + throw new RuntimeException(String.format("Duplicate fileId %s from bucket %s of partition %s found " + + "during the BucketStreamWriteFunction index bootstrap.", fileId, bucketNumber, + partition)); + } else { + LOG.info(String.format("Adding fileId %s to the bucket %s of partition %s.", fileId, bucketNumber, + partition)); + bucketToFileIDMap.put(bucketNumber, fileId); + } + } + }); + bucketIndex.put(partition, bucketToFileIDMap); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteOperator.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteOperator.java new file mode 100644 index 0000000000..40b1a29423 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bucket/BucketStreamWriteOperator.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.sink.bucket; + +import org.apache.inlong.sort.base.metric.MetricOption; + +import org.apache.flink.configuration.Configuration; +import org.apache.hudi.sink.common.AbstractWriteOperator; +import org.apache.hudi.sink.common.WriteOperatorFactory; + +/** + * Operator for {@link BucketStreamWriteFunction}. + * + * @param <I> The input type + */ +public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> { + + public BucketStreamWriteOperator(Configuration conf, MetricOption metricOption) { + super(new BucketStreamWriteFunction<>(conf, metricOption)); + } + + public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, MetricOption metricOption) { + return WriteOperatorFactory.instance(conf, new BucketStreamWriteOperator<>(conf, metricOption)); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java new file mode 100644 index 0000000000..44c8966e30 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.sink.bulk; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.SinkMetricData; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; +import org.apache.hudi.sink.bulk.BulkInsertWriterHelper; +import org.apache.hudi.sink.bulk.WriterHelpers; +import org.apache.hudi.sink.common.AbstractWriteFunction; +import org.apache.hudi.sink.event.WriteMetadataEvent; +import org.apache.hudi.sink.meta.CkpMetadata; +import org.apache.hudi.sink.utils.TimeWait; +import org.apache.hudi.util.FlinkWriteClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Sink function to write the data to the underneath filesystem. + * + * <p>The function should only be used in operation type {@link WriteOperationType#BULK_INSERT}. + * + * <p>Note: The function task requires the input stream be shuffled by partition path. + * + * @param <I> Type of the input record + * @see StreamWriteOperatorCoordinator + * + * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3 + */ +public class BulkInsertWriteFunction<I> + extends + AbstractWriteFunction<I> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(BulkInsertWriteFunction.class); + + /** + * Helper class for bulk insert mode. + */ + private transient BulkInsertWriterHelper writerHelper; + + /** + * Config options. + */ + private final Configuration config; + + /** + * Table row type. + */ + private final RowType rowType; + + /** + * Id of current subtask. + */ + private int taskID; + + /** + * Write Client. + */ + private transient HoodieFlinkWriteClient writeClient; + + /** + * The initial inflight instant when start up. + */ + private volatile String initInstant; + + /** + * Gateway to send operator events to the operator coordinator. + */ + private transient OperatorEventGateway eventGateway; + + /** + * Checkpoint metadata. + */ + private CkpMetadata ckpMetadata; + + private final MetricOption metricOption; + + private SinkMetricData sinkMetricData; + /** + * Constructs a StreamingSinkFunction. + * + * @param config The config options + */ + public BulkInsertWriteFunction(Configuration config, RowType rowType, MetricOption metricOption) { + this.config = config; + this.rowType = rowType; + this.metricOption = metricOption; + } + + @Override + public void open(Configuration parameters) { + this.taskID = getRuntimeContext().getIndexOfThisSubtask(); + this.writeClient = FlinkWriteClients.createWriteClient(this.config, getRuntimeContext()); + this.ckpMetadata = CkpMetadata.getInstance(config); + this.initInstant = lastPendingInstant(); + if (metricOption != null) { + this.sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } + sendBootstrapEvent(); + initWriterHelper(); + } + + @Override + public void processElement(I value, Context ctx, Collector<Object> out) throws IOException { + this.writerHelper.write((RowData) value); + if (sinkMetricData != null) { + sinkMetricData.invokeWithEstimate(value); + } + } + + @Override + public void close() { + if (this.writeClient != null) { + this.writeClient.close(); + } + } + + /** + * End input action for batch source. + */ + public void endInput() { + final List<WriteStatus> writeStatus = this.writerHelper.getWriteStatuses(this.taskID); + + final WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .instantTime(this.writerHelper.getInstantTime()) + .writeStatus(writeStatus) + .lastBatch(true) + .endInput(true) + .build(); + this.eventGateway.sendEventToCoordinator(event); + } + + @Override + public void handleOperatorEvent(OperatorEvent event) { + // no operation + } + + // ------------------------------------------------------------------------- + // Getter/Setter + // ------------------------------------------------------------------------- + + public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { + this.eventGateway = operatorEventGateway; + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + private void initWriterHelper() { + String instant = instantToWrite(); + this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), + this.writeClient.getConfig(), + instant, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), + getRuntimeContext().getAttemptNumber(), + this.rowType); + } + + private void sendBootstrapEvent() { + WriteMetadataEvent event = WriteMetadataEvent.builder() + .taskID(taskID) + .writeStatus(Collections.emptyList()) + .instantTime("") + .bootstrap(true) + .build(); + this.eventGateway.sendEventToCoordinator(event); + LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", taskID); + } + + /** + * Returns the last pending instant time. + */ + protected String lastPendingInstant() { + return this.ckpMetadata.lastPendingInstant(); + } + + private String instantToWrite() { + String instant = lastPendingInstant(); + // if exactly-once semantics turns on, + // waits for the checkpoint notification until the checkpoint timeout threshold hits. + TimeWait timeWait = TimeWait.builder() + .timeout(config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)) + .action("instant initialize") + .build(); + while (instant == null || instant.equals(this.initInstant)) { + // wait condition: + // 1. there is no inflight instant + // 2. the inflight instant does not change + // sleep for a while + timeWait.waitFor(); + // refresh the inflight instant + instant = lastPendingInstant(); + } + return instant; + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java new file mode 100644 index 0000000000..55d2290666 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.sink.bulk; + +import org.apache.inlong.sort.base.metric.MetricOption; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hudi.sink.common.AbstractWriteOperator; +import org.apache.hudi.sink.common.WriteOperatorFactory; + +/** + * Operator for bulk insert mode sink. + * + * @param <I> The input type + * + * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3 + */ +public class BulkInsertWriteOperator<I> + extends + AbstractWriteOperator<I> + implements + BoundedOneInput { + + public BulkInsertWriteOperator(Configuration conf, RowType rowType, MetricOption metricOption) { + super(new BulkInsertWriteFunction<>(conf, rowType, metricOption)); + } + + @Override + public void handleOperatorEvent(OperatorEvent event) { + // no operation + } + + public static <I> WriteOperatorFactory<I> getFactory(Configuration conf, RowType rowType, + MetricOption metricOption) { + return WriteOperatorFactory.instance(conf, new BulkInsertWriteOperator<>(conf, rowType, metricOption)); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java new file mode 100644 index 0000000000..ea92e9361e --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.sink.utils; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.hudi.sink.StreamWriteOperator; +import org.apache.inlong.sort.hudi.sink.append.AppendWriteOperator; +import org.apache.inlong.sort.hudi.sink.bucket.BucketStreamWriteOperator; +import org.apache.inlong.sort.hudi.sink.bulk.BulkInsertWriteOperator; + +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.streaming.api.operators.KeyedProcessOperator; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hudi.common.model.ClusteringOperation; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.sink.CleanFunction; +import org.apache.hudi.sink.bootstrap.BootstrapOperator; +import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator; +import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper; +import org.apache.hudi.sink.bulk.RowDataKeyGen; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.clustering.ClusteringCommitEvent; +import org.apache.hudi.sink.clustering.ClusteringCommitSink; +import org.apache.hudi.sink.clustering.ClusteringOperator; +import org.apache.hudi.sink.clustering.ClusteringPlanEvent; +import org.apache.hudi.sink.clustering.ClusteringPlanOperator; +import org.apache.hudi.sink.common.WriteOperatorFactory; +import org.apache.hudi.sink.compact.CompactOperator; +import org.apache.hudi.sink.compact.CompactionCommitEvent; +import org.apache.hudi.sink.compact.CompactionCommitSink; +import org.apache.hudi.sink.compact.CompactionPlanEvent; +import org.apache.hudi.sink.compact.CompactionPlanOperator; +import org.apache.hudi.sink.partitioner.BucketAssignFunction; +import org.apache.hudi.sink.partitioner.BucketIndexPartitioner; +import org.apache.hudi.sink.transform.RowDataToHoodieFunctions; +import org.apache.hudi.table.format.FilePathUtils; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Utilities to generate all kinds of sub-pipelines. + * <p> + * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3 + */ +public class Pipelines { + + /** + * Bulk insert the input dataset at once. + * + * <p>By default, the input dataset would shuffle by the partition path first then + * sort by the partition path before passing around to the write function. + * The whole pipeline looks like the following: + * + * <pre> + * | input1 | ===\ /=== |sorter| === | task1 | (p1, p2) + * shuffle + * | input2 | ===/ \=== |sorter| === | task2 | (p3, p4) + * + * Note: Both input1 and input2's dataset come from partitions: p1, p2, p3, p4 + * </pre> + * + * <p>The write task switches to new file handle each time it receives a record + * from the different partition path, the shuffle and sort would reduce small files. + * + * <p>The bulk insert should be run in batch execution mode. + * + * @param conf The configuration + * @param rowType The input row type + * @param dataStream The input data stream + * @return the bulk insert data stream sink + */ + public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream, + MetricOption metricOption) { + WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType, metricOption); + if (OptionsResolver.isBucketIndexType(conf)) { + String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD); + int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); + + BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(numBuckets, indexKeys); + RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType); + RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType); + InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId); + + Map<String, String> bucketIdToFileId = new HashMap<>(); + dataStream = dataStream.partitionCustom(partitioner, keyGen::getHoodieKey) + .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, + indexKeys, numBuckets), typeInfo) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to + // avoid shuffle + if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { + SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId); + dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator()) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to + // avoid shuffle + ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + } + return dataStream + .transform(opName("bucket_bulk_insert", conf), TypeInformation.of(Object.class), operatorFactory) + .uid(opUID("bucket_bulk_insert", conf)) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) + .addSink(DummySink.INSTANCE) + .name("dummy"); + } + + final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); + if (partitionFields.length > 0) { + RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); + if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT)) { + + // shuffle by partition keys + // use #partitionCustom instead of #keyBy to avoid duplicate sort operations, + // see BatchExecutionUtils#applyBatchExecutionSettings for details. + Partitioner<String> partitioner = + (key, channels) -> KeyGroupRangeAssignment.assignKeyToParallelOperator(key, + KeyGroupRangeAssignment.computeDefaultMaxParallelism( + conf.getInteger(FlinkOptions.WRITE_TASKS)), + channels); + dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath); + } + if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { + SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields); + // sort by partition keys + dataStream = dataStream + .transform("partition_key_sorter", + InternalTypeInfo.of(rowType), + sortOperatorGen.createSortOperator()) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + } + } + return dataStream + .transform(opName("hoodie_bulk_insert_write", conf), + TypeInformation.of(Object.class), + operatorFactory) + // follow the parallelism of upstream operators to avoid shuffle + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)) + .addSink(DummySink.INSTANCE) + .name("dummy"); + } + + /** + * Insert the dataset with append mode(no upsert or deduplication). + * + * <p>The input dataset would be rebalanced among the write tasks: + * + * <pre> + * | input1 | ===\ /=== | task1 | (p1, p2, p3, p4) + * shuffle + * | input2 | ===/ \=== | task2 | (p1, p2, p3, p4) + * + * Note: Both input1 and input2's dataset come from partitions: p1, p2, p3, p4 + * </pre> + * + * <p>The write task switches to new file handle each time it receives a record + * from the different partition path, so there may be many small files. + * + * @param conf The configuration + * @param rowType The input row type + * @param dataStream The input data stream + * @param bounded Whether the input stream is bounded + * @return the appending data stream sink + */ + public static DataStream<Object> append( + Configuration conf, + RowType rowType, + DataStream<RowData> dataStream, + boolean bounded, + MetricOption metricOption) { + if (!bounded) { + // In principle, the config should be immutable, but the boundedness + // is only visible when creating the sink pipeline. + conf.setBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT, false); + } + WriteOperatorFactory<RowData> operatorFactory = AppendWriteOperator.getFactory(conf, rowType, metricOption); + + return dataStream + .transform(opName("hoodie_append_write", conf), TypeInformation.of(Object.class), operatorFactory) + .uid(opUID("hoodie_stream_write", conf)) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + } + + /** + * Constructs bootstrap pipeline as streaming. + * The bootstrap operator loads the existing data index (primary key to file id mapping), + * then sends the indexing data set to subsequent operator(usually the bucket assign operator). + */ + public static DataStream<HoodieRecord> bootstrap( + Configuration conf, + RowType rowType, + DataStream<RowData> dataStream) { + return bootstrap(conf, rowType, dataStream, false, false); + } + + /** + * Constructs bootstrap pipeline. + * The bootstrap operator loads the existing data index (primary key to file id mapping), + * then send the indexing data set to subsequent operator(usually the bucket assign operator). + * + * @param conf The configuration + * @param rowType The row type + * @param dataStream The data stream + * @param bounded Whether the source is bounded + * @param overwrite Whether it is insert overwrite + */ + public static DataStream<HoodieRecord> bootstrap( + Configuration conf, + RowType rowType, + DataStream<RowData> dataStream, + boolean bounded, + boolean overwrite) { + final boolean globalIndex = conf.getBoolean(FlinkOptions.INDEX_GLOBAL_ENABLED); + if (overwrite || OptionsResolver.isBucketIndexType(conf)) { + return rowDataToHoodieRecord(conf, rowType, dataStream); + } else if (bounded && !globalIndex && OptionsResolver.isPartitionedTable(conf)) { + return boundedBootstrap(conf, rowType, dataStream); + } else { + return streamBootstrap(conf, rowType, dataStream, bounded); + } + } + + private static DataStream<HoodieRecord> streamBootstrap( + Configuration conf, + RowType rowType, + DataStream<RowData> dataStream, + boolean bounded) { + DataStream<HoodieRecord> dataStream1 = rowDataToHoodieRecord(conf, rowType, dataStream); + + if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED) || bounded) { + dataStream1 = dataStream1 + .transform( + "index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new BootstrapOperator<>(conf)) + .setParallelism( + conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream1.getParallelism())) + .uid(opUID("index_bootstrap", conf)); + } + + return dataStream1; + } + + /** + * Constructs bootstrap pipeline for batch execution mode. + * The indexing data set is loaded before the actual data write + * in order to support batch UPSERT. + */ + private static DataStream<HoodieRecord> boundedBootstrap( + Configuration conf, + RowType rowType, + DataStream<RowData> dataStream) { + final RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); + // shuffle by partition keys + dataStream = dataStream + .keyBy(rowDataKeyGen::getPartitionPath); + + return rowDataToHoodieRecord(conf, rowType, dataStream) + .transform( + "batch_index_bootstrap", + TypeInformation.of(HoodieRecord.class), + new BatchBootstrapOperator<>(conf)) + .setParallelism( + conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(dataStream.getParallelism())) + .uid(opUID("batch_index_bootstrap", conf)); + } + + /** + * Transforms the row data to hoodie records. + */ + public static DataStream<HoodieRecord> rowDataToHoodieRecord(Configuration conf, RowType rowType, + DataStream<RowData> dataStream) { + return dataStream.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class)) + .setParallelism(dataStream.getParallelism()).name("row_data_to_hoodie_record"); + } + + /** + * The streaming write pipeline. + * + * <p>The input dataset shuffles by the primary key first then + * shuffles by the file group ID before passing around to the write function. + * The whole pipeline looks like the following: + * + * <pre> + * | input1 | ===\ /=== | bucket assigner | ===\ /=== | task1 | + * shuffle(by PK) shuffle(by bucket ID) + * | input2 | ===/ \=== | bucket assigner | ===/ \=== | task2 | + * + * Note: a file group must be handled by one write task to avoid write conflict. + * </pre> + * + * <p>The bucket assigner assigns the inputs to suitable file groups, the write task caches + * and flushes the data set to disk. + * + * @param conf The configuration + * @param dataStream The input data stream + * @return the stream write data stream pipeline + */ + public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream, + MetricOption metricOption) { + if (OptionsResolver.isBucketIndexType(conf)) { + WriteOperatorFactory<HoodieRecord> operatorFactory = + BucketStreamWriteOperator.getFactory(conf, metricOption); + int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); + String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); + BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); + return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) + .transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory) + .uid(opUID("bucket_write", conf)) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + } else { + WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf, metricOption); + return dataStream + // Key-by record key, to avoid multiple subtasks write to a bucket at the same time + .keyBy(HoodieRecord::getRecordKey) + .transform( + "bucket_assigner", + TypeInformation.of(HoodieRecord.class), + new KeyedProcessOperator<>(new BucketAssignFunction<>(conf))) + .uid(opUID("bucket_assigner", conf)) + .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS)) + // shuffle by fileId(bucket id) + .keyBy(record -> record.getCurrentLocation().getFileId()) + .transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory) + .uid(opUID("stream_write", conf)) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); + } + } + + /** + * The compaction tasks pipeline. + * + * <p>The compaction plan operator monitors the new compaction plan on the timeline + * then distributes the sub-plans to the compaction tasks. The compaction task then + * handle over the metadata to commit task for compaction transaction commit. + * The whole pipeline looks like the following: + * + * <pre> + * /=== | task1 | ===\ + * | plan generation | ===> hash | commit | + * \=== | task2 | ===/ + * + * Note: both the compaction plan generation task and commission task are singleton. + * </pre> + * + * @param conf The configuration + * @param dataStream The input data stream + * @return the compaction pipeline + */ + public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) { + return dataStream.transform("compact_plan_generate", + TypeInformation.of(CompactionPlanEvent.class), + new CompactionPlanOperator(conf)) + .setParallelism(1) // plan generate must be singleton + // make the distribution strategy deterministic to avoid concurrent modifications + // on the same bucket files + .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId()) + .transform("compact_task", + TypeInformation.of(CompactionCommitEvent.class), + new CompactOperator(conf)) + .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS)) + .addSink(new CompactionCommitSink(conf)) + .name("compact_commit") + .setParallelism(1); // compaction commit should be singleton + } + + /** + * The clustering tasks pipeline. + * + * <p>The clustering plan operator monitors the new clustering plan on the timeline + * then distributes the sub-plans to the clustering tasks. The clustering task then + * handle over the metadata to commit task for clustering transaction commit. + * The whole pipeline looks like the following: + * + * <pre> + * /=== | task1 | ===\ + * | plan generation | ===> hash | commit | + * \=== | task2 | ===/ + * + * Note: both the clustering plan generation task and commission task are singleton. + * </pre> + * + * @param conf The configuration + * @param rowType The input row type + * @param dataStream The input data stream + * @return the clustering pipeline + */ + public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, + DataStream<Object> dataStream) { + DataStream<ClusteringCommitEvent> clusteringStream = dataStream.transform("cluster_plan_generate", + TypeInformation.of(ClusteringPlanEvent.class), + new ClusteringPlanOperator(conf)) + .setParallelism(1) // plan generate must be singleton + .keyBy(plan -> + // make the distribution strategy deterministic to avoid concurrent modifications + // on the same bucket files + plan.getClusteringGroupInfo().getOperations() + .stream().map(ClusteringOperation::getFileId).collect(Collectors.joining())) + .transform("clustering_task", + TypeInformation.of(ClusteringCommitEvent.class), + new ClusteringOperator(conf, rowType)) + .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS)); + if (OptionsResolver.sortClusteringEnabled(conf)) { + ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + } + return clusteringStream.addSink(new ClusteringCommitSink(conf)) + .name("clustering_commit") + .setParallelism(1); // compaction commit should be singleton + } + + public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) { + return dataStream.addSink(new CleanFunction<>(conf)) + .setParallelism(1) + .name("clean_commits"); + } + + public static DataStreamSink<Object> dummySink(DataStream<Object> dataStream) { + return dataStream.addSink(DummySink.INSTANCE) + .setParallelism(1) + .name("dummy"); + } + + public static String opName(String operatorN, Configuration conf) { + return operatorN + ": " + conf.getString(FlinkOptions.TABLE_NAME); + } + + public static String opUID(String operatorN, Configuration conf) { + return "uid_" + operatorN + "_" + conf.getString(FlinkOptions.TABLE_NAME); + } + + /** + * Dummy sink that does nothing. + */ + public static class DummySink implements SinkFunction<Object> { + + private static final long serialVersionUID = 1L; + public static DummySink INSTANCE = new DummySink(); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java new file mode 100644 index 0000000000..ac153ef729 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.table; + +import org.apache.inlong.sort.base.metric.MetricOption; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.constraints.UniqueConstraint; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.factories.DynamicTableSinkFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.keygen.ComplexAvroKeyGenerator; +import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator; +import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator; +import org.apache.hudi.keygen.constant.KeyGeneratorOptions; +import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.DataTypeUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; +import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; + +/** + * Hoodie data source/sink factory. + * <p> + * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3 + */ +public class HoodieTableFactory implements DynamicTableSinkFactory { + + private static final Logger LOG = LoggerFactory.getLogger(HoodieTableFactory.class); + + public static final String FACTORY_ID = "hudi-inlong"; + + @Override + public DynamicTableSink createDynamicTableSink(Context context) { + Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions()); + checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)), + "Option [path] should not be empty."); + ResolvedSchema schema = context.getCatalogTable().getResolvedSchema(); + sanityCheck(conf, schema); + setupConfOptions(conf, context.getObjectIdentifier(), context.getCatalogTable(), schema); + + String inlongMetric = conf.getOptional(INLONG_METRIC).orElse(null); + String auditHostAndPorts = conf.get(INLONG_AUDIT); + String auditKeys = conf.get(AUDIT_KEYS); + + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withAuditAddress(auditHostAndPorts) + .withAuditKeys(auditKeys) + .build(); + return new HoodieTableSink(conf, schema, metricOption); + } + + @Override + public String factoryIdentifier() { + return FACTORY_ID; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + return Collections.singleton(FlinkOptions.PATH); + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + Set<ConfigOption<?>> configOptions = FlinkOptions.optionalOptions(); + configOptions.add(AUDIT_KEYS); + configOptions.add(INLONG_METRIC); + configOptions.add(INLONG_AUDIT); + return configOptions; + } + + // ------------------------------------------------------------------------- + // Utilities + // ------------------------------------------------------------------------- + + /** + * The sanity check. + * + * @param conf The table options + * @param schema The table schema + */ + private void sanityCheck(Configuration conf, ResolvedSchema schema) { + List<String> fields = schema.getColumnNames(); + + // validate record key in pk absence. + if (!schema.getPrimaryKey().isPresent()) { + String[] recordKeys = conf.get(FlinkOptions.RECORD_KEY_FIELD).split(","); + if (recordKeys.length == 1 + && FlinkOptions.RECORD_KEY_FIELD.defaultValue().equals(recordKeys[0]) + && !fields.contains(recordKeys[0])) { + throw new HoodieValidationException("Primary key definition is required, use either PRIMARY KEY syntax " + + "or option '" + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify."); + } + + Arrays.stream(recordKeys) + .filter(field -> !fields.contains(field)) + .findAny() + .ifPresent(f -> { + throw new HoodieValidationException("Field '" + f + "' specified in option " + + "'" + FlinkOptions.RECORD_KEY_FIELD.key() + "' does not exist in the table schema."); + }); + } + + // validate pre_combine key + String preCombineField = conf.get(FlinkOptions.PRECOMBINE_FIELD); + if (!fields.contains(preCombineField)) { + if (OptionsResolver.isDefaultHoodieRecordPayloadClazz(conf)) { + throw new HoodieValidationException("Option '" + FlinkOptions.PRECOMBINE_FIELD.key() + + "' is required for payload class: " + DefaultHoodieRecordPayload.class.getName()); + } + if (preCombineField.equals(FlinkOptions.PRECOMBINE_FIELD.defaultValue())) { + conf.setString(FlinkOptions.PRECOMBINE_FIELD, FlinkOptions.NO_PRE_COMBINE); + } else if (!preCombineField.equals(FlinkOptions.NO_PRE_COMBINE)) { + throw new HoodieValidationException("Field " + preCombineField + " does not exist in the table schema." + + "Please check '" + FlinkOptions.PRECOMBINE_FIELD.key() + "' option."); + } + } + } + + /** + * Sets up the config options based on the table definition, for e.g, the table name, primary key. + * + * @param conf The configuration to set up + * @param tablePath The table path + * @param table The catalog table + * @param schema The physical schema + */ + private static void setupConfOptions( + Configuration conf, + ObjectIdentifier tablePath, + CatalogTable table, + ResolvedSchema schema) { + // table name + conf.setString(FlinkOptions.TABLE_NAME.key(), tablePath.getObjectName()); + // hoodie key about options + setupHoodieKeyOptions(conf, table); + // compaction options + setupCompactionOptions(conf); + // hive options + setupHiveOptions(conf, tablePath); + // read options + setupReadOptions(conf); + // write options + setupWriteOptions(conf); + // infer avro schema from physical DDL schema + inferAvroSchema(conf, schema.toPhysicalRowDataType().notNull().getLogicalType()); + } + + /** + * Sets up the hoodie key options (e.g. record key and partition key) from the table definition. + */ + private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table) { + List<String> pkColumns = table.getSchema().getPrimaryKey() + .map(UniqueConstraint::getColumns).orElse(Collections.emptyList()); + if (pkColumns.size() > 0) { + // the PRIMARY KEY syntax always has higher priority than option FlinkOptions#RECORD_KEY_FIELD + String recordKey = String.join(",", pkColumns); + conf.setString(FlinkOptions.RECORD_KEY_FIELD, recordKey); + } + List<String> partitionKeys = table.getPartitionKeys(); + if (partitionKeys.size() > 0) { + // the PARTITIONED BY syntax always has higher priority than option FlinkOptions#PARTITION_PATH_FIELD + conf.setString(FlinkOptions.PARTITION_PATH_FIELD, String.join(",", partitionKeys)); + } + // set index key for bucket index if not defined + if (conf.getString(FlinkOptions.INDEX_TYPE).equals(HoodieIndex.IndexType.BUCKET.name())) { + if (conf.getString(FlinkOptions.INDEX_KEY_FIELD).isEmpty()) { + conf.setString(FlinkOptions.INDEX_KEY_FIELD, conf.getString(FlinkOptions.RECORD_KEY_FIELD)); + } else { + Set<String> recordKeySet = + Arrays.stream(conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",")) + .collect(Collectors.toSet()); + Set<String> indexKeySet = + Arrays.stream(conf.getString(FlinkOptions.INDEX_KEY_FIELD).split(",")) + .collect(Collectors.toSet()); + if (!recordKeySet.containsAll(indexKeySet)) { + throw new HoodieValidationException( + FlinkOptions.INDEX_KEY_FIELD + " should be a subset of or equal to the recordKey fields"); + } + } + } + + // tweak the key gen class if possible + final String[] partitions = conf.getString(FlinkOptions.PARTITION_PATH_FIELD).split(","); + final String[] pks = conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(","); + if (partitions.length == 1) { + final String partitionField = partitions[0]; + if (partitionField.isEmpty()) { + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, NonpartitionedAvroKeyGenerator.class.getName()); + LOG.info("Table option [{}] is reset to {} because this is a non-partitioned table", + FlinkOptions.KEYGEN_CLASS_NAME.key(), NonpartitionedAvroKeyGenerator.class.getName()); + return; + } + DataType partitionFieldType = table.getSchema().getFieldDataType(partitionField) + .orElseThrow(() -> new HoodieValidationException("Field " + partitionField + " does not exist")); + if (pks.length <= 1 && DataTypeUtils.isDatetimeType(partitionFieldType)) { + // timestamp based key gen only supports simple primary key + setupTimestampKeygenOptions(conf, partitionFieldType); + return; + } + } + boolean complexHoodieKey = pks.length > 1 || partitions.length > 1; + if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.KEYGEN_CLASS_NAME)) { + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, ComplexAvroKeyGenerator.class.getName()); + LOG.info("Table option [{}] is reset to {} because record key or partition path has two or more fields", + FlinkOptions.KEYGEN_CLASS_NAME.key(), ComplexAvroKeyGenerator.class.getName()); + } + } + + /** + * Sets up the keygen options when the partition path is datetime type. + * + * <p>The UTC timezone is used as default. + */ + public static void setupTimestampKeygenOptions(Configuration conf, DataType fieldType) { + if (conf.contains(FlinkOptions.KEYGEN_CLASS_NAME)) { + // the keygen clazz has been set up explicitly, skipping + return; + } + + conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, TimestampBasedAvroKeyGenerator.class.getName()); + LOG.info("Table option [{}] is reset to {} because datetime partitioning turns on", + FlinkOptions.KEYGEN_CLASS_NAME.key(), TimestampBasedAvroKeyGenerator.class.getName()); + if (DataTypeUtils.isTimestampType(fieldType)) { + int precision = DataTypeUtils.precision(fieldType.getLogicalType()); + if (precision == 0) { + // seconds + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, + TimestampBasedAvroKeyGenerator.TimestampType.UNIX_TIMESTAMP.name()); + } else if (precision == 3) { + // milliseconds + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, + TimestampBasedAvroKeyGenerator.TimestampType.EPOCHMILLISECONDS.name()); + } + String outputPartitionFormat = + conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_HOUR); + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputPartitionFormat); + } else { + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_TYPE_FIELD_PROP, + TimestampBasedAvroKeyGenerator.TimestampType.SCALAR.name()); + conf.setString(KeyGeneratorOptions.Config.INPUT_TIME_UNIT, TimeUnit.DAYS.toString()); + + String outputPartitionFormat = + conf.getOptional(FlinkOptions.PARTITION_FORMAT).orElse(FlinkOptions.PARTITION_FORMAT_DAY); + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_DATE_FORMAT_PROP, outputPartitionFormat); + // the option is actually useless, it only works for validation + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_INPUT_DATE_FORMAT_PROP, + FlinkOptions.PARTITION_FORMAT_DAY); + } + conf.setString(KeyGeneratorOptions.Config.TIMESTAMP_OUTPUT_TIMEZONE_FORMAT_PROP, "UTC"); + } + + /** + * Sets up the compaction options from the table definition. + */ + private static void setupCompactionOptions(Configuration conf) { + int commitsToRetain = conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS); + int minCommitsToKeep = conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS); + if (commitsToRetain >= minCommitsToKeep) { + LOG.info("Table option [{}] is reset to {} to be greater than {}={},\n" + + "to avoid risk of missing data from few instants in incremental pull", + FlinkOptions.ARCHIVE_MIN_COMMITS.key(), commitsToRetain + 10, + FlinkOptions.CLEAN_RETAIN_COMMITS.key(), commitsToRetain); + conf.setInteger(FlinkOptions.ARCHIVE_MIN_COMMITS, commitsToRetain + 10); + conf.setInteger(FlinkOptions.ARCHIVE_MAX_COMMITS, commitsToRetain + 20); + } + } + + /** + * Sets up the hive options from the table definition. + */ + private static void setupHiveOptions(Configuration conf, ObjectIdentifier tablePath) { + if (!conf.contains(FlinkOptions.HIVE_SYNC_DB)) { + conf.setString(FlinkOptions.HIVE_SYNC_DB, tablePath.getDatabaseName()); + } + if (!conf.contains(FlinkOptions.HIVE_SYNC_TABLE)) { + conf.setString(FlinkOptions.HIVE_SYNC_TABLE, tablePath.getObjectName()); + } + } + + /** + * Sets up the read options from the table definition. + */ + private static void setupReadOptions(Configuration conf) { + if (!conf.getBoolean(FlinkOptions.READ_AS_STREAMING) + && (conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() + || conf.getOptional(FlinkOptions.READ_END_COMMIT).isPresent())) { + conf.setString(FlinkOptions.QUERY_TYPE, FlinkOptions.QUERY_TYPE_INCREMENTAL); + } + } + + /** + * Sets up the write options from the table definition. + */ + private static void setupWriteOptions(Configuration conf) { + if (FlinkOptions.isDefaultValueDefined(conf, FlinkOptions.OPERATION) + && OptionsResolver.isCowTable(conf)) { + conf.setBoolean(FlinkOptions.PRE_COMBINE, true); + } + } + + /** + * Inferences the deserialization Avro schema from the table schema (e.g. the DDL) + * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and + * {@link FlinkOptions#SOURCE_AVRO_SCHEMA} are not specified. + * + * @param conf The configuration + * @param rowType The specified table row type + */ + private static void inferAvroSchema(Configuration conf, LogicalType rowType) { + if (!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent() + && !conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) { + String inferredSchema = AvroSchemaConverter.convertToSchema(rowType).toString(); + conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema); + } + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java new file mode 100644 index 0000000000..173befd988 --- /dev/null +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.hudi.table; + +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.hudi.sink.utils.Pipelines; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; +import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; +import org.apache.flink.table.types.logical.RowType; +import org.apache.hudi.adapter.DataStreamSinkProviderAdapter; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.OptionsInference; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.util.ChangelogModes; + +import java.util.Map; + +/** + * Hoodie table sink. + * <p> + * Copy from org.apache.hudi:hudi-flink1.15-bundle:0.12.3 + */ +public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite { + + private final Configuration conf; + private final ResolvedSchema schema; + private boolean overwrite = false; + private final MetricOption metricOption; + + public HoodieTableSink(Configuration conf, ResolvedSchema schema, MetricOption metricOption) { + this.conf = conf; + this.schema = schema; + this.metricOption = metricOption; + + } + + public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean overwrite, MetricOption metricOption) { + this.conf = conf; + this.schema = schema; + this.overwrite = overwrite; + this.metricOption = metricOption; + } + + @Override + public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + return (DataStreamSinkProviderAdapter) dataStream -> { + + // setup configuration + long ckpTimeout = dataStream.getExecutionEnvironment() + .getCheckpointConfig().getCheckpointTimeout(); + conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout); + // set up default parallelism + OptionsInference.setupSinkTasks(conf, dataStream.getExecutionConfig().getParallelism()); + + RowType rowType = (RowType) schema.toSinkRowDataType().notNull().getLogicalType(); + + // bulk_insert mode + final String writeOperation = this.conf.get(FlinkOptions.OPERATION); + if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { + return Pipelines.bulkInsert(conf, rowType, dataStream, metricOption); + } + + // Append mode + if (OptionsResolver.isAppendMode(conf)) { + DataStream<Object> pipeline = + Pipelines.append(conf, rowType, dataStream, context.isBounded(), metricOption); + if (OptionsResolver.needsAsyncClustering(conf)) { + return Pipelines.cluster(conf, rowType, pipeline); + } else { + return Pipelines.dummySink(pipeline); + } + } + + DataStream<Object> pipeline; + // bootstrap + final DataStream<HoodieRecord> hoodieRecordDataStream = + Pipelines.bootstrap(conf, rowType, dataStream, context.isBounded(), overwrite); + // write pipeline + pipeline = Pipelines.hoodieStreamWrite(conf, hoodieRecordDataStream, metricOption); + // compaction + if (OptionsResolver.needsAsyncCompaction(conf)) { + // use synchronous compaction for bounded source. + if (context.isBounded()) { + conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); + } + return Pipelines.compact(conf, pipeline); + } else { + return Pipelines.clean(conf, pipeline); + } + }; + } + + @Override + public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { + if (conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) { + return ChangelogModes.FULL; + } else { + return ChangelogModes.UPSERT; + } + } + + @Override + public DynamicTableSink copy() { + return new HoodieTableSink(this.conf, this.schema, this.overwrite, this.metricOption); + } + + @Override + public String asSummaryString() { + return "HoodieTableSink"; + } + + @Override + public void applyStaticPartition(Map<String, String> partitions) { + // #applyOverwrite should have been invoked. + if (this.overwrite && partitions.size() > 0) { + this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE.value()); + } + } + + @Override + public void applyOverwrite(boolean overwrite) { + this.overwrite = overwrite; + // set up the operation as INSERT_OVERWRITE_TABLE first, + // if there are explicit partitions, #applyStaticPartition would overwrite the option. + this.conf.setString(FlinkOptions.OPERATION, WriteOperationType.INSERT_OVERWRITE_TABLE.value()); + } +} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java deleted file mode 100644 index d2985ac5d4..0000000000 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/sink/HudiTableInlongFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.sort.hudi.table.sink; - -import org.apache.flink.configuration.ConfigOption; -import org.apache.flink.table.connector.sink.DynamicTableSink; -import org.apache.hudi.table.HoodieTableFactory; - -import java.util.Set; - -import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; -import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; - -public class HudiTableInlongFactory extends HoodieTableFactory { - - public static final String SORT_CONNECTOR_IDENTIFY_HUDI = "hudi-inlong"; - - public HudiTableInlongFactory() { - super(); - } - - @Override - public String factoryIdentifier() { - return SORT_CONNECTOR_IDENTIFY_HUDI; - } - - @Override - public DynamicTableSink createDynamicTableSink(Context context) { - return super.createDynamicTableSink(context); - } - - @Override - public Set<ConfigOption<?>> optionalOptions() { - Set<ConfigOption<?>> configOptions = super.optionalOptions(); - configOptions.add(INLONG_METRIC); - configOptions.add(INLONG_AUDIT); - return configOptions; - } - -} diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 709a02e60c..d8a0e68a77 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -org.apache.inlong.sort.hudi.table.sink.HudiTableInlongFactory +org.apache.inlong.sort.hudi.table.HoodieTableFactory diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index e3a995cc65..7be72aace2 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -880,6 +880,19 @@ License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE Source : com.ververica:flink-connector-mysql-cdc:2.3.0 (Please note that the software have been modified.) License : https://github.com/ververica/flink-cdc-connectors/blob/master/LICENSE +1.3.26 inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteFunction.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/append/AppendWriteOperator.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteFunction.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/bulk/BulkInsertWriteOperator.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/utils/Pipelines.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteFunction.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/sink/StreamWriteOperator.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableFactory.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HoodieTableSink.java + inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/hudi/src/main/java/org/apache/inlong/sort/hudi/table/HudiTableInlongFactory.java +Source : org.apache.hudi:hudi-flink1.15-bundle:0.12.3 (Please note that the software have been modified.) +License : https://github.com/apache/hudi/blob/master/LICENSE + =======================================================================