This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d4aca310 [INLONG-7653][Sort] Support archiving dirty data and metrics 
for Iceberg connector (#7654)
6d4aca310 is described below

commit 6d4aca310330578bc591ca90c1b25366425f8832
Author: Liao Rui <liao...@users.noreply.github.com>
AuthorDate: Fri Mar 24 18:01:41 2023 +0800

    [INLONG-7653][Sort] Support archiving dirty data and metrics for Iceberg 
connector (#7654)
---
 .../sort/iceberg/FlinkDynamicTableFactory.java     |  23 +++
 .../inlong/sort/iceberg/IcebergTableSink.java      |   9 +
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  34 +++-
 .../sink/multiple/DynamicSchemaHandleOperator.java | 207 +++++++++++++--------
 .../multiple/IcebergMultipleFilesCommiter.java     |  16 +-
 .../sink/multiple/IcebergSingleFileCommiter.java   |  16 +-
 .../sink/multiple/IcebergSingleStreamWriter.java   |   1 -
 .../iceberg/sink/multiple/RecordWithSchema.java    |   4 +
 8 files changed, 214 insertions(+), 96 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
index c6d3ed874..32d3a21bd 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.ActionsProvider;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.common.DynMethods;
@@ -112,6 +113,25 @@ public class FlinkDynamicTableFactory implements 
DynamicTableSinkFactory, Dynami
             .orNoop()
             .build();
 
+    public static final ConfigOption<Boolean> WRITE_COMPACT_ENABLE =
+            ConfigOptions.key("write.compact.enable")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether to enable compact small file.");
+
+    public static final ConfigOption<Integer> WRITE_COMPACT_INTERVAL =
+            ConfigOptions.key("write.compact.snapshot.interval")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("Compact snapshot interval.");
+
+    public static final ConfigOption<String> WRITE_DISTRIBUTION_MODE =
+            ConfigOptions.key(TableProperties.WRITE_DISTRIBUTION_MODE)
+                    .stringType()
+                    .defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_NONE)
+                    .withDescription("Distribute the records from input data 
stream based "
+                            + "on the write.distribution-mode.");
+
     private final FlinkCatalog catalog;
 
     public FlinkDynamicTableFactory() {
@@ -274,6 +294,9 @@ public class FlinkDynamicTableFactory implements 
DynamicTableSinkFactory, Dynami
         options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY);
         options.add(SINK_MULTIPLE_PK_AUTO_GENERATED);
         options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK);
+        options.add(WRITE_COMPACT_ENABLE);
+        options.add(WRITE_COMPACT_INTERVAL);
+        options.add(WRITE_DISTRIBUTION_MODE);
         return options;
     }
 
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
index ef93e1f6f..cd239633e 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.actions.ActionsProvider;
 import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.TableLoader;
@@ -54,6 +55,7 @@ import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_PK_AUTO_GENERA
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_SCHEMA_UPDATE_POLICY;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TABLE_PATTERN;
 import static 
org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK;
+import static 
org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory.WRITE_DISTRIBUTION_MODE;
 
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
@@ -131,6 +133,11 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
                             
.withSchemaUpdatePolicy(tableOptions.get(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY))
                             
.withPkAutoGenerated(tableOptions.get(SINK_MULTIPLE_PK_AUTO_GENERATED))
                             .build())
