This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 08db5ea09 [INLONG-6765][Sort] Supports dirty data side-output for Iceberg sink (#6766) 08db5ea09 is described below commit 08db5ea092f825ab44dace4534a9c67cc748800c Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Thu Dec 8 09:54:36 2022 +0800 [INLONG-6765][Sort] Supports dirty data side-output for Iceberg sink (#6766) --- .../org/apache/inlong/sort/base/Constants.java | 11 ++- .../apache/inlong/sort/base/dirty/DirtyData.java | 41 +++++++- .../apache/inlong/sort/base/dirty/DirtyType.java | 12 +++ .../sort/base/dirty/sink/log/LogDirtySink.java | 27 +++--- .../sort/base/dirty/sink/s3/S3DirtySink.java | 33 ++++--- .../inlong/sort/base/dirty/utils/FormatUtils.java | 30 ++++++ .../inlong/sort/base/sink/MultipleSinkOption.java | 1 - .../inlong/sort/base/dirty/FormatUtilsTest.java | 4 +- .../sort/iceberg/FlinkDynamicTableFactory.java | 13 ++- .../inlong/sort/iceberg/IcebergTableSink.java | 16 +++- .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 35 +++++-- .../sink/multiple/DynamicSchemaHandleOperator.java | 106 ++++++++++++++++----- .../sink/multiple/IcebergMultipleStreamWriter.java | 19 +++- .../sink/multiple/IcebergSingleStreamWriter.java | 60 ++++++++++-- .../sort/parser/IcebergNodeSqlParserTest.java | 11 ++- 15 files changed, 328 insertions(+), 91 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 983b3c207..23676ec0f 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -227,11 +227,11 @@ public final class Constants { .stringType() .noDefaultValue() .withDescription( - "The identifier of dirty data, " - + "it will be used for filename generation of file dirty sink, " + "The identifier of dirty data, it will be used for filename generation of file dirty sink, " + "topic generation of mq dirty sink, tablename generation of database, etc." + "and it supports variable replace like '${variable}'." - + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported," + + "There are several system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] " + + "are currently supported, " + "and the support of other variables is determined by the connector."); public static final ConfigOption<Boolean> DIRTY_SIDE_OUTPUT_ENABLE = ConfigOptions.key("dirty.side-output.enable") @@ -266,7 +266,8 @@ public final class Constants { .withDescription( "The labels of dirty side-output, format is 'key1=value1&key2=value2', " + "it supports variable replace like '${variable}'," - + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported," + + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] " + + "are currently supported," + " and the support of other variables is determined by the connector."); public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_LOG_TAG = ConfigOptions.key("dirty.side-output.log-tag") @@ -274,7 +275,7 @@ public final class Constants { .defaultValue("DirtyData") .withDescription( "The log tag of dirty side-output, it supports variable replace like '${variable}'." - + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported," + + "There are two system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported," + " and the support of other variables is determined by the connector."); public static final ConfigOption<String> DIRTY_SIDE_OUTPUT_FIELD_DELIMITER = ConfigOptions.key("dirty.side-output.field-delimiter") diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java index a8b84f2b4..16f1cb762 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyData.java @@ -17,8 +17,10 @@ package org.apache.inlong.sort.base.dirty; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.inlong.sort.base.util.PatternReplaceUtils; +import javax.annotation.Nullable; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; @@ -33,6 +35,8 @@ public class DirtyData<T> { private static final String DIRTY_TYPE_KEY = "DIRTY_TYPE"; + private static final String DIRTY_MESSAGE_KEY = "DIRTY_MESSAGE"; + private static final String SYSTEM_TIME_KEY = "SYSTEM_TIME"; private static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); @@ -41,7 +45,7 @@ public class DirtyData<T> { * The identifier of dirty data, it will be used for filename generation of file dirty sink, * topic generation of mq dirty sink, tablename generation of database, etc, * and it supports variable replace like '${variable}'. - * There are two system variables[SYSTEM_TIME|DIRTY_TYPE] are currently supported, + * There are several system variables[SYSTEM_TIME|DIRTY_TYPE|DIRTY_MESSAGE] are currently supported, * and the support of other variables is determined by the connector. */ private final String identifier; @@ -58,18 +62,31 @@ public class DirtyData<T> { * Dirty type */ private final DirtyType dirtyType; + /** + * Dirty describe message, it is the cause of dirty data + */ + private final String dirtyMessage; + /** + * The row type of data, it is only used for 'RowData' + */ + private @Nullable final LogicalType rowType; /** * The real dirty data */ private final T data; - public DirtyData(T data, String identifier, String labels, String logTag, DirtyType dirtyType) { + public DirtyData(T data, String identifier, String labels, + String logTag, DirtyType dirtyType, String dirtyMessage, + @Nullable LogicalType rowType) { this.data = data; this.dirtyType = dirtyType; + this.dirtyMessage = dirtyMessage; + this.rowType = rowType; Map<String, String> paramMap = genParamMap(); this.labels = PatternReplaceUtils.replace(labels, paramMap); this.logTag = PatternReplaceUtils.replace(logTag, paramMap); this.identifier = PatternReplaceUtils.replace(identifier, paramMap); + } public static <T> Builder<T> builder() { @@ -80,6 +97,7 @@ public class DirtyData<T> { Map<String, String> paramMap = new HashMap<>(); paramMap.put(SYSTEM_TIME_KEY, DATE_TIME_FORMAT.format(LocalDateTime.now())); paramMap.put(DIRTY_TYPE_KEY, dirtyType.format()); + paramMap.put(DIRTY_MESSAGE_KEY, dirtyMessage); return paramMap; } @@ -103,12 +121,19 @@ public class DirtyData<T> { return identifier; } + @Nullable + public LogicalType getRowType() { + return rowType; + } + public static class Builder<T> { private String identifier; private String labels; private String logTag; private DirtyType dirtyType = DirtyType.UNDEFINED; + private String dirtyMessage; + private LogicalType rowType; private T data; public Builder<T> setDirtyType(DirtyType dirtyType) { @@ -136,8 +161,18 @@ public class DirtyData<T> { return this; } + public Builder<T> setDirtyMessage(String dirtyMessage) { + this.dirtyMessage = dirtyMessage; + return this; + } + + public Builder<T> setRowType(LogicalType rowType) { + this.rowType = rowType; + return this; + } + public DirtyData<T> build() { - return new DirtyData<>(data, identifier, labels, logTag, dirtyType); + return new DirtyData<>(data, identifier, labels, logTag, dirtyType, dirtyMessage, rowType); } } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java index afe3e0d17..0637725c3 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/DirtyType.java @@ -76,6 +76,18 @@ public enum DirtyType { * Json process error */ JSON_PROCESS_ERROR("JsonProcessError"), + /** + * Table identifier parse error + */ + TABLE_IDENTIFIER_PARSE_ERROR("TableIdentifierParseError"), + /** + * Extract schema error + */ + EXTRACT_SCHEMA_ERROR("ExtractSchemaError"), + /** + * Extract RowData error + */ + EXTRACT_ROWDATA_ERROR("ExtractRowDataError"), ; private final String format; diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java index a57c981fe..b7f7af914 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/log/LogDirtySink.java @@ -35,7 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; -import static org.apache.flink.table.data.RowData.createFieldGetter; /** * Log dirty sink that is used to print log @@ -48,7 +47,7 @@ public class LogDirtySink<T> implements DirtySink<T> { private static final Logger LOGGER = LoggerFactory.getLogger(LogDirtySink.class); - private final RowData.FieldGetter[] fieldGetters; + private RowData.FieldGetter[] fieldGetters; private final String format; private final String fieldDelimiter; private final DataType physicalRowDataType; @@ -58,18 +57,13 @@ public class LogDirtySink<T> implements DirtySink<T> { this.format = format; this.fieldDelimiter = fieldDelimiter; this.physicalRowDataType = physicalRowDataType; - final LogicalType[] logicalTypes = physicalRowDataType.getChildren() - .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new); - this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; - for (int i = 0; i < logicalTypes.length; i++) { - fieldGetters[i] = createFieldGetter(logicalTypes[i], i); - } } @Override public void open(Configuration configuration) throws Exception { converter = new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null) .createConverter(physicalRowDataType.getLogicalType()); + fieldGetters = FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType()); } @Override @@ -78,7 +72,7 @@ public class LogDirtySink<T> implements DirtySink<T> { Map<String, String> labelMap = LabelUtils.parseLabels(dirtyData.getLabels()); T data = dirtyData.getData(); if (data instanceof RowData) { - value = format((RowData) data, labelMap); + value = format((RowData) data, dirtyData.getRowType(), labelMap); } else if (data instanceof JsonNode) { value = format((JsonNode) data, labelMap); } else { @@ -88,14 +82,23 @@ public class LogDirtySink<T> implements DirtySink<T> { LOGGER.info("[{}] {}", dirtyData.getLogTag(), value); } - private String format(RowData data, Map<String, String> labels) throws JsonProcessingException { + private String format(RowData data, LogicalType rowType, + Map<String, String> labels) throws JsonProcessingException { String value; switch (format) { case "csv": - value = FormatUtils.csvFormat(data, fieldGetters, labels, fieldDelimiter); + RowData.FieldGetter[] getters = fieldGetters; + if (rowType != null) { + getters = FormatUtils.parseFieldGetters(rowType); + } + value = FormatUtils.csvFormat(data, getters, labels, fieldDelimiter); break; case "json": - value = FormatUtils.jsonFormat(data, converter, labels); + RowDataToJsonConverter jsonConverter = converter; + if (rowType != null) { + jsonConverter = FormatUtils.parseRowDataToJsonConverter(rowType); + } + value = FormatUtils.jsonFormat(data, jsonConverter, labels); break; default: throw new UnsupportedOperationException( diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java index 3cb20c7b8..ab8fc9464 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/sink/s3/S3DirtySink.java @@ -24,9 +24,6 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import org.apache.commons.lang3.StringUtils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.formats.common.TimestampFormat; -import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode; -import org.apache.flink.formats.json.RowDataToJsonConverters; import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; @@ -53,7 +50,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.flink.table.data.RowData.createFieldGetter; /** * S3 dirty sink that is used to sink dirty data to s3 @@ -72,7 +68,7 @@ public class S3DirtySink<T> implements DirtySink<T> { private final AtomicLong writeOutNum = new AtomicLong(0); private final AtomicLong errorNum = new AtomicLong(0); private final DataType physicalRowDataType; - private final RowData.FieldGetter[] fieldGetters; + private RowData.FieldGetter[] fieldGetters; private RowDataToJsonConverter converter; private long batchBytes = 0L; private int size; @@ -85,18 +81,12 @@ public class S3DirtySink<T> implements DirtySink<T> { public S3DirtySink(S3Options s3Options, DataType physicalRowDataType) { this.s3Options = s3Options; this.physicalRowDataType = physicalRowDataType; - final LogicalType[] logicalTypes = physicalRowDataType.getChildren() - .stream().map(DataType::getLogicalType).toArray(LogicalType[]::new); - this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; - for (int i = 0; i < logicalTypes.length; i++) { - fieldGetters[i] = createFieldGetter(logicalTypes[i], i); - } } @Override public void open(Configuration configuration) throws Exception { - converter = new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null) - .createConverter(physicalRowDataType.getLogicalType()); + converter = FormatUtils.parseRowDataToJsonConverter(physicalRowDataType.getLogicalType()); + fieldGetters = FormatUtils.parseFieldGetters(physicalRowDataType.getLogicalType()); AmazonS3 s3Client; if (s3Options.getAccessKeyId() != null && s3Options.getSecretKeyId() != null) { BasicAWSCredentials awsCreds = @@ -149,7 +139,7 @@ public class S3DirtySink<T> implements DirtySink<T> { Map<String, String> labelMap = LabelUtils.parseLabels(dirtyData.getLabels()); T data = dirtyData.getData(); if (data instanceof RowData) { - value = format((RowData) data, labelMap); + value = format((RowData) data, dirtyData.getRowType(), labelMap); } else if (data instanceof JsonNode) { value = format((JsonNode) data, labelMap); } else { @@ -164,14 +154,23 @@ public class S3DirtySink<T> implements DirtySink<T> { batchMap.computeIfAbsent(dirtyData.getIdentifier(), k -> new ArrayList<>()).add(value); } - private String format(RowData data, Map<String, String> labels) throws JsonProcessingException { + private String format(RowData data, LogicalType rowType, + Map<String, String> labels) throws JsonProcessingException { String value; switch (s3Options.getFormat()) { case "csv": - value = FormatUtils.csvFormat(data, fieldGetters, labels, s3Options.getFieldDelimiter()); + RowData.FieldGetter[] getters = fieldGetters; + if (rowType != null) { + getters = FormatUtils.parseFieldGetters(rowType); + } + value = FormatUtils.csvFormat(data, getters, labels, s3Options.getFieldDelimiter()); break; case "json": - value = FormatUtils.jsonFormat(data, converter, labels); + RowDataToJsonConverter jsonConverter = converter; + if (rowType != null) { + jsonConverter = FormatUtils.parseRowDataToJsonConverter(rowType); + } + value = FormatUtils.jsonFormat(data, jsonConverter, labels); break; default: throw new UnsupportedOperationException( diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java index 3220837dd..58260bae1 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/dirty/utils/FormatUtils.java @@ -17,18 +17,24 @@ package org.apache.inlong.sort.base.dirty.utils; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonOptions.MapNullKeyMode; +import org.apache.flink.formats.json.RowDataToJsonConverters; import org.apache.flink.formats.json.RowDataToJsonConverters.RowDataToJsonConverter; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.LogicalType; import java.util.Iterator; import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.StringJoiner; +import static org.apache.flink.table.data.RowData.createFieldGetter; /** * Format utils @@ -49,6 +55,30 @@ public final class FormatUtils { private FormatUtils() { } + /** + * Parse FieldGetter from LogicalType + * @param rowType The row type + * @return A array of FieldGetter + */ + public static RowData.FieldGetter[] parseFieldGetters(LogicalType rowType) { + List<LogicalType> logicalTypes = rowType.getChildren(); + RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[logicalTypes.size()]; + for (int i = 0; i < logicalTypes.size(); i++) { + fieldGetters[i] = createFieldGetter(logicalTypes.get(i), i); + } + return fieldGetters; + } + + /** + * Parse RowDataToJsonConverter + * @param rowType The row type + * @return RowDataToJsonConverter + */ + public static RowDataToJsonConverter parseRowDataToJsonConverter(LogicalType rowType) { + return new RowDataToJsonConverters(TimestampFormat.SQL, MapNullKeyMode.DROP, null) + .createConverter(rowType); + } + /** * Csv format for 'RowData' * diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java index 4e3cfdf9b..d37d5dd28 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/sink/MultipleSinkOption.java @@ -149,7 +149,6 @@ public class MultipleSinkOption implements Serializable { LOG.warn("Ignore table {} schema change: {}.", tableName, tableChange); return false; } - throw new UnsupportedOperationException( String.format("Unsupported table %s schema change: %s.", tableName, tableChange)); } diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java index 386766ddd..80363ba8b 100644 --- a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java +++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/dirty/FormatUtilsTest.java @@ -31,7 +31,6 @@ import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData.FieldGetter; import org.apache.flink.table.data.StringData; -import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.inlong.sort.base.dirty.utils.FormatUtils; import org.junit.Assert; @@ -41,7 +40,6 @@ import org.junit.Test; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import static org.apache.flink.table.data.RowData.createFieldGetter; /** @@ -64,7 +62,7 @@ public class FormatUtilsTest { Column.physical("name", DataTypes.STRING()), Column.physical("age", DataTypes.INT())); List<LogicalType> logicalTypes = schema.toPhysicalRowDataType() - .getChildren().stream().map(DataType::getLogicalType).collect(Collectors.toList()); + .getLogicalType().getChildren(); fieldGetters = new RowData.FieldGetter[logicalTypes.size()]; for (int i = 0; i < logicalTypes.size(); i++) { fieldGetters[i] = createFieldGetter(logicalTypes.get(i), i); diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java index 11ed1113f..8553ce94a 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java @@ -45,6 +45,9 @@ import org.apache.iceberg.flink.IcebergTableSource; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; +import org.apache.inlong.sort.base.dirty.utils.DirtySinkFactoryUtils; import java.util.Map; import java.util.Set; @@ -224,12 +227,15 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami Map<String, String> tableProps = catalogTable.getOptions(); TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()); ActionsProvider actionsLoader = createActionLoader(context.getClassLoader(), tableProps); - + // Build the dirty data side-output + final DirtyOptions dirtyOptions = DirtyOptions.fromConfig(Configuration.fromMap(tableProps)); + final DirtySink<Object> dirtySink = DirtySinkFactoryUtils.createDirtySink(context, dirtyOptions); boolean multipleSink = Boolean.parseBoolean( tableProps.getOrDefault(SINK_MULTIPLE_ENABLE.key(), SINK_MULTIPLE_ENABLE.defaultValue().toString())); if (multipleSink) { CatalogLoader catalogLoader = createCatalogLoader(tableProps); - return new IcebergTableSink(null, tableSchema, catalogTable, catalogLoader, actionsLoader); + return new IcebergTableSink(null, tableSchema, catalogTable, + catalogLoader, actionsLoader, dirtyOptions, dirtySink); } else { TableLoader tableLoader; if (catalog != null) { @@ -238,7 +244,8 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami tableLoader = createTableLoader(catalogTable, tableProps, objectPath.getDatabaseName(), objectPath.getObjectName()); } - return new IcebergTableSink(tableLoader, tableSchema, catalogTable, null, actionsLoader); + return new IcebergTableSink(tableLoader, tableSchema, catalogTable, + null, actionsLoader, dirtyOptions, dirtySink); } } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java index bd65c76e9..31698c0de 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/IcebergTableSink.java @@ -35,11 +35,14 @@ import org.apache.iceberg.actions.ActionsProvider; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.sink.MultipleSinkOption; import org.apache.inlong.sort.iceberg.sink.FlinkSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; @@ -74,6 +77,9 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, private boolean overwrite = false; + private final DirtyOptions dirtyOptions; + private @Nullable final DirtySink<Object> dirtySink; + private IcebergTableSink(IcebergTableSink toCopy) { this.tableLoader = toCopy.tableLoader; this.tableSchema = toCopy.tableSchema; @@ -81,18 +87,24 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, this.catalogTable = toCopy.catalogTable; this.catalogLoader = toCopy.catalogLoader; this.actionsProvider = toCopy.actionsProvider; + this.dirtyOptions = toCopy.dirtyOptions; + this.dirtySink = toCopy.dirtySink; } public IcebergTableSink(TableLoader tableLoader, TableSchema tableSchema, CatalogTable catalogTable, CatalogLoader catalogLoader, - ActionsProvider actionsProvider) { + ActionsProvider actionsProvider, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { this.tableLoader = tableLoader; this.tableSchema = tableSchema; this.catalogTable = catalogTable; this.catalogLoader = catalogLoader; this.actionsProvider = actionsProvider; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; } @Override @@ -130,6 +142,8 @@ public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, .overwrite(overwrite) .appendMode(tableOptions.get(IGNORE_ALL_CHANGELOG)) .metric(tableOptions.get(INLONG_METRIC), tableOptions.get(INLONG_AUDIT)) + .dirtyOptions(dirtyOptions) + .dirtySink(dirtySink) .action(actionsProvider) .append(); } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java index dd1c43275..a32db160a 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java @@ -52,6 +52,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.PropertyUtil; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.sink.MultipleSinkOption; import org.apache.inlong.sort.iceberg.sink.multiple.IcebergMultipleFilesCommiter; import org.apache.inlong.sort.iceberg.sink.multiple.IcebergMultipleStreamWriter; @@ -64,6 +66,7 @@ import org.apache.inlong.sort.iceberg.sink.multiple.DynamicSchemaHandleOperator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.IOException; import java.io.UncheckedIOException; import java.util.List; @@ -166,6 +169,8 @@ public class FlinkSink { private CatalogLoader catalogLoader = null; private boolean multipleSink = false; private MultipleSinkOption multipleSinkOption = null; + private DirtyOptions dirtyOptions; + private @Nullable DirtySink<Object> dirtySink; private Builder() { } @@ -283,6 +288,16 @@ public class FlinkSink { return this; } + public Builder dirtyOptions(DirtyOptions dirtyOptions) { + this.dirtyOptions = dirtyOptions; + return this; + } + + public Builder dirtySink(DirtySink<Object> dirtySink) { + this.dirtySink = dirtySink; + return this; + } + /** * Configure the write {@link DistributionMode} that the flink sink will use. Currently, flink support * {@link DistributionMode#NONE} and {@link DistributionMode#HASH}. @@ -514,7 +529,8 @@ public class FlinkSink { } IcebergProcessOperator<RowData, WriteResult> streamWriter = createStreamWriter( - table, flinkRowType, equalityFieldIds, upsertMode, appendMode, inlongMetric, auditHostAndPorts); + table, flinkRowType, equalityFieldIds, upsertMode, appendMode, inlongMetric, + auditHostAndPorts, dirtyOptions, dirtySink); int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism; SingleOutputStreamOperator<WriteResult> writerStream = input @@ -534,8 +550,7 @@ public class FlinkSink { int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism; DynamicSchemaHandleOperator routeOperator = new DynamicSchemaHandleOperator( - catalogLoader, - multipleSinkOption); + catalogLoader, multipleSinkOption, dirtyOptions, dirtySink); SingleOutputStreamOperator<RecordWithSchema> routeStream = input .transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME), TypeInformation.of(RecordWithSchema.class), @@ -544,7 +559,8 @@ public class FlinkSink { IcebergProcessOperator streamWriter = new IcebergProcessOperator(new IcebergMultipleStreamWriter( - appendMode, catalogLoader, inlongMetric, auditHostAndPorts, multipleSinkOption)); + appendMode, catalogLoader, inlongMetric, auditHostAndPorts, + multipleSinkOption, dirtyOptions, dirtySink)); SingleOutputStreamOperator<MultipleWriteResult> writerStream = routeStream .transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME), TypeInformation.of(IcebergProcessOperator.class), @@ -618,7 +634,10 @@ public class FlinkSink { boolean upsert, boolean appendMode, String inlongMetric, - String auditHostAndPorts) { + String auditHostAndPorts, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { + // flink A, iceberg a Preconditions.checkArgument(table != null, "Iceberg table should't be null"); Map<String, String> props = table.properties(); long targetFileSize = getTargetFileSizeBytes(props); @@ -628,9 +647,11 @@ public class FlinkSink { TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory( serializableTable, serializableTable.schema(), flinkRowType, targetFileSize, fileFormat, equalityFieldIds, upsert, appendMode); - + // Set null for flinkRowType of IcebergSingleStreamWriter + // to avoid frequent Field.Getter creation in dirty data sink. return new IcebergProcessOperator<>(new IcebergSingleStreamWriter<>( - table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts)); + table.name(), taskWriterFactory, inlongMetric, auditHostAndPorts, + null, dirtyOptions, dirtySink)); } private static FileFormat getFileFormat(Map<String, String> properties) { diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index f8422add6..571221257 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -19,7 +19,6 @@ package org.apache.inlong.sort.iceberg.sink.multiple; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -39,6 +38,10 @@ import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.types.Types.NestedField; +import org.apache.inlong.sort.base.dirty.DirtyData; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.DirtyType; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat; import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory; import org.apache.inlong.sort.base.sink.MultipleSinkOption; @@ -47,6 +50,7 @@ import org.apache.inlong.sort.base.sink.TableChange.AddColumn; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Collections; @@ -63,10 +67,10 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi OneInputStreamOperator<RowData, RecordWithSchema>, ProcessingTimeCallback { - private static final Logger LOG = LoggerFactory.getLogger(DynamicSchemaHandleOperator.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DynamicSchemaHandleOperator.class); private static final long HELPER_DEBUG_INTERVEL = 10 * 60 * 1000; + private static final long serialVersionUID = 1L; - private final ObjectMapper objectMapper = new ObjectMapper(); private final CatalogLoader catalogLoader; private final MultipleSinkOption multipleSinkOption; @@ -84,18 +88,26 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi // blacklist to filter schema update failed table private transient Set<TableIdentifier> blacklist; + private final DirtyOptions dirtyOptions; + private @Nullable final DirtySink<Object> dirtySink; + public DynamicSchemaHandleOperator(CatalogLoader catalogLoader, - MultipleSinkOption multipleSinkOption) { + MultipleSinkOption multipleSinkOption, DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { this.catalogLoader = catalogLoader; this.multipleSinkOption = multipleSinkOption; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; } + @SuppressWarnings("unchecked") @Override public void open() throws Exception { super.open(); this.catalog = catalogLoader.loadCatalog(); this.asNamespaceCatalog = catalog instanceof SupportsNamespaces ? (SupportsNamespaces) catalog : null; + this.dynamicSchemaFormat = DynamicSchemaFormatFactory.getFormat( multipleSinkOption.getFormat(), multipleSinkOption.getFormatOption()); @@ -118,13 +130,25 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi @Override public void processElement(StreamRecord<RowData> element) throws Exception { - JsonNode jsonNode = dynamicSchemaFormat.deserialize(element.getValue().getBinary(0)); - - TableIdentifier tableId = parseId(jsonNode); + JsonNode jsonNode = null; + try { + jsonNode = dynamicSchemaFormat.deserialize(element.getValue().getBinary(0)); + } catch (Exception e) { + LOGGER.error(String.format("Deserialize error, raw data: %s", + new String(element.getValue().getBinary(0))), e); + handleDirtyData(new String(element.getValue().getBinary(0)), + null, DirtyType.DESERIALIZE_ERROR, e); + } + TableIdentifier tableId = null; + try { + tableId = parseId(jsonNode); + } catch (Exception e) { + LOGGER.error(String.format("Table identifier parse error, raw data: %s", jsonNode), e); + handleDirtyData(jsonNode, jsonNode, DirtyType.TABLE_IDENTIFIER_PARSE_ERROR, e); + } if (blacklist.contains(tableId)) { return; } - boolean isDDL = dynamicSchemaFormat.extractDDLFlag(jsonNode); if (isDDL) { execDDL(jsonNode, tableId); @@ -133,6 +157,41 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi } } + private void handleDirtyData(Object dirtyData, JsonNode rootNode, DirtyType dirtyType, Exception e) { + if (!dirtyOptions.ignoreDirty()) { + RuntimeException ex; + if (e instanceof RuntimeException) { + ex = (RuntimeException) e; + } else { + ex = new RuntimeException(e); + } + throw ex; + } + if (dirtySink != null) { + DirtyData.Builder<Object> builder = DirtyData.builder(); + try { + builder.setData(dirtyData) + .setDirtyType(dirtyType) + .setDirtyMessage(e.getMessage()); + if (rootNode != null) { + builder.setLabels(dynamicSchemaFormat.parse(rootNode, dirtyOptions.getLabels())) + .setLogTag(dynamicSchemaFormat.parse(rootNode, dirtyOptions.getLogTag())) + .setIdentifier(dynamicSchemaFormat.parse(rootNode, dirtyOptions.getIdentifier())); + } else { + builder.setLabels(dirtyOptions.getLabels()) + .setLogTag(dirtyOptions.getLogTag()) + .setIdentifier(dirtyOptions.getIdentifier()); + } + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOG.warn("Dirty sink failed", ex); + } + } + } + @Override public void onProcessingTime(long timestamp) { LOG.info("Black list table: {} at time {}.", blacklist, timestamp); @@ -146,6 +205,9 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi private void execDML(JsonNode jsonNode, TableIdentifier tableId) { RecordWithSchema record = parseRecord(jsonNode, tableId); + if (record == null) { + return; + } Schema schema = schemaCache.get(record.getTableId()); Schema dataSchema = record.getSchema(); recordQueues.compute(record.getTableId(), (k, v) -> { @@ -155,7 +217,6 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi v.add(record); return v; }); - if (schema == null) { handleTableCreateEventFromOperator(record.getTableId(), dataSchema); } else { @@ -182,6 +243,7 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi LOG.warn("Ignore table {} schema change, old: {} new: {}.", tableId, dataSchema, latestSchema, e); blacklist.add(tableId); + handleDirtyData(jsonNode, jsonNode, DirtyType.EXTRACT_ROWDATA_ERROR, e); } return Collections.emptyList(); }); @@ -204,13 +266,11 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi LOG.warn("Database({}) already exist in Catalog({})!", tableId.namespace(), catalog.name()); } } - ImmutableMap.Builder<String, String> properties = ImmutableMap.builder(); properties.put("format-version", "2"); properties.put("write.upsert.enabled", "true"); // for hive visible properties.put("engine.hive.enabled", "true"); - try { catalog.createTable(tableId, schema, PartitionSpec.unpartitioned(), properties.build()); LOG.info("Auto create Table({}) in Database({}) in Catalog({})!", @@ -220,13 +280,11 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi tableId.name(), tableId.namespace(), catalog.name()); } } - handleSchemaInfoEvent(tableId, catalog.loadTable(tableId).schema()); } private void handldAlterSchemaEventFromOperator(TableIdentifier tableId, Schema oldSchema, Schema newSchema) { Table table = catalog.loadTable(tableId); - // The transactionality of changes is guaranteed by comparing the old schema with the current schema of the // table. // Judging whether changes can be made by schema comparison (currently only column additions are supported), @@ -263,15 +321,18 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi } private RecordWithSchema parseRecord(JsonNode data, TableIdentifier tableId) { - List<String> pkListStr = dynamicSchemaFormat.extractPrimaryKeyNames(data); - RowType schema = dynamicSchemaFormat.extractSchema(data, pkListStr); - - RecordWithSchema record = new RecordWithSchema( - data, - FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(schema)), - tableId, - pkListStr); - return record; + try { + List<String> pkListStr = dynamicSchemaFormat.extractPrimaryKeyNames(data); + RowType schema = dynamicSchemaFormat.extractSchema(data, pkListStr); + return new RecordWithSchema( + data, + FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(schema)), + tableId, + pkListStr); + } catch (Exception e) { + handleDirtyData(data, data, DirtyType.EXTRACT_SCHEMA_ERROR, e); + } + return null; } private boolean canHandleWithSchemaUpdatePolicy(TableIdentifier tableId, List<TableChange> tableChanges) { @@ -290,7 +351,6 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi break; } } - return canHandle; } } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java index 16bedfd83..9246ffeed 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -38,6 +39,8 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.PropertyUtil; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; @@ -95,18 +98,24 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi private transient SinkMetricData metricData; private transient ListState<MetricState> metricStateListState; private transient MetricState metricState; + private final DirtyOptions dirtyOptions; + private @Nullable final DirtySink<Object> dirtySink; public IcebergMultipleStreamWriter( boolean appendMode, CatalogLoader catalogLoader, String inlongMetric, String auditHostAndPorts, - MultipleSinkOption multipleSinkOption) { + MultipleSinkOption multipleSinkOption, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { this.appendMode = appendMode; this.catalogLoader = catalogLoader; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; this.multipleSinkOption = multipleSinkOption; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; } @Override @@ -182,11 +191,11 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi .map(NestedField::fieldId) .collect(Collectors.toList()); } - + RowType flinkRowType = FlinkSchemaUtil.convert(recordWithSchema.getSchema()); TaskWriterFactory<RowData> taskWriterFactory = new RowDataTaskWriterFactory( table, recordWithSchema.getSchema(), - FlinkSchemaUtil.convert(recordWithSchema.getSchema()), + flinkRowType, targetFileSizeBytes, fileFormat, equalityFieldIds, @@ -195,7 +204,8 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi if (multipleWriters.get(tableId) == null) { IcebergSingleStreamWriter<RowData> writer = new IcebergSingleStreamWriter<>( - tableId.toString(), taskWriterFactory, null, null); + tableId.toString(), taskWriterFactory, null, + null, flinkRowType, dirtyOptions, dirtySink); writer.setup(getRuntimeContext(), new CallbackCollector<>( writeResult -> collector.collect(new MultipleWriteResult(tableId, writeResult))), @@ -206,6 +216,7 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi } else { // only if second times schema will evolute // Refresh new schema maybe cause previous file writer interrupted, so here should handle it multipleWriters.get(tableId).schemaEvolution(taskWriterFactory); + multipleWriters.get(tableId).setFlinkRowType(flinkRowType); } } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java index 8d936ffd8..761306a6e 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleStreamWriter.java @@ -26,15 +26,21 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.inlong.sort.base.dirty.DirtyData; +import org.apache.inlong.sort.base.dirty.DirtyOptions; +import org.apache.inlong.sort.base.dirty.sink.DirtySink; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; import org.apache.inlong.sort.base.util.MetricStateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.io.IOException; @@ -48,6 +54,8 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ CheckpointedFunction, SchemaEvolutionFunction<TaskWriterFactory<T>> { + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergSingleStreamWriter.class); + private static final long serialVersionUID = 1L; private final String fullTableName; @@ -58,20 +66,28 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ private transient TaskWriter<T> writer; private transient int subTaskId; private transient int attemptId; - @Nullable - private transient SinkMetricData metricData; + private @Nullable transient SinkMetricData metricData; private transient ListState<MetricState> metricStateListState; private transient MetricState metricState; + private @Nullable RowType flinkRowType; + private final DirtyOptions dirtyOptions; + private @Nullable final DirtySink<Object> dirtySink; public IcebergSingleStreamWriter( String fullTableName, TaskWriterFactory<T> taskWriterFactory, String inlongMetric, - String auditHostAndPorts) { + String auditHostAndPorts, + @Nullable RowType flinkRowType, + DirtyOptions dirtyOptions, + @Nullable DirtySink<Object> dirtySink) { this.fullTableName = fullTableName; this.taskWriterFactory = taskWriterFactory; this.inlongMetric = inlongMetric; this.auditHostAndPorts = auditHostAndPorts; + this.flinkRowType = flinkRowType; + this.dirtyOptions = dirtyOptions; + this.dirtySink = dirtySink; } @Override @@ -81,7 +97,6 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ // Initialize the task writer factory. this.taskWriterFactory.initialize(subTaskId, attemptId); - // Initialize the task writer. this.writer = taskWriterFactory.create(); @@ -102,17 +117,38 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { // close all open files and emit files to downstream committer operator emit(writer.complete()); - this.writer = taskWriterFactory.create(); } @Override - public void processElement(T value) - throws Exception { - writer.write(value); - + public void processElement(T value) throws Exception { + try { + writer.write(value); + } catch (Exception e) { + LOGGER.error(String.format("write error, raw data: %s", value), e); + if (!dirtyOptions.ignoreDirty()) { + throw e; + } + if (dirtySink != null) { + DirtyData.Builder<Object> builder = DirtyData.builder(); + try { + builder.setData(value) + .setLabels(dirtyOptions.getLabels()) + .setLogTag(dirtyOptions.getLogTag()) + .setIdentifier(dirtyOptions.getIdentifier()) + .setRowType(flinkRowType) + .setDirtyMessage(e.getMessage()); + dirtySink.invoke(builder.build()); + } catch (Exception ex) { + if (!dirtyOptions.ignoreSideOutputErrors()) { + throw new RuntimeException(ex); + } + LOGGER.warn("Dirty sink failed", ex); + } + } + } if (metricData != null) { - metricData.invokeWithEstimate(value); + metricData.invokeWithEstimate(value == null ? "" : value); } } @@ -131,6 +167,10 @@ public class IcebergSingleStreamWriter<T> extends IcebergProcessFunction<T, Writ } } + public void setFlinkRowType(@Nullable RowType flinkRowType) { + this.flinkRowType = flinkRowType; + } + @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { if (metricData != null && metricStateListState != null) { diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java index 50388dcb4..0602868f2 100644 --- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java +++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IcebergNodeSqlParserTest.java @@ -44,6 +44,7 @@ import org.junit.Test; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -116,7 +117,13 @@ public class IcebergNodeSqlParserTest extends AbstractTestBase { new FieldInfo("age", new IntFormatInfo())), new FieldRelation(new FieldInfo("ts", new TimestampFormatInfo()), new FieldInfo("ts", new TimestampFormatInfo()))); - + Map<String, String> properties = new LinkedHashMap<>(); + properties.put("dirty.side-output.connector", "log"); + properties.put("dirty.ignore", "true"); + properties.put("dirty.side-output.enable", "true"); + properties.put("dirty.side-output.format", "csv"); + properties.put("dirty.side-output.labels", + "SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=inlong_iceberg"); // set HIVE_CONF_DIR,or set uri and warehouse return new IcebergLoadNode( "iceberg", @@ -126,7 +133,7 @@ public class IcebergNodeSqlParserTest extends AbstractTestBase { null, null, null, - null, + properties, "inlong", "inlong_iceberg", null,