Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1894119512 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1026,20 +1026,86 @@ void createTemporarySystemFunction( * If a permanent table with a given path exists, it will be used from now on for any queries * that reference this path. * + * @param path The given path under which the temporary table will be dropped. See also the + * {@link TableEnvironment} class description for the format of the path. * @return true if a table existed in the given path and was removed */ boolean dropTemporaryTable(String path); +/** + * Drops a table registered in the given path. + * + * This method can only drop permanent objects. Temporary objects can shadow permanent ones. + * If a temporary object exists in a given path, make sure to drop the temporary object first + * using {@link #dropTemporaryTable}. + * + * Compared to SQL, this method will not throw an error if the table does not exist. Use + * {@link #dropTable(java.lang.String, boolean)} to change the default behavior. + * + * @param path The given path under which the table will be dropped. See also the {@link + * TableEnvironment} class description for the format of the path. + * @return true if a table existed in the given path and was removed + */ +boolean dropTable(String path); + +/** + * Drops a table registered in the given path. + * + * This method can only drop permanent objects. Temporary objects can shadow permanent ones. + * If a temporary object exists in a given path, make sure to drop the temporary object first + * using {@link #dropTemporaryTable}. + * + * @param path The given path under which the given table will be dropped. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param ignoreIfNotExists whether to ignore if table does not exist. + * @return true if a table existed in the given path and was removed, throws {@link Review Comment: nit: can we add when false is returned, so it is explicit , maybe add "return false if the table was not dropped, because the table does not exist and ignoreIfNotExists is true." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894119140 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java: ## @@ -366,6 +366,61 @@ public static ApiExpression toTimestampLtz(Object numericEpochTime, Object preci return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, numericEpochTime, precision); } +/** + * * Converts the given time string with the specified format to {@link Review Comment: modified. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36227] Restore compatibility with Logback 1.2 [flink]
afedulov commented on PR #25813: URL: https://github.com/apache/flink/pull/25813#issuecomment-2557293147 @piotrp thanks for the contribution. Please consider that usage of Mockito in Flink is generally discouraged: https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894092145 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java: ## @@ -804,4 +813,163 @@ private Stream floorTestCases() { .format(TIMESTAMP_FORMATTER), STRING().nullable())); } + +private Stream toTimestampLtzTestCases() { +return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ) +.onFieldsWithData( +100, +1234, +-100, +null, +DecimalDataUtils.castFrom(-Double.MAX_VALUE, 38, 18), Review Comment: Hi @snuyanzin , I have the invalid test cases in the last commit, the invalid inputs are directly pased in. https://github.com/apache/flink/pull/25763/files/982b92f13fbb93b8e0c5244b30d4ae8bbbfa3a25#diff-7588925e682b7f5b4b2e41fdc4992ef0d9f246ce00696f783c36f993b8aeb7aeR950-R973 Could you clarify what you mean in this comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894106052 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894092145 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java: ## @@ -804,4 +813,163 @@ private Stream floorTestCases() { .format(TIMESTAMP_FORMATTER), STRING().nullable())); } + +private Stream toTimestampLtzTestCases() { +return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ) +.onFieldsWithData( +100, +1234, +-100, +null, +DecimalDataUtils.castFrom(-Double.MAX_VALUE, 38, 18), Review Comment: Hi @snuyanzin , I have the invalid test cases in the last commit, the invalid inputs are directly passed in as literals. https://github.com/apache/flink/pull/25763/files/982b92f13fbb93b8e0c5244b30d4ae8bbbfa3a25#diff-7588925e682b7f5b4b2e41fdc4992ef0d9f246ce00696f783c36f993b8aeb7aeR950-R973 Could you clarify what you mean in this comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1894119512 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1026,20 +1026,86 @@ void createTemporarySystemFunction( * If a permanent table with a given path exists, it will be used from now on for any queries * that reference this path. * + * @param path The given path under which the temporary table will be dropped. See also the + * {@link TableEnvironment} class description for the format of the path. * @return true if a table existed in the given path and was removed */ boolean dropTemporaryTable(String path); +/** + * Drops a table registered in the given path. + * + * This method can only drop permanent objects. Temporary objects can shadow permanent ones. + * If a temporary object exists in a given path, make sure to drop the temporary object first + * using {@link #dropTemporaryTable}. + * + * Compared to SQL, this method will not throw an error if the table does not exist. Use + * {@link #dropTable(java.lang.String, boolean)} to change the default behavior. + * + * @param path The given path under which the table will be dropped. See also the {@link + * TableEnvironment} class description for the format of the path. + * @return true if a table existed in the given path and was removed + */ +boolean dropTable(String path); + +/** + * Drops a table registered in the given path. + * + * This method can only drop permanent objects. Temporary objects can shadow permanent ones. + * If a temporary object exists in a given path, make sure to drop the temporary object first + * using {@link #dropTemporaryTable}. + * + * @param path The given path under which the given table will be dropped. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param ignoreIfNotExists whether to ignore if table does not exist. + * @return true if a table existed in the given path and was removed, throws {@link Review Comment: Can we add when false is returned, so it is explicit , maybe add "return false if the table was not dropped, because the table does not exist and ignoreIfNotExists is true." In Catalog manager we say: " * @return true if table existed in the given path and was dropped, false if table didn't exist * in the given path and ignoreIfNotExists was true." * I think the javadoc should be the same in each case, I would suggest that the return only documents when it returns true and false - like CatalogManager does. I think if we want to mention the ValidationException this should be in the body of the javadoc, or as an @Exception if we want to declare the ValidationException on the method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1894119512 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1026,20 +1026,86 @@ void createTemporarySystemFunction( * If a permanent table with a given path exists, it will be used from now on for any queries * that reference this path. * + * @param path The given path under which the temporary table will be dropped. See also the + * {@link TableEnvironment} class description for the format of the path. * @return true if a table existed in the given path and was removed */ boolean dropTemporaryTable(String path); +/** + * Drops a table registered in the given path. + * + * This method can only drop permanent objects. Temporary objects can shadow permanent ones. + * If a temporary object exists in a given path, make sure to drop the temporary object first + * using {@link #dropTemporaryTable}. + * + * Compared to SQL, this method will not throw an error if the table does not exist. Use + * {@link #dropTable(java.lang.String, boolean)} to change the default behavior. + * + * @param path The given path under which the table will be dropped. See also the {@link + * TableEnvironment} class description for the format of the path. + * @return true if a table existed in the given path and was removed + */ +boolean dropTable(String path); + +/** + * Drops a table registered in the given path. + * + * This method can only drop permanent objects. Temporary objects can shadow permanent ones. + * If a temporary object exists in a given path, make sure to drop the temporary object first + * using {@link #dropTemporaryTable}. + * + * @param path The given path under which the given table will be dropped. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param ignoreIfNotExists whether to ignore if table does not exist. + * @return true if a table existed in the given path and was removed, throws {@link Review Comment: nit: can we add when false is returned, so it is explicit , maybe add "return false if the table was not dropped, because the table does not exist and ignoreIfNotExists is true." In Catalog manager we say: " * @return true if table existed in the given path and was dropped, false if table didn't exist * in the given path and ignoreIfNotExists was true." * I think the javadoc should be the same in each case, I would suggest that the return only documents when it returns true and false - like CatalogManager does. I think if we want to mention the ValidationException this should be in the body of the javadoc, or as an @Exception if we want to declare the ValidationException on the method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36227] Restore compatibility with Logback 1.2 [flink]
afedulov commented on code in PR #25813: URL: https://github.com/apache/flink/pull/25813#discussion_r1894125426 ## flink-core/pom.xml: ## @@ -180,6 +180,13 @@ under the License. test + Review Comment: Mockito is intentionallly excluded from Flink and adding this dependency to flink-core sends a wrong message. Please check if you can avoid using it. https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
hanyuzheng7 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894127361 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTim
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1894119512 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1026,20 +1026,86 @@ void createTemporarySystemFunction( * If a permanent table with a given path exists, it will be used from now on for any queries * that reference this path. * + * @param path The given path under which the temporary table will be dropped. See also the + * {@link TableEnvironment} class description for the format of the path. * @return true if a table existed in the given path and was removed */ boolean dropTemporaryTable(String path); +/** + * Drops a table registered in the given path. + * + * This method can only drop permanent objects. Temporary objects can shadow permanent ones. + * If a temporary object exists in a given path, make sure to drop the temporary object first + * using {@link #dropTemporaryTable}. + * + * Compared to SQL, this method will not throw an error if the table does not exist. Use + * {@link #dropTable(java.lang.String, boolean)} to change the default behavior. + * + * @param path The given path under which the table will be dropped. See also the {@link + * TableEnvironment} class description for the format of the path. + * @return true if a table existed in the given path and was removed + */ +boolean dropTable(String path); + +/** + * Drops a table registered in the given path. + * + * This method can only drop permanent objects. Temporary objects can shadow permanent ones. + * If a temporary object exists in a given path, make sure to drop the temporary object first + * using {@link #dropTemporaryTable}. + * + * @param path The given path under which the given table will be dropped. See also the {@link + * TableEnvironment} class description for the format of the path. + * @param ignoreIfNotExists whether to ignore if table does not exist. + * @return true if a table existed in the given path and was removed, throws {@link Review Comment: Can we add when false is returned, so it is explicit , maybe add "return false if the table was not dropped, because the table does not exist and ignoreIfNotExists is true." In Catalog manager we say: " * @return true if table existed in the given path and was dropped, false if table didn't exist * in the given path and ignoreIfNotExists was true." * I think the javadoc should be the same in each case, I would suggest that the return only documents when it returns true and false - like CatalogManager does. I think if we want to mention the ValidationException this should be in the body of the javadoc, or as an @Exception if we want to declare the ValidationException on the method. Though I see it is mentioned in the @param ignoreIfNotExists javadoc in some of the methods - but not consistently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894146265 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894146265 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894166214 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894166214 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894166214 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894166214 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
[PR] [FLINK-33117][table][docs] Fix scala example in udfs page [flink]
afedulov opened a new pull request, #25833: URL: https://github.com/apache/flink/pull/25833 Unmodified backport of https://github.com/apache/flink/pull/23439 to 1.20 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33117][table][docs] Fix scala example in udfs page [flink]
afedulov commented on PR #25833: URL: https://github.com/apache/flink/pull/25833#issuecomment-2557385736 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33117][table][docs] Fix scala example in udfs page [flink]
flinkbot commented on PR #25833: URL: https://github.com/apache/flink/pull/25833#issuecomment-2557386028 ## CI report: * c8f3c26629996819cea131ef8cf11526e6b98369 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894092145 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java: ## @@ -804,4 +813,163 @@ private Stream floorTestCases() { .format(TIMESTAMP_FORMATTER), STRING().nullable())); } + +private Stream toTimestampLtzTestCases() { +return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ) +.onFieldsWithData( +100, +1234, +-100, +null, +DecimalDataUtils.castFrom(-Double.MAX_VALUE, 38, 18), Review Comment: Hi @snuyanzin , I have the invalid test cases here, the invalid inputs are directly pased in. https://github.com/apache/flink/pull/25763/files/982b92f13fbb93b8e0c5244b30d4ae8bbbfa3a25#diff-7588925e682b7f5b4b2e41fdc4992ef0d9f246ce00696f783c36f993b8aeb7aeR950-R973 Could you clarify what you mean in this comment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36931) FlinkCDC YAML supports synchronizing the full amount of data of the entire database in Batch mode
[ https://issues.apache.org/jira/browse/FLINK-36931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-36931: -- Assignee: Wenkai Qi > FlinkCDC YAML supports synchronizing the full amount of data of the entire > database in Batch mode > - > > Key: FLINK-36931 > URL: https://issues.apache.org/jira/browse/FLINK-36931 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: Wenkai Qi >Assignee: Wenkai Qi >Priority: Major > Original Estimate: 336h > Remaining Estimate: 336h > > h1. Background > MysqlCDC in Flink CDC supports *StartupMode.SNAPSHOT* and is of > {*}Boundedness.BOUNDED{*}, and can run in {*}RuntimeExecutionMode.BATCH{*}. > h1. Â > Expectation > FlinkCDC YAML jobs can support {*}StartupMode.SNAPSHOT{*}, be of > {*}Boundedness.BOUNDED{*}, and can run in {*}RuntimeExecutionMode.BATCH{*}. > h1. Benefits > Â > # The performance improvement of Flink Batch can be utilized (dynamic > partition pruning, Hybrid Shuffle). Which optimizations of the batch mode > will be used needs to be discussed. > # The full amount of data of the entire database can be synchronized to > supplement data in an offline computing manner. In the future, it can even > support the full amount of data synchronization of the entire database for > other databases and data lakes. > h1. Under consideration > Â > # Sink needs to switch to Batch mode. > [https://github.com/apache/flink-cdc/pull/3646#pullrequestreview-2491309306] > # For 2PC sink, call a checkpoint with checkpointid of Long.MAX_VALUE once, > and the sink should make the final submission based on this id. > # Sink directly supports Batch writing (such as DorisSink) > # ...(In supplementation) -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
snuyanzin commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894130094 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
snuyanzin commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894130094 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1894138335 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ## @@ -1297,30 +1303,42 @@ public void dropMaterializedTable( * @param objectIdentifier The fully qualified path of the view to drop. * @param ignoreIfNotExists If false exception will be thrown if the view to drop does not * exist. + * @return true if view existed in the given path and was dropped, false if view didn't exist in + * the given path and ignoreIfNotExists was true. */ -public void dropView(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { -dropTableInternal(objectIdentifier, ignoreIfNotExists, false, false); -} +public boolean dropView(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { +return dropTableInternal(objectIdentifier, ignoreIfNotExists, TableKind.VIEW); +} + +private boolean dropTableInternal( +ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists, TableKind kind) { +final Predicate filter; +final String tableOrView; +switch (kind) { Review Comment: I notice that `CatalogBaseTable` has `getTableKind()` can we use this instead of `instanceof ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1894138335 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ## @@ -1297,30 +1303,42 @@ public void dropMaterializedTable( * @param objectIdentifier The fully qualified path of the view to drop. * @param ignoreIfNotExists If false exception will be thrown if the view to drop does not * exist. + * @return true if view existed in the given path and was dropped, false if view didn't exist in + * the given path and ignoreIfNotExists was true. */ -public void dropView(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { -dropTableInternal(objectIdentifier, ignoreIfNotExists, false, false); -} +public boolean dropView(ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists) { +return dropTableInternal(objectIdentifier, ignoreIfNotExists, TableKind.VIEW); +} + +private boolean dropTableInternal( +ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists, TableKind kind) { +final Predicate filter; +final String tableOrView; +switch (kind) { Review Comment: I notice that `CatalogBaseTable` has `getTableKind()` can we use this instead of `instanceof ` avoiding the overhead of reflection? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894140247 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java: ## @@ -804,4 +813,163 @@ private Stream floorTestCases() { .format(TIMESTAMP_FORMATTER), STRING().nullable())); } + +private Stream toTimestampLtzTestCases() { +return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ) +.onFieldsWithData( +100, +1234, +-100, +null, +DecimalDataUtils.castFrom(-Double.MAX_VALUE, 38, 18), Review Comment: Resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
davidradl commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894143500 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894146265 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
snuyanzin commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894179515 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
snuyanzin commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894179515 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
[jira] [Assigned] (FLINK-36946) optimize sink operator name truncate
[ https://issues.apache.org/jira/browse/FLINK-36946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan reassigned FLINK-36946: --- Assignee: tartarus > optimize sink operator name truncate > > > Key: FLINK-36946 > URL: https://issues.apache.org/jira/browse/FLINK-36946 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: tartarus >Assignee: tartarus >Priority: Major > Labels: pull-request-available > > We use the `Writer` keyword to filter the Metric of the sink Operator in > flink. > But the user has set a very long sink name that causes, the `Writer` keyword > to be truncated. > Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36945) MySQL CDC internal schema representation becomes out of sync with the real database schema when restarting a job
[ https://issues.apache.org/jira/browse/FLINK-36945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yohei Yoshimuta updated FLINK-36945: Description: [The Vitess schema migration tool|https://vitess.io/docs/user-guides/schema-changes/ddl-strategies/] uses _RENAME TABLE_ to perform schema changes. However, the MySQL CDC connector does not account for these changes, causing the schema history topic in Debezium to become stale. While this issue does not immediately affect a running job, {*}it prevents the job from restarting successfully from a checkpoint and results in the following error{*}: {code:java} Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:139) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:60) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209) ... 12 more {code} When this happens, *the database history topic needs to be rebuilt, but the job cannot be automatically recovered.* The current workaround is to set `scan.startup.mode` to `specific-offset`, which forces CDC [to pass `schema_only_recovery` to Debezium|https://github.com/apache/flink-cdc/blob/dd69756d3fc1e3f5261b286added5397acbd88ad/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java#L209-L212]. However, this requires manual intervention. Examples of potential solutions include: - Enhancing the schema history mechanism to capture and process `RENAME TABLE` events during schema migrations. - Implementing a fallback mechanism to reconcile schema differences during recovery, reducing the need for manual intervention. was: [The Vitess schema migration tool|https://vitess.io/docs/user-guides/schema-changes/ddl-strategies/] uses _RENAME TABLE_ to perform schema changes. However, the MySQL CDC connector does not account for these changes, causing the schema history topic in Debezium to become stale. While this issue does not immediately affect a running job, {*}it prevents the job from restarting successfully from a checkpoint and results in the following error{*}: {code:java} Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:139) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:60) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209) ... 12 more {code} When this happens, *the database history topic needs to be rebuilt, but the job cannot be automatically recovered.* The current workaround is to set `scan.startup.mode` to `specific-offset`, which forces CDC to pass `schema_only_recovery` to Debezium. However, this requires manual intervention. Examples of potential solutions include: - Enhancing the schema history mechanism to capture and process `RENAME TABLE` events during schema migrations. - Implementing a fallback mechanism to reconcile schema differences during recovery, reducing the need for manual intervention. > MySQL CDC internal schema representation becomes out of sync with the real > database schema when restarting a job > > > Key: FLINK-36945 > URL: https://issues.apache.org/jira/browse/FLINK-36945 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: Yohei Yoshimuta >Priority: Major > > [The Vitess schema migration > tool|https://vitess.io/docs/user-guides/schema-changes/ddl-strategies/] uses > _RENAME TABLE_ to perform schema changes. > However, the MySQL CDC connector does not account
[jira] [Commented] (FLINK-36920) Update org.quartz-schedule:quartz in flink-autoscaler module from 2.3.2 to 2.4.0
[ https://issues.apache.org/jira/browse/FLINK-36920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907357#comment-17907357 ] Gyula Fora commented on FLINK-36920: Makes sense ! But I agree that cutting a new release is almost as easy as a patch release at this point:)Â > Update org.quartz-schedule:quartz in flink-autoscaler module from 2.3.2 to > 2.4.0 > > > Key: FLINK-36920 > URL: https://issues.apache.org/jira/browse/FLINK-36920 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: 1.10.0 >Reporter: Anupam Aggarwal >Assignee: Anupam Aggarwal >Priority: Minor > Labels: pull-request-available > > Update dependency on org.quartz-scheduler:quartz used in flink-autoscaler > module from 2.3.2 to 2.4.0 > Â > *Vulnerability info:* > cve-2023-39017 > quartz-jobs 2.3.2 and below was discovered to contain a code injection > vulnerability in the component > org.quartz.jobs.ee.jms.SendQueueMessageJob.execute. This vulnerability is > exploited via passing an unchecked argument. NOTE: this is disputed by > multiple parties because it is not plausible that untrusted user input would > reach the code location where injection must occur. > More details are at: [https://nvd.nist.gov/vuln/detail/cve-2023-39017]Â > *Proposed fix* > Bumping the dependency from 2.3.2 to 2.4.0Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.20][FLINK-36739] Update the NodeJS to v22.11.0 (LTS) [flink]
mehdid93 commented on PR #25794: URL: https://github.com/apache/flink/pull/25794#issuecomment-2556704958 Hey Robert and David and sorry for the late response. Yes the issue is due to the old version of Ubuntu image used in the build CI I've backported the PRs made by @zentol and @simplejason in this release-1.20 branch. The list of all the PRs are: - https://github.com/apache/flink/pull/25827 Update CI to Ubuntu 22.04 (Jammy) - https://github.com/apache/flink/pull/25794 Update the NodeJS to v22.11.0 (LTS) - https://github.com/apache/flink/pull/25829 Update ng-zorro-antd to v18 - https://github.com/apache/flink/pull/25830 Update frontend dependencies to address vulnerabilities -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator
[ https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907333#comment-17907333 ] Zakelly Lan commented on FLINK-36942: - [~au_miner] Thanks for your contribution! The root cause is the test harness `OneInputStreamOperatorTestHarness` is not well implemented with async state. Please use `AsyncKeyedOneInputStreamOperatorTestHarness` instead, when you are testing the async state. And the PR you mentioned perform a important [fix](https://github.com/apache/flink/pull/25815/files#diff-885dd1cb87987c93b79ac6da39deeb53d5a09dc434196031d4c53e60b18e88e3L57-R66) on `TwoInputStreamOperatorTestHarness`. That's why it runs well. Please try `AsyncKeyedOneInputStreamOperatorTestHarness`, we will use that harness for all async state operators in future. > Unable to trigger onEventTime() when using the async state API in > RowTimestamportOperator > - > > Key: FLINK-36942 > URL: https://issues.apache.org/jira/browse/FLINK-36942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 2.0.0 >Reporter: Wang Qilong >Priority: Major > > When I use the async state API in RowTimeSortOperator and inherit it from > AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the > timer expires. > Regarding how to reproduce: > In > [AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator] > and execute the tests in RowTimestamportOperatorTest. > I think this is a very strange bug because it can be triggered smoothly in > [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], > but cannot be implemented in RowTimestamportOperator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36945) MySQL CDC internal schema representation becomes out of sync with the real database schema when restarting a job
Yohei Yoshimuta created FLINK-36945: --- Summary: MySQL CDC internal schema representation becomes out of sync with the real database schema when restarting a job Key: FLINK-36945 URL: https://issues.apache.org/jira/browse/FLINK-36945 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: Yohei Yoshimuta [The Vitess schema migration tool|https://vitess.io/docs/user-guides/schema-changes/ddl-strategies/] uses `RENAME TABLE` to perform schema changes. However, the MySQL CDC connector does not account for these changes, causing the schema history topic in Debezium to become stale. While this issue does not immediately affect a running job, it prevents the job from restarting successfully from a checkpoint and results in the following error: Â ``` Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254) at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283) at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141) at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:139) at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:60) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209) ... 12 more ``` Â When this happens, the database history topic needs to be rebuilt, but the job cannot be automatically recovered. Â The current workaround is to set `scan.startup.mode` to `specific-offset`, which forces CDC to pass `schema_only_recovery` to Debezium. However, this requires manual intervention. Â Examples of potential solutions include: - Enhancing the schema history mechanism to capture and process `RENAME TABLE` events during schema migrations. - Implementing a fallback mechanism to reconcile schema differences during recovery, reducing the need for manual intervention. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36944) Introduce 'enableAsycState' in KeyedStream interfaces
[ https://issues.apache.org/jira/browse/FLINK-36944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-36944: Summary: Introduce 'enableAsycState' in KeyedStream interfaces (was: Introduce 'enableAsycState' in DataStream v1 interfaces) > Introduce 'enableAsycState' in KeyedStream interfaces > - > > Key: FLINK-36944 > URL: https://issues.apache.org/jira/browse/FLINK-36944 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36944][DataStream/API] Introduce 'enableAsycState' in KeyedStream interfaces and implement reduce operator [flink]
Zakelly opened a new pull request, #25831: URL: https://github.com/apache/flink/pull/25831 ## What is the purpose of the change As first part of FLIP-488, this PR introduces `enableAsycState` in KeyedStream and its related interface classes. Also it provides a basic implementation of async reduce operator. ## Brief change log Please check each commit. - Introduce `enableAsyncState` API - Introduce serializer based state descriptor constructors - Implement async reduce operator - Introduce async state version of wordcount example ## Verifying this change This PR adds a IT test on the wordcount example, which can be a IT test of new APIs and reduce operator. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36944) Introduce 'enableAsycState' in KeyedStream interfaces
[ https://issues.apache.org/jira/browse/FLINK-36944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36944: --- Labels: pull-request-available (was: ) > Introduce 'enableAsycState' in KeyedStream interfaces > - > > Key: FLINK-36944 > URL: https://issues.apache.org/jira/browse/FLINK-36944 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36946][Metrics] Optimize sink operator name truncate [flink]
Tartarus0zm opened a new pull request, #25832: URL: https://github.com/apache/flink/pull/25832 ## What is the purpose of the change Avoid truncation of the Writer/Committer keyword. ## Brief change log - add MetricUtil#truncateOperatorName to optimize Operator name ## Verifying this change - SinkMetricsITCase#testMetrics ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36946) optimize sink operator name truncate
[ https://issues.apache.org/jira/browse/FLINK-36946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36946: --- Labels: pull-request-available (was: ) > optimize sink operator name truncate > > > Key: FLINK-36946 > URL: https://issues.apache.org/jira/browse/FLINK-36946 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: tartarus >Priority: Major > Labels: pull-request-available > > We use the `Writer` keyword to filter the Metric of the sink Operator in > flink. > But the user has set a very long sink name that causes, the `Writer` keyword > to be truncated. > Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36938] Provide hooks before and after the watermark processing [flink]
Zakelly merged PR #25824: URL: https://github.com/apache/flink/pull/25824 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36938) Provide hooks before and after the watermark processing
[ https://issues.apache.org/jira/browse/FLINK-36938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907367#comment-17907367 ] Zakelly Lan commented on FLINK-36938: - Merge 7fa4f78 into master > Provide hooks before and after the watermark processing > --- > > Key: FLINK-36938 > URL: https://issues.apache.org/jira/browse/FLINK-36938 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Async State Processing >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > > We have rewrite the `processWatermark` in abstract operators for async state > processing. The sub-classes need a proper hook for doing something right > before the watermark processing and right after the watermark emit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-36938) Provide hooks before and after the watermark processing
[ https://issues.apache.org/jira/browse/FLINK-36938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan resolved FLINK-36938. - Fix Version/s: 2.0.0 Resolution: Fixed > Provide hooks before and after the watermark processing > --- > > Key: FLINK-36938 > URL: https://issues.apache.org/jira/browse/FLINK-36938 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Async State Processing >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > We have rewrite the `processWatermark` in abstract operators for async state > processing. The sub-classes need a proper hook for doing something right > before the watermark processing and right after the watermark emit. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36946) optimize sink operator name truncate
tartarus created FLINK-36946: Summary: optimize sink operator name truncate Key: FLINK-36946 URL: https://issues.apache.org/jira/browse/FLINK-36946 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Reporter: tartarus We use the `Writer` keyword to filter the Metric of the sink Operator in flink. But the user has set a very long sink name that causes, the `Writer` keyword to be truncated. Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36946][Metrics] Optimize sink operator name truncate [flink]
flinkbot commented on PR #25832: URL: https://github.com/apache/flink/pull/25832#issuecomment-2556895215 ## CI report: * cee66261ba96fb8c49b4da0f74abc7c33c05b146 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36946) optimize sink operator name truncate
[ https://issues.apache.org/jira/browse/FLINK-36946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907370#comment-17907370 ] tartarus commented on FLINK-36946: -- [~fanrui] Â Hi, if you're free, please take a look, thanks! > optimize sink operator name truncate > > > Key: FLINK-36946 > URL: https://issues.apache.org/jira/browse/FLINK-36946 > Project: Flink > Issue Type: Improvement > Components: Runtime / Metrics >Reporter: tartarus >Priority: Major > Labels: pull-request-available > > We use the `Writer` keyword to filter the Metric of the sink Operator in > flink. > But the user has set a very long sink name that causes, the `Writer` keyword > to be truncated. > Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]
davidradl commented on code in PR #25828: URL: https://github.com/apache/flink/pull/25828#discussion_r1893949065 ## docs/data/sql_functions.yml: ## @@ -642,7 +642,10 @@ temporal: description: Returns TRUE if two time intervals defined by (timepoint1, temporal1) and (timepoint2, temporal2) overlap. The temporal values could be either a time point or a time interval. E.g., (TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR) returns TRUE; (TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR) returns FALSE. - sql: DATE_FORMAT(timestamp, string) table: dateFormat(TIMESTAMP, STRING) -description: Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. +description: Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's DateTimeFormatter. Review Comment: nit: to a value of string -> to a string -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
snuyanzin commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1893951451 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
snuyanzin commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1893952530 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java: ## @@ -804,4 +813,163 @@ private Stream floorTestCases() { .format(TIMESTAMP_FORMATTER), STRING().nullable())); } + +private Stream toTimestampLtzTestCases() { +return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ) +.onFieldsWithData( +100, +1234, +-100, +null, +DecimalDataUtils.castFrom(-Double.MAX_VALUE, 38, 18), Review Comment: I don't see here tests for invalid input like invalid timezone, invalid format invalid something else... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36770) Support Request Timeout for AWS sinks
[ https://issues.apache.org/jira/browse/FLINK-36770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36770: --- Labels: pull-request-available (was: ) > Support Request Timeout for AWS sinks > - > > Key: FLINK-36770 > URL: https://issues.apache.org/jira/browse/FLINK-36770 > Project: Flink > Issue Type: Improvement > Components: Connectors / AWS >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: aws-connector-5.1.0 > > > h2. Description > in > [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] > we introduced request timeout for Async Sink which was released in 1.20, > Ideally we want to support that in AWS connectors. > h2. Acceptance Criteria > - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated > h2. Notes > This is a breaking change regarding flink version compatibility which means > we expect the next version (5.1) to not be compatible with 1.19 anymore. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36770-] make AWS sink writers use ResultHandler, move flink version to 1.20 [flink-connector-aws]
vahmed-hamdy opened a new pull request, #186: URL: https://github.com/apache/flink-connector-aws/pull/186 ## Purpose of the change - Move `submitRequestEntries` implementation for AWS sinks to use `ResultHandler` - Bump flink version to 1.20 - Adapt pipelines to Flink version change ## Verifying this change Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing This change is already covered by existing sink unit tests ## Significant changes *(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for convenience.)* - [ ] Dependencies have been added or upgraded - [ ] Public API has been changed (Public API is any class annotated with `@Public(Evolving)`) - [ ] Serializers have been changed - [ ] New feature has been introduced - If yes, how is this documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]
nictownsend commented on code in PR #25828: URL: https://github.com/apache/flink/pull/25828#discussion_r1893958758 ## docs/data/sql_functions.yml: ## @@ -642,7 +642,10 @@ temporal: description: Returns TRUE if two time intervals defined by (timepoint1, temporal1) and (timepoint2, temporal2) overlap. The temporal values could be either a time point or a time interval. E.g., (TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR) returns TRUE; (TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR) returns FALSE. - sql: DATE_FORMAT(timestamp, string) table: dateFormat(TIMESTAMP, STRING) -description: Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. +description: Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's DateTimeFormatter. + - sql: DATE_FORMAT(string, string) +table: dateFormat(STRING, STRING) +description: Re-format a timestamp string to another string, using a custom format. The format string is compatible with Java's SimpleDateFormat. Review Comment: Assuming I've read https://github.com/apache/flink/blob/7fa4f78e43bfd99a7e7a3d655a9a3641e4d7bc59/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java#L2304 to mean it invokes `unixTimestamp(dateStr, format, null)` as there is no `tz` third parameter in the SQL function. ## docs/data/sql_functions.yml: ## @@ -642,7 +642,10 @@ temporal: description: Returns TRUE if two time intervals defined by (timepoint1, temporal1) and (timepoint2, temporal2) overlap. The temporal values could be either a time point or a time interval. E.g., (TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR) returns TRUE; (TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR) returns FALSE. - sql: DATE_FORMAT(timestamp, string) table: dateFormat(TIMESTAMP, STRING) -description: Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's SimpleDateFormat. +description: Converts timestamp to a value of string in the format specified by the date format string. The format string is compatible with Java's DateTimeFormatter. + - sql: DATE_FORMAT(string, string) +table: dateFormat(STRING, STRING) +description: Re-format a timestamp string to another string, using a custom format. The format string is compatible with Java's SimpleDateFormat. Review Comment: Could we do the same for `UNIX_TIMESTAMP(string1[, string2])` (https://github.com/apache/flink/pull/25828/files#diff-539fb22ee6aeee4cf07230bb4155500c6680c4cc889260e2c58bfa9d63fb7de5R662) please? https://github.com/apache/flink/blob/7fa4f78e43bfd99a7e7a3d655a9a3641e4d7bc59/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java#L1468 calls into https://github.com/apache/flink/blob/7fa4f78e43bfd99a7e7a3d655a9a3641e4d7bc59/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java#L960 too - so the format is also using SimpleDateFormat -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]
davidradl commented on code in PR #25828: URL: https://github.com/apache/flink/pull/25828#discussion_r1893959599 ## flink-python/pyflink/table/expressions.py: ## @@ -350,14 +350,19 @@ def temporal_overlaps(left_time_point, def date_format(timestamp, format) -> Expression: """ Formats a timestamp as a string using a specified format. -The format must be compatible with MySQL's date formatting syntax as used by the -date_parse function. Review Comment: if this has been removed - do we not need to deprecate first before replacing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]
davidradl commented on code in PR #25828: URL: https://github.com/apache/flink/pull/25828#discussion_r1893960744 ## flink-python/pyflink/table/expressions.py: ## @@ -350,14 +350,19 @@ def temporal_overlaps(left_time_point, def date_format(timestamp, format) -> Expression: """ Formats a timestamp as a string using a specified format. -The format must be compatible with MySQL's date formatting syntax as used by the -date_parse function. -For example `date_format(col("time"), "%Y, %d %M")` results in strings formatted as -"2017, 05 May". +Supported functions: +1. date_format(TIMESTAMP, STRING) -> STRING Review Comment: we should mention the difference between the supported format strings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36923) Bump flink-connector-hbase to Flink 1.20
[ https://issues.apache.org/jira/browse/FLINK-36923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-36923: - Affects Version/s: (was: hbase-4.1.0) > Bump flink-connector-hbase to Flink 1.20 > > > Key: FLINK-36923 > URL: https://issues.apache.org/jira/browse/FLINK-36923 > Project: Flink > Issue Type: Sub-task > Components: Connectors / HBase >Reporter: Yanquan Lv >Assignee: Yanquan Lv >Priority: Major > Labels: pull-request-available > Fix For: hbase-4.1.0 > > > Considering we've released flink-connector-hbase 4.0.0 that supported Flink > 1.18 & Flink 1.19, we can start bumping flink-connector-hbase to Flink 1.19 > to support Flink 1.19 & Flink 1.20 for next release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894201016 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894146265 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
[jira] [Updated] (FLINK-36923) Bump flink-connector-hbase to Flink 1.20
[ https://issues.apache.org/jira/browse/FLINK-36923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-36923: - Fix Version/s: hbase-4.1.0 > Bump flink-connector-hbase to Flink 1.20 > > > Key: FLINK-36923 > URL: https://issues.apache.org/jira/browse/FLINK-36923 > Project: Flink > Issue Type: Sub-task > Components: Connectors / HBase >Affects Versions: hbase-4.1.0 >Reporter: Yanquan Lv >Assignee: Yanquan Lv >Priority: Major > Labels: pull-request-available > Fix For: hbase-4.1.0 > > > Considering we've released flink-connector-hbase 4.0.0 that supported Flink > 1.18 & Flink 1.19, we can start bumping flink-connector-hbase to Flink 1.19 > to support Flink 1.19 & Flink 1.20 for next release. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]
yiyutian1 commented on code in PR #25763: URL: https://github.com/apache/flink/pull/25763#discussion_r1894146265 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java: ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; +import org.apache.flink.table.utils.DateTimeUtils; + +import javax.annotation.Nullable; + +import java.time.DateTimeException; +import java.time.ZoneId; +import java.time.ZonedDateTime; + +import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData; + +/** + * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}. + * + * A function that converts various time formats to TIMESTAMP_LTZ type. + * + * Supported function signatures: + * + * + * {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) + * Converts numeric epoch time in milliseconds to timestamp with local timezone + * {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> TIMESTAMP_LTZ(precision) + * Converts numeric epoch time to timestamp with specified precision (0 as seconds, 3 as + * milliseconds) + * {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using default format '-MM-dd HH:mm:ss' + * {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input string of format + * {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> TIMESTAMP_LTZ(3) + * Parses string timestamp using input strings of format and timezone + * + * + * Example: + * + * {@code + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ(1234567890123) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds + * TO_TIMESTAMP_LTZ(1234567890123, 3) // Converts epoch milliseconds + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00') // Parses string using default format + * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // Parses string using input format + * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // Parses string using input format and timezone + * } + */ +@Internal +public class ToTimestampLtzFunction extends BuiltInScalarFunction { + +private static final int DEFAULT_PRECISION = 3; + +public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext context) { +super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context); +} + +public @Nullable TimestampData eval(Number epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} +if (epoch instanceof Float || epoch instanceof Double) { +return DateTimeUtils.toTimestampData(epoch.doubleValue(), precision); +} +return DateTimeUtils.toTimestampData(epoch.longValue(), precision); +} + +public @Nullable TimestampData eval(DecimalData epoch, Integer precision) { +if (epoch == null || precision == null) { +return null; +} + +return DateTimeUtils.toTimestampData(epoch, precision); +} + +public @Nullable TimestampData eval(Number epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(DecimalData epoch) { +return eval(epoch, DEFAULT_PRECISION); +} + +public @Nullable TimestampData eval(StringData timestamp) { +if (timestamp == null) { +return null; +} + +return parseTimestampData(timestamp.toString()); +} + +public @Nullable TimestampData eval(StringData timestamp, StringData format) { +if (timestamp == null || format == null) { +return null; +} + +return parseTimes
Re: [PR] [FLINK-36923] Bump flink version to 1.19.1, adapt CI workflow [flink-connector-hbase]
ferenc-csaky commented on PR #54: URL: https://github.com/apache/flink-connector-hbase/pull/54#issuecomment-2557428849 Hi, thanks for your contribution! I am planning to try to include at least JDK 17 (and possibly 21) support for the next release, which will require to bump HBase version and pretty much all Hadoop related dependencies. I spent some time on that before releasing 4.0, but it is not trivial and I ran into some runtime issues, that's where I decided to cut scope and release to only JDK8 and 11. Including these version updates makes sense anyway, so I do not see a problem merging it on its own. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]
afedulov commented on code in PR #25656: URL: https://github.com/apache/flink/pull/25656#discussion_r1894239482 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -1029,6 +1029,11 @@ private TableResultInternal executeInternal( defaultJobName, jobStatusHookList); try { +// Current context class loader here is the application class loader. Review Comment: @ammu20-dev @davidradl @seb-pereira This modifies a pretty critical execution path. How confident are we that it doesn't introduce regressions? Specifically, could any existing functionality—such as built-in serializers for various connector formats—break if libraries are inadvertently loaded from user JARs instead of Flink's parent class loader? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36940) Why does autoscaler Math.round the TargetProcessingCapacity
[ https://issues.apache.org/jira/browse/FLINK-36940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907300#comment-17907300 ] Gyula Fora commented on FLINK-36940: Good question , I don’t remember right now . For the metrics we round to 2/3 decimal digits to improve logging and greatly reduce storage costs here I guess rounding could be avoided > Why does autoscaler Math.round the TargetProcessingCapacity > --- > > Key: FLINK-36940 > URL: https://issues.apache.org/jira/browse/FLINK-36940 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Priority: Major > > AutoScalerUtils#getTargetProcessingCapacity[1] calls > Math.round(lagCatchupTargetRate + restartCatchupRate + > inputTargetAtUtilization); >  > The return type is double, I don't know why need Math.round here. >  > I'm writing some end to end tests[2] for autoscaler, and the recommended > parallelism is unexpected. After analysis, I found Math.round is the root > cause. Following is my test[2] core logic: > The TARGET_UTILIZATION is 0.8, and BusyTimePerSec is always 800 for source, > so we expect the source parallelism won't be changed. > But the recommended parallelism is changed from 100 to 96. >  > Note: The processing rate of production jobs is usually very high, so > rounding has almost no effect on production jobs. >  >  [1] > [https://github.com/apache/flink-kubernetes-operator/blob/091e803a6ae713ebe839742694ab6ca53249c4dd/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java#L75] > [2] > [https://github.com/1996fanrui/flink-kubernetes-operator/commit/5420f59a12e924f5f47a7dde8d79f5da9a2b4917#diff-b922ac4031d391fa030b25fba77b453736d43156a0edef5b436fee1d8241295fR158] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-36940) Why does autoscaler Math.round the TargetProcessingCapacity
[ https://issues.apache.org/jira/browse/FLINK-36940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907300#comment-17907300 ] Gyula Fora edited comment on FLINK-36940 at 12/20/24 8:40 AM: -- Good question , I don’t remember right now . For the metrics we round to 2/3 decimal digits to improve logging and greatly reduce storage costs here I guess rounding could be avoided  or apply the same 3 decimal point logic was (Author: gyfora): Good question , I don’t remember right now . For the metrics we round to 2/3 decimal digits to improve logging and greatly reduce storage costs here I guess rounding could be avoided > Why does autoscaler Math.round the TargetProcessingCapacity > --- > > Key: FLINK-36940 > URL: https://issues.apache.org/jira/browse/FLINK-36940 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Priority: Major > > AutoScalerUtils#getTargetProcessingCapacity[1] calls > Math.round(lagCatchupTargetRate + restartCatchupRate + > inputTargetAtUtilization); >  > The return type is double, I don't know why need Math.round here. >  > I'm writing some end to end tests[2] for autoscaler, and the recommended > parallelism is unexpected. After analysis, I found Math.round is the root > cause. Following is my test[2] core logic: > The TARGET_UTILIZATION is 0.8, and BusyTimePerSec is always 800 for source, > so we expect the source parallelism won't be changed. > But the recommended parallelism is changed from 100 to 96. >  > Note: The processing rate of production jobs is usually very high, so > rounding has almost no effect on production jobs. >  >  [1] > [https://github.com/apache/flink-kubernetes-operator/blob/091e803a6ae713ebe839742694ab6ca53249c4dd/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java#L75] > [2] > [https://github.com/1996fanrui/flink-kubernetes-operator/commit/5420f59a12e924f5f47a7dde8d79f5da9a2b4917#diff-b922ac4031d391fa030b25fba77b453736d43156a0edef5b436fee1d8241295fR158] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35921) Flink SQL physical operator replacement support
[ https://issues.apache.org/jira/browse/FLINK-35921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Qilong closed FLINK-35921. --- Resolution: Fixed > Flink SQL physical operator replacement support > --- > > Key: FLINK-35921 > URL: https://issues.apache.org/jira/browse/FLINK-35921 > Project: Flink > Issue Type: Bug >Reporter: Wang Qilong >Priority: Major > > Â > Does Flinksql provide some SPI implementations that support custom physical > operators, such as customizing a new StreamExecFileSourceScan and supporting > rule injection for converting logical operators to physical operators > I want to implement the function of adding some additional physical > operators. A better option may be to inject ColumnarRule similar to Spark, > which can insert some new physical operators when converting logical > operators to physical operators. However, I have not found that Flink has > similar functions > Â -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator
Wang Qilong created FLINK-36942: --- Summary: Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator Key: FLINK-36942 URL: https://issues.apache.org/jira/browse/FLINK-36942 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 2.0.0 Reporter: Wang Qilong When I use the async state API in RowTimestamportOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires Regarding how to reproduce: in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator And execute the tests in RowTimestamportOperatorTest I think this is a very strange bug because it can be triggered smoothly in windowJoinOperator, but cannot be implemented in RowTimestamportOperator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator
[ https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Qilong updated FLINK-36942: Description: When I use the async state API in RowTimeSortOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires Regarding how to reproduce: in [AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator] And execute the tests in RowTimestamportOperatorTest I think this is a very strange bug because it can be triggered smoothly in [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], but cannot be implemented in RowTimestamportOperator was: When I use the async state API in RowTimeSortOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires Regarding how to reproduce: in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator And execute the tests in RowTimestamportOperatorTest I think this is a very strange bug because it can be triggered smoothly in [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], but cannot be implemented in RowTimestamportOperator > Unable to trigger onEventTime() when using the async state API in > RowTimestamportOperator > - > > Key: FLINK-36942 > URL: https://issues.apache.org/jira/browse/FLINK-36942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 2.0.0 >Reporter: Wang Qilong >Priority: Major > > When I use the async state API in RowTimeSortOperator and inherit it from > AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the > timer expires > Regarding how to reproduce: > in > [AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator] > And execute the tests in RowTimestamportOperatorTest > I think this is a very strange bug because it can be triggered smoothly in > [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], > but cannot be implemented in RowTimestamportOperator -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893692402 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -649,6 +649,13 @@ public boolean dropTemporaryTable(String path) { } } +@Override +public boolean dropTable(String path) { +UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); +ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); +return catalogManager.dropTable(identifier, true); Review Comment: shouldn't `true` be `ignoreIfNotExists` here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36716) Address vulnerabilities in Flink UI
[ https://issues.apache.org/jira/browse/FLINK-36716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907318#comment-17907318 ] Mehdi commented on FLINK-36716: --- I'm going to for the PRs done to the branch 1.20 for 1.20.X release. The list of PRs will be updated: * [https://github.com/apache/flink/pull/25827 (Update CI to Ubuntu 22.04 (Jammy))|https://github.com/apache/flink/pull/25827] * [https://github.com/apache/flink/pull/25794] (Update the NodeJS to v22.11.0 (LTS)) > Address vulnerabilities in Flink UI > --- > > Key: FLINK-36716 > URL: https://issues.apache.org/jira/browse/FLINK-36716 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 2.0.0, 1.20.0 >Reporter: Mehdi >Assignee: Mehdi >Priority: Major > Fix For: 2.0.0, 1.20.1 > > > When running `npm audit` we get 36 vulnerabilities (1 low, 15 moderate, 17 > high, 3 critical) we should address any current, open vulnerabilities. > These critical vulnerabilities gone by raising the version of angular and we > do need also to raise node version, so there is two sub tasks for this ticket > Result of the npm audit: > {code:java} > npm audit report@adobe/css-tools  <=4.3.1 > Severity: moderate > @adobe/css-tools Regular Expression Denial of Service (ReDOS) while Parsing > CSS - https://github.com/advisories/GHSA-hpx4-r86g-5jrg > @adobe/css-tools Improper Input Validation and Inefficient Regular Expression > Complexity - https://github.com/advisories/GHSA-prr3-c3m5-p7q2 > fix available via `npm audit fix` > node_modules/@adobe/css-tools@babel/traverse  <7.23.2 > Severity: critical > Babel vulnerable to arbitrary code execution when compiling specifically > crafted malicious code - https://github.com/advisories/GHSA-67hx-6x53-jw92 > fix available via `npm audit fix` > node_modules/@babel/traversebody-parser  <1.20.3 > Severity: high > body-parser vulnerable to denial of service when url encoding is enabled - > https://github.com/advisories/GHSA-qwcr-r2fm-qrc7 > fix available via `npm audit fix` > node_modules/body-parser >  express  <=4.21.0 || 5.0.0-alpha.1 - 5.0.0 >  Depends on vulnerable versions of body-parser >  Depends on vulnerable versions of cookie >  Depends on vulnerable versions of path-to-regexp >  Depends on vulnerable versions of send >  Depends on vulnerable versions of serve-static >  node_modules/expressbraces  <3.0.3 > Severity: high > Uncontrolled resource consumption in braces - > https://github.com/advisories/GHSA-grv7-fg5c-xmjg > fix available via `npm audit fix` > node_modules/bracescookie  <0.7.0 > cookie accepts cookie name, path, and domain with out of bounds characters - > https://github.com/advisories/GHSA-pxg6-pf52-xh8x > fix available via `npm audit fix` > node_modules/cookie > node_modules/express/node_modules/cookie >  engine.io  0.7.8 - 0.7.9 || 1.8.0 - 6.6.1 >  Depends on vulnerable versions of cookie >  Depends on vulnerable versions of ws >  node_modules/engine.io >   socket.io  1.6.0 - 4.7.5 >   Depends on vulnerable versions of engine.io >   node_modules/socket.iod3-color  <3.1.0 > Severity: high > d3-color vulnerable to ReDoS - > https://github.com/advisories/GHSA-36jr-mh4h-2g58 > fix available via `npm audit fix` > node_modules/d3-interpolate/node_modules/d3-color >  d3-interpolate  0.1.3 - 2.0.1 >  Depends on vulnerable versions of d3-color >  node_modules/d3-interpolate >   @antv/g-base  <=0.5.11 >   Depends on vulnerable versions of d3-interpolate >   node_modules/@antv/g-basefollow-redirects  <=1.15.5 > Severity: moderate > Follow Redirects improperly handles URLs in the url.parse() function - > https://github.com/advisories/GHSA-jchw-25xp-jwwc > follow-redirects' Proxy-Authorization header kept across hosts - > https://github.com/advisories/GHSA-cxjh-pqwp-8mfp > fix available via `npm audit fix` > node_modules/follow-redirectshttp-proxy-middleware  <2.0.7 > Severity: high > Denial of service in http-proxy-middleware - > https://github.com/advisories/GHSA-c7qv-q95q-8v27 > fix available via `npm audit fix` > node_modules/http-proxy-middlewareip  * > Severity: high > NPM IP package incorrectly identifies some private IP addresses as public - > https://github.com/advisories/GHSA-78xj-cgh5-2h22 > ip SSRF improper categorization in isPublic - > https://github.com/advisories/GHSA-2p57-rm9w-gvfp > fix available via `npm audit fix` > node_modules/iploader-utils  3.0.0 - 3.2.0 > Severity: high > loader-utils is vulnerable to Regular Expression Denial of Service (ReDoS) > via url variable - https://github.com/advisories/GHSA-3rfm-jhwj-7488 > loader-utils is vulnerable to Regular Expression Denial of Service (ReDoS) - > https://github.com/advisories/GHSA-hhq3-ff78-jv3g > fix available via `npm audit fix` > node_mod
[jira] [Comment Edited] (FLINK-36716) Address vulnerabilities in Flink UI
[ https://issues.apache.org/jira/browse/FLINK-36716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907318#comment-17907318 ] Mehdi edited comment on FLINK-36716 at 12/20/24 9:39 AM: - I'm going to for the PRs done to the branch 1.20 for 1.20.X release. The list of PRs will be updated: * [https://github.com/apache/flink/pull/25827 |https://github.com/apache/flink/pull/25827]Update CI to Ubuntu 22.04 (Jammy) * [https://github.com/apache/flink/pull/25794] Update the NodeJS to v22.11.0 (LTS) was (Author: JIRAUSER301654): I'm going to for the PRs done to the branch 1.20 for 1.20.X release. The list of PRs will be updated: * [https://github.com/apache/flink/pull/25827 |https://github.com/apache/flink/pull/25827](Update CI to Ubuntu 22.04 (Jammy)) * [https://github.com/apache/flink/pull/25794] (Update the NodeJS to v22.11.0 (LTS)) > Address vulnerabilities in Flink UI > --- > > Key: FLINK-36716 > URL: https://issues.apache.org/jira/browse/FLINK-36716 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 2.0.0, 1.20.0 >Reporter: Mehdi >Assignee: Mehdi >Priority: Major > Fix For: 2.0.0, 1.20.1 > > > When running `npm audit` we get 36 vulnerabilities (1 low, 15 moderate, 17 > high, 3 critical) we should address any current, open vulnerabilities. > These critical vulnerabilities gone by raising the version of angular and we > do need also to raise node version, so there is two sub tasks for this ticket > Result of the npm audit: > {code:java} > npm audit report@adobe/css-tools  <=4.3.1 > Severity: moderate > @adobe/css-tools Regular Expression Denial of Service (ReDOS) while Parsing > CSS - https://github.com/advisories/GHSA-hpx4-r86g-5jrg > @adobe/css-tools Improper Input Validation and Inefficient Regular Expression > Complexity - https://github.com/advisories/GHSA-prr3-c3m5-p7q2 > fix available via `npm audit fix` > node_modules/@adobe/css-tools@babel/traverse  <7.23.2 > Severity: critical > Babel vulnerable to arbitrary code execution when compiling specifically > crafted malicious code - https://github.com/advisories/GHSA-67hx-6x53-jw92 > fix available via `npm audit fix` > node_modules/@babel/traversebody-parser  <1.20.3 > Severity: high > body-parser vulnerable to denial of service when url encoding is enabled - > https://github.com/advisories/GHSA-qwcr-r2fm-qrc7 > fix available via `npm audit fix` > node_modules/body-parser >  express  <=4.21.0 || 5.0.0-alpha.1 - 5.0.0 >  Depends on vulnerable versions of body-parser >  Depends on vulnerable versions of cookie >  Depends on vulnerable versions of path-to-regexp >  Depends on vulnerable versions of send >  Depends on vulnerable versions of serve-static >  node_modules/expressbraces  <3.0.3 > Severity: high > Uncontrolled resource consumption in braces - > https://github.com/advisories/GHSA-grv7-fg5c-xmjg > fix available via `npm audit fix` > node_modules/bracescookie  <0.7.0 > cookie accepts cookie name, path, and domain with out of bounds characters - > https://github.com/advisories/GHSA-pxg6-pf52-xh8x > fix available via `npm audit fix` > node_modules/cookie > node_modules/express/node_modules/cookie >  engine.io  0.7.8 - 0.7.9 || 1.8.0 - 6.6.1 >  Depends on vulnerable versions of cookie >  Depends on vulnerable versions of ws >  node_modules/engine.io >   socket.io  1.6.0 - 4.7.5 >   Depends on vulnerable versions of engine.io >   node_modules/socket.iod3-color  <3.1.0 > Severity: high > d3-color vulnerable to ReDoS - > https://github.com/advisories/GHSA-36jr-mh4h-2g58 > fix available via `npm audit fix` > node_modules/d3-interpolate/node_modules/d3-color >  d3-interpolate  0.1.3 - 2.0.1 >  Depends on vulnerable versions of d3-color >  node_modules/d3-interpolate >   @antv/g-base  <=0.5.11 >   Depends on vulnerable versions of d3-interpolate >   node_modules/@antv/g-basefollow-redirects  <=1.15.5 > Severity: moderate > Follow Redirects improperly handles URLs in the url.parse() function - > https://github.com/advisories/GHSA-jchw-25xp-jwwc > follow-redirects' Proxy-Authorization header kept across hosts - > https://github.com/advisories/GHSA-cxjh-pqwp-8mfp > fix available via `npm audit fix` > node_modules/follow-redirectshttp-proxy-middleware  <2.0.7 > Severity: high > Denial of service in http-proxy-middleware - > https://github.com/advisories/GHSA-c7qv-q95q-8v27 > fix available via `npm audit fix` > node_modules/http-proxy-middlewareip  * > Severity: high > NPM IP package incorrectly identifies some private IP addresses as public - > https://github.com/advisories/GHSA-78xj-cgh5-2h22 > ip SSRF improper categorization in isPublic - > https://github.com/advisories/GHSA-2p57-rm9w-gvfp > fix available via `npm audi
[jira] [Comment Edited] (FLINK-36716) Address vulnerabilities in Flink UI
[ https://issues.apache.org/jira/browse/FLINK-36716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907318#comment-17907318 ] Mehdi edited comment on FLINK-36716 at 12/20/24 9:39 AM: - I'm going to for the PRs done to the branch 1.20 for 1.20.X release. The list of PRs will be updated: * [https://github.com/apache/flink/pull/25827 |https://github.com/apache/flink/pull/25827](Update CI to Ubuntu 22.04 (Jammy)) * [https://github.com/apache/flink/pull/25794] (Update the NodeJS to v22.11.0 (LTS)) was (Author: JIRAUSER301654): I'm going to for the PRs done to the branch 1.20 for 1.20.X release. The list of PRs will be updated: * [https://github.com/apache/flink/pull/25827 (Update CI to Ubuntu 22.04 (Jammy))|https://github.com/apache/flink/pull/25827] * [https://github.com/apache/flink/pull/25794] (Update the NodeJS to v22.11.0 (LTS)) > Address vulnerabilities in Flink UI > --- > > Key: FLINK-36716 > URL: https://issues.apache.org/jira/browse/FLINK-36716 > Project: Flink > Issue Type: Improvement > Components: Runtime / Web Frontend >Affects Versions: 2.0.0, 1.20.0 >Reporter: Mehdi >Assignee: Mehdi >Priority: Major > Fix For: 2.0.0, 1.20.1 > > > When running `npm audit` we get 36 vulnerabilities (1 low, 15 moderate, 17 > high, 3 critical) we should address any current, open vulnerabilities. > These critical vulnerabilities gone by raising the version of angular and we > do need also to raise node version, so there is two sub tasks for this ticket > Result of the npm audit: > {code:java} > npm audit report@adobe/css-tools  <=4.3.1 > Severity: moderate > @adobe/css-tools Regular Expression Denial of Service (ReDOS) while Parsing > CSS - https://github.com/advisories/GHSA-hpx4-r86g-5jrg > @adobe/css-tools Improper Input Validation and Inefficient Regular Expression > Complexity - https://github.com/advisories/GHSA-prr3-c3m5-p7q2 > fix available via `npm audit fix` > node_modules/@adobe/css-tools@babel/traverse  <7.23.2 > Severity: critical > Babel vulnerable to arbitrary code execution when compiling specifically > crafted malicious code - https://github.com/advisories/GHSA-67hx-6x53-jw92 > fix available via `npm audit fix` > node_modules/@babel/traversebody-parser  <1.20.3 > Severity: high > body-parser vulnerable to denial of service when url encoding is enabled - > https://github.com/advisories/GHSA-qwcr-r2fm-qrc7 > fix available via `npm audit fix` > node_modules/body-parser >  express  <=4.21.0 || 5.0.0-alpha.1 - 5.0.0 >  Depends on vulnerable versions of body-parser >  Depends on vulnerable versions of cookie >  Depends on vulnerable versions of path-to-regexp >  Depends on vulnerable versions of send >  Depends on vulnerable versions of serve-static >  node_modules/expressbraces  <3.0.3 > Severity: high > Uncontrolled resource consumption in braces - > https://github.com/advisories/GHSA-grv7-fg5c-xmjg > fix available via `npm audit fix` > node_modules/bracescookie  <0.7.0 > cookie accepts cookie name, path, and domain with out of bounds characters - > https://github.com/advisories/GHSA-pxg6-pf52-xh8x > fix available via `npm audit fix` > node_modules/cookie > node_modules/express/node_modules/cookie >  engine.io  0.7.8 - 0.7.9 || 1.8.0 - 6.6.1 >  Depends on vulnerable versions of cookie >  Depends on vulnerable versions of ws >  node_modules/engine.io >   socket.io  1.6.0 - 4.7.5 >   Depends on vulnerable versions of engine.io >   node_modules/socket.iod3-color  <3.1.0 > Severity: high > d3-color vulnerable to ReDoS - > https://github.com/advisories/GHSA-36jr-mh4h-2g58 > fix available via `npm audit fix` > node_modules/d3-interpolate/node_modules/d3-color >  d3-interpolate  0.1.3 - 2.0.1 >  Depends on vulnerable versions of d3-color >  node_modules/d3-interpolate >   @antv/g-base  <=0.5.11 >   Depends on vulnerable versions of d3-interpolate >   node_modules/@antv/g-basefollow-redirects  <=1.15.5 > Severity: moderate > Follow Redirects improperly handles URLs in the url.parse() function - > https://github.com/advisories/GHSA-jchw-25xp-jwwc > follow-redirects' Proxy-Authorization header kept across hosts - > https://github.com/advisories/GHSA-cxjh-pqwp-8mfp > fix available via `npm audit fix` > node_modules/follow-redirectshttp-proxy-middleware  <2.0.7 > Severity: high > Denial of service in http-proxy-middleware - > https://github.com/advisories/GHSA-c7qv-q95q-8v27 > fix available via `npm audit fix` > node_modules/http-proxy-middlewareip  * > Severity: high > NPM IP package incorrectly identifies some private IP addresses as public - > https://github.com/advisories/GHSA-78xj-cgh5-2h22 > ip SSRF improper categorization in isPublic - > https://github.com/advisories/GHSA-2p57-rm9w-gvfp > fix available via `npm
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
snuyanzin commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893703612 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -649,6 +649,13 @@ public boolean dropTemporaryTable(String path) { } } +@Override +public boolean dropTable(String path) { +UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); +ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); +return catalogManager.dropTable(identifier, true); Review Comment: i didn't get it , could you elaborate? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]
yiyutian1 opened a new pull request, #25828: URL: https://github.com/apache/flink/pull/25828 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36941) Fix Doc for DATE_FORMAT
[ https://issues.apache.org/jira/browse/FLINK-36941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36941: --- Labels: pull-request-available (was: ) > Fix Doc for DATE_FORMAT > --- > > Key: FLINK-36941 > URL: https://issues.apache.org/jira/browse/FLINK-36941 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Yiyu Tian >Priority: Major > Labels: pull-request-available > > The documentation for DATE_FORMAT is outdated. > https://github.com/apache/flink/blob/6951686be5691dc855b6d07d575535ac239670da/docs/data/sql_functions.yml#L645 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36943) FLIP-488: Expose Async State Processing and New State APIs in Datastream(V1) APIs
Zakelly Lan created FLINK-36943: --- Summary: FLIP-488: Expose Async State Processing and New State APIs in Datastream(V1) APIs Key: FLINK-36943 URL: https://issues.apache.org/jira/browse/FLINK-36943 Project: Flink Issue Type: Improvement Reporter: Zakelly Lan Assignee: Zakelly Lan FLIP-424 introduced a new State API set which enables the async state query, and FLIP-425 introduced the async state processing. These are highly anticipated features for Flink 2.0. For SQL users, the FLIP-473 implements SQL operators using the new State APIs and they could easily make use of the async state processing as well as the disaggregated state management. However, it is also important that the Datastream users could leverage the capability of async state query and disaggregated state management. This FLIP proposes to expose and enable them in Datastream(V1) APIs. For more details, please read the FLIP-488 https://cwiki.apache.org/confluence/x/yIrREw -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36069][runtime/rest] Extending job detail rest API to expose json stream graph [flink]
yuchen-ecnu commented on code in PR #25798: URL: https://github.com/apache/flink/pull/25798#discussion_r1893709324 ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java: ## @@ -79,6 +79,10 @@ public class JobDetailsInfo implements ResponseBody { public static final String FIELD_NAME_JSON_PLAN = "plan"; +public static final String FIELD_NAME_STREAM_GRAPH_JSON_PLAN = "stream-graph-plan"; + +public static final String FIELD_NAME_PENDING_OPERATOR_COUNT = "pending-operator-count"; Review Comment: Updated. ## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java: ## @@ -47,6 +47,13 @@ public interface AccessExecutionGraph extends JobStatusProvider { */ String getJsonPlan(); +/** + * Returns the stream graph as a JSON string. + * + * @return stream graph as a JSON string, or empty string if the job submitted with JobGraph. Review Comment: Updated. ## flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java: ## @@ -141,6 +142,11 @@ private static JobDetailsInfo createJobDetailsInfo( executionState, jobVerticesPerState[executionState.ordinal()]); } +JobPlanInfo.RawJson streamGraphJson = null; +if (!StringUtils.isNullOrWhitespaceOnly(executionGraph.getStreamGraphJson())) { Review Comment: That's true, but it's safe to check non-null at the same time by `StringUtils.isNullOrWhitespaceOnly`? WDYT? ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java: ## @@ -62,4 +62,13 @@ public interface StreamGraphContext { * @return true if all modifications were successful and applied atomically, false otherwise. */ boolean modifyStreamEdge(List requestInfos); + +interface StreamGraphUpdateListener { Review Comment: Updated. ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java: ## @@ -62,4 +62,13 @@ public interface StreamGraphContext { * @return true if all modifications were successful and applied atomically, false otherwise. */ boolean modifyStreamEdge(List requestInfos); + +interface StreamGraphUpdateListener { +/** + * This method is called whenever the StreamGraph is updated. + * + * @param streamGraph the updated StreamGraph + */ +void onStreamGraphUpdated(StreamGraph streamGraph); Review Comment: It's true that `AdaptiveGraphManager` hold a stream graph, but the `StreamGraphContext` should notify it's listener with the updated stream graph (since the stream graph in the `AdaptiveGraphManager` maybe outdated?). Just like `JobStatusListener` should notify listener with `newJobStatus`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]
flinkbot commented on PR #25828: URL: https://github.com/apache/flink/pull/25828#issuecomment-2556636543 ## CI report: * 5a9ea558b896c515f962554f0a5b33e7fda015dd UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36069][runtime/rest] Extending job detail rest API to expose json stream graph [flink]
yuchen-ecnu commented on code in PR #25798: URL: https://github.com/apache/flink/pull/25798#discussion_r1893709533 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java: ## @@ -291,6 +299,8 @@ private List createJobVerticesAndUpdateGraph(List streamN generateConfigForJobVertices(jobVertexBuildContext); +onStreamGraphUpdated(streamGraph); Review Comment: I added a private function `generateStreamGraphJson()` to `AdaptiveGraphManager` and called it in `onStreamGraphUpdated` & here to make it clearer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893692402 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -649,6 +649,13 @@ public boolean dropTemporaryTable(String path) { } } +@Override +public boolean dropTable(String path) { +UnresolvedIdentifier unresolvedIdentifier = getParser().parseIdentifier(path); +ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); +return catalogManager.dropTable(identifier, true); Review Comment: `true` should be `ignoreIfNotExists` here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36944) Introduce 'enableAsycState' in DataStream v1 interfaces
Zakelly Lan created FLINK-36944: --- Summary: Introduce 'enableAsycState' in DataStream v1 interfaces Key: FLINK-36944 URL: https://issues.apache.org/jira/browse/FLINK-36944 Project: Flink Issue Type: Sub-task Components: API / DataStream Reporter: Zakelly Lan Assignee: Zakelly Lan -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36943) FLIP-488: Expose Async State Processing and New State APIs in Datastream(V1) APIs
[ https://issues.apache.org/jira/browse/FLINK-36943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-36943: Component/s: API / DataStream Runtime / Async State Processing > FLIP-488: Expose Async State Processing and New State APIs in Datastream(V1) > APIs > - > > Key: FLINK-36943 > URL: https://issues.apache.org/jira/browse/FLINK-36943 > Project: Flink > Issue Type: Improvement > Components: API / DataStream, Runtime / Async State Processing >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > > FLIP-424 introduced a new State API set which enables the async state query, > and FLIP-425 introduced the async state processing. These are highly > anticipated features for Flink 2.0. For SQL users, the FLIP-473 implements > SQL operators using the new State APIs and they could easily make use of the > async state processing as well as the disaggregated state management. > However, it is also important that the Datastream users could leverage the > capability of async state query and disaggregated state management. This FLIP > proposes to expose and enable them in Datastream(V1) APIs. > For more details, please read the FLIP-488 > https://cwiki.apache.org/confluence/x/yIrREw -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [BP-1.20][FLINK-36689][Runtime/Web Frontend] Update ng-zorro-antd to v18 [flink]
mehdid93 opened a new pull request, #25829: URL: https://github.com/apache/flink/pull/25829 ## What is the purpose of the change This PR backport the changes done of the PR made by @simplejason (https://github.com/apache/flink/pull/25713) in master for version 1.20.X to be used in dependencies upgrade. ## Brief change log - Angular: v18.2.13 - ng-zorro-antd: v18.2.1 - typescript: v5.4.5 ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36763 / 36690][runtime] Support new "distributed" schema evolution topology & fix parallelized hang glitch [flink-cdc]
yuxiqian commented on code in PR #3801: URL: https://github.com/apache/flink-cdc/pull/3801#discussion_r1893623717 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java: ## @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.distributed; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.route.RouteRule; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.sink.MetadataApplier; +import org.apache.flink.cdc.common.utils.Preconditions; +import org.apache.flink.cdc.common.utils.SchemaMergingUtils; +import org.apache.flink.cdc.common.utils.SchemaUtils; +import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator; +import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager; +import org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry; +import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent; +import org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSchemaRequest; +import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest; +import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; +import org.apache.flink.util.FlinkRuntimeException; + +import org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable; +import org.apache.flink.shaded.guava31.com.google.common.collect.HashMultimap; +import org.apache.flink.shaded.guava31.com.google.common.collect.Multimap; +import org.apache.flink.shaded.guava31.com.google.common.collect.Table; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils.wrap; + +/** Coordinator node for {@link SchemaOperator}. Registry actor in distributed topology. */ +public class SchemaCoordinator extends SchemaRegistry { + +private static final Logger LOG = LoggerFactory.getLogger(SchemaCoordinator.class); + +/** Atomic finite state machine to track global schema evolving state. */ +private transient AtomicReference evolvingStatus; + +/** Request futures from pending schema mappers. */ +private transient Map< +Integer, Tuple2>> +pendingRequests; + +/** Tracing sink writers that have flushed successfully. */ +protected transient Set flushedSinkWriters; + +/** + * Transient upstream table schema. The second arity is source partition ID, because in + * distributed topology, schemas might vary among partitions, so we can't rely on {@code + * schemaManager} to store original schemas. + */ +private transient Table upstreamSchemaTable; + +/** + * In distributed topology, one schema change event will be broadcast N-times (N = downstream + * parallelism). We need to effectively ignore duplicate ones since not all {@link + * SchemaChangeEvent}s are idempotent. + */ +private transient Multimap, Integer> +
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
snuyanzin commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893604056 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1030,6 +1030,17 @@ void createTemporarySystemFunction( */ boolean dropTemporaryTable(String path); +/** + * Drops a table registered in the given path. Review Comment: yep, even more `DROP TABLE` via `DropTableOpeartion` leads execution to the same code https://github.com/apache/flink/blob/84044f45830542d13d6c9baeb2bfe30de3eac4ac/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java#L71-L75 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][docs] Fix Apache Avro Specification Link. [flink]
davidradl commented on PR #25769: URL: https://github.com/apache/flink/pull/25769#issuecomment-2556549160 @rmetzger as discussed at the Chi meeting are you ok to merge this? @donPain can you raise a separate issue to track the variable replacement improvement idea please. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator
[ https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Qilong updated FLINK-36942: Description: When I use the async state API in RowTimeSortOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires Regarding how to reproduce: in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator And execute the tests in RowTimestamportOperatorTest I think this is a very strange bug because it can be triggered smoothly in [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], but cannot be implemented in RowTimestamportOperator was: When I use the async state API in RowTimeSortOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires Regarding how to reproduce: in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator And execute the tests in RowTimestamportOperatorTest I think this is a very strange bug because it can be triggered smoothly in AsyncStateWindowJoinOperator[https://github.com/apache/flink/pull/25815], but cannot be implemented in RowTimestamportOperator > Unable to trigger onEventTime() when using the async state API in > RowTimestamportOperator > - > > Key: FLINK-36942 > URL: https://issues.apache.org/jira/browse/FLINK-36942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 2.0.0 >Reporter: Wang Qilong >Priority: Major > > When I use the async state API in RowTimeSortOperator and inherit it from > AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the > timer expires > Regarding how to reproduce: > in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator > And execute the tests in RowTimestamportOperatorTest > I think this is a very strange bug because it can be triggered smoothly in > [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], > but cannot be implemented in RowTimestamportOperator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator
[ https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Qilong updated FLINK-36942: Description: When I use the async state API in RowTimeSortOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires Regarding how to reproduce: in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator And execute the tests in RowTimestamportOperatorTest I think this is a very strange bug because it can be triggered smoothly in windowJoinOperator, but cannot be implemented in RowTimestamportOperator was: When I use the async state API in RowTimestamportOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires Regarding how to reproduce: in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator And execute the tests in RowTimestamportOperatorTest I think this is a very strange bug because it can be triggered smoothly in windowJoinOperator, but cannot be implemented in RowTimestamportOperator > Unable to trigger onEventTime() when using the async state API in > RowTimestamportOperator > - > > Key: FLINK-36942 > URL: https://issues.apache.org/jira/browse/FLINK-36942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 2.0.0 >Reporter: Wang Qilong >Priority: Major > > When I use the async state API in RowTimeSortOperator and inherit it from > AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the > timer expires > Regarding how to reproduce: > in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator > And execute the tests in RowTimestamportOperatorTest > I think this is a very strange bug because it can be triggered smoothly in > windowJoinOperator, but cannot be implemented in RowTimestamportOperator -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [BP-1.20][FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
mehdid93 opened a new pull request, #25827: URL: https://github.com/apache/flink/pull/25827 ## What is the purpose of the change This PR backport the changes done of the PR made by @zentol (https://github.com/apache/flink/pull/25708) in master for version 1.20.X to be used in dependencies upgrade. ## Brief change log - Ubuntu CI image version is no more v18 but v22 ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893679664 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1040,6 +1066,32 @@ void createTemporarySystemFunction( */ boolean dropTemporaryView(String path); +/** + * Drops a view registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryView}. This method + * can only drop permanent objects. + * + * * Compared to SQL, this method will not throw an error if the table does not exist. Use + * {@link #dropView(java.lang.String, boolean)} to change the default behavior. + * + * @return true if a view existed in the given path and was removed + */ +boolean dropView(String path); + +/** + * Drops a view registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryView}. This method + * can only drop permanent objects. + * + * @return true if a view existed in the given path and was removed Throws {@link + * ValidationException} if view not exists and ignoreIfNotExists is false + */ +boolean dropView(String path, boolean ignoreIfNotExists); Review Comment: missing @params javadoc ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1030,6 +1030,32 @@ void createTemporarySystemFunction( */ boolean dropTemporaryTable(String path); +/** + * Drops a table registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryTable}. This method + * can only drop permanent objects. + * + * Compared to SQL, this method will not throw an error if the table does not exist. Use + * {@link #dropTable(java.lang.String, boolean)} to change the default behavior. + * + * @return true if a table existed in the given path and was removed Review Comment: missing @params javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893681601 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1040,6 +1066,32 @@ void createTemporarySystemFunction( */ boolean dropTemporaryView(String path); +/** + * Drops a view registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryView}. This method + * can only drop permanent objects. + * + * * Compared to SQL, this method will not throw an error if the table does not exist. Use Review Comment: table -> view -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-1.20][FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]
flinkbot commented on PR #25827: URL: https://github.com/apache/flink/pull/25827#issuecomment-2556600135 ## CI report: * 565408f120180d245b935b09165a43bac4731bd7 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator
[ https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Qilong updated FLINK-36942: Description: When I use the async state API in RowTimeSortOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires Regarding how to reproduce: in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator And execute the tests in RowTimestamportOperatorTest I think this is a very strange bug because it can be triggered smoothly in AsyncStateWindowJoinOperator[https://github.com/apache/flink/pull/25815], but cannot be implemented in RowTimestamportOperator was: When I use the async state API in RowTimeSortOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires Regarding how to reproduce: in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator And execute the tests in RowTimestamportOperatorTest I think this is a very strange bug because it can be triggered smoothly in windowJoinOperator, but cannot be implemented in RowTimestamportOperator > Unable to trigger onEventTime() when using the async state API in > RowTimestamportOperator > - > > Key: FLINK-36942 > URL: https://issues.apache.org/jira/browse/FLINK-36942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 2.0.0 >Reporter: Wang Qilong >Priority: Major > > When I use the async state API in RowTimeSortOperator and inherit it from > AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the > timer expires > Regarding how to reproduce: > in https://github.com/Au-Miner/flink/tree/RowTimeSortOperator > And execute the tests in RowTimestamportOperatorTest > I think this is a very strange bug because it can be triggered smoothly in > AsyncStateWindowJoinOperator[https://github.com/apache/flink/pull/25815], but > cannot be implemented in RowTimestamportOperator -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
twalthr commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893677952 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1040,6 +1066,32 @@ void createTemporarySystemFunction( */ boolean dropTemporaryView(String path); +/** + * Drops a view registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryView}. This method + * can only drop permanent objects. + * + * * Compared to SQL, this method will not throw an error if the table does not exist. Use Review Comment: ```suggestion * Compared to SQL, this method will not throw an error if the table does not exist. Use ``` ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1040,6 +1066,32 @@ void createTemporarySystemFunction( */ boolean dropTemporaryView(String path); +/** + * Drops a view registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryView}. This method + * can only drop permanent objects. + * + * * Compared to SQL, this method will not throw an error if the table does not exist. Use + * {@link #dropView(java.lang.String, boolean)} to change the default behavior. + * + * @return true if a view existed in the given path and was removed + */ +boolean dropView(String path); + +/** + * Drops a view registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryView}. This method + * can only drop permanent objects. + * + * @return true if a view existed in the given path and was removed Throws {@link + * ValidationException} if view not exists and ignoreIfNotExists is false Review Comment: ```suggestion * ValidationException} if the view does not exist and ignoreIfNotExists is false ``` ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1040,6 +1066,32 @@ void createTemporarySystemFunction( */ boolean dropTemporaryView(String path); +/** + * Drops a view registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryView}. This method + * can only drop permanent objects. + * + * * Compared to SQL, this method will not throw an error if the table does not exist. Use + * {@link #dropView(java.lang.String, boolean)} to change the default behavior. + * + * @return true if a view existed in the given path and was removed + */ +boolean dropView(String path); + +/** + * Drops a view registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryView}. This method + * can only drop permanent objects. + * + * @return true if a view existed in the given path and was removed Throws {@link Review Comment: ```suggestion * @return true if a view existed in the given path and was removed, throws {@link ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator
[ https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Qilong updated FLINK-36942: Description: When I use the async state API in RowTimeSortOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires. Regarding how to reproduce: In [AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator] and execute the tests in RowTimestamportOperatorTest. I think this is a very strange bug because it can be triggered smoothly in [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], but cannot be implemented in RowTimestamportOperator. was: When I use the async state API in RowTimeSortOperator and inherit it from AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer expires Regarding how to reproduce: in [AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator] And execute the tests in RowTimestamportOperatorTest I think this is a very strange bug because it can be triggered smoothly in [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], but cannot be implemented in RowTimestamportOperator > Unable to trigger onEventTime() when using the async state API in > RowTimestamportOperator > - > > Key: FLINK-36942 > URL: https://issues.apache.org/jira/browse/FLINK-36942 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 2.0.0 >Reporter: Wang Qilong >Priority: Major > > When I use the async state API in RowTimeSortOperator and inherit it from > AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the > timer expires. > Regarding how to reproduce: > In > [AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator] > and execute the tests in RowTimestamportOperatorTest. > I think this is a very strange bug because it can be triggered smoothly in > [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], > but cannot be implemented in RowTimestamportOperator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893685759 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1030,6 +1030,32 @@ void createTemporarySystemFunction( */ boolean dropTemporaryTable(String path); +/** + * Drops a table registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryTable}. This method + * can only drop permanent objects. + * + * Compared to SQL, this method will not throw an error if the table does not exist. Use Review Comment: I am wondering if we could change `Compared to SQL` to `Compared to ANSI SQL` which I guess is what we mean here. As Flink SQL is SQL as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893689041 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1040,6 +1066,32 @@ void createTemporarySystemFunction( */ boolean dropTemporaryView(String path); +/** + * Drops a view registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryView}. This method Review Comment: nit: I would move sentence "This method can only drop permanent objects" to the beginning of the paragraph. So it is obvious what this method is before we talk about temporary objects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
davidradl commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893681174 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1040,6 +1066,32 @@ void createTemporarySystemFunction( */ boolean dropTemporaryView(String path); +/** + * Drops a view registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryView}. This method + * can only drop permanent objects. + * + * * Compared to SQL, this method will not throw an error if the table does not exist. Use + * {@link #dropView(java.lang.String, boolean)} to change the default behavior. + * + * @return true if a view existed in the given path and was removed Review Comment: missing @params javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]
snuyanzin commented on code in PR #25810: URL: https://github.com/apache/flink/pull/25810#discussion_r1893690187 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java: ## @@ -1030,6 +1030,32 @@ void createTemporarySystemFunction( */ boolean dropTemporaryTable(String path); +/** + * Drops a table registered in the given path. + * + * Temporary objects can shadow permanent ones. If a temporary object exists in a given path, + * make sure to drop the temporary object first using {@link #dropTemporaryTable}. This method + * can only drop permanent objects. + * + * Compared to SQL, this method will not throw an error if the table does not exist. Use Review Comment: No here we mean Flink SQL and we compare Table API vs Flink SQL (usually called just SQL) behavior -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]
snuyanzin commented on PR #25828: URL: https://github.com/apache/flink/pull/25828#issuecomment-2556669198 fyi: it can not be both `[hotfix]` and marked with jira issue. So please change commit message with something like `[FLINK-36941][docs] ...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org