+                    .dirtyOptions(dirtyOptions)
+                    .dirtySink(dirtySink)
+                    .action(actionsProvider)
+                    .tableOptions(tableOptions)
+                    
.distributionMode(DistributionMode.fromName(tableOptions.get(WRITE_DISTRIBUTION_MODE)))
                     .append();
         } else {
             return (DataStreamSinkProvider) dataStream -> 
FlinkSink.forRowData(dataStream)
@@ -143,6 +150,8 @@ public class IcebergTableSink implements DynamicTableSink, 
SupportsPartitioning,
                     .dirtyOptions(dirtyOptions)
                     .dirtySink(dirtySink)
                     .action(actionsProvider)
+                    .tableOptions(tableOptions)
+                    
.distributionMode(DistributionMode.fromName(tableOptions.get(WRITE_DISTRIBUTION_MODE)))
                     .append();
         }
     }
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index 0fd079f59..9fb259b39 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -158,7 +158,9 @@ public class FlinkSink {
         private ActionsProvider actionProvider;
         private boolean overwrite = false;
         private boolean appendMode = false;
+        private DistributionMode distributionMode = null;
         private Integer writeParallelism = null;
+        private boolean upsert = false;
         private List<String> equalityFieldColumns = null;
         private String uidPrefix = null;
         private ReadableConfig readableConfig = new Configuration();
@@ -171,6 +173,7 @@ public class FlinkSink {
         private MultipleSinkOption multipleSinkOption = null;
         private DirtyOptions dirtyOptions;
         private @Nullable DirtySink<Object> dirtySink;
+        private ReadableConfig tableOptions;
 
         private Builder() {
         }
@@ -255,6 +258,11 @@ public class FlinkSink {
             return this;
         }
 
+        public Builder flinkConf(ReadableConfig config) {
+            this.readableConfig = config;
+            return this;
+        }
+
         /**
          * The appendMode properties is used to insert data without equality 
field columns.
          *
@@ -277,6 +285,11 @@ public class FlinkSink {
             return this;
         }
 
+        public FlinkSink.Builder tableOptions(ReadableConfig tableOptions) {
+            this.tableOptions = tableOptions;
+            return this;
+        }
+
         /**
          * Add metric output for iceberg writer
          * @param inlongMetric
@@ -311,6 +324,7 @@ public class FlinkSink {
                     !DistributionMode.RANGE.equals(mode),
                     "Flink does not support 'range' write distribution mode 
now.");
             if (mode != null) {
+                this.distributionMode = mode;
                 writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), 
mode.modeName());
             }
             return this;
@@ -337,6 +351,7 @@ public class FlinkSink {
          * @return {@link Builder} to connect the iceberg table.
          */
         public Builder upsert(boolean enabled) {
+            this.upsert = enabled;
             writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), 
Boolean.toString(enabled));
             return this;
         }
@@ -408,9 +423,11 @@ public class FlinkSink {
             // Convert the requested flink table schema to flink row type.
             RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
 
-            // Distribute the records from input data stream based on the 
write.distribution-mode.
-            DataStream<RowData> distributeStream = distributeDataStream(
-                    rowDataInput, equalityFieldIds, table.spec(), 
table.schema(), flinkRowType);
+            // Distribute the records from input data stream based on the 
write.distribution-mode and
+            // equality fields.
+            DataStream<RowData> distributeStream =
+                    distributeDataStream(
+                            rowDataInput, equalityFieldIds, table.spec(), 
table.schema(), flinkRowType);
 
             // Add parallel writers that append rows to files
             SingleOutputStreamOperator<WriteResult> writerStream =
@@ -513,7 +530,8 @@ public class FlinkSink {
                             TableIdentifier.of(table.name()),
                             tableLoader,
                             flinkWriteConf.overwriteMode(),
-                            actionProvider));
+                            actionProvider,
+                            tableOptions));
             SingleOutputStreamOperator<Void> committerStream = writerStream
                     .transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), 
Types.VOID, filesCommitter)
                     .setParallelism(1)
