This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6d4aca310 [INLONG-7653][Sort] Support archiving dirty data and metrics for Iceberg connector (#7654) 6d4aca310 is described below commit 6d4aca310330578bc591ca90c1b25366425f8832 Author: Liao Rui <liao...@users.noreply.github.com> AuthorDate: Fri Mar 24 18:01:41 2023 +0800 [INLONG-7653][Sort] Support archiving dirty data and metrics for Iceberg connector (#7654) --- .../sort/iceberg/FlinkDynamicTableFactory.java | 23 +++ .../inlong/sort/iceberg/IcebergTableSink.java | 9 + .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 34 +++- .../sink/multiple/DynamicSchemaHandleOperator.java | 207 +++++++++++++-------- .../multiple/IcebergMultipleFilesCommiter.java | 16 +- .../sink/multiple/IcebergSingleFileCommiter.java | 16 +- .../sink/multiple/IcebergSingleStreamWriter.java | 1 - .../iceberg/sink/multiple/RecordWithSchema.java | 4 + 8 files changed, 214 insertions(+), 96 deletions(-) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java index c6d3ed874..32d3a21bd 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java @@ -34,6 +34,7 @@ import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.Preconditions; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.common.DynMethods; @@ -112,6 +113,25 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami .orNoop() .build(); + public static final ConfigOption<Boolean> WRITE_COMPACT_ENABLE = + ConfigOptions.key("write.compact.enable") + .booleanType() + .defaultValue(false) + .withDescription("Whether to enable compact small file."); + + public static final ConfigOption<Integer> WRITE_COMPACT_INTERVAL = + ConfigOptions.key("write.compact.snapshot.interval") + .intType() + .defaultValue(20) + .withDescription("Compact snapshot interval."); + + public static final ConfigOption<String> WRITE_DISTRIBUTION_MODE = + ConfigOptions.key(TableProperties.WRITE_DISTRIBUTION_MODE) + .stringType() + .defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_NONE) + .withDescription("Distribute the records from input data stream based " + + "on the write.distribution-mode."); + private final FlinkCatalog catalog; public FlinkDynamicTableFactory() { @@ -274,6 +294,9 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY); options.add(SINK_MULTIPLE_PK_AUTO_GENERATED); options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK); + options.add(WRITE_COMPACT_ENABLE); + options.add(WRITE_COMPACT_INTERVAL); + options.add(WRITE_DISTRIBUTION_MODE); return options; } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java index ef93e1f6f..cd239633e 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java @@ -29,6 +29,7 @@ import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite; import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.TableLoader; @@ -54,6 +55,7 @@ import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERA import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK; +import static org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_DISTRIBUTION_MODE; /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 @@ -131,6 +133,11 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, .withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY)) .withPkAutoGenerated(tableOptions.get(SINK_MULTIPLE_PK_AUTO_GENERATED)) .build()) + .dirtyOptions(dirtyOptions) + .dirtySink(dirtySink) + .action(actionsProvider) + .tableOptions(tableOptions) + .distributionMode(DistributionMode.fromName(tableOptions.get(WRITE_DISTRIBUTION_MODE))) .append(); } else { return (DataStreamSinkProvider) dataStream -> FlinkSink.forRowData(dataStream) @@ -143,6 +150,8 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, .dirtyOptions(dirtyOptions) .dirtySink(dirtySink) .action(actionsProvider) + .tableOptions(tableOptions) + .distributionMode(DistributionMode.fromName(tableOptions.get(WRITE_DISTRIBUTION_MODE))) .append(); } } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java index 0fd079f59..9fb259b39 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java @@ -158,7 +158,9 @@ public class FlinkSink { private ActionsProvider actionProvider; private boolean overwrite = false; private boolean appendMode = false; + private DistributionMode distributionMode = null; private Integer writeParallelism = null; + private boolean upsert = false; private List<String> equalityFieldColumns = null; private String uidPrefix = null; private ReadableConfig readableConfig = new Configuration(); @@ -171,6 +173,7 @@ public class FlinkSink { private MultipleSinkOption multipleSinkOption = null; private DirtyOptions dirtyOptions; private @Nullable DirtySink<Object> dirtySink; + private ReadableConfig tableOptions; private Builder() { } @@ -255,6 +258,11 @@ public class FlinkSink { return this; } + public Builder flinkConf(ReadableConfig config) { + this.readableConfig = config; + return this; + } + /** * The appendMode properties is used to insert data without equality field columns. * @@ -277,6 +285,11 @@ public class FlinkSink { return this; } + public FlinkSink.Builder tableOptions(ReadableConfig tableOptions) { + this.tableOptions = tableOptions; + return this; + } + /** * Add metric output for iceberg writer * @param inlongMetric @@ -311,6 +324,7 @@ public class FlinkSink { !DistributionMode.RANGE.equals(mode), "Flink does not support 'range' write distribution mode now."); if (mode != null) { + this.distributionMode = mode; writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName()); } return this; @@ -337,6 +351,7 @@ public class FlinkSink { * @return {@link Builder} to connect the iceberg table. */ public Builder upsert(boolean enabled) { + this.upsert = enabled; writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled)); return this; } @@ -408,9 +423,11 @@ public class FlinkSink { // Convert the requested flink table schema to flink row type. RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema); - // Distribute the records from input data stream based on the write.distribution-mode. - DataStream<RowData> distributeStream = distributeDataStream( - rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType); + // Distribute the records from input data stream based on the write.distribution-mode and + // equality fields. + DataStream<RowData> distributeStream = + distributeDataStream( + rowDataInput, equalityFieldIds, table.spec(), table.schema(), flinkRowType); // Add parallel writers that append rows to files SingleOutputStreamOperator<WriteResult> writerStream = @@ -513,7 +530,8 @@ public class FlinkSink { TableIdentifier.of(table.name()), tableLoader, flinkWriteConf.overwriteMode(), - actionProvider)); + actionProvider, + tableOptions)); SingleOutputStreamOperator<Void> committerStream = writerStream .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter) .setParallelism(1) @@ -527,7 +545,8 @@ public class FlinkSink { private SingleOutputStreamOperator<Void> appendMultipleCommitter( SingleOutputStreamOperator<MultipleWriteResult> writerStream) { IcebergProcessOperator<MultipleWriteResult, Void> multipleFilesCommiter = - new IcebergProcessOperator<>(new IcebergMultipleFilesCommiter(catalogLoader, overwrite)); + new IcebergProcessOperator<>(new IcebergMultipleFilesCommiter(catalogLoader, overwrite, + actionProvider, tableOptions)); SingleOutputStreamOperator<Void> committerStream = writerStream .transform(operatorName(ICEBERG_MULTIPLE_FILES_COMMITTER_NAME), Types.VOID, multipleFilesCommiter) .setParallelism(1) @@ -561,8 +580,8 @@ public class FlinkSink { } IcebergProcessOperator<RowData, WriteResult> streamWriter = createStreamWriter( - table, flinkRowType, equalityFieldIds, flinkWriteConf, - appendMode, inlongMetric, auditHostAndPorts, dirtyOptions, dirtySink); + table, flinkRowType, equalityFieldIds, flinkWriteConf, appendMode, inlongMetric, + auditHostAndPorts, dirtyOptions, dirtySink); int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism; SingleOutputStreamOperator<WriteResult> writerStream = input @@ -701,6 +720,7 @@ public class FlinkSink { String auditHostAndPorts, DirtyOptions dirtyOptions, @Nullable DirtySink<Object> dirtySink) { + // flink A, iceberg a Preconditions.checkArgument(table != null, "Iceberg table should't be null"); Table serializableTable = SerializableTable.copyOf(table); diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index 12914098e..f2660c5aa 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; @@ -43,8 +44,8 @@ import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types.NestedField; -import org.apache.inlong.sort.base.dirty.DirtyData; import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.DirtySinkHelper; import org.apache.inlong.sort.base.dirty.DirtyType; import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat; @@ -54,6 +55,7 @@ import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData; import org.apache.inlong.sort.base.sink.MultipleSinkOption; +import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy; import org.apache.inlong.sort.base.sink.TableChange; import org.apache.inlong.sort.base.sink.TableChange.AddColumn; import org.apache.inlong.sort.base.util.MetricStateUtils; @@ -61,14 +63,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import javax.ws.rs.NotSupportedException; import java.io.Closeable; import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; @@ -99,8 +104,10 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi // schema cache private transient Map<TableIdentifier, Schema> schemaCache; - private final DirtyOptions dirtyOptions; - private @Nullable final DirtySink<Object> dirtySink; + // blacklist to filter schema update failed table + private transient Set<TableIdentifier> blacklist; + + private final DirtySinkHelper<Object> dirtySinkHelper; // metric private final String inlongMetric; @@ -110,14 +117,16 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi private transient MetricState metricState; public DynamicSchemaHandleOperator(CatalogLoader catalogLoader, - MultipleSinkOption multipleSinkOption, DirtyOptions dirtyOptions, - @Nullable DirtySink<Object> dirtySink, String inlongMetric, String auditHostAndPorts) { + MultipleSinkOption multipleSinkOption, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink, + String inlongMetric, + String auditHostAndPorts) { this.catalogLoader = catalogLoader; this.multipleSinkOption = multipleSinkOption; - this.dirtyOptions = dirtyOptions; - this.dirtySink = dirtySink; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; + this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink); } @SuppressWarnings("unchecked") @@ -137,6 +146,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi this.recordQueues = new HashMap<>(); this.schemaCache = new HashMap<>(); + this.blacklist = new HashSet<>(); // Initialize metric MetricOption metricOption = MetricOption.builder() @@ -151,6 +161,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi if (metricOption != null) { metricData = new SinkTableMetricData(metricOption, getRuntimeContext().getMetricGroup()); } + this.dirtySinkHelper.open(new Configuration()); } @Override @@ -169,18 +180,25 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi } catch (Exception e) { LOGGER.error(String.format("Deserialize error, raw data: %s", new String(element.getValue().getBinary(0))), e); - handleDirtyData(new String(element.getValue().getBinary(0)), - null, DirtyType.DESERIALIZE_ERROR, e, TableIdentifier.of("unknow", "unknow")); + if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == multipleSinkOption.getSchemaUpdatePolicy()) { + handleDirtyData(new String(element.getValue().getBinary(0)), + null, DirtyType.DESERIALIZE_ERROR, e, TableIdentifier.of("unknow", "unknow")); + } + return; } TableIdentifier tableId = null; try { tableId = parseId(jsonNode); } catch (Exception e) { LOGGER.error(String.format("Table identifier parse error, raw data: %s", jsonNode), e); - handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, - e, TableIdentifier.of("unknow", "unknow")); + if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == multipleSinkOption.getSchemaUpdatePolicy()) { + handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, e, + TableIdentifier.of("unknow", "unknow")); + } + } + if (blacklist.contains(tableId)) { + return; } - boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode); if (isDDL) { execDDL(jsonNode, tableId); @@ -189,51 +207,59 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi } } + private void handleDirtyDataOfLogWithIgnore(JsonNode jsonNode, Schema dataSchema, + TableIdentifier tableId, Exception e) { + List<RowData> rowDataForDataSchemaList = Collections.emptyList(); + try { + rowDataForDataSchemaList = dynamicSchemaFormat + .extractRowData(jsonNode, FlinkSchemaUtil.convert(dataSchema)); + } catch (Throwable ee) { + LOG.error("extractRowData {} failed!", jsonNode, ee); + } + + for (RowData rowData : rowDataForDataSchemaList) { + DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions(); + if (!dirtyOptions.ignoreDirty()) { + if (metricData != null) { + metricData.outputDirtyMetricsWithEstimate(tableId.namespace().toString(), + null, tableId.name(), rowData.toString()); + } + } else { + handleDirtyData(rowData.toString(), jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId); + } + } + } + private void handleDirtyData(Object dirtyData, JsonNode rootNode, DirtyType dirtyType, Exception e, TableIdentifier tableId) { - if (!dirtyOptions.ignoreDirty()) { - RuntimeException ex; - if (e instanceof RuntimeException) { - ex = (RuntimeException) e; - } else { - ex = new RuntimeException(e); - } - throw ex; - } - if (dirtySink != null) { - DirtyData.Builder<Object> builder = DirtyData.builder(); + DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions(); + if (rootNode != null) { try { - builder.setData(dirtyData) - .setDirtyType(dirtyType) - .setDirtyMessage(e.getMessage()); - if (rootNode != null) { - builder.setLabels(dynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels())) - .setLogTag(dynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag())) - .setIdentifier(dynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier())); - } else { - builder.setLabels(dirtyOptions.getLabels()) - .setLogTag(dirtyOptions.getLogTag()) - .setIdentifier(dirtyOptions.getIdentifier()); - } - dirtySink.invoke(builder.build()); - if (metricData != null) { - metricData.outputDirtyMetricsWithEstimate( - tableId.namespace().toString(), null, tableId.name(), dirtyData); - } + String dirtyLabel = dynamicSchemaFormat.parse(rootNode, + DirtySinkHelper.regexReplace(dirtyOptions.getLabels(), DirtyType.BATCH_LOAD_ERROR, null)); + String dirtyLogTag = dynamicSchemaFormat.parse(rootNode, + DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(), DirtyType.BATCH_LOAD_ERROR, null)); + String dirtyIdentifier = dynamicSchemaFormat.parse(rootNode, + DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(), DirtyType.BATCH_LOAD_ERROR, null)); + dirtySinkHelper.invoke(dirtyData, dirtyType, dirtyLabel, dirtyLogTag, dirtyIdentifier, e); } catch (Exception ex) { - if (!dirtyOptions.ignoreSideOutputErrors()) { - throw new RuntimeException(ex); - } - LOG.warn("Dirty sink failed", ex); + throw new RuntimeException(ex); } + } else { + dirtySinkHelper.invoke(dirtyData, dirtyType, dirtyOptions.getLabels(), dirtyOptions.getLogTag(), + dirtyOptions.getIdentifier(), e); + } + if (metricData != null) { + metricData.outputDirtyMetricsWithEstimate(tableId.namespace().toString(), null, tableId.name(), dirtyData); } } @Override public void onProcessingTime(long timestamp) { + LOG.info("Black list table: {} at time {}.", blacklist, timestamp); processingTimeService.registerTimer( processingTimeService.getCurrentProcessingTime() + HELPER_DEBUG_INTERVEL, this); } @@ -280,7 +306,22 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi return v; }); if (schema == null) { - handleTableCreateEventFromOperator(record.getTableId(), dataSchema); + try { + handleTableCreateEventFromOperator(record.getTableId(), dataSchema); + } catch (Exception e) { + LOGGER.error("Table create error, tableId: {}, schema: {}", record.getTableId(), dataSchema); + if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == multipleSinkOption + .getSchemaUpdatePolicy()) { + handleDirtyDataOfLogWithIgnore(jsonNode, dataSchema, tableId, e); + } else if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == multipleSinkOption + .getSchemaUpdatePolicy()) { + blacklist.add(tableId); + } else { + LOGGER.error("Table create error, tableId: {}, schema: {}, schemaUpdatePolicy: {}", + record.getTableId(), dataSchema, multipleSinkOption.getSchemaUpdatePolicy(), e); + throw e; + } + } } else { handleSchemaInfoEvent(record.getTableId(), schema); } @@ -302,26 +343,43 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi try { return dynamicSchemaFormat.extractRowData(jsonNode, FlinkSchemaUtil.convert(schema1)); } catch (Exception e) { - LOG.warn("Ignore table {} schema change, old: {} new: {}.", - tableId, dataSchema, latestSchema, e); - try { - List<RowData> rowDataForDataSchemaList = - dynamicSchemaFormat.extractRowData(jsonNode, - FlinkSchemaUtil.convert(dataSchema)); - for (RowData rowData : rowDataForDataSchemaList) { - handleDirtyData(rowData.toString(), jsonNode, - DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId); - } - } catch (Exception ee) { - LOG.error("handleDirtyData {} failed!", jsonNode); + if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == multipleSinkOption + .getSchemaUpdatePolicy()) { + handleDirtyDataOfLogWithIgnore(jsonNode, dataSchema, tableId, e); + } else if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == multipleSinkOption + .getSchemaUpdatePolicy()) { + blacklist.add(tableId); + } else { + LOG.error("Table {} schema change, schemaUpdatePolicy:{} old: {} new: {}.", + tableId, multipleSinkOption.getSchemaUpdatePolicy(), dataSchema, + latestSchema, e); + throw e; } } return Collections.emptyList(); }); output.collect(new StreamRecord<>(recordWithSchema)); } else { - handldAlterSchemaEventFromOperator(tableId, latestSchema, dataSchema); - break; + if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == multipleSinkOption + .getSchemaUpdatePolicy()) { + RecordWithSchema recordWithSchema = queue.poll(); + handleDirtyDataOfLogWithIgnore(recordWithSchema.getOriginalData(), dataSchema, tableId, + new NotSupportedException( + String.format("SchemaUpdatePolicy %s does not support schema dynamic update!", + multipleSinkOption.getSchemaUpdatePolicy()))); + } else if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == multipleSinkOption + .getSchemaUpdatePolicy()) { + blacklist.add(tableId); + break; + } else if (SchemaUpdateExceptionPolicy.TRY_IT_BEST == multipleSinkOption + .getSchemaUpdatePolicy()) { + handldAlterSchemaEventFromOperator(tableId, latestSchema, dataSchema); + break; + } else { + throw new NotSupportedException( + String.format("SchemaUpdatePolicy %s does not support schema dynamic update!", + multipleSinkOption.getSchemaUpdatePolicy())); + } } } } @@ -340,6 +398,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi ImmutableMap.Builder<String, String> properties = ImmutableMap.builder(); properties.put("format-version", "2"); properties.put("write.upsert.enabled", "true"); + properties.put("write.metadata.metrics.default", "full"); // for hive visible properties.put("engine.hive.enabled", "true"); try { @@ -363,9 +422,12 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi Transaction transaction = table.newTransaction(); if (table.schema().sameSchema(oldSchema)) { List<TableChange> tableChanges = SchemaChangeUtils.diffSchema(oldSchema, newSchema); - if (!canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) { - // If can not handle this schema update, should not push data into next operator - return; + for (TableChange tableChange : tableChanges) { + if (!(tableChange instanceof AddColumn)) { + // todo:currently iceberg can only handle addColumn, so always return false + throw new UnsupportedOperationException( + String.format("Unsupported table %s schema change: %s.", tableId.toString(), tableChange)); + } } SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), tableChanges); LOG.info("Schema evolution in table({}) for table change: {}", tableId, tableChanges); @@ -401,26 +463,11 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi tableId, pkListStr); } catch (Exception e) { - handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e, tableId); + if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == multipleSinkOption.getSchemaUpdatePolicy()) { + handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e, tableId); + } } return null; } - private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId, List<TableChange> tableChanges) { - boolean canHandle = true; - for (TableChange tableChange : tableChanges) { - canHandle &= MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange, - multipleSinkOption.getSchemaUpdatePolicy()); - if (!(tableChange instanceof AddColumn)) { - // todo:currently iceberg can only handle addColumn, so always return false - LOG.info("Ignore table {} schema change: {} because iceberg can't handle it.", - tableId, tableChange); - canHandle = false; - } - if (!canHandle) { - break; - } - } - return canHandle; - } } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java index 19907e274..fee48eadd 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java @@ -19,10 +19,12 @@ package org.apache.inlong.sort.iceberg.sink.multiple; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.TableLoader; @@ -40,20 +42,26 @@ public class IcebergMultipleFilesCommiter extends IcebergProcessFunction<Multipl private Map<TableIdentifier, IcebergSingleFileCommiter> multipleCommiters; private final CatalogLoader catalogLoader; private final boolean overwrite; + private final ActionsProvider actionsProvider; + private final ReadableConfig tableOptions; - public IcebergMultipleFilesCommiter(CatalogLoader catalogLoader, boolean overwrite) { + private transient FunctionInitializationContext functionInitializationContext; + + public IcebergMultipleFilesCommiter(CatalogLoader catalogLoader, boolean overwrite, ActionsProvider actionProvider, + ReadableConfig tableOptions) { this.catalogLoader = catalogLoader; this.overwrite = overwrite; + this.actionsProvider = actionProvider; + this.tableOptions = tableOptions; } - private transient FunctionInitializationContext functionInitializationContext; - @Override public void processElement(MultipleWriteResult value) throws Exception { TableIdentifier tableId = value.getTableId(); if (multipleCommiters.get(tableId) == null) { IcebergSingleFileCommiter commiter = new IcebergSingleFileCommiter( - tableId, TableLoader.fromCatalog(catalogLoader, value.getTableId()), overwrite, null); + tableId, TableLoader.fromCatalog(catalogLoader, value.getTableId()), overwrite, + actionsProvider, tableOptions); commiter.setup(getRuntimeContext(), collector, context); commiter.initializeState(functionInitializationContext); commiter.open(new Configuration()); diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java index 6c5e14fe7..f842ab711 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.io.SimpleVersionedSerialization; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -48,6 +49,7 @@ import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.inlong.sort.iceberg.FlinkActions; +import org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory; import org.apache.inlong.sort.iceberg.sink.DeltaManifests; import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer; import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil; @@ -118,17 +120,20 @@ public class IcebergSingleFileCommiter extends IcebergProcessFunction<WriteResul // compact file action private ActionsProvider flinkActions; private transient RewriteDataFiles compactAction; + private ReadableConfig tableOptions; public IcebergSingleFileCommiter( TableIdentifier tableId, TableLoader tableLoader, boolean replacePartitions, - ActionsProvider actionProvider) { + ActionsProvider actionProvider, + ReadableConfig tableOptions) { // Here must distinguish state descriptor with tableId, because all icebergSingleFileCommiter state in // one IcebergMultipleFilesCommiter use same StateStore. this.tableLoader = tableLoader; this.replacePartitions = replacePartitions; this.flinkActions = actionProvider; + this.tableOptions = tableOptions; this.jobIdDescriptor = new ListStateDescriptor<>( String.format("iceberg(%s)-flink-job-id", tableId.toString()), BasicTypeInfo.STRING_TYPE_INFO); this.stateDescriptor = buildStateDescriptor(tableId); @@ -142,10 +147,13 @@ public class IcebergSingleFileCommiter extends IcebergProcessFunction<WriteResul this.tableLoader.open(); this.table = tableLoader.loadTable(); + boolean writeCompactEnabelFromTO = this.tableOptions == null + ? false + : this.tableOptions.get(FlinkDynamicTableFactory.WRITE_COMPACT_ENABLE); // compact file - if (flinkActions != null - && PropertyUtil.propertyAsBoolean( - table.properties(), FlinkActions.COMPACT_ENABLED, FlinkActions.COMPACT_ENABLED_DEFAULT)) { + if (flinkActions != null && (PropertyUtil.propertyAsBoolean( + table.properties(), FlinkActions.COMPACT_ENABLED, FlinkActions.COMPACT_ENABLED_DEFAULT) + || writeCompactEnabelFromTO)) { compactAction = flinkActions.rewriteDataFiles(table); } maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10); diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java index d44927db1..11fd7624a 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java @@ -73,7 +73,6 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ private @Nullable RowType flinkRowType; private final DirtyOptions dirtyOptions; private @Nullable final DirtySink<Object> dirtySink; - private boolean multipleSink; public IcebergSingleStreamWriter( diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java index 5d330b0ca..01ca8dce7 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java @@ -70,6 +70,10 @@ public class RecordWithSchema { return data; } + public JsonNode getOriginalData() { + return originalData; + } + public Schema getSchema() { return schema; }