[GitHub] [inlong] hejiay commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg
hejiay commented on code in PR #8095: URL: https://github.com/apache/inlong/pull/8095#discussion_r1208411380 ## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java: ## @@ -51,49 +53,81 @@ public class SchemaChangeUtils { static List diffSchema(Schema oldSchema, Schema newSchema) { List oldFields = oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); List newFields = newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); -int oi = 0; -int ni = 0; +Set oldFieldSet = new HashSet<>(oldFields); +Set newFieldSet = new HashSet<>(newFields); + +Set intersectColSet = Sets.intersection(oldFieldSet, newFieldSet); +Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet); +Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet); + List tableChanges = new ArrayList<>(); -while (ni < newFields.size()) { -if (oi < oldFields.size() && oldFields.get(oi).equals(newFields.get(ni))) { -oi++; -ni++; -} else { -NestedField newField = newSchema.findField(newFields.get(ni)); + +// step0: Unknown change +if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) { Review Comment: because it can not distinguished (add + delete) or modify, example first [a, b, c] -> delete c [a, b] -> add d [a, b, d], it not suppoted to judge. I will support later use information extracted from DDL. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] hejiay commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg
hejiay commented on code in PR #8095: URL: https://github.com/apache/inlong/pull/8095#discussion_r1208411858 ## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java: ## @@ -51,49 +53,81 @@ public class SchemaChangeUtils { static List diffSchema(Schema oldSchema, Schema newSchema) { List oldFields = oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); List newFields = newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); -int oi = 0; -int ni = 0; +Set oldFieldSet = new HashSet<>(oldFields); +Set newFieldSet = new HashSet<>(newFields); + +Set intersectColSet = Sets.intersection(oldFieldSet, newFieldSet); +Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet); +Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet); + List tableChanges = new ArrayList<>(); -while (ni < newFields.size()) { -if (oi < oldFields.size() && oldFields.get(oi).equals(newFields.get(ni))) { -oi++; -ni++; -} else { -NestedField newField = newSchema.findField(newFields.get(ni)); + +// step0: Unknown change +if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) { +tableChanges.add(new UnknownColumnChange( +String.format(" old schema: [%s] and new schema: [%s], it is unknown column change", +oldSchema.toString(), newSchema.toString(; +return tableChanges; +} + +// step1: judge whether update column(only column type change and doc change) +for (String colName : intersectColSet) { Review Comment: sorry, changing the field position is currently not supported,because I haven't considered it yet -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow opened a new pull request, #8104: [INLONG-8103][CI] Format the order of imports by the spotless plugin
healchow opened a new pull request, #8104: URL: https://github.com/apache/inlong/pull/8104 ### Prepare a Pull Request - Fixes #8103 ### Motivation Format the order of imports by the spotless plugin. ### Modifications Execute the command `mvn spotless:apply` to batch format the imports in the code, the effect is similar to:  ### Verifying this change - [x] This change is a trivial rework/code cleanup without any test coverage. ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] hejiay commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg
hejiay commented on code in PR #8095: URL: https://github.com/apache/inlong/pull/8095#discussion_r1208414903 ## inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/sink/TableChange.java: ## @@ -177,9 +177,80 @@ public String toString() { final class DeleteColumn implements ColumnChange { +private final String[] fieldsNames; + +public DeleteColumn(String[] fieldsNames) { +Preconditions.checkArgument(fieldsNames.length > 0, "Invalid filed name: at least one is required"); +this.fieldsNames = fieldsNames; +} + @Override public String[] fieldNames() { -return new String[0]; +return fieldsNames; +} + +@Override +public boolean equals(Object o) { +if (this == o) { +return true; +} +if (!(o instanceof DeleteColumn)) { +return false; +} +DeleteColumn that = (DeleteColumn) o; +return Arrays.equals(fieldsNames, that.fieldsNames); +} + +@Override +public int hashCode() { +return Arrays.hashCode(fieldsNames); +} + +@Override +public String toString() { +return String.format("DELETE COLUMNS `%s`", fieldsNames[fieldsNames.length - 1]); +} +} + +final class UpdateColumn implements ColumnChange { + +private final String[] fieldsNames; +private final LogicalType dataType; +private final boolean isNullable; +private final String comment; + +public UpdateColumn(String[] fieldsNames, LogicalType dataType, boolean isNullable, String comment) { +Preconditions.checkArgument(fieldsNames.length > 0, "Invalid filed name: at least one is required"); Review Comment: it can not distinguished (add + delete) or modify, example first [a, b, c] -> delete c [a, b] -> add d [a, b, d], it is judged as unknown change, and I will judeg it by using information extracted from DDL in next version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #8100: [INLONG-8098][Manager] Support Inlong user permission control
healchow commented on code in PR #8100: URL: https://github.com/apache/inlong/pull/8100#discussion_r1208420472 ## inlong-manager/manager-dao/src/main/resources/mappers/UserRoleEntityMapper.xml: ## Review Comment: Is the table name `user_role` or `tenant_user_role`? The name of the Entity should be consistent with the name of the table. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #8100: [INLONG-8098][Manager] Support Inlong user permission control
healchow commented on code in PR #8100: URL: https://github.com/apache/inlong/pull/8100#discussion_r1208421170 ## inlong-manager/manager-dao/src/main/resources/mappers/UserRoleEntityMapper.xml: ## Review Comment: I see that the table name is `tenant_user_role`, so your comment is right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] lordcheng10 commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg
lordcheng10 commented on code in PR #8095: URL: https://github.com/apache/inlong/pull/8095#discussion_r1208768400 ## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java: ## @@ -51,49 +53,81 @@ public class SchemaChangeUtils { static List diffSchema(Schema oldSchema, Schema newSchema) { List oldFields = oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); List newFields = newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); -int oi = 0; -int ni = 0; +Set oldFieldSet = new HashSet<>(oldFields); +Set newFieldSet = new HashSet<>(newFields); + +Set intersectColSet = Sets.intersection(oldFieldSet, newFieldSet); +Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet); +Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet); + List tableChanges = new ArrayList<>(); -while (ni < newFields.size()) { -if (oi < oldFields.size() && oldFields.get(oi).equals(newFields.get(ni))) { -oi++; -ni++; -} else { -NestedField newField = newSchema.findField(newFields.get(ni)); + +// step0: Unknown change +if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) { Review Comment: > example first [a, b, c] -> delete c [a, b] -> add d [a, b, d] If two messages are received one after another: (a1,b1,c1)->(a1,b1,d1), through the diff schema, you can get an add(d) and delete(c), so after executing add and delete , becomes [a, b, d] ? ## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java: ## @@ -51,49 +53,81 @@ public class SchemaChangeUtils { static List diffSchema(Schema oldSchema, Schema newSchema) { List oldFields = oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); List newFields = newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); -int oi = 0; -int ni = 0; +Set oldFieldSet = new HashSet<>(oldFields); +Set newFieldSet = new HashSet<>(newFields); + +Set intersectColSet = Sets.intersection(oldFieldSet, newFieldSet); +Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet); +Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet); + List tableChanges = new ArrayList<>(); -while (ni < newFields.size()) { -if (oi < oldFields.size() && oldFields.get(oi).equals(newFields.get(ni))) { -oi++; -ni++; -} else { -NestedField newField = newSchema.findField(newFields.get(ni)); + +// step0: Unknown change +if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) { Review Comment: > example first [a, b, c] -> delete c [a, b] -> add d [a, b, d] @hejiay If two messages are received one after another: (a1,b1,c1)->(a1,b1,d1), through the diff schema, you can get an add(d) and delete(c), so after executing add and delete , becomes [a, b, d] ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #8096: [INLONG-8092][Sort] Support all database and multiple tables transmission for Hive
gong commented on code in PR #8096: URL: https://github.com/apache/inlong/pull/8096#discussion_r1208776632 ## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/HiveTableSink.java: ## @@ -170,23 +188,50 @@ private DataStreamSink consume( try (HiveMetastoreClientWrapper client = HiveMetastoreClientFactory.create(HiveConfUtils.create(jobConf), hiveVersion)) { -Table table = client.getTable(dbName, identifier.getObjectName()); -StorageDescriptor sd = table.getSd(); - -Class hiveOutputFormatClz = - hiveShim.getHiveOutputFormatClass(Class.forName(sd.getOutputFormat())); +StorageDescriptor sd; +Properties tableProps = new Properties(); +Class hiveOutputFormatClz; +HiveWriterFactory writerFactory; +boolean sinkMultipleEnable = Boolean.parseBoolean( + catalogTable.getOptions().getOrDefault(SINK_MULTIPLE_ENABLE.key(), "false")); boolean isCompressed = jobConf.getBoolean(HiveConf.ConfVars.COMPRESSRESULT.varname, false); -HiveWriterFactory writerFactory = -new HiveWriterFactory( -jobConf, -hiveOutputFormatClz, -sd.getSerdeInfo(), -tableSchema, -getPartitionKeyArray(), -HiveReflectionUtils.getTableMetadata(hiveShim, table), -hiveShim, -isCompressed); +if (sinkMultipleEnable) { +sd = new StorageDescriptor(); +SerDeInfo serDeInfo = new SerDeInfo(); + serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"); Review Comment: Maybe, this param can be passed by HiveOptions, deafult valuse is "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on pull request #8104: [INLONG-8103][CI] Format the order of imports by the spotless plugin
gong commented on PR #8104: URL: https://github.com/apache/inlong/pull/8104#issuecomment-1566418340 Too many style change, why not format code when change old code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu opened a new pull request, #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink
yunqingmoswu opened a new pull request, #8105: URL: https://github.com/apache/inlong/pull/8105 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-7853][Sort] Add common handle for schema-change in sink *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #7853 ### Motivation Add common handle for schema-change in sink ### Modifications 1. Add protocols for schema-change 2. Add tool for schema-change ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg
gong commented on code in PR #8095: URL: https://github.com/apache/inlong/pull/8095#discussion_r1208796949 ## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java: ## @@ -42,7 +44,7 @@ public class SchemaChangeUtils { /** * Compare two schemas and get the schema changes that happened in them. - * TODO: currently only support add column + * TODO: currently only support add column,delete column and update column(column type change) Review Comment: `update column` include too many situation. Maybe differentiation of fine grains is better, not use `update column` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] hejiay commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg
hejiay commented on code in PR #8095: URL: https://github.com/apache/inlong/pull/8095#discussion_r1208799987 ## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java: ## @@ -51,49 +53,81 @@ public class SchemaChangeUtils { static List diffSchema(Schema oldSchema, Schema newSchema) { List oldFields = oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); List newFields = newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); -int oi = 0; -int ni = 0; +Set oldFieldSet = new HashSet<>(oldFields); +Set newFieldSet = new HashSet<>(newFields); + +Set intersectColSet = Sets.intersection(oldFieldSet, newFieldSet); +Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet); +Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet); + List tableChanges = new ArrayList<>(); -while (ni < newFields.size()) { -if (oi < oldFields.size() && oldFields.get(oi).equals(newFields.get(ni))) { -oi++; -ni++; -} else { -NestedField newField = newSchema.findField(newFields.get(ni)); + +// step0: Unknown change +if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) { Review Comment: of course not,for iceberg, each field has a unique id. Even if it is deleted, the field still exists, but the content of the field will be hidden when querying.if modify, only one modify step [a, b, c] -> [a, b, d],The field[d] id has not changed.if add and delete, the filed d has a new unique id. So first add and then delete cannot be confused with modify. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] hejiay commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg
hejiay commented on code in PR #8095: URL: https://github.com/apache/inlong/pull/8095#discussion_r1208800947 ## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java: ## @@ -42,7 +44,7 @@ public class SchemaChangeUtils { /** * Compare two schemas and get the schema changes that happened in them. - * TODO: currently only support add column + * TODO: currently only support add column,delete column and update column(column type change) Review Comment: Thanks for the correction -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on pull request #8104: [INLONG-8103][CI] Format the order of imports by the spotless plugin
healchow commented on PR #8104: URL: https://github.com/apache/inlong/pull/8104#issuecomment-1566448032 > Too many style change, why not format code when change old code. @gong This code format is restricted on Maven's spotless plugin, and all submissions must match this format to pass the pipeline. Afterward, when the code is modified or added, the import format will be automatically formatted according to the current specification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink
gong commented on code in PR #8105: URL: https://github.com/apache/inlong/pull/8105#discussion_r1208801911 ## inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java: ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.util; + +import com.google.common.base.Preconditions; +import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn; +import org.apache.inlong.sort.protocol.ddl.expressions.Column; +import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation; +import org.apache.inlong.sort.protocol.ddl.operations.Operation; +import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy; +import org.apache.inlong.sort.protocol.enums.SchemaChangeType; + Review Comment: Maybe import order is not right for `com.google.common.base.Preconditions`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink
gong commented on code in PR #8105: URL: https://github.com/apache/inlong/pull/8105#discussion_r1208839366 ## inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java: ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.util; + +import com.google.common.base.Preconditions; +import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn; +import org.apache.inlong.sort.protocol.ddl.expressions.Column; +import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation; +import org.apache.inlong.sort.protocol.ddl.operations.Operation; +import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy; +import org.apache.inlong.sort.protocol.enums.SchemaChangeType; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.StringJoiner; + +/** + * Schema-change Utils + */ +public final class SchemaChangeUtils { + +private final static String DELIMITER = "&"; +private final static String KEY_VALUE_DELIMITER = "="; + +private SchemaChangeUtils() { +} + +/** + * deserialize the policies to a Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] + * + * @param policies The policies format by 'key1=value1&key2=value2...' + * @return A policy Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] + */ +public static Map deserialize(String policies) { +Preconditions.checkNotNull(policies, "policies is null"); +Map policyMap = new HashMap<>(); +for (String kv : policies.split(DELIMITER)) { +int index = kv.indexOf(KEY_VALUE_DELIMITER); +if (index < 1 || index == kv.length() - 1) { +throw new IllegalArgumentException( +"The format of policies must be like 'key1=value1&key2=value2...'"); +} +String typeCode = kv.substring(0, index); +String policyCode = kv.substring(index + 1); +SchemaChangeType type; +SchemaChangePolicy policy; +try { +type = SchemaChangeType.getInstance(Integer.parseInt(typeCode)); +} catch (NumberFormatException e) { +throw new IllegalArgumentException( +String.format("Unsupported type of schema-change: %s for InLong", typeCode)); +} +try { +policy = SchemaChangePolicy.getInstance(Integer.parseInt(policyCode)); +} catch (NumberFormatException e) { +throw new IllegalArgumentException( +String.format("Unsupported policy of schema-change: %s for InLong", policyCode)); +} +policyMap.put(type, policy); +} +return policyMap; +} + +/** + * Serialize the policy Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] to a string + * + * @param policyMap The policy Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] + * @return A string format by 'key1=value1&key2=value2...' + */ +public static String serialize(Map policyMap) { +Preconditions.checkNotNull(policyMap, "policyMap is null"); +StringJoiner joiner = new StringJoiner("&"); +for (Entry kv : policyMap.entrySet()) { +joiner.add(kv.getKey().getCode() + "=" + kv.getValue().getCode()); +} Review Comment: Maybe use string constant, Keep consitent with deserialize -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink
gong commented on code in PR #8105: URL: https://github.com/apache/inlong/pull/8105#discussion_r1208845988 ## inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java: ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.util; + +import com.google.common.base.Preconditions; +import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn; +import org.apache.inlong.sort.protocol.ddl.expressions.Column; +import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation; +import org.apache.inlong.sort.protocol.ddl.operations.Operation; +import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy; +import org.apache.inlong.sort.protocol.enums.SchemaChangeType; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.StringJoiner; + +/** + * Schema-change Utils + */ +public final class SchemaChangeUtils { + +private final static String DELIMITER = "&"; +private final static String KEY_VALUE_DELIMITER = "="; + +private SchemaChangeUtils() { +} + +/** + * deserialize the policies to a Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] + * + * @param policies The policies format by 'key1=value1&key2=value2...' + * @return A policy Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] + */ +public static Map deserialize(String policies) { +Preconditions.checkNotNull(policies, "policies is null"); +Map policyMap = new HashMap<>(); +for (String kv : policies.split(DELIMITER)) { +int index = kv.indexOf(KEY_VALUE_DELIMITER); +if (index < 1 || index == kv.length() - 1) { +throw new IllegalArgumentException( +"The format of policies must be like 'key1=value1&key2=value2...'"); +} +String typeCode = kv.substring(0, index); +String policyCode = kv.substring(index + 1); +SchemaChangeType type; +SchemaChangePolicy policy; +try { +type = SchemaChangeType.getInstance(Integer.parseInt(typeCode)); +} catch (NumberFormatException e) { +throw new IllegalArgumentException( +String.format("Unsupported type of schema-change: %s for InLong", typeCode)); +} +try { +policy = SchemaChangePolicy.getInstance(Integer.parseInt(policyCode)); +} catch (NumberFormatException e) { +throw new IllegalArgumentException( +String.format("Unsupported policy of schema-change: %s for InLong", policyCode)); +} +policyMap.put(type, policy); +} +return policyMap; +} + +/** + * Serialize the policy Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] to a string + * + * @param policyMap The policy Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] + * @return A string format by 'key1=value1&key2=value2...' + */ +public static String serialize(Map policyMap) { +Preconditions.checkNotNull(policyMap, "policyMap is null"); +StringJoiner joiner = new StringJoiner("&"); +for (Entry kv : policyMap.entrySet()) { +joiner.add(kv.getKey().getCode() + "=" + kv.getValue().getCode()); +} +return joiner.toString(); +} + +/** + * Extract the schema change types from {@link Operation} + * + * @param operation The operation + * @return Set of {@link SchemaChangeType} + */ +public static Set extractSchemaChangeTypes(Operation operation) { +Set types = new HashSet<>(); +switch (operation.getOperationType()) { +case ALTER: +AlterOperation alterOperation = (AlterOperation) operation; +Preconditions.checkState(alterOperation.getAlterColumns() != null +&& !alterOperation.getAlterColumns().isEmpty(), "alter columns is empty"); +for (AlterColumn alterColumn : alterOperation.getAlterColum
[GitHub] [inlong] healchow merged pull request #8104: [INLONG-8103][CI] Format the order of imports by the spotless plugin
healchow merged PR #8104: URL: https://github.com/apache/inlong/pull/8104 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #8105: [INLONG-7853][Sort] Add common handle for schema-change in sink
gong commented on code in PR #8105: URL: https://github.com/apache/inlong/pull/8105#discussion_r1208853226 ## inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/SchemaChangeUtils.java: ## @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.util; + +import com.google.common.base.Preconditions; +import org.apache.inlong.sort.protocol.ddl.expressions.AlterColumn; +import org.apache.inlong.sort.protocol.ddl.expressions.Column; +import org.apache.inlong.sort.protocol.ddl.operations.AlterOperation; +import org.apache.inlong.sort.protocol.ddl.operations.Operation; +import org.apache.inlong.sort.protocol.enums.SchemaChangePolicy; +import org.apache.inlong.sort.protocol.enums.SchemaChangeType; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.StringJoiner; + +/** + * Schema-change Utils + */ +public final class SchemaChangeUtils { + +private final static String DELIMITER = "&"; +private final static String KEY_VALUE_DELIMITER = "="; + +private SchemaChangeUtils() { +} + +/** + * deserialize the policies to a Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] + * + * @param policies The policies format by 'key1=value1&key2=value2...' + * @return A policy Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] + */ +public static Map deserialize(String policies) { +Preconditions.checkNotNull(policies, "policies is null"); +Map policyMap = new HashMap<>(); +for (String kv : policies.split(DELIMITER)) { +int index = kv.indexOf(KEY_VALUE_DELIMITER); +if (index < 1 || index == kv.length() - 1) { +throw new IllegalArgumentException( +"The format of policies must be like 'key1=value1&key2=value2...'"); +} +String typeCode = kv.substring(0, index); +String policyCode = kv.substring(index + 1); +SchemaChangeType type; +SchemaChangePolicy policy; +try { +type = SchemaChangeType.getInstance(Integer.parseInt(typeCode)); +} catch (NumberFormatException e) { +throw new IllegalArgumentException( +String.format("Unsupported type of schema-change: %s for InLong", typeCode)); +} +try { +policy = SchemaChangePolicy.getInstance(Integer.parseInt(policyCode)); +} catch (NumberFormatException e) { +throw new IllegalArgumentException( +String.format("Unsupported policy of schema-change: %s for InLong", policyCode)); +} +policyMap.put(type, policy); +} +return policyMap; +} + +/** + * Serialize the policy Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] to a string + * + * @param policyMap The policy Map[{@link SchemaChangeType}, {@link SchemaChangePolicy}] + * @return A string format by 'key1=value1&key2=value2...' + */ +public static String serialize(Map policyMap) { +Preconditions.checkNotNull(policyMap, "policyMap is null"); +StringJoiner joiner = new StringJoiner("&"); +for (Entry kv : policyMap.entrySet()) { +joiner.add(kv.getKey().getCode() + "=" + kv.getValue().getCode()); +} +return joiner.toString(); +} + +/** + * Extract the schema change types from {@link Operation} + * + * @param operation The operation + * @return Set of {@link SchemaChangeType} + */ +public static Set extractSchemaChangeTypes(Operation operation) { +Set types = new HashSet<>(); +switch (operation.getOperationType()) { +case ALTER: +AlterOperation alterOperation = (AlterOperation) operation; +Preconditions.checkState(alterOperation.getAlterColumns() != null +&& !alterOperation.getAlterColumns().isEmpty(), "alter columns is empty"); +for (AlterColumn alterColumn : alterOperation.getAlterColum
[GitHub] [inlong] lordcheng10 commented on a diff in pull request #8095: [INLONG-7959][Sort] Dynamic schema evolution support delete and update columns when sink to Iceberg
lordcheng10 commented on code in PR #8095: URL: https://github.com/apache/inlong/pull/8095#discussion_r1208768400 ## inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/SchemaChangeUtils.java: ## @@ -51,49 +53,81 @@ public class SchemaChangeUtils { static List diffSchema(Schema oldSchema, Schema newSchema) { List oldFields = oldSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); List newFields = newSchema.columns().stream().map(NestedField::name).collect(Collectors.toList()); -int oi = 0; -int ni = 0; +Set oldFieldSet = new HashSet<>(oldFields); +Set newFieldSet = new HashSet<>(newFields); + +Set intersectColSet = Sets.intersection(oldFieldSet, newFieldSet); +Set colsToDelete = Sets.difference(oldFieldSet, newFieldSet); +Set colsToAdd = Sets.difference(newFieldSet, oldFieldSet); + List tableChanges = new ArrayList<>(); -while (ni < newFields.size()) { -if (oi < oldFields.size() && oldFields.get(oi).equals(newFields.get(ni))) { -oi++; -ni++; -} else { -NestedField newField = newSchema.findField(newFields.get(ni)); + +// step0: Unknown change +if (!colsToDelete.isEmpty() && !colsToAdd.isEmpty()) { Review Comment: > example first [a, b, c] -> delete c [a, b] -> add d [a, b, d] @hejiay If two messages are received one after another: (a1,b1,c1)->(a1,b1,d1), through the diff schema, you can get an add(d) and delete(c), so after executing add and delete , becomes [a, b, d] ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] hnrainll opened a new pull request, #8109: [INLONG-8108][DataProxy] WorkflowApprover API Permissions Optimization
hnrainll opened a new pull request, #8109: URL: https://github.com/apache/inlong/pull/8109 ### Prepare a Pull Request - Fixes #8108 ### Motivation WorkflowApprover API can be accessed with too many permissions ### Modifications WorkflowApproverController API Permissions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org