@@ -527,7 +545,8 @@ public class FlinkSink {
         private SingleOutputStreamOperator<Void> appendMultipleCommitter(
                 SingleOutputStreamOperator<MultipleWriteResult> writerStream) {
             IcebergProcessOperator<MultipleWriteResult, Void> 
multipleFilesCommiter =
-                    new IcebergProcessOperator<>(new 
IcebergMultipleFilesCommiter(catalogLoader, overwrite));
+                    new IcebergProcessOperator<>(new 
IcebergMultipleFilesCommiter(catalogLoader, overwrite,
+                            actionProvider, tableOptions));
             SingleOutputStreamOperator<Void> committerStream = writerStream
                     
.transform(operatorName(ICEBERG_MULTIPLE_FILES_COMMITTER_NAME), Types.VOID, 
multipleFilesCommiter)
                     .setParallelism(1)
@@ -561,8 +580,8 @@ public class FlinkSink {
             }
 
             IcebergProcessOperator<RowData, WriteResult> streamWriter = 
createStreamWriter(
-                    table, flinkRowType, equalityFieldIds, flinkWriteConf,
-                    appendMode, inlongMetric, auditHostAndPorts, dirtyOptions, 
dirtySink);
+                    table, flinkRowType, equalityFieldIds, flinkWriteConf, 
appendMode, inlongMetric,
+                    auditHostAndPorts, dirtyOptions, dirtySink);
 
             int parallelism = writeParallelism == null ? 
input.getParallelism() : writeParallelism;
             SingleOutputStreamOperator<WriteResult> writerStream = input
@@ -701,6 +720,7 @@ public class FlinkSink {
             String auditHostAndPorts,
             DirtyOptions dirtyOptions,
             @Nullable DirtySink<Object> dirtySink) {
+        // flink A, iceberg a
         Preconditions.checkArgument(table != null, "Iceberg table should't be 
null");
 
         Table serializableTable = SerializableTable.copyOf(table);
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 12914098e..f2660c5aa 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -43,8 +44,8 @@ import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.FlinkSchemaUtil;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Types.NestedField;
-import org.apache.inlong.sort.base.dirty.DirtyData;
 import org.apache.inlong.sort.base.dirty.DirtyOptions;
+import org.apache.inlong.sort.base.dirty.DirtySinkHelper;
 import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
@@ -54,6 +55,7 @@ import 
org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.sub.SinkTableMetricData;
 import org.apache.inlong.sort.base.sink.MultipleSinkOption;
+import org.apache.inlong.sort.base.sink.SchemaUpdateExceptionPolicy;
 import org.apache.inlong.sort.base.sink.TableChange;
 import org.apache.inlong.sort.base.sink.TableChange.AddColumn;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
@@ -61,14 +63,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+import javax.ws.rs.NotSupportedException;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT;
@@ -99,8 +104,10 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
     // schema cache
     private transient Map<TableIdentifier, Schema> schemaCache;
 
-    private final DirtyOptions dirtyOptions;
-    private @Nullable final DirtySink<Object> dirtySink;
+    // blacklist to filter schema update failed table
+    private transient Set<TableIdentifier> blacklist;
+
+    private final DirtySinkHelper<Object> dirtySinkHelper;
 
     // metric
     private final String inlongMetric;
@@ -110,14 +117,16 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
     private transient MetricState metricState;
 
     public DynamicSchemaHandleOperator(CatalogLoader catalogLoader,
-            MultipleSinkOption multipleSinkOption, DirtyOptions dirtyOptions,
-            @Nullable DirtySink<Object> dirtySink, String inlongMetric, String 
auditHostAndPorts) {
+            MultipleSinkOption multipleSinkOption,
+            DirtyOptions dirtyOptions,
+            @Nullable DirtySink<Object> dirtySink,
+            String inlongMetric,
+            String auditHostAndPorts) {
         this.catalogLoader = catalogLoader;
         this.multipleSinkOption = multipleSinkOption;
-        this.dirtyOptions = dirtyOptions;
-        this.dirtySink = dirtySink;
         this.inlongMetric = inlongMetric;
         this.auditHostAndPorts = auditHostAndPorts;
+        this.dirtySinkHelper = new DirtySinkHelper<>(dirtyOptions, dirtySink);
     }
 
     @SuppressWarnings("unchecked")
@@ -137,6 +146,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
 
         this.recordQueues = new HashMap<>();
         this.schemaCache = new HashMap<>();
+        this.blacklist = new HashSet<>();
 
         // Initialize metric
         MetricOption metricOption = MetricOption.builder()
@@ -151,6 +161,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         if (metricOption != null) {
             metricData = new SinkTableMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
         }
+        this.dirtySinkHelper.open(new Configuration());
     }
 
     @Override
@@ -169,18 +180,25 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         } catch (Exception e) {
             LOGGER.error(String.format("Deserialize error, raw data: %s",
                     new String(element.getValue().getBinary(0))), e);
-            handleDirtyData(new String(element.getValue().getBinary(0)),
-                    null, DirtyType.DESERIALIZE_ERROR, e, 
TableIdentifier.of("unknow", "unknow"));
+            if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
multipleSinkOption.getSchemaUpdatePolicy()) {
+                handleDirtyData(new String(element.getValue().getBinary(0)),
+                        null, DirtyType.DESERIALIZE_ERROR, e, 
TableIdentifier.of("unknow", "unknow"));
+            }
+            return;
         }
         TableIdentifier tableId = null;
         try {
             tableId = parseId(jsonNode);
         } catch (Exception e) {
             LOGGER.error(String.format("Table identifier parse error, raw 
data: %s", jsonNode), e);
-            handleDirtyData(jsonNode, jsonNode, 
DirtyType.TABLE_IDENTIFIER_PARSE_ERROR,
-                    e, TableIdentifier.of("unknow", "unknow"));
+            if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
multipleSinkOption.getSchemaUpdatePolicy()) {
+                handleDirtyData(jsonNode, jsonNode, 
DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, e,
+                        TableIdentifier.of("unknow", "unknow"));
+            }
+        }
+        if (blacklist.contains(tableId)) {
+            return;
         }
-
         boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode);
         if (isDDL) {
             execDDL(jsonNode, tableId);
@@ -189,51 +207,59 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         }
     }
 
