This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ff3cf5110a [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361) ff3cf5110a is described below commit ff3cf5110a9eb9d4149ab6d44604a345e5e67dc1 Author: Sting <zpen...@connect.ust.hk> AuthorDate: Wed Nov 29 18:59:57 2023 +0800 [INLONG-9359][Sort] Fix iceberg all migrate connector stack overflow error (#9361) --- .../sort/protocol/node/load/IcebergLoadNode.java | 5 ----- .../java/org/apache/inlong/sort/base/Constants.java | 2 +- .../sort/iceberg/FlinkDynamicTableFactory.java | 2 ++ .../sink/multiple/DynamicSchemaHandleOperator.java | 20 +++++++++++++++++--- 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java index a418fd930b..9e3fb9e8dd 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/IcebergLoadNode.java @@ -30,7 +30,6 @@ import org.apache.inlong.sort.protocol.node.LoadNode; import org.apache.inlong.sort.protocol.node.format.Format; import org.apache.inlong.sort.protocol.transformation.FieldRelation; import org.apache.inlong.sort.protocol.transformation.FilterFunction; -import org.apache.inlong.sort.util.SchemaChangeUtils; import com.google.common.base.Preconditions; import lombok.Data; @@ -54,7 +53,6 @@ import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIP import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_ENABLE; import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_FORMAT; import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_MULTIPLE_TABLE_PATTERN; -import static org.apache.inlong.sort.protocol.constant.DorisConstant.SINK_SCHEMA_CHANGE_POLICIES; @JsonTypeName("icebergLoad") @Data @@ -177,8 +175,6 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, public Map<String, String> tableOptions() { Map<String, String> options = super.tableOptions(); options.put(IcebergConstant.CONNECTOR_KEY, IcebergConstant.CONNECTOR); - // for test sink.ignore.changelog - // options.put("sink.ignore.changelog", "true"); options.put(IcebergConstant.DATABASE_KEY, dbName); options.put(IcebergConstant.TABLE_KEY, tableName); options.put(IcebergConstant.DEFAULT_DATABASE_KEY, dbName); @@ -197,7 +193,6 @@ public class IcebergLoadNode extends LoadNode implements InlongMetric, Metadata, options.put(SINK_MULTIPLE_FORMAT, Objects.requireNonNull(sinkMultipleFormat).identifier()); options.put(SINK_MULTIPLE_DATABASE_PATTERN, databasePattern); options.put(SINK_MULTIPLE_TABLE_PATTERN, tablePattern); - options.put(SINK_SCHEMA_CHANGE_POLICIES, SchemaChangeUtils.serialize(policyMap)); } else { options.put(SINK_MULTIPLE_ENABLE, "false"); } diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java index ac7baac4dd..ba47c74356 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -419,7 +419,7 @@ public final class Constants { public static final ConfigOption<Boolean> SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT = ConfigOptions.key("sink.multiple.auto-create-table-when-snapshot") .booleanType() - .defaultValue(false) + .defaultValue(true) .withDescription("Whether supporting auto create table when snapshot, default value is 'false'"); public static final ConfigOption<String> INNER_FORMAT = diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java index bb33e896bd..9a6056e614 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/FlinkDynamicTableFactory.java @@ -57,6 +57,7 @@ import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; import static org.apache.inlong.sort.base.Constants.IGNORE_ALL_CHANGELOG; import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; +import static org.apache.inlong.sort.base.Constants.SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_DATABASE_PATTERN; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_ENABLE; import static org.apache.inlong.sort.base.Constants.SINK_MULTIPLE_FORMAT; @@ -327,6 +328,7 @@ public class FlinkDynamicTableFactory implements DynamicTableSinkFactory, Dynami options.add(SINK_MULTIPLE_SCHEMA_UPDATE_POLICY); options.add(SINK_MULTIPLE_PK_AUTO_GENERATED); options.add(SINK_MULTIPLE_TYPE_MAP_COMPATIBLE_WITH_SPARK); + options.add(SINK_AUTO_CREATE_TABLE_WHEN_SNAPSHOT); options.add(WRITE_COMPACT_ENABLE); options.add(WRITE_COMPACT_INTERVAL); options.add(WRITE_DISTRIBUTION_MODE); diff --git a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java index e416cfd24f..1a11b1d771 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java +++ b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java @@ -60,6 +60,7 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.flink.CatalogLoader; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,6 +81,7 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.IntStream; import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES_OUT; import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS_OUT; @@ -522,10 +524,22 @@ public class DynamicSchemaHandleOperator extends AbstractStreamOperator<RecordWi } // =============================== Utils method ================================================================= - // if newSchema is not same with oldSchema, return false. It include difference in name, type, position, and - // quantity + // if newSchema is not same with oldSchema, return false. private boolean isCompatible(Schema newSchema, Schema oldSchema) { - return oldSchema.sameSchema(newSchema); + if (newSchema == null) { + return false; + } + + List<NestedField> oldSchemaFields = oldSchema.columns(); + List<NestedField> newSchemaFields = newSchema.columns(); + + if (oldSchemaFields.size() != newSchemaFields.size()) { + return false; + } + + return IntStream.range(0, oldSchemaFields.size()) + .allMatch(i -> oldSchemaFields.get(i).name().equals(newSchemaFields.get(i).name()) + && oldSchemaFields.get(i).type() == newSchemaFields.get(i).type()); } private TableIdentifier parseId(JsonNode data) throws IOException {