This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f0538163a [INLONG-7197][Sort] Iceberg connector supports keyby with the primary key (#7217) f0538163a is described below commit f0538163a7e199ece22f8b29d25a674d03eb0a3f Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Fri Jan 13 15:59:45 2023 +0800 [INLONG-7197][Sort] Iceberg connector supports keyby with the primary key (#7217) --- .../iceberg/sink/EqualityFieldKeySelector.java | 90 +++++++++ .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 212 +++++++++++++-------- licenses/inlong-sort-connectors/LICENSE | 1 + 3 files changed, 225 insertions(+), 78 deletions(-) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java new file mode 100644 index 000000000..8feaa6a4c --- /dev/null +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink; + +import java.util.List; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.util.StructLikeWrapper; +import org.apache.iceberg.util.StructProjection; + +/** + * Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record + * will be emitted to same writer in order. + * `EqualityFieldKeySelector` is copied from https://github.com/apache/iceberg/blob/e2bb9ad7e792efca419fa7c4a1afde7c4c44fa01/flink/v1.13/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java#L36 + */ +class EqualityFieldKeySelector implements KeySelector<RowData, Integer> { + + private final Schema schema; + private final RowType flinkSchema; + private final Schema deleteSchema; + + private transient RowDataWrapper rowDataWrapper; + private transient StructProjection structProjection; + private transient StructLikeWrapper structLikeWrapper; + + EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds) { + this.schema = schema; + this.flinkSchema = flinkSchema; + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + } + + /** + * Construct the {@link RowDataWrapper} lazily here because few members in it are not + * serializable. In this way, we don't have to serialize them with forcing. + */ + protected RowDataWrapper lazyRowDataWrapper() { + if (rowDataWrapper == null) { + rowDataWrapper = new RowDataWrapper(flinkSchema, schema.asStruct()); + } + return rowDataWrapper; + } + + /** + * Construct the {@link StructProjection} lazily because it is not serializable. + */ + protected StructProjection lazyStructProjection() { + if (structProjection == null) { + structProjection = StructProjection.create(schema, deleteSchema); + } + return structProjection; + } + + /** + * Construct the {@link StructLikeWrapper} lazily because it is not serializable. + */ + protected StructLikeWrapper lazyStructLikeWrapper() { + if (structLikeWrapper == null) { + structLikeWrapper = StructLikeWrapper.forType(deleteSchema.asStruct()); + } + return structLikeWrapper; + } + + @Override + public Integer getKey(RowData row) { + RowDataWrapper wrappedRowData = lazyRowDataWrapper().wrap(row); + StructProjection projectedRowData = lazyStructProjection().wrap(wrappedRowData); + StructLikeWrapper wrapper = lazyStructLikeWrapper().set(projectedRowData); + return wrapper.hashCode(); + } +} diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java index b0ed02abe..2fc1ca13d 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java @@ -17,9 +17,12 @@ package org.apache.inlong.sort.iceberg.sink; +import java.util.Set; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -32,7 +35,6 @@ import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.Row; import org.apache.iceberg.DataFile; import org.apache.iceberg.DistributionMode; -import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -42,12 +44,17 @@ import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.FlinkWriteConf; +import org.apache.iceberg.flink.FlinkWriteOptions; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.flink.util.FlinkCompatibilityUtil; import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; import org.apache.inlong.sort.base.dirty.DirtyOptions; @@ -68,18 +75,12 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.function.Function; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; import static org.apache.iceberg.TableProperties.UPSERT_ENABLED; import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE; -import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE_NONE; -import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; -import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 @@ -155,13 +156,13 @@ public class FlinkSink { private Table table; private TableSchema tableSchema; private ActionsProvider actionProvider; - private boolean overwrite = false; private boolean appendMode = false; - private DistributionMode distributionMode = null; private Integer writeParallelism = null; - private boolean upsert = false; private List<String> equalityFieldColumns = null; private String uidPrefix = null; + private ReadableConfig readableConfig = new Configuration(); + private final Map<String, String> writeOptions = Maps.newHashMap(); + private FlinkWriteConf flinkWriteConf = null; private String inlongMetric = null; private String auditHostAndPorts = null; private CatalogLoader catalogLoader = null; @@ -248,7 +249,7 @@ public class FlinkSink { } public Builder overwrite(boolean newOverwrite) { - this.overwrite = newOverwrite; + writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite)); return this; } @@ -304,9 +305,12 @@ public class FlinkSink { * @return {@link Builder} to connect the iceberg table. */ public Builder distributionMode(DistributionMode mode) { - Preconditions.checkArgument(!DistributionMode.RANGE.equals(mode), + Preconditions.checkArgument( + !DistributionMode.RANGE.equals(mode), "Flink does not support 'range' write distribution mode now."); - this.distributionMode = mode; + if (mode != null) { + writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName()); + } return this; } @@ -331,7 +335,7 @@ public class FlinkSink { * @return {@link Builder} to connect the iceberg table. */ public Builder upsert(boolean enabled) { - this.upsert = enabled; + writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled)); return this; } @@ -394,15 +398,21 @@ public class FlinkSink { } } + flinkWriteConf = new FlinkWriteConf(table, writeOptions, readableConfig); + + // Find out the equality field id list based on the user-provided equality field column names. + List<Integer> equalityFieldIds = checkAndGetEqualityFieldIds(); + // Convert the requested flink table schema to flink row type. RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema); // Distribute the records from input data stream based on the write.distribution-mode. DataStream<RowData> distributeStream = distributeDataStream( - rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType); + rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType); // Add parallel writers that append rows to files - SingleOutputStreamOperator<WriteResult> writerStream = appendWriter(distributeStream, flinkRowType); + SingleOutputStreamOperator<WriteResult> writerStream = + appendWriter(distributeStream, flinkRowType, equalityFieldIds); // Add single-parallelism committer that commits files // after successful checkpoint or end of input @@ -453,6 +463,35 @@ public class FlinkSink { return uidPrefix != null ? uidPrefix + "-" + suffix : suffix; } + @VisibleForTesting + List<Integer> checkAndGetEqualityFieldIds() { + List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds()); + if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) { + Set<Integer> equalityFieldSet = + Sets.newHashSetWithExpectedSize(equalityFieldColumns.size()); + for (String column : equalityFieldColumns) { + org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column); + Preconditions.checkNotNull( + field, + "Missing required equality field column '%s' in table schema %s", + column, + table.schema()); + equalityFieldSet.add(field.fieldId()); + } + + if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) { + LOG.warn( + "The configured equality field column IDs {} are not matched with the schema identifier " + + "field IDs {}, use job specified equality field columns as the equality fields " + + "by default.", + equalityFieldSet, + table.schema().identifierFieldIds()); + } + equalityFieldIds = Lists.newArrayList(equalityFieldSet); + } + return equalityFieldIds; + } + @SuppressWarnings("unchecked") private <T> DataStreamSink<T> appendDummySink(SingleOutputStreamOperator<Void> committerStream) { DataStreamSink<T> resultStream = committerStream @@ -469,7 +508,10 @@ public class FlinkSink { private SingleOutputStreamOperator<Void> appendCommitter(SingleOutputStreamOperator<WriteResult> writerStream) { IcebergProcessOperator<WriteResult, Void> filesCommitter = new IcebergProcessOperator<>( new IcebergSingleFileCommiter( - TableIdentifier.of(table.name()), tableLoader, overwrite, actionProvider)); + TableIdentifier.of(table.name()), + tableLoader, + flinkWriteConf.overwriteMode(), + actionProvider)); SingleOutputStreamOperator<Void> committerStream = writerStream .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter) .setParallelism(1) @@ -483,7 +525,8 @@ public class FlinkSink { private SingleOutputStreamOperator<Void> appendMultipleCommitter( SingleOutputStreamOperator<MultipleWriteResult> writerStream) { IcebergProcessOperator<MultipleWriteResult, Void> multipleFilesCommiter = - new IcebergProcessOperator<>(new IcebergMultipleFilesCommiter(catalogLoader, overwrite)); + new IcebergProcessOperator<>(new IcebergMultipleFilesCommiter(catalogLoader, + flinkWriteConf.overwriteMode())); SingleOutputStreamOperator<Void> committerStream = writerStream .transform(operatorName(ICEBERG_MULTIPLE_FILES_COMMITTER_NAME), Types.VOID, multipleFilesCommiter) .setParallelism(1) @@ -494,26 +537,16 @@ public class FlinkSink { return committerStream; } - private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType) { - // Find out the equality field id list based on the user-provided equality field column names. - List<Integer> equalityFieldIds = Lists.newArrayList(); - if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) { - for (String column : equalityFieldColumns) { - org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column); - Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", - column, table.schema()); - equalityFieldIds.add(field.fieldId()); - } - } - + private SingleOutputStreamOperator<WriteResult> appendWriter(DataStream<RowData> input, RowType flinkRowType, + List<Integer> equalityFieldIds) { // Fallback to use upsert mode parsed from table properties if don't specify in job level. // Only if not appendMode, upsert can be valid. - boolean upsertMode = (upsert || PropertyUtil.propertyAsBoolean(table.properties(), + boolean upsertMode = (flinkWriteConf.upsertMode() || PropertyUtil.propertyAsBoolean(table.properties(), UPSERT_ENABLED, UPSERT_ENABLED_DEFAULT)) && !appendMode; // Validate the equality fields and partition fields if we enable the upsert mode. if (upsertMode) { - Preconditions.checkState(!overwrite, + Preconditions.checkState(!flinkWriteConf.overwriteMode(), "OVERWRITE mode shouldn't be enable when configuring to use UPSERT data stream."); Preconditions.checkState(!equalityFieldIds.isEmpty(), "Equality field columns shouldn't be empty when configuring to use UPSERT data stream."); @@ -527,8 +560,8 @@ public class FlinkSink { } IcebergProcessOperator<RowData, WriteResult> streamWriter = createStreamWriter( - table, flinkRowType, equalityFieldIds, upsertMode, appendMode, inlongMetric, - auditHostAndPorts, dirtyOptions, dirtySink); + table, flinkRowType, equalityFieldIds, flinkWriteConf, + appendMode, inlongMetric, auditHostAndPorts, dirtyOptions, dirtySink); int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism; SingleOutputStreamOperator<WriteResult> writerStream = input @@ -570,43 +603,75 @@ public class FlinkSink { return writerStream; } - private DataStream<RowData> distributeDataStream(DataStream<RowData> input, - Map<String, String> properties, + private DataStream<RowData> distributeDataStream( + DataStream<RowData> input, + List<Integer> equalityFieldIds, PartitionSpec partitionSpec, Schema iSchema, RowType flinkRowType) { - DistributionMode writeMode; - if (distributionMode == null) { - // Fallback to use distribution mode parsed from table properties if don't specify in job level. - String modeName = PropertyUtil.propertyAsString(properties, - WRITE_DISTRIBUTION_MODE, - WRITE_DISTRIBUTION_MODE_NONE); - - writeMode = DistributionMode.fromName(modeName); - } else { - writeMode = distributionMode; - } - + DistributionMode writeMode = flinkWriteConf.distributionMode(); + LOG.info("Write distribution mode is '{}'", writeMode.modeName()); switch (writeMode) { case NONE: - return input; + if (equalityFieldIds.isEmpty()) { + return input; + } else { + LOG.info("Distribute rows by equality fields, because there are equality fields set"); + return input.keyBy( + new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); + } case HASH: - if (partitionSpec.isUnpartitioned()) { - return input; + if (equalityFieldIds.isEmpty()) { + if (partitionSpec.isUnpartitioned()) { + LOG.warn( + "Fallback to use 'none' distribution mode, because there are no equality fields " + + "set and table is unpartitioned"); + return input; + } else { + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } } else { - return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + if (partitionSpec.isUnpartitioned()) { + LOG.info( + "Distribute rows by equality fields, because there are equality fields set " + + "and table is unpartitioned"); + return input.keyBy( + new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); + } else { + for (PartitionField partitionField : partitionSpec.fields()) { + Preconditions.checkState( + equalityFieldIds.contains(partitionField.sourceId()), + "In 'hash' distribution mode with equality fields set, partition field '%s' " + + "should be included in equality fields: '%s'", + partitionField, + equalityFieldColumns); + } + return input.keyBy(new PartitionKeySelector(partitionSpec, iSchema, flinkRowType)); + } } case RANGE: - LOG.warn("Fallback to use 'none' distribution mode, because {}={} is not supported in flink now", - WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()); - return input; + if (equalityFieldIds.isEmpty()) { + LOG.warn( + "Fallback to use 'none' distribution mode, because there are no equality fields set " + + "and {}=range is not supported yet in flink", + WRITE_DISTRIBUTION_MODE); + return input; + } else { + LOG.info( + "Distribute rows by equality fields, because there are equality fields set " + + "and{}=range is not supported yet in flink", + WRITE_DISTRIBUTION_MODE); + return input.keyBy( + new EqualityFieldKeySelector(iSchema, flinkRowType, equalityFieldIds)); + } default: - throw new RuntimeException("Unrecognized write.distribution-mode: " + writeMode); + throw new RuntimeException("Unrecognized " + WRITE_DISTRIBUTION_MODE + ": " + writeMode); } } + } static RowType toFlinkRowType(Schema schema, TableSchema requestedSchema) { @@ -629,37 +694,28 @@ public class FlinkSink { static IcebergProcessOperator<RowData, WriteResult> createStreamWriter(Table table, RowType flinkRowType, List<Integer> equalityFieldIds, - boolean upsert, + FlinkWriteConf flinkWriteConf, boolean appendMode, String inlongMetric, String auditHostAndPorts, DirtyOptions dirtyOptions, @Nullable DirtySink<Object> dirtySink) { - // flink A, iceberg a Preconditions.checkArgument(table != null, "Iceberg table should't be null"); - Map<String, String> props = table.properties(); - long targetFileSize = getTargetFileSizeBytes(props); - FileFormat fileFormat = getFileFormat(props); Table serializableTable = SerializableTable.copyOf(table); - TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory( - serializableTable, serializableTable.schema(), flinkRowType, targetFileSize, - fileFormat, equalityFieldIds, upsert, appendMode); - // Set null for flinkRowType of IcebergSingleStreamWriter - // to avoid frequent Field.Getter creation in dirty data sink. - return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>( - table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts, - null, dirtyOptions, dirtySink)); - } + TaskWriterFactory<RowData> taskWriterFactory = + new RowDataTaskWriterFactory( + serializableTable, + serializableTable.schema(), + flinkRowType, + flinkWriteConf.targetDataFileSize(), + flinkWriteConf.dataFileFormat(), + equalityFieldIds, + flinkWriteConf.upsertMode(), + appendMode); - private static FileFormat getFileFormat(Map<String, String> properties) { - String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH)); + return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>( + table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts, null, dirtyOptions, dirtySink)); } - private static long getTargetFileSizeBytes(Map<String, String> properties) { - return PropertyUtil.propertyAsLong(properties, - WRITE_TARGET_FILE_SIZE_BYTES, - WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); - } } diff --git a/licenses/inlong-sort-connectors/LICENSE b/licenses/inlong-sort-connectors/LICENSE index 693c6f5f8..a362ffeee 100644 --- a/licenses/inlong-sort-connectors/LICENSE +++ b/licenses/inlong-sort-connectors/LICENSE @@ -521,6 +521,7 @@ inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/DeltaManifestsSerializer.java inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkManifestUtil.java inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java + inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/EqualityFieldKeySelector.java inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergFilesCommitter.java inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/ManifestOutputFileFactory.java