+    private void handleDirtyDataOfLogWithIgnore(JsonNode jsonNode, Schema 
dataSchema,
+            TableIdentifier tableId, Exception e) {
+        List<RowData> rowDataForDataSchemaList = Collections.emptyList();
+        try {
+            rowDataForDataSchemaList = dynamicSchemaFormat
+                    .extractRowData(jsonNode, 
FlinkSchemaUtil.convert(dataSchema));
+        } catch (Throwable ee) {
+            LOG.error("extractRowData {} failed!", jsonNode, ee);
+        }
+
+        for (RowData rowData : rowDataForDataSchemaList) {
+            DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
+            if (!dirtyOptions.ignoreDirty()) {
+                if (metricData != null) {
+                    
metricData.outputDirtyMetricsWithEstimate(tableId.namespace().toString(),
+                            null, tableId.name(), rowData.toString());
+                }
+            } else {
+                handleDirtyData(rowData.toString(), jsonNode, 
DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
+            }
+        }
+    }
+
     private void handleDirtyData(Object dirtyData,
             JsonNode rootNode,
             DirtyType dirtyType,
             Exception e,
             TableIdentifier tableId) {
-        if (!dirtyOptions.ignoreDirty()) {
-            RuntimeException ex;
-            if (e instanceof RuntimeException) {
-                ex = (RuntimeException) e;
-            } else {
-                ex = new RuntimeException(e);
-            }
-            throw ex;
-        }
-        if (dirtySink != null) {
-            DirtyData.Builder<Object> builder = DirtyData.builder();
+        DirtyOptions dirtyOptions = dirtySinkHelper.getDirtyOptions();
+        if (rootNode != null) {
             try {
-                builder.setData(dirtyData)
-                        .setDirtyType(dirtyType)
-                        .setDirtyMessage(e.getMessage());
-                if (rootNode != null) {
-                    builder.setLabels(dynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLabels()))
-                            .setLogTag(dynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getLogTag()))
-                            .setIdentifier(dynamicSchemaFormat.parse(rootNode, 
dirtyOptions.getIdentifier()));
-                } else {
-                    builder.setLabels(dirtyOptions.getLabels())
-                            .setLogTag(dirtyOptions.getLogTag())
-                            .setIdentifier(dirtyOptions.getIdentifier());
-                }
-                dirtySink.invoke(builder.build());
-                if (metricData != null) {
-                    metricData.outputDirtyMetricsWithEstimate(
-                            tableId.namespace().toString(), null, 
tableId.name(), dirtyData);
-                }
+                String dirtyLabel = dynamicSchemaFormat.parse(rootNode,
+                        DirtySinkHelper.regexReplace(dirtyOptions.getLabels(), 
DirtyType.BATCH_LOAD_ERROR, null));
+                String dirtyLogTag = dynamicSchemaFormat.parse(rootNode,
+                        DirtySinkHelper.regexReplace(dirtyOptions.getLogTag(), 
DirtyType.BATCH_LOAD_ERROR, null));
+                String dirtyIdentifier = dynamicSchemaFormat.parse(rootNode,
+                        
DirtySinkHelper.regexReplace(dirtyOptions.getIdentifier(), 
DirtyType.BATCH_LOAD_ERROR, null));
+                dirtySinkHelper.invoke(dirtyData, dirtyType, dirtyLabel, 
dirtyLogTag, dirtyIdentifier, e);
             } catch (Exception ex) {
-                if (!dirtyOptions.ignoreSideOutputErrors()) {
-                    throw new RuntimeException(ex);
-                }
-                LOG.warn("Dirty sink failed", ex);
+                throw new RuntimeException(ex);
             }
+        } else {
+            dirtySinkHelper.invoke(dirtyData, dirtyType, 
dirtyOptions.getLabels(), dirtyOptions.getLogTag(),
+                    dirtyOptions.getIdentifier(), e);
+        }
+        if (metricData != null) {
+            
metricData.outputDirtyMetricsWithEstimate(tableId.namespace().toString(), null, 
tableId.name(), dirtyData);
         }
     }
 
     @Override
     public void onProcessingTime(long timestamp) {
+        LOG.info("Black list table: {} at time {}.", blacklist, timestamp);
         processingTimeService.registerTimer(
                 processingTimeService.getCurrentProcessingTime() + 
HELPER_DEBUG_INTERVEL, this);
     }
@@ -280,7 +306,22 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
             return v;
         });
         if (schema == null) {
-            handleTableCreateEventFromOperator(record.getTableId(), 
dataSchema);
+            try {
+                handleTableCreateEventFromOperator(record.getTableId(), 
dataSchema);
+            } catch (Exception e) {
+                LOGGER.error("Table create error, tableId: {}, schema: {}", 
record.getTableId(), dataSchema);
+                if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
multipleSinkOption
+                        .getSchemaUpdatePolicy()) {
+                    handleDirtyDataOfLogWithIgnore(jsonNode, dataSchema, 
tableId, e);
+                } else if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == 
multipleSinkOption
+                        .getSchemaUpdatePolicy()) {
+                    blacklist.add(tableId);
+                } else {
+                    LOGGER.error("Table create error, tableId: {}, schema: {}, 
schemaUpdatePolicy: {}",
+                            record.getTableId(), dataSchema, 
multipleSinkOption.getSchemaUpdatePolicy(), e);
+                    throw e;
+                }
+            }
         } else {
             handleSchemaInfoEvent(record.getTableId(), schema);
         }
@@ -302,26 +343,43 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                             try {
                                 return 
dynamicSchemaFormat.extractRowData(jsonNode, FlinkSchemaUtil.convert(schema1));
                             } catch (Exception e) {
-                                LOG.warn("Ignore table {} schema change, old: 
{} new: {}.",
-                                        tableId, dataSchema, latestSchema, e);
-                                try {
-                                    List<RowData> rowDataForDataSchemaList =
-                                            
dynamicSchemaFormat.extractRowData(jsonNode,
-                                                    
FlinkSchemaUtil.convert(dataSchema));
-                                    for (RowData rowData : 
rowDataForDataSchemaList) {
-                                        handleDirtyData(rowData.toString(), 
jsonNode,
-                                                
DirtyType.EXTRACT_ROWDATA_ERROR, e, tableId);
-                                    }
-                                } catch (Exception ee) {
-                                    LOG.error("handleDirtyData {} failed!", 
jsonNode);
+                                if 
(SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == multipleSinkOption
+                                        .getSchemaUpdatePolicy()) {
+                                    handleDirtyDataOfLogWithIgnore(jsonNode, 
dataSchema, tableId, e);
+                                } else if 
(SchemaUpdateExceptionPolicy.STOP_PARTIAL == multipleSinkOption
+                                        .getSchemaUpdatePolicy()) {
+                                    blacklist.add(tableId);
+                                } else {
+                                    LOG.error("Table {} schema change, 
schemaUpdatePolicy:{} old: {} new: {}.",
+                                            tableId, 
multipleSinkOption.getSchemaUpdatePolicy(), dataSchema,
+                                            latestSchema, e);
+                                    throw e;
                                 }
                             }
                             return Collections.emptyList();
                         });
                 output.collect(new StreamRecord<>(recordWithSchema));
             } else {
-                handldAlterSchemaEventFromOperator(tableId, latestSchema, 
dataSchema);
-                break;
+                if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
multipleSinkOption
+                        .getSchemaUpdatePolicy()) {
+                    RecordWithSchema recordWithSchema = queue.poll();
+                    
handleDirtyDataOfLogWithIgnore(recordWithSchema.getOriginalData(), dataSchema, 
tableId,
+                            new NotSupportedException(
+                                    String.format("SchemaUpdatePolicy %s does 
not support schema dynamic update!",
+                                            
multipleSinkOption.getSchemaUpdatePolicy())));
+                } else if (SchemaUpdateExceptionPolicy.STOP_PARTIAL == 
multipleSinkOption
+                        .getSchemaUpdatePolicy()) {
+                    blacklist.add(tableId);
+                    break;
+                } else if (SchemaUpdateExceptionPolicy.TRY_IT_BEST == 
multipleSinkOption
+                        .getSchemaUpdatePolicy()) {
+                    handldAlterSchemaEventFromOperator(tableId, latestSchema, 
dataSchema);
+                    break;
+                } else {
+                    throw new NotSupportedException(
+                            String.format("SchemaUpdatePolicy %s does not 
support schema dynamic update!",
+                                    
multipleSinkOption.getSchemaUpdatePolicy()));
+                }
             }
         }
     }
@@ -340,6 +398,7 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
             ImmutableMap.Builder<String, String> properties = 
ImmutableMap.builder();
             properties.put("format-version", "2");
             properties.put("write.upsert.enabled", "true");
+            properties.put("write.metadata.metrics.default", "full");
             // for hive visible
             properties.put("engine.hive.enabled", "true");
             try {
@@ -363,9 +422,12 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
         Transaction transaction = table.newTransaction();
         if (table.schema().sameSchema(oldSchema)) {
             List<TableChange> tableChanges = 
SchemaChangeUtils.diffSchema(oldSchema, newSchema);
-            if (!canHandleWithSchemaUpdatePolicy(tableId, tableChanges)) {
-                // If can not handle this schema update, should not push data 
into next operator
-                return;
+            for (TableChange tableChange : tableChanges) {
+                if (!(tableChange instanceof AddColumn)) {
+                    // todo:currently iceberg can only handle addColumn, so 
always return false
+                    throw new UnsupportedOperationException(
+                            String.format("Unsupported table %s schema change: 
%s.", tableId.toString(), tableChange));
+                }
             }
             SchemaChangeUtils.applySchemaChanges(transaction.updateSchema(), 
tableChanges);
             LOG.info("Schema evolution in table({}) for table change: {}", 
tableId, tableChanges);
@@ -401,26 +463,11 @@ public class DynamicSchemaHandleOperator extends 
AbstractStreamOperator<RecordWi
                     tableId,
                     pkListStr);
         } catch (Exception e) {
-            handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e, 
tableId);
+            if (SchemaUpdateExceptionPolicy.LOG_WITH_IGNORE == 
multipleSinkOption.getSchemaUpdatePolicy()) {
+                handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e, 
tableId);
+            }
         }
         return null;
     }
 
-    private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId, 
List<TableChange> tableChanges) {
-        boolean canHandle = true;
-        for (TableChange tableChange : tableChanges) {
-            canHandle &= 
MultipleSinkOption.canHandleWithSchemaUpdate(tableId.toString(), tableChange,
-                    multipleSinkOption.getSchemaUpdatePolicy());
-            if (!(tableChange instanceof AddColumn)) {
-                // todo:currently iceberg can only handle addColumn, so always 
return false
-                LOG.info("Ignore table {} schema change: {} because iceberg 
can't handle it.",
-                        tableId, tableChange);
-                canHandle = false;
-            }
-            if (!canHandle) {
-                break;
-            }
-        }
-        return canHandle;
-    }
 }
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java
index 19907e274..fee48eadd 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleFilesCommiter.java
@@ -19,10 +19,12 @@ package org.apache.inlong.sort.iceberg.sink.multiple;
 
 import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.iceberg.actions.ActionsProvider;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.flink.CatalogLoader;
 import org.apache.iceberg.flink.TableLoader;
@@ -40,20 +42,26 @@ public class IcebergMultipleFilesCommiter extends 
IcebergProcessFunction<Multipl
     private Map<TableIdentifier, IcebergSingleFileCommiter> multipleCommiters;
     private final CatalogLoader catalogLoader;
     private final boolean overwrite;
+    private final ActionsProvider actionsProvider;
+    private final ReadableConfig tableOptions;
 
-    public IcebergMultipleFilesCommiter(CatalogLoader catalogLoader, boolean 
overwrite) {
+    private transient FunctionInitializationContext 
functionInitializationContext;
+
+    public IcebergMultipleFilesCommiter(CatalogLoader catalogLoader, boolean 
overwrite, ActionsProvider actionProvider,
+            ReadableConfig tableOptions) {
         this.catalogLoader = catalogLoader;
         this.overwrite = overwrite;
+        this.actionsProvider = actionProvider;
+        this.tableOptions = tableOptions;
     }
 
-    private transient FunctionInitializationContext 
functionInitializationContext;
-
     @Override
     public void processElement(MultipleWriteResult value) throws Exception {
         TableIdentifier tableId = value.getTableId();
         if (multipleCommiters.get(tableId) == null) {
             IcebergSingleFileCommiter commiter = new IcebergSingleFileCommiter(
-                    tableId, TableLoader.fromCatalog(catalogLoader, 
value.getTableId()), overwrite, null);
+                    tableId, TableLoader.fromCatalog(catalogLoader, 
value.getTableId()), overwrite,
+                    actionsProvider, tableOptions);
             commiter.setup(getRuntimeContext(), collector, context);
             commiter.initializeState(functionInitializationContext);
             commiter.open(new Configuration());
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
index 6c5e14fe7..f842ab711 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.io.SimpleVersionedSerialization;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -48,6 +49,7 @@ import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.inlong.sort.iceberg.FlinkActions;
+import org.apache.inlong.sort.iceberg.FlinkDynamicTableFactory;
 import org.apache.inlong.sort.iceberg.sink.DeltaManifests;
 import org.apache.inlong.sort.iceberg.sink.DeltaManifestsSerializer;
 import org.apache.inlong.sort.iceberg.sink.FlinkManifestUtil;
@@ -118,17 +120,20 @@ public class IcebergSingleFileCommiter extends 
IcebergProcessFunction<WriteResul
     // compact file action
     private ActionsProvider flinkActions;
     private transient RewriteDataFiles compactAction;
+    private ReadableConfig tableOptions;
 
     public IcebergSingleFileCommiter(
             TableIdentifier tableId,
             TableLoader tableLoader,
             boolean replacePartitions,
-            ActionsProvider actionProvider) {
+            ActionsProvider actionProvider,
+            ReadableConfig tableOptions) {
         // Here must distinguish state descriptor with tableId, because all 
icebergSingleFileCommiter state in
         // one IcebergMultipleFilesCommiter use same StateStore.
         this.tableLoader = tableLoader;
         this.replacePartitions = replacePartitions;
         this.flinkActions = actionProvider;
+        this.tableOptions = tableOptions;
         this.jobIdDescriptor = new ListStateDescriptor<>(
                 String.format("iceberg(%s)-flink-job-id", tableId.toString()), 
BasicTypeInfo.STRING_TYPE_INFO);
         this.stateDescriptor = buildStateDescriptor(tableId);
@@ -142,10 +147,13 @@ public class IcebergSingleFileCommiter extends 
IcebergProcessFunction<WriteResul
         this.tableLoader.open();
         this.table = tableLoader.loadTable();
 
+        boolean writeCompactEnabelFromTO = this.tableOptions == null
+                ? false
+                : 
this.tableOptions.get(FlinkDynamicTableFactory.WRITE_COMPACT_ENABLE);
         // compact file
-        if (flinkActions != null
-                && PropertyUtil.propertyAsBoolean(
-                        table.properties(), FlinkActions.COMPACT_ENABLED, 
FlinkActions.COMPACT_ENABLED_DEFAULT)) {
+        if (flinkActions != null && (PropertyUtil.propertyAsBoolean(
+                table.properties(), FlinkActions.COMPACT_ENABLED, 
FlinkActions.COMPACT_ENABLED_DEFAULT)
+                || writeCompactEnabelFromTO)) {
             compactAction = flinkActions.rewriteDataFiles(table);
         }
         maxContinuousEmptyCommits = 
PropertyUtil.propertyAsInt(table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 
10);
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
index d44927db1..11fd7624a 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java
@@ -73,7 +73,6 @@ public class IcebergSingleStreamWriter<T> extends 
IcebergProcessFunction<T, Writ
     private @Nullable RowType flinkRowType;
     private final DirtyOptions dirtyOptions;
     private @Nullable final DirtySink<Object> dirtySink;
-
     private boolean multipleSink;
 
     public IcebergSingleStreamWriter(
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
index 5d330b0ca..01ca8dce7 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/RecordWithSchema.java
@@ -70,6 +70,10 @@ public class RecordWithSchema {
         return data;
     }
 
+    public JsonNode getOriginalData() {
+        return originalData;
+    }
+
     public Schema getSchema() {
         return schema;
     }


Reply via email to