Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1894119512


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1026,20 +1026,86 @@ void createTemporarySystemFunction(
  * If a permanent table with a given path exists, it will be used from 
now on for any queries
  * that reference this path.
  *
+ * @param path The given path under which the temporary table will be 
dropped. See also the
+ * {@link TableEnvironment} class description for the format of the 
path.
  * @return true if a table existed in the given path and was removed
  */
 boolean dropTemporaryTable(String path);
 
+/**
+ * Drops a table registered in the given path.
+ *
+ * This method can only drop permanent objects. Temporary objects can 
shadow permanent ones.
+ * If a temporary object exists in a given path, make sure to drop the 
temporary object first
+ * using {@link #dropTemporaryTable}.
+ *
+ * Compared to SQL, this method will not throw an error if the table 
does not exist. Use
+ * {@link #dropTable(java.lang.String, boolean)} to change the default 
behavior.
+ *
+ * @param path The given path under which the table will be dropped. See 
also the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @return true if a table existed in the given path and was removed
+ */
+boolean dropTable(String path);
+
+/**
+ * Drops a table registered in the given path.
+ *
+ * This method can only drop permanent objects. Temporary objects can 
shadow permanent ones.
+ * If a temporary object exists in a given path, make sure to drop the 
temporary object first
+ * using {@link #dropTemporaryTable}.
+ *
+ * @param path The given path under which the given table will be dropped. 
See also the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @param ignoreIfNotExists whether to ignore if table does not exist.
+ * @return true if a table existed in the given path and was removed, 
throws {@link

Review Comment:
   nit: can we add when false is returned, so it is explicit , maybe add 
"return false if the table was not dropped, because the table does not exist 
and ignoreIfNotExists is true."  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894119140


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java:
##
@@ -366,6 +366,61 @@ public static ApiExpression toTimestampLtz(Object 
numericEpochTime, Object preci
 return apiCall(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, 
numericEpochTime, precision);
 }
 
+/**
+ * * Converts the given time string with the specified format to {@link

Review Comment:
   modified. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36227] Restore compatibility with Logback 1.2 [flink]

2024-12-20 Thread via GitHub


afedulov commented on PR #25813:
URL: https://github.com/apache/flink/pull/25813#issuecomment-2557293147

   @piotrp thanks for the contribution. Please consider that usage of Mockito 
in Flink is generally discouraged:
   
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894092145


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java:
##
@@ -804,4 +813,163 @@ private Stream floorTestCases() {
 .format(TIMESTAMP_FORMATTER),
 STRING().nullable()));
 }
+
+private Stream toTimestampLtzTestCases() {
+return Stream.of(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ)
+.onFieldsWithData(
+100,
+1234,
+-100,
+null,
+DecimalDataUtils.castFrom(-Double.MAX_VALUE, 
38, 18),

Review Comment:
   Hi @snuyanzin , I have the invalid test cases in the last commit, the 
invalid inputs are directly pased in.  
https://github.com/apache/flink/pull/25763/files/982b92f13fbb93b8e0c5244b30d4ae8bbbfa3a25#diff-7588925e682b7f5b4b2e41fdc4992ef0d9f246ce00696f783c36f993b8aeb7aeR950-R973
   
   Could you clarify what you mean in this comment? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894106052


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894092145


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java:
##
@@ -804,4 +813,163 @@ private Stream floorTestCases() {
 .format(TIMESTAMP_FORMATTER),
 STRING().nullable()));
 }
+
+private Stream toTimestampLtzTestCases() {
+return Stream.of(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ)
+.onFieldsWithData(
+100,
+1234,
+-100,
+null,
+DecimalDataUtils.castFrom(-Double.MAX_VALUE, 
38, 18),

Review Comment:
   Hi @snuyanzin , I have the invalid test cases in the last commit, the 
invalid inputs are directly passed in as literals.  
https://github.com/apache/flink/pull/25763/files/982b92f13fbb93b8e0c5244b30d4ae8bbbfa3a25#diff-7588925e682b7f5b4b2e41fdc4992ef0d9f246ce00696f783c36f993b8aeb7aeR950-R973
   
   Could you clarify what you mean in this comment? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1894119512


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1026,20 +1026,86 @@ void createTemporarySystemFunction(
  * If a permanent table with a given path exists, it will be used from 
now on for any queries
  * that reference this path.
  *
+ * @param path The given path under which the temporary table will be 
dropped. See also the
+ * {@link TableEnvironment} class description for the format of the 
path.
  * @return true if a table existed in the given path and was removed
  */
 boolean dropTemporaryTable(String path);
 
+/**
+ * Drops a table registered in the given path.
+ *
+ * This method can only drop permanent objects. Temporary objects can 
shadow permanent ones.
+ * If a temporary object exists in a given path, make sure to drop the 
temporary object first
+ * using {@link #dropTemporaryTable}.
+ *
+ * Compared to SQL, this method will not throw an error if the table 
does not exist. Use
+ * {@link #dropTable(java.lang.String, boolean)} to change the default 
behavior.
+ *
+ * @param path The given path under which the table will be dropped. See 
also the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @return true if a table existed in the given path and was removed
+ */
+boolean dropTable(String path);
+
+/**
+ * Drops a table registered in the given path.
+ *
+ * This method can only drop permanent objects. Temporary objects can 
shadow permanent ones.
+ * If a temporary object exists in a given path, make sure to drop the 
temporary object first
+ * using {@link #dropTemporaryTable}.
+ *
+ * @param path The given path under which the given table will be dropped. 
See also the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @param ignoreIfNotExists whether to ignore if table does not exist.
+ * @return true if a table existed in the given path and was removed, 
throws {@link

Review Comment:
   Can we add when false is returned, so it is explicit , maybe add "return 
false if the table was not dropped, because the table does not exist and 
ignoreIfNotExists is true."  
   
   In Catalog manager we say:
   " * @return true if table existed in the given path and was dropped, false 
if table didn't exist
* in the given path and ignoreIfNotExists was true."
* 
I think the javadoc should be the same in each case, I would suggest 
that the return only documents when it returns true and false - like 
CatalogManager does.
   
   I think if we want to mention the ValidationException this should be in the 
body of the javadoc, or as an @Exception if we want to declare the 
ValidationException on the method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1894119512


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1026,20 +1026,86 @@ void createTemporarySystemFunction(
  * If a permanent table with a given path exists, it will be used from 
now on for any queries
  * that reference this path.
  *
+ * @param path The given path under which the temporary table will be 
dropped. See also the
+ * {@link TableEnvironment} class description for the format of the 
path.
  * @return true if a table existed in the given path and was removed
  */
 boolean dropTemporaryTable(String path);
 
+/**
+ * Drops a table registered in the given path.
+ *
+ * This method can only drop permanent objects. Temporary objects can 
shadow permanent ones.
+ * If a temporary object exists in a given path, make sure to drop the 
temporary object first
+ * using {@link #dropTemporaryTable}.
+ *
+ * Compared to SQL, this method will not throw an error if the table 
does not exist. Use
+ * {@link #dropTable(java.lang.String, boolean)} to change the default 
behavior.
+ *
+ * @param path The given path under which the table will be dropped. See 
also the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @return true if a table existed in the given path and was removed
+ */
+boolean dropTable(String path);
+
+/**
+ * Drops a table registered in the given path.
+ *
+ * This method can only drop permanent objects. Temporary objects can 
shadow permanent ones.
+ * If a temporary object exists in a given path, make sure to drop the 
temporary object first
+ * using {@link #dropTemporaryTable}.
+ *
+ * @param path The given path under which the given table will be dropped. 
See also the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @param ignoreIfNotExists whether to ignore if table does not exist.
+ * @return true if a table existed in the given path and was removed, 
throws {@link

Review Comment:
   nit: can we add when false is returned, so it is explicit , maybe add 
"return false if the table was not dropped, because the table does not exist 
and ignoreIfNotExists is true."  
   
   In Catalog manager we say:
   " * @return true if table existed in the given path and was dropped, false 
if table didn't exist
* in the given path and ignoreIfNotExists was true."
* 
I think the javadoc should be the same in each case, I would suggest 
that the return only documents when it returns true and false - like 
CatalogManager does.
   
   I think if we want to mention the ValidationException this should be in the 
body of the javadoc, or as an @Exception if we want to declare the 
ValidationException on the method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36227] Restore compatibility with Logback 1.2 [flink]

2024-12-20 Thread via GitHub


afedulov commented on code in PR #25813:
URL: https://github.com/apache/flink/pull/25813#discussion_r1894125426


##
flink-core/pom.xml:
##
@@ -180,6 +180,13 @@ under the License.
test

 
+   

Review Comment:
   Mockito is intentionallly excluded from Flink and adding this dependency to 
flink-core sends a wrong message. Please check if you can avoid using it.
   
https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-mockito---use-reusable-test-implementations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


hanyuzheng7 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894127361


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTim

Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1894119512


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1026,20 +1026,86 @@ void createTemporarySystemFunction(
  * If a permanent table with a given path exists, it will be used from 
now on for any queries
  * that reference this path.
  *
+ * @param path The given path under which the temporary table will be 
dropped. See also the
+ * {@link TableEnvironment} class description for the format of the 
path.
  * @return true if a table existed in the given path and was removed
  */
 boolean dropTemporaryTable(String path);
 
+/**
+ * Drops a table registered in the given path.
+ *
+ * This method can only drop permanent objects. Temporary objects can 
shadow permanent ones.
+ * If a temporary object exists in a given path, make sure to drop the 
temporary object first
+ * using {@link #dropTemporaryTable}.
+ *
+ * Compared to SQL, this method will not throw an error if the table 
does not exist. Use
+ * {@link #dropTable(java.lang.String, boolean)} to change the default 
behavior.
+ *
+ * @param path The given path under which the table will be dropped. See 
also the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @return true if a table existed in the given path and was removed
+ */
+boolean dropTable(String path);
+
+/**
+ * Drops a table registered in the given path.
+ *
+ * This method can only drop permanent objects. Temporary objects can 
shadow permanent ones.
+ * If a temporary object exists in a given path, make sure to drop the 
temporary object first
+ * using {@link #dropTemporaryTable}.
+ *
+ * @param path The given path under which the given table will be dropped. 
See also the {@link
+ * TableEnvironment} class description for the format of the path.
+ * @param ignoreIfNotExists whether to ignore if table does not exist.
+ * @return true if a table existed in the given path and was removed, 
throws {@link

Review Comment:
   Can we add when false is returned, so it is explicit , maybe add "return 
false if the table was not dropped, because the table does not exist and 
ignoreIfNotExists is true."  
   
   In Catalog manager we say:
   " * @return true if table existed in the given path and was dropped, false 
if table didn't exist
* in the given path and ignoreIfNotExists was true."
* 
I think the javadoc should be the same in each case, I would suggest 
that the return only documents when it returns true and false - like 
CatalogManager does. 
   
   I think if we want to mention the ValidationException this should be in the 
body of the javadoc, or as an @Exception if we want to declare the 
ValidationException on the method. Though I see it is mentioned in the @param 
ignoreIfNotExists javadoc in some of the methods - but not consistently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894146265


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894146265


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894166214


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894166214


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894166214


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894166214


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

[PR] [FLINK-33117][table][docs] Fix scala example in udfs page [flink]

2024-12-20 Thread via GitHub


afedulov opened a new pull request, #25833:
URL: https://github.com/apache/flink/pull/25833

   Unmodified backport of https://github.com/apache/flink/pull/23439 to 1.20


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33117][table][docs] Fix scala example in udfs page [flink]

2024-12-20 Thread via GitHub


afedulov commented on PR #25833:
URL: https://github.com/apache/flink/pull/25833#issuecomment-2557385736

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33117][table][docs] Fix scala example in udfs page [flink]

2024-12-20 Thread via GitHub


flinkbot commented on PR #25833:
URL: https://github.com/apache/flink/pull/25833#issuecomment-2557386028

   
   ## CI report:
   
   * c8f3c26629996819cea131ef8cf11526e6b98369 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894092145


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java:
##
@@ -804,4 +813,163 @@ private Stream floorTestCases() {
 .format(TIMESTAMP_FORMATTER),
 STRING().nullable()));
 }
+
+private Stream toTimestampLtzTestCases() {
+return Stream.of(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ)
+.onFieldsWithData(
+100,
+1234,
+-100,
+null,
+DecimalDataUtils.castFrom(-Double.MAX_VALUE, 
38, 18),

Review Comment:
   Hi @snuyanzin , I have the invalid test cases here, the invalid inputs are 
directly pased in.  
https://github.com/apache/flink/pull/25763/files/982b92f13fbb93b8e0c5244b30d4ae8bbbfa3a25#diff-7588925e682b7f5b4b2e41fdc4992ef0d9f246ce00696f783c36f993b8aeb7aeR950-R973
   
   Could you clarify what you mean in this comment? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36931) FlinkCDC YAML supports synchronizing the full amount of data of the entire database in Batch mode

2024-12-20 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-36931:
--

Assignee: Wenkai Qi

> FlinkCDC YAML supports synchronizing the full amount of data of the entire 
> database in Batch mode
> -
>
> Key: FLINK-36931
> URL: https://issues.apache.org/jira/browse/FLINK-36931
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: Wenkai Qi
>Assignee: Wenkai Qi
>Priority: Major
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> h1. Background
> MysqlCDC in Flink CDC supports *StartupMode.SNAPSHOT* and is of 
> {*}Boundedness.BOUNDED{*}, and can run in {*}RuntimeExecutionMode.BATCH{*}.
> h1.  
> Expectation
> FlinkCDC YAML jobs can support {*}StartupMode.SNAPSHOT{*}, be of 
> {*}Boundedness.BOUNDED{*}, and can run in {*}RuntimeExecutionMode.BATCH{*}.
> h1. Benefits
>  
>  # The performance improvement of Flink Batch can be utilized (dynamic 
> partition pruning, Hybrid Shuffle). Which optimizations of the batch mode 
> will be used needs to be discussed.
>  # The full amount of data of the entire database can be synchronized to 
> supplement data in an offline computing manner. In the future, it can even 
> support the full amount of data synchronization of the entire database for 
> other databases and data lakes.
> h1. Under consideration
>  
>  # Sink needs to switch to Batch mode. 
> [https://github.com/apache/flink-cdc/pull/3646#pullrequestreview-2491309306]
>  # For 2PC sink, call a checkpoint with checkpointid of Long.MAX_VALUE once, 
> and the sink should make the final submission based on this id.
>  # Sink directly supports Batch writing (such as DorisSink)
>  # ...(In supplementation)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


snuyanzin commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894130094


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


snuyanzin commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894130094


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1894138335


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -1297,30 +1303,42 @@ public void dropMaterializedTable(
  * @param objectIdentifier The fully qualified path of the view to drop.
  * @param ignoreIfNotExists If false exception will be thrown if the view 
to drop does not
  * exist.
+ * @return true if view existed in the given path and was dropped, false 
if view didn't exist in
+ * the given path and ignoreIfNotExists was true.
  */
-public void dropView(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
-dropTableInternal(objectIdentifier, ignoreIfNotExists, false, false);
-}
+public boolean dropView(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
+return dropTableInternal(objectIdentifier, ignoreIfNotExists, 
TableKind.VIEW);
+}
+
+private boolean dropTableInternal(
+ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists, 
TableKind kind) {
+final Predicate filter;
+final String tableOrView;
+switch (kind) {

Review Comment:
   I notice that `CatalogBaseTable` has `getTableKind()` can we use this 
instead of `instanceof ` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1894138335


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -1297,30 +1303,42 @@ public void dropMaterializedTable(
  * @param objectIdentifier The fully qualified path of the view to drop.
  * @param ignoreIfNotExists If false exception will be thrown if the view 
to drop does not
  * exist.
+ * @return true if view existed in the given path and was dropped, false 
if view didn't exist in
+ * the given path and ignoreIfNotExists was true.
  */
-public void dropView(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
-dropTableInternal(objectIdentifier, ignoreIfNotExists, false, false);
-}
+public boolean dropView(ObjectIdentifier objectIdentifier, boolean 
ignoreIfNotExists) {
+return dropTableInternal(objectIdentifier, ignoreIfNotExists, 
TableKind.VIEW);
+}
+
+private boolean dropTableInternal(
+ObjectIdentifier objectIdentifier, boolean ignoreIfNotExists, 
TableKind kind) {
+final Predicate filter;
+final String tableOrView;
+switch (kind) {

Review Comment:
   I notice that `CatalogBaseTable` has `getTableKind()` can we use this 
instead of `instanceof ` avoiding the overhead of reflection?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894140247


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java:
##
@@ -804,4 +813,163 @@ private Stream floorTestCases() {
 .format(TIMESTAMP_FORMATTER),
 STRING().nullable()));
 }
+
+private Stream toTimestampLtzTestCases() {
+return Stream.of(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ)
+.onFieldsWithData(
+100,
+1234,
+-100,
+null,
+DecimalDataUtils.castFrom(-Double.MAX_VALUE, 
38, 18),

Review Comment:
   Resolved. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894143500


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894146265


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


snuyanzin commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894179515


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


snuyanzin commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894179515


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

[jira] [Assigned] (FLINK-36946) optimize sink operator name truncate

2024-12-20 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-36946:
---

Assignee: tartarus

> optimize sink operator name truncate
> 
>
> Key: FLINK-36946
> URL: https://issues.apache.org/jira/browse/FLINK-36946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: tartarus
>Assignee: tartarus
>Priority: Major
>  Labels: pull-request-available
>
> We use the `Writer` keyword to filter the Metric of the sink Operator in 
> flink.
> But the user has set a very long sink name that causes, the `Writer` keyword 
> to be truncated.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36945) MySQL CDC internal schema representation becomes out of sync with the real database schema when restarting a job

2024-12-20 Thread Yohei Yoshimuta (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yohei Yoshimuta updated FLINK-36945:

Description: 
[The Vitess schema migration 
tool|https://vitess.io/docs/user-guides/schema-changes/ddl-strategies/] uses 
_RENAME TABLE_ to perform schema changes.

However, the MySQL CDC connector does not account for these changes, causing 
the schema history topic in Debezium to become stale. While this issue does not 
immediately affect a running job, {*}it prevents the job from restarting 
successfully from a checkpoint and results in the following error{*}:
{code:java}
Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 Data row is smaller than a column index, internal schema representation is 
probably out of sync with real database schema
at 
io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254)
at 
io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283)
at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:139)
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:60)
at 
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209)
... 12 more
{code}
When this happens, *the database history topic needs to be rebuilt, but the job 
cannot be automatically recovered.*

The current workaround is to set `scan.startup.mode` to `specific-offset`, 
which forces CDC [to pass `schema_only_recovery` to 
Debezium|https://github.com/apache/flink-cdc/blob/dd69756d3fc1e3f5261b286added5397acbd88ad/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/MySqlSource.java#L209-L212].
 However, this requires manual intervention.

Examples of potential solutions include:
 - Enhancing the schema history mechanism to capture and process `RENAME TABLE` 
events during schema migrations.

 - Implementing a fallback mechanism to reconcile schema differences during 
recovery, reducing the need for manual intervention.

  was:
[The Vitess schema migration 
tool|https://vitess.io/docs/user-guides/schema-changes/ddl-strategies/] uses 
_RENAME TABLE_ to perform schema changes.

However, the MySQL CDC connector does not account for these changes, causing 
the schema history topic in Debezium to become stale. While this issue does not 
immediately affect a running job, {*}it prevents the job from restarting 
successfully from a checkpoint and results in the following error{*}:
{code:java}
Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 Data row is smaller than a column index, internal schema representation is 
probably out of sync with real database schema
at 
io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254)
at 
io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283)
at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:139)
at 
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:60)
at 
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209)
... 12 more
{code}
When this happens, *the database history topic needs to be rebuilt, but the job 
cannot be automatically recovered.*

The current workaround is to set `scan.startup.mode` to `specific-offset`, 
which forces CDC to pass `schema_only_recovery` to Debezium. However, this 
requires manual intervention.

Examples of potential solutions include:
 - Enhancing the schema history mechanism to capture and process `RENAME TABLE` 
events during schema migrations.

 - Implementing a fallback mechanism to reconcile schema differences during 
recovery, reducing the need for manual intervention.


> MySQL CDC internal schema representation becomes out of sync with the real 
> database schema when restarting a job
> 
>
> Key: FLINK-36945
> URL: https://issues.apache.org/jira/browse/FLINK-36945
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: Yohei Yoshimuta
>Priority: Major
>
> [The Vitess schema migration 
> tool|https://vitess.io/docs/user-guides/schema-changes/ddl-strategies/] uses 
> _RENAME TABLE_ to perform schema changes.
> However, the MySQL CDC connector does not account

[jira] [Commented] (FLINK-36920) Update org.quartz-schedule:quartz in flink-autoscaler module from 2.3.2 to 2.4.0

2024-12-20 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907357#comment-17907357
 ] 

Gyula Fora commented on FLINK-36920:


Makes sense ! But I agree that cutting a new release is almost as easy as a 
patch release at this point:) 

> Update org.quartz-schedule:quartz in flink-autoscaler module from 2.3.2 to 
> 2.4.0
> 
>
> Key: FLINK-36920
> URL: https://issues.apache.org/jira/browse/FLINK-36920
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: 1.10.0
>Reporter: Anupam Aggarwal
>Assignee: Anupam Aggarwal
>Priority: Minor
>  Labels: pull-request-available
>
> Update dependency on org.quartz-scheduler:quartz used in flink-autoscaler 
> module from 2.3.2 to 2.4.0
>  
> *Vulnerability info:*
> cve-2023-39017
> quartz-jobs 2.3.2 and below was discovered to contain a code injection 
> vulnerability in the component 
> org.quartz.jobs.ee.jms.SendQueueMessageJob.execute. This vulnerability is 
> exploited via passing an unchecked argument. NOTE: this is disputed by 
> multiple parties because it is not plausible that untrusted user input would 
> reach the code location where injection must occur.
> More details are at: [https://nvd.nist.gov/vuln/detail/cve-2023-39017] 
> *Proposed fix*
> Bumping the dependency from 2.3.2 to 2.4.0 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-1.20][FLINK-36739] Update the NodeJS to v22.11.0 (LTS) [flink]

2024-12-20 Thread via GitHub


mehdid93 commented on PR #25794:
URL: https://github.com/apache/flink/pull/25794#issuecomment-2556704958

   Hey Robert and David and sorry for the late response.
   Yes the issue is due to the old version of Ubuntu image used in the build CI 
I've backported the PRs made by @zentol and @simplejason in this release-1.20 
branch.
   The list of all the PRs are:
   - https://github.com/apache/flink/pull/25827 Update CI to Ubuntu 22.04 
(Jammy)
   - https://github.com/apache/flink/pull/25794 Update the NodeJS to v22.11.0 
(LTS)
   - https://github.com/apache/flink/pull/25829 Update ng-zorro-antd to v18
   - https://github.com/apache/flink/pull/25830 Update frontend dependencies to 
address vulnerabilities


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator

2024-12-20 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907333#comment-17907333
 ] 

Zakelly Lan commented on FLINK-36942:
-

[~au_miner] Thanks for your contribution! The root cause is the test harness 
`OneInputStreamOperatorTestHarness` is not well implemented with async state. 
Please use `AsyncKeyedOneInputStreamOperatorTestHarness` instead, when you are 
testing the async state.

And the PR you mentioned perform a important 
[fix](https://github.com/apache/flink/pull/25815/files#diff-885dd1cb87987c93b79ac6da39deeb53d5a09dc434196031d4c53e60b18e88e3L57-R66)
 on `TwoInputStreamOperatorTestHarness`. That's why it runs well.

Please try `AsyncKeyedOneInputStreamOperatorTestHarness`, we will use that 
harness for all async state operators in future.

> Unable to trigger onEventTime() when using the async state API in 
> RowTimestamportOperator
> -
>
> Key: FLINK-36942
> URL: https://issues.apache.org/jira/browse/FLINK-36942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 2.0.0
>Reporter: Wang Qilong
>Priority: Major
>
> When I use the async state API in RowTimeSortOperator and inherit it from 
> AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the 
> timer expires.
> Regarding how to reproduce:
> In 
> [AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator]
>  and execute the tests in RowTimestamportOperatorTest.
> I think this is a very strange bug because it can be triggered smoothly in 
> [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], 
> but cannot be implemented in RowTimestamportOperator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36945) MySQL CDC internal schema representation becomes out of sync with the real database schema when restarting a job

2024-12-20 Thread Yohei Yoshimuta (Jira)
Yohei Yoshimuta created FLINK-36945:
---

 Summary: MySQL CDC internal schema representation becomes out of 
sync with the real database schema when restarting a job
 Key: FLINK-36945
 URL: https://issues.apache.org/jira/browse/FLINK-36945
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: Yohei Yoshimuta


[The Vitess schema migration 
tool|https://vitess.io/docs/user-guides/schema-changes/ddl-strategies/] uses 
`RENAME TABLE` to perform schema changes.

However, the MySQL CDC connector does not account for these changes, causing 
the schema history topic in Debezium to become stale. While this issue does not 
immediately affect a running job, it prevents the job from restarting 
successfully from a checkpoint and results in the following error:

 

```

Caused by: 
com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException:
 Data row is smaller than a column index, internal schema representation is 
probably out of sync with real database schema

at 
io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:254)

at 
io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:283)

at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)

at 
io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:139)

at 
io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:60)

at 
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:209)

... 12 more

```

 

When this happens, the database history topic needs to be rebuilt, but the job 
cannot be automatically recovered.

 

The current workaround is to set `scan.startup.mode` to `specific-offset`, 
which forces CDC to pass `schema_only_recovery` to Debezium. However, this 
requires manual intervention.

 

Examples of potential solutions include:

- Enhancing the schema history mechanism to capture and process `RENAME TABLE` 
events during schema migrations.

- Implementing a fallback mechanism to reconcile schema differences during 
recovery, reducing the need for manual intervention.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36944) Introduce 'enableAsycState' in KeyedStream interfaces

2024-12-20 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan updated FLINK-36944:

Summary: Introduce 'enableAsycState' in KeyedStream interfaces  (was: 
Introduce 'enableAsycState' in DataStream v1 interfaces)

> Introduce 'enableAsycState' in KeyedStream interfaces
> -
>
> Key: FLINK-36944
> URL: https://issues.apache.org/jira/browse/FLINK-36944
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36944][DataStream/API] Introduce 'enableAsycState' in KeyedStream interfaces and implement reduce operator [flink]

2024-12-20 Thread via GitHub


Zakelly opened a new pull request, #25831:
URL: https://github.com/apache/flink/pull/25831

   ## What is the purpose of the change
   
   As first part of FLIP-488, this PR introduces `enableAsycState` in 
KeyedStream and its related interface classes. Also it provides a basic 
implementation of async reduce operator.
   
   
   ## Brief change log
   
   Please check each commit.
- Introduce `enableAsyncState` API
- Introduce serializer based state descriptor constructors
- Implement async reduce operator
- Introduce async state version of wordcount example
   
   
   ## Verifying this change
   
   This PR adds a IT test on the wordcount example, which can be a IT test of 
new APIs and reduce operator.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36944) Introduce 'enableAsycState' in KeyedStream interfaces

2024-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36944:
---
Labels: pull-request-available  (was: )

> Introduce 'enableAsycState' in KeyedStream interfaces
> -
>
> Key: FLINK-36944
> URL: https://issues.apache.org/jira/browse/FLINK-36944
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / DataStream
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36946][Metrics] Optimize sink operator name truncate [flink]

2024-12-20 Thread via GitHub


Tartarus0zm opened a new pull request, #25832:
URL: https://github.com/apache/flink/pull/25832

   ## What is the purpose of the change
   
   Avoid truncation of the Writer/Committer keyword.
   
   
   ## Brief change log
   
   - add MetricUtil#truncateOperatorName to optimize Operator name
   
   
   ## Verifying this change
   
   - SinkMetricsITCase#testMetrics
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36946) optimize sink operator name truncate

2024-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36946:
---
Labels: pull-request-available  (was: )

> optimize sink operator name truncate
> 
>
> Key: FLINK-36946
> URL: https://issues.apache.org/jira/browse/FLINK-36946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: tartarus
>Priority: Major
>  Labels: pull-request-available
>
> We use the `Writer` keyword to filter the Metric of the sink Operator in 
> flink.
> But the user has set a very long sink name that causes, the `Writer` keyword 
> to be truncated.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36938] Provide hooks before and after the watermark processing [flink]

2024-12-20 Thread via GitHub


Zakelly merged PR #25824:
URL: https://github.com/apache/flink/pull/25824


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36938) Provide hooks before and after the watermark processing

2024-12-20 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907367#comment-17907367
 ] 

Zakelly Lan commented on FLINK-36938:
-

Merge 7fa4f78 into master

> Provide hooks before and after the watermark processing
> ---
>
> Key: FLINK-36938
> URL: https://issues.apache.org/jira/browse/FLINK-36938
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Async State Processing
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>
> We have rewrite the `processWatermark` in abstract operators for async state 
> processing. The sub-classes need a proper hook for doing something right 
> before the watermark processing and right after the watermark emit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36938) Provide hooks before and after the watermark processing

2024-12-20 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36938?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan resolved FLINK-36938.
-
Fix Version/s: 2.0.0
   Resolution: Fixed

> Provide hooks before and after the watermark processing
> ---
>
> Key: FLINK-36938
> URL: https://issues.apache.org/jira/browse/FLINK-36938
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Async State Processing
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> We have rewrite the `processWatermark` in abstract operators for async state 
> processing. The sub-classes need a proper hook for doing something right 
> before the watermark processing and right after the watermark emit.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36946) optimize sink operator name truncate

2024-12-20 Thread tartarus (Jira)
tartarus created FLINK-36946:


 Summary: optimize sink operator name truncate
 Key: FLINK-36946
 URL: https://issues.apache.org/jira/browse/FLINK-36946
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Reporter: tartarus


We use the `Writer` keyword to filter the Metric of the sink Operator in flink.

But the user has set a very long sink name that causes, the `Writer` keyword to 
be truncated.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36946][Metrics] Optimize sink operator name truncate [flink]

2024-12-20 Thread via GitHub


flinkbot commented on PR #25832:
URL: https://github.com/apache/flink/pull/25832#issuecomment-2556895215

   
   ## CI report:
   
   * cee66261ba96fb8c49b4da0f74abc7c33c05b146 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36946) optimize sink operator name truncate

2024-12-20 Thread tartarus (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907370#comment-17907370
 ] 

tartarus commented on FLINK-36946:
--

[~fanrui]  Hi, if you're free, please take a look, thanks!

> optimize sink operator name truncate
> 
>
> Key: FLINK-36946
> URL: https://issues.apache.org/jira/browse/FLINK-36946
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Metrics
>Reporter: tartarus
>Priority: Major
>  Labels: pull-request-available
>
> We use the `Writer` keyword to filter the Metric of the sink Operator in 
> flink.
> But the user has set a very long sink name that causes, the `Writer` keyword 
> to be truncated.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25828:
URL: https://github.com/apache/flink/pull/25828#discussion_r1893949065


##
docs/data/sql_functions.yml:
##
@@ -642,7 +642,10 @@ temporal:
 description: Returns TRUE if two time intervals defined by (timepoint1, 
temporal1) and (timepoint2, temporal2) overlap. The temporal values could be 
either a time point or a time interval. E.g., (TIME '2:55:00', INTERVAL '1' 
HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR) returns TRUE; (TIME 
'9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR) 
returns FALSE.
   - sql: DATE_FORMAT(timestamp, string)
 table: dateFormat(TIMESTAMP, STRING)
-description: Converts timestamp to a value of string in the format 
specified by the date format string. The format string is compatible with 
Java's SimpleDateFormat.
+description: Converts timestamp to a value of string in the format 
specified by the date format string. The format string is compatible with 
Java's DateTimeFormatter.

Review Comment:
   nit: to a value of string -> to a string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


snuyanzin commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1893951451


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


snuyanzin commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1893952530


##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/TimeFunctionsITCase.java:
##
@@ -804,4 +813,163 @@ private Stream floorTestCases() {
 .format(TIMESTAMP_FORMATTER),
 STRING().nullable()));
 }
+
+private Stream toTimestampLtzTestCases() {
+return Stream.of(
+
TestSetSpec.forFunction(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ)
+.onFieldsWithData(
+100,
+1234,
+-100,
+null,
+DecimalDataUtils.castFrom(-Double.MAX_VALUE, 
38, 18),

Review Comment:
   I don't see here tests for invalid input
   like invalid timezone, invalid format
   invalid something else...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36770) Support Request Timeout for AWS sinks

2024-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36770:
---
Labels: pull-request-available  (was: )

> Support Request Timeout for AWS sinks
> -
>
> Key: FLINK-36770
> URL: https://issues.apache.org/jira/browse/FLINK-36770
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / AWS
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Major
>  Labels: pull-request-available
> Fix For: aws-connector-5.1.0
>
>
> h2. Description
> in 
> [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]
>  we introduced request timeout for Async Sink which was released in 1.20, 
> Ideally we want to support that in AWS connectors.
> h2. Acceptance Criteria
>  - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated
> h2. Notes
> This is a breaking change regarding flink version compatibility which means 
> we expect the next version (5.1) to not be compatible with 1.19 anymore. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36770-] make AWS sink writers use ResultHandler, move flink version to 1.20 [flink-connector-aws]

2024-12-20 Thread via GitHub


vahmed-hamdy opened a new pull request, #186:
URL: https://github.com/apache/flink-connector-aws/pull/186

   
   
   
   
   ## Purpose of the change
   
- Move `submitRequestEntries` implementation for AWS sinks to use 
`ResultHandler`
- Bump flink version to 1.20
- Adapt pipelines to Flink version change
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   This change is already covered by existing sink unit tests
   
   
   ## Significant changes
   *(Please check any boxes [x] if the answer is "yes". You can first publish 
the PR and check them afterwards, for convenience.)*
   - [ ] Dependencies have been added or upgraded
   - [ ] Public API has been changed (Public API is any class annotated with 
`@Public(Evolving)`)
   - [ ] Serializers have been changed
   - [ ] New feature has been introduced
 - If yes, how is this documented? (not applicable / docs / JavaDocs / not 
documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]

2024-12-20 Thread via GitHub


nictownsend commented on code in PR #25828:
URL: https://github.com/apache/flink/pull/25828#discussion_r1893958758


##
docs/data/sql_functions.yml:
##
@@ -642,7 +642,10 @@ temporal:
 description: Returns TRUE if two time intervals defined by (timepoint1, 
temporal1) and (timepoint2, temporal2) overlap. The temporal values could be 
either a time point or a time interval. E.g., (TIME '2:55:00', INTERVAL '1' 
HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR) returns TRUE; (TIME 
'9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR) 
returns FALSE.
   - sql: DATE_FORMAT(timestamp, string)
 table: dateFormat(TIMESTAMP, STRING)
-description: Converts timestamp to a value of string in the format 
specified by the date format string. The format string is compatible with 
Java's SimpleDateFormat.
+description: Converts timestamp to a value of string in the format 
specified by the date format string. The format string is compatible with 
Java's DateTimeFormatter.
+  - sql: DATE_FORMAT(string, string)
+table: dateFormat(STRING, STRING)
+description: Re-format a timestamp string to another string, using a 
custom format. The format string is compatible with Java's SimpleDateFormat.

Review Comment:
   Assuming I've read 
https://github.com/apache/flink/blob/7fa4f78e43bfd99a7e7a3d655a9a3641e4d7bc59/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java#L2304
 to mean it invokes `unixTimestamp(dateStr, format, null)` as there is no `tz` 
third parameter in the SQL function.  



##
docs/data/sql_functions.yml:
##
@@ -642,7 +642,10 @@ temporal:
 description: Returns TRUE if two time intervals defined by (timepoint1, 
temporal1) and (timepoint2, temporal2) overlap. The temporal values could be 
either a time point or a time interval. E.g., (TIME '2:55:00', INTERVAL '1' 
HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR) returns TRUE; (TIME 
'9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR) 
returns FALSE.
   - sql: DATE_FORMAT(timestamp, string)
 table: dateFormat(TIMESTAMP, STRING)
-description: Converts timestamp to a value of string in the format 
specified by the date format string. The format string is compatible with 
Java's SimpleDateFormat.
+description: Converts timestamp to a value of string in the format 
specified by the date format string. The format string is compatible with 
Java's DateTimeFormatter.
+  - sql: DATE_FORMAT(string, string)
+table: dateFormat(STRING, STRING)
+description: Re-format a timestamp string to another string, using a 
custom format. The format string is compatible with Java's SimpleDateFormat.

Review Comment:
   Could we do the same for `UNIX_TIMESTAMP(string1[, string2])` 
(https://github.com/apache/flink/pull/25828/files#diff-539fb22ee6aeee4cf07230bb4155500c6680c4cc889260e2c58bfa9d63fb7de5R662)
 please?
   
   
https://github.com/apache/flink/blob/7fa4f78e43bfd99a7e7a3d655a9a3641e4d7bc59/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java#L1468
 calls into 
https://github.com/apache/flink/blob/7fa4f78e43bfd99a7e7a3d655a9a3641e4d7bc59/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/DateTimeUtils.java#L960
 too - so the format is also using SimpleDateFormat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25828:
URL: https://github.com/apache/flink/pull/25828#discussion_r1893959599


##
flink-python/pyflink/table/expressions.py:
##
@@ -350,14 +350,19 @@ def temporal_overlaps(left_time_point,
 def date_format(timestamp, format) -> Expression:
 """
 Formats a timestamp as a string using a specified format.
-The format must be compatible with MySQL's date formatting syntax as used 
by the
-date_parse function.

Review Comment:
   if this has been removed - do we not need to deprecate first before 
replacing? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25828:
URL: https://github.com/apache/flink/pull/25828#discussion_r1893960744


##
flink-python/pyflink/table/expressions.py:
##
@@ -350,14 +350,19 @@ def temporal_overlaps(left_time_point,
 def date_format(timestamp, format) -> Expression:
 """
 Formats a timestamp as a string using a specified format.
-The format must be compatible with MySQL's date formatting syntax as used 
by the
-date_parse function.
 
-For example `date_format(col("time"), "%Y, %d %M")` results in strings 
formatted as
-"2017, 05 May".
+Supported functions:
+1. date_format(TIMESTAMP, STRING) -> STRING

Review Comment:
   we should mention the difference between the supported format strings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36923) Bump flink-connector-hbase to Flink 1.20

2024-12-20 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-36923:
-
Affects Version/s: (was: hbase-4.1.0)

> Bump flink-connector-hbase to Flink 1.20
> 
>
> Key: FLINK-36923
> URL: https://issues.apache.org/jira/browse/FLINK-36923
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / HBase
>Reporter: Yanquan Lv
>Assignee: Yanquan Lv
>Priority: Major
>  Labels: pull-request-available
> Fix For: hbase-4.1.0
>
>
> Considering we've released flink-connector-hbase 4.0.0 that supported Flink 
> 1.18 & Flink 1.19, we can start bumping flink-connector-hbase to Flink 1.19 
> to support Flink 1.19 & Flink 1.20 for next release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894201016


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894146265


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

[jira] [Updated] (FLINK-36923) Bump flink-connector-hbase to Flink 1.20

2024-12-20 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-36923:
-
Fix Version/s: hbase-4.1.0

> Bump flink-connector-hbase to Flink 1.20
> 
>
> Key: FLINK-36923
> URL: https://issues.apache.org/jira/browse/FLINK-36923
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / HBase
>Affects Versions: hbase-4.1.0
>Reporter: Yanquan Lv
>Assignee: Yanquan Lv
>Priority: Major
>  Labels: pull-request-available
> Fix For: hbase-4.1.0
>
>
> Considering we've released flink-connector-hbase 4.0.0 that supported Flink 
> 1.18 & Flink 1.19, we can start bumping flink-connector-hbase to Flink 1.19 
> to support Flink 1.19 & Flink 1.20 for next release.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36862][table] Implement additional TO_TIMESTAMP_LTZ() functions [flink]

2024-12-20 Thread via GitHub


yiyutian1 commented on code in PR #25763:
URL: https://github.com/apache/flink/pull/25763#discussion_r1894146265


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/ToTimestampLtzFunction.java:
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+import org.apache.flink.table.utils.DateTimeUtils;
+
+import javax.annotation.Nullable;
+
+import java.time.DateTimeException;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+
+import static org.apache.flink.table.utils.DateTimeUtils.parseTimestampData;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#TO_TIMESTAMP_LTZ}.
+ *
+ * A function that converts various time formats to TIMESTAMP_LTZ type.
+ *
+ * Supported function signatures:
+ *
+ * 
+ *   {@code TO_TIMESTAMP_LTZ(numeric)} -> TIMESTAMP_LTZ(3) 
+ *   Converts numeric epoch time in milliseconds to timestamp with local 
timezone
+ *   {@code TO_TIMESTAMP_LTZ(numeric, precision)} -> 
TIMESTAMP_LTZ(precision) 
+ *   Converts numeric epoch time to timestamp with specified precision (0 
as seconds, 3 as
+ *   milliseconds)
+ *   {@code TO_TIMESTAMP_LTZ(timestamp)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using default format '-MM-dd HH:mm:ss'
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format)} -> TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input string of format
+ *   {@code TO_TIMESTAMP_LTZ(timestamp, format, timezone)} -> 
TIMESTAMP_LTZ(3) 
+ *   Parses string timestamp using input strings of format and timezone
+ * 
+ *
+ * Example:
+ *
+ * {@code
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ(1234567890123)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ(1234567890, 0) // Converts epoch seconds
+ * TO_TIMESTAMP_LTZ(1234567890123, 3)  // Converts epoch milliseconds
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00')  // Parses string using default 
format
+ * TO_TIMESTAMP_LTZ('2023-01-01T10:00:00', '-MM-dd\'T\'HH:mm:ss') // 
Parses string using input format
+ * TO_TIMESTAMP_LTZ('2023-01-01 10:00:00', '-MM-dd HH:mm:ss', 'UTC') // 
Parses string using input format and timezone
+ * }
+ */
+@Internal
+public class ToTimestampLtzFunction extends BuiltInScalarFunction {
+
+private static final int DEFAULT_PRECISION = 3;
+
+public ToTimestampLtzFunction(SpecializedFunction.SpecializedContext 
context) {
+super(BuiltInFunctionDefinitions.TO_TIMESTAMP_LTZ, context);
+}
+
+public @Nullable TimestampData eval(Number epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+if (epoch instanceof Float || epoch instanceof Double) {
+return DateTimeUtils.toTimestampData(epoch.doubleValue(), 
precision);
+}
+return DateTimeUtils.toTimestampData(epoch.longValue(), precision);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch, Integer precision) {
+if (epoch == null || precision == null) {
+return null;
+}
+
+return DateTimeUtils.toTimestampData(epoch, precision);
+}
+
+public @Nullable TimestampData eval(Number epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(DecimalData epoch) {
+return eval(epoch, DEFAULT_PRECISION);
+}
+
+public @Nullable TimestampData eval(StringData timestamp) {
+if (timestamp == null) {
+return null;
+}
+
+return parseTimestampData(timestamp.toString());
+}
+
+public @Nullable TimestampData eval(StringData timestamp, StringData 
format) {
+if (timestamp == null || format == null) {
+return null;
+}
+
+return parseTimes

Re: [PR] [FLINK-36923] Bump flink version to 1.19.1, adapt CI workflow [flink-connector-hbase]

2024-12-20 Thread via GitHub


ferenc-csaky commented on PR #54:
URL: 
https://github.com/apache/flink-connector-hbase/pull/54#issuecomment-2557428849

   Hi, thanks for your contribution! I am planning to try to include at least 
JDK 17 (and possibly 21) support for the next release, which will require to 
bump HBase version and pretty much all Hadoop related dependencies. I spent 
some time on that before releasing 4.0, but it is not trivial and I ran into 
some runtime issues, that's where I decided to cut scope and release to only 
JDK8 and 11.
   
   Including these version updates makes sense anyway, so I do not see a 
problem merging it on its own.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]

2024-12-20 Thread via GitHub


afedulov commented on code in PR #25656:
URL: https://github.com/apache/flink/pull/25656#discussion_r1894239482


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -1029,6 +1029,11 @@ private TableResultInternal executeInternal(
 defaultJobName,
 jobStatusHookList);
 try {
+// Current context class loader here is the application class 
loader.

Review Comment:
   @ammu20-dev @davidradl @seb-pereira 
   This modifies a pretty critical execution path. How confident are we that it 
doesn't introduce regressions? Specifically, could any existing 
functionality—such as built-in serializers for various connector formats—break 
if libraries are inadvertently loaded from user JARs instead of Flink's parent 
class loader?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36940) Why does autoscaler Math.round the TargetProcessingCapacity

2024-12-20 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907300#comment-17907300
 ] 

Gyula Fora commented on FLINK-36940:


Good question , I don’t remember right now . For the metrics we round to 2/3 
decimal digits to improve logging and greatly reduce storage costs

here I guess rounding could be avoided 

> Why does autoscaler Math.round the TargetProcessingCapacity
> ---
>
> Key: FLINK-36940
> URL: https://issues.apache.org/jira/browse/FLINK-36940
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Priority: Major
>
> AutoScalerUtils#getTargetProcessingCapacity[1] calls 
> Math.round(lagCatchupTargetRate + restartCatchupRate + 
> inputTargetAtUtilization);
>  
> The return type is double, I don't know why need Math.round here.
>  
> I'm writing some end to end tests[2] for autoscaler, and the recommended 
> parallelism is unexpected. After analysis, I found Math.round is the root 
> cause. Following is my test[2] core logic:
> The TARGET_UTILIZATION is 0.8, and BusyTimePerSec is always 800 for source, 
> so we expect the source parallelism won't be changed.
> But the recommended parallelism is changed from 100 to 96.
>  
> Note: The processing rate of production jobs is usually very high, so 
> rounding has almost no effect on production jobs.
>  
>  [1] 
> [https://github.com/apache/flink-kubernetes-operator/blob/091e803a6ae713ebe839742694ab6ca53249c4dd/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java#L75]
> [2] 
> [https://github.com/1996fanrui/flink-kubernetes-operator/commit/5420f59a12e924f5f47a7dde8d79f5da9a2b4917#diff-b922ac4031d391fa030b25fba77b453736d43156a0edef5b436fee1d8241295fR158]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36940) Why does autoscaler Math.round the TargetProcessingCapacity

2024-12-20 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36940?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907300#comment-17907300
 ] 

Gyula Fora edited comment on FLINK-36940 at 12/20/24 8:40 AM:
--

Good question , I don’t remember right now . For the metrics we round to 2/3 
decimal digits to improve logging and greatly reduce storage costs

here I guess rounding could be avoided  or apply the same 3 decimal point logic 


was (Author: gyfora):
Good question , I don’t remember right now . For the metrics we round to 2/3 
decimal digits to improve logging and greatly reduce storage costs

here I guess rounding could be avoided 

> Why does autoscaler Math.round the TargetProcessingCapacity
> ---
>
> Key: FLINK-36940
> URL: https://issues.apache.org/jira/browse/FLINK-36940
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Priority: Major
>
> AutoScalerUtils#getTargetProcessingCapacity[1] calls 
> Math.round(lagCatchupTargetRate + restartCatchupRate + 
> inputTargetAtUtilization);
>  
> The return type is double, I don't know why need Math.round here.
>  
> I'm writing some end to end tests[2] for autoscaler, and the recommended 
> parallelism is unexpected. After analysis, I found Math.round is the root 
> cause. Following is my test[2] core logic:
> The TARGET_UTILIZATION is 0.8, and BusyTimePerSec is always 800 for source, 
> so we expect the source parallelism won't be changed.
> But the recommended parallelism is changed from 100 to 96.
>  
> Note: The processing rate of production jobs is usually very high, so 
> rounding has almost no effect on production jobs.
>  
>  [1] 
> [https://github.com/apache/flink-kubernetes-operator/blob/091e803a6ae713ebe839742694ab6ca53249c4dd/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java#L75]
> [2] 
> [https://github.com/1996fanrui/flink-kubernetes-operator/commit/5420f59a12e924f5f47a7dde8d79f5da9a2b4917#diff-b922ac4031d391fa030b25fba77b453736d43156a0edef5b436fee1d8241295fR158]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35921) Flink SQL physical operator replacement support

2024-12-20 Thread Wang Qilong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Qilong closed FLINK-35921.
---
Resolution: Fixed

> Flink SQL physical operator replacement support
> ---
>
> Key: FLINK-35921
> URL: https://issues.apache.org/jira/browse/FLINK-35921
> Project: Flink
>  Issue Type: Bug
>Reporter: Wang Qilong
>Priority: Major
>
>  
> Does Flinksql provide some SPI implementations that support custom physical 
> operators, such as customizing a new StreamExecFileSourceScan and supporting 
> rule injection for converting logical operators to physical operators
> I want to implement the function of adding some additional physical 
> operators. A better option may be to inject ColumnarRule similar to Spark, 
> which can insert some new physical operators when converting logical 
> operators to physical operators. However, I have not found that Flink has 
> similar functions
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator

2024-12-20 Thread Wang Qilong (Jira)
Wang Qilong created FLINK-36942:
---

 Summary: Unable to trigger onEventTime() when using the async 
state API in RowTimestamportOperator
 Key: FLINK-36942
 URL: https://issues.apache.org/jira/browse/FLINK-36942
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 2.0.0
Reporter: Wang Qilong


When I use the async state API in RowTimestamportOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires

Regarding how to reproduce:
in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
And execute the tests in RowTimestamportOperatorTest

I think this is a very strange bug because it can be triggered smoothly in 
windowJoinOperator, but cannot be implemented in RowTimestamportOperator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator

2024-12-20 Thread Wang Qilong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Qilong updated FLINK-36942:

Description: 
When I use the async state API in RowTimeSortOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires

Regarding how to reproduce:
in 
[AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator]
And execute the tests in RowTimestamportOperatorTest

I think this is a very strange bug because it can be triggered smoothly in 
[AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], but 
cannot be implemented in RowTimestamportOperator

  was:
When I use the async state API in RowTimeSortOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires

Regarding how to reproduce:
in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
And execute the tests in RowTimestamportOperatorTest

I think this is a very strange bug because it can be triggered smoothly in 
[AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], but 
cannot be implemented in RowTimestamportOperator


> Unable to trigger onEventTime() when using the async state API in 
> RowTimestamportOperator
> -
>
> Key: FLINK-36942
> URL: https://issues.apache.org/jira/browse/FLINK-36942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 2.0.0
>Reporter: Wang Qilong
>Priority: Major
>
> When I use the async state API in RowTimeSortOperator and inherit it from 
> AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the 
> timer expires
> Regarding how to reproduce:
> in 
> [AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator]
> And execute the tests in RowTimestamportOperatorTest
> I think this is a very strange bug because it can be triggered smoothly in 
> [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], 
> but cannot be implemented in RowTimestamportOperator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893692402


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -649,6 +649,13 @@ public boolean dropTemporaryTable(String path) {
 }
 }
 
+@Override
+public boolean dropTable(String path) {
+UnresolvedIdentifier unresolvedIdentifier = 
getParser().parseIdentifier(path);
+ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+return catalogManager.dropTable(identifier, true);

Review Comment:
   shouldn't `true` be `ignoreIfNotExists` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36716) Address vulnerabilities in Flink UI

2024-12-20 Thread Mehdi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907318#comment-17907318
 ] 

Mehdi commented on FLINK-36716:
---

I'm going to for the PRs done to the branch 1.20 for 1.20.X release.
The list of PRs will be updated:
 * [https://github.com/apache/flink/pull/25827 (Update CI to Ubuntu 22.04 
(Jammy))|https://github.com/apache/flink/pull/25827]
 * [https://github.com/apache/flink/pull/25794] (Update the NodeJS to v22.11.0 
(LTS))

> Address vulnerabilities in Flink UI
> ---
>
> Key: FLINK-36716
> URL: https://issues.apache.org/jira/browse/FLINK-36716
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 2.0.0, 1.20.0
>Reporter: Mehdi
>Assignee: Mehdi
>Priority: Major
> Fix For: 2.0.0, 1.20.1
>
>
> When running `npm audit` we get 36 vulnerabilities (1 low, 15 moderate, 17 
> high, 3 critical) we should address any current, open vulnerabilities.
> These critical vulnerabilities gone by raising the version of angular and we 
> do need also to raise node version, so there is two sub tasks for this ticket
> Result of the npm audit:
> {code:java}
> npm audit report@adobe/css-tools  <=4.3.1
> Severity: moderate
> @adobe/css-tools Regular Expression Denial of Service (ReDOS) while Parsing 
> CSS - https://github.com/advisories/GHSA-hpx4-r86g-5jrg
> @adobe/css-tools Improper Input Validation and Inefficient Regular Expression 
> Complexity - https://github.com/advisories/GHSA-prr3-c3m5-p7q2
> fix available via `npm audit fix`
> node_modules/@adobe/css-tools@babel/traverse  <7.23.2
> Severity: critical
> Babel vulnerable to arbitrary code execution when compiling specifically 
> crafted malicious code - https://github.com/advisories/GHSA-67hx-6x53-jw92
> fix available via `npm audit fix`
> node_modules/@babel/traversebody-parser  <1.20.3
> Severity: high
> body-parser vulnerable to denial of service when url encoding is enabled - 
> https://github.com/advisories/GHSA-qwcr-r2fm-qrc7
> fix available via `npm audit fix`
> node_modules/body-parser
>   express  <=4.21.0 || 5.0.0-alpha.1 - 5.0.0
>   Depends on vulnerable versions of body-parser
>   Depends on vulnerable versions of cookie
>   Depends on vulnerable versions of path-to-regexp
>   Depends on vulnerable versions of send
>   Depends on vulnerable versions of serve-static
>   node_modules/expressbraces  <3.0.3
> Severity: high
> Uncontrolled resource consumption in braces - 
> https://github.com/advisories/GHSA-grv7-fg5c-xmjg
> fix available via `npm audit fix`
> node_modules/bracescookie  <0.7.0
> cookie accepts cookie name, path, and domain with out of bounds characters - 
> https://github.com/advisories/GHSA-pxg6-pf52-xh8x
> fix available via `npm audit fix`
> node_modules/cookie
> node_modules/express/node_modules/cookie
>   engine.io  0.7.8 - 0.7.9 || 1.8.0 - 6.6.1
>   Depends on vulnerable versions of cookie
>   Depends on vulnerable versions of ws
>   node_modules/engine.io
>     socket.io  1.6.0 - 4.7.5
>     Depends on vulnerable versions of engine.io
>     node_modules/socket.iod3-color  <3.1.0
> Severity: high
> d3-color vulnerable to ReDoS - 
> https://github.com/advisories/GHSA-36jr-mh4h-2g58
> fix available via `npm audit fix`
> node_modules/d3-interpolate/node_modules/d3-color
>   d3-interpolate  0.1.3 - 2.0.1
>   Depends on vulnerable versions of d3-color
>   node_modules/d3-interpolate
>     @antv/g-base  <=0.5.11
>     Depends on vulnerable versions of d3-interpolate
>     node_modules/@antv/g-basefollow-redirects  <=1.15.5
> Severity: moderate
> Follow Redirects improperly handles URLs in the url.parse() function - 
> https://github.com/advisories/GHSA-jchw-25xp-jwwc
> follow-redirects' Proxy-Authorization header kept across hosts - 
> https://github.com/advisories/GHSA-cxjh-pqwp-8mfp
> fix available via `npm audit fix`
> node_modules/follow-redirectshttp-proxy-middleware  <2.0.7
> Severity: high
> Denial of service in http-proxy-middleware - 
> https://github.com/advisories/GHSA-c7qv-q95q-8v27
> fix available via `npm audit fix`
> node_modules/http-proxy-middlewareip  *
> Severity: high
> NPM IP package incorrectly identifies some private IP addresses as public - 
> https://github.com/advisories/GHSA-78xj-cgh5-2h22
> ip SSRF improper categorization in isPublic - 
> https://github.com/advisories/GHSA-2p57-rm9w-gvfp
> fix available via `npm audit fix`
> node_modules/iploader-utils  3.0.0 - 3.2.0
> Severity: high
> loader-utils is vulnerable to Regular Expression Denial of Service (ReDoS) 
> via url variable - https://github.com/advisories/GHSA-3rfm-jhwj-7488
> loader-utils is vulnerable to Regular Expression Denial of Service (ReDoS) - 
> https://github.com/advisories/GHSA-hhq3-ff78-jv3g
> fix available via `npm audit fix`
> node_mod

[jira] [Comment Edited] (FLINK-36716) Address vulnerabilities in Flink UI

2024-12-20 Thread Mehdi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907318#comment-17907318
 ] 

Mehdi edited comment on FLINK-36716 at 12/20/24 9:39 AM:
-

I'm going to for the PRs done to the branch 1.20 for 1.20.X release.
The list of PRs will be updated:
 * [https://github.com/apache/flink/pull/25827 
|https://github.com/apache/flink/pull/25827]Update CI to Ubuntu 22.04 (Jammy)
 * [https://github.com/apache/flink/pull/25794] Update the NodeJS to v22.11.0 
(LTS)


was (Author: JIRAUSER301654):
I'm going to for the PRs done to the branch 1.20 for 1.20.X release.
The list of PRs will be updated:
 * [https://github.com/apache/flink/pull/25827 
|https://github.com/apache/flink/pull/25827](Update CI to Ubuntu 22.04 (Jammy))
 * [https://github.com/apache/flink/pull/25794] (Update the NodeJS to v22.11.0 
(LTS))

> Address vulnerabilities in Flink UI
> ---
>
> Key: FLINK-36716
> URL: https://issues.apache.org/jira/browse/FLINK-36716
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 2.0.0, 1.20.0
>Reporter: Mehdi
>Assignee: Mehdi
>Priority: Major
> Fix For: 2.0.0, 1.20.1
>
>
> When running `npm audit` we get 36 vulnerabilities (1 low, 15 moderate, 17 
> high, 3 critical) we should address any current, open vulnerabilities.
> These critical vulnerabilities gone by raising the version of angular and we 
> do need also to raise node version, so there is two sub tasks for this ticket
> Result of the npm audit:
> {code:java}
> npm audit report@adobe/css-tools  <=4.3.1
> Severity: moderate
> @adobe/css-tools Regular Expression Denial of Service (ReDOS) while Parsing 
> CSS - https://github.com/advisories/GHSA-hpx4-r86g-5jrg
> @adobe/css-tools Improper Input Validation and Inefficient Regular Expression 
> Complexity - https://github.com/advisories/GHSA-prr3-c3m5-p7q2
> fix available via `npm audit fix`
> node_modules/@adobe/css-tools@babel/traverse  <7.23.2
> Severity: critical
> Babel vulnerable to arbitrary code execution when compiling specifically 
> crafted malicious code - https://github.com/advisories/GHSA-67hx-6x53-jw92
> fix available via `npm audit fix`
> node_modules/@babel/traversebody-parser  <1.20.3
> Severity: high
> body-parser vulnerable to denial of service when url encoding is enabled - 
> https://github.com/advisories/GHSA-qwcr-r2fm-qrc7
> fix available via `npm audit fix`
> node_modules/body-parser
>   express  <=4.21.0 || 5.0.0-alpha.1 - 5.0.0
>   Depends on vulnerable versions of body-parser
>   Depends on vulnerable versions of cookie
>   Depends on vulnerable versions of path-to-regexp
>   Depends on vulnerable versions of send
>   Depends on vulnerable versions of serve-static
>   node_modules/expressbraces  <3.0.3
> Severity: high
> Uncontrolled resource consumption in braces - 
> https://github.com/advisories/GHSA-grv7-fg5c-xmjg
> fix available via `npm audit fix`
> node_modules/bracescookie  <0.7.0
> cookie accepts cookie name, path, and domain with out of bounds characters - 
> https://github.com/advisories/GHSA-pxg6-pf52-xh8x
> fix available via `npm audit fix`
> node_modules/cookie
> node_modules/express/node_modules/cookie
>   engine.io  0.7.8 - 0.7.9 || 1.8.0 - 6.6.1
>   Depends on vulnerable versions of cookie
>   Depends on vulnerable versions of ws
>   node_modules/engine.io
>     socket.io  1.6.0 - 4.7.5
>     Depends on vulnerable versions of engine.io
>     node_modules/socket.iod3-color  <3.1.0
> Severity: high
> d3-color vulnerable to ReDoS - 
> https://github.com/advisories/GHSA-36jr-mh4h-2g58
> fix available via `npm audit fix`
> node_modules/d3-interpolate/node_modules/d3-color
>   d3-interpolate  0.1.3 - 2.0.1
>   Depends on vulnerable versions of d3-color
>   node_modules/d3-interpolate
>     @antv/g-base  <=0.5.11
>     Depends on vulnerable versions of d3-interpolate
>     node_modules/@antv/g-basefollow-redirects  <=1.15.5
> Severity: moderate
> Follow Redirects improperly handles URLs in the url.parse() function - 
> https://github.com/advisories/GHSA-jchw-25xp-jwwc
> follow-redirects' Proxy-Authorization header kept across hosts - 
> https://github.com/advisories/GHSA-cxjh-pqwp-8mfp
> fix available via `npm audit fix`
> node_modules/follow-redirectshttp-proxy-middleware  <2.0.7
> Severity: high
> Denial of service in http-proxy-middleware - 
> https://github.com/advisories/GHSA-c7qv-q95q-8v27
> fix available via `npm audit fix`
> node_modules/http-proxy-middlewareip  *
> Severity: high
> NPM IP package incorrectly identifies some private IP addresses as public - 
> https://github.com/advisories/GHSA-78xj-cgh5-2h22
> ip SSRF improper categorization in isPublic - 
> https://github.com/advisories/GHSA-2p57-rm9w-gvfp
> fix available via `npm audi

[jira] [Comment Edited] (FLINK-36716) Address vulnerabilities in Flink UI

2024-12-20 Thread Mehdi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17907318#comment-17907318
 ] 

Mehdi edited comment on FLINK-36716 at 12/20/24 9:39 AM:
-

I'm going to for the PRs done to the branch 1.20 for 1.20.X release.
The list of PRs will be updated:
 * [https://github.com/apache/flink/pull/25827 
|https://github.com/apache/flink/pull/25827](Update CI to Ubuntu 22.04 (Jammy))
 * [https://github.com/apache/flink/pull/25794] (Update the NodeJS to v22.11.0 
(LTS))


was (Author: JIRAUSER301654):
I'm going to for the PRs done to the branch 1.20 for 1.20.X release.
The list of PRs will be updated:
 * [https://github.com/apache/flink/pull/25827 (Update CI to Ubuntu 22.04 
(Jammy))|https://github.com/apache/flink/pull/25827]
 * [https://github.com/apache/flink/pull/25794] (Update the NodeJS to v22.11.0 
(LTS))

> Address vulnerabilities in Flink UI
> ---
>
> Key: FLINK-36716
> URL: https://issues.apache.org/jira/browse/FLINK-36716
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Web Frontend
>Affects Versions: 2.0.0, 1.20.0
>Reporter: Mehdi
>Assignee: Mehdi
>Priority: Major
> Fix For: 2.0.0, 1.20.1
>
>
> When running `npm audit` we get 36 vulnerabilities (1 low, 15 moderate, 17 
> high, 3 critical) we should address any current, open vulnerabilities.
> These critical vulnerabilities gone by raising the version of angular and we 
> do need also to raise node version, so there is two sub tasks for this ticket
> Result of the npm audit:
> {code:java}
> npm audit report@adobe/css-tools  <=4.3.1
> Severity: moderate
> @adobe/css-tools Regular Expression Denial of Service (ReDOS) while Parsing 
> CSS - https://github.com/advisories/GHSA-hpx4-r86g-5jrg
> @adobe/css-tools Improper Input Validation and Inefficient Regular Expression 
> Complexity - https://github.com/advisories/GHSA-prr3-c3m5-p7q2
> fix available via `npm audit fix`
> node_modules/@adobe/css-tools@babel/traverse  <7.23.2
> Severity: critical
> Babel vulnerable to arbitrary code execution when compiling specifically 
> crafted malicious code - https://github.com/advisories/GHSA-67hx-6x53-jw92
> fix available via `npm audit fix`
> node_modules/@babel/traversebody-parser  <1.20.3
> Severity: high
> body-parser vulnerable to denial of service when url encoding is enabled - 
> https://github.com/advisories/GHSA-qwcr-r2fm-qrc7
> fix available via `npm audit fix`
> node_modules/body-parser
>   express  <=4.21.0 || 5.0.0-alpha.1 - 5.0.0
>   Depends on vulnerable versions of body-parser
>   Depends on vulnerable versions of cookie
>   Depends on vulnerable versions of path-to-regexp
>   Depends on vulnerable versions of send
>   Depends on vulnerable versions of serve-static
>   node_modules/expressbraces  <3.0.3
> Severity: high
> Uncontrolled resource consumption in braces - 
> https://github.com/advisories/GHSA-grv7-fg5c-xmjg
> fix available via `npm audit fix`
> node_modules/bracescookie  <0.7.0
> cookie accepts cookie name, path, and domain with out of bounds characters - 
> https://github.com/advisories/GHSA-pxg6-pf52-xh8x
> fix available via `npm audit fix`
> node_modules/cookie
> node_modules/express/node_modules/cookie
>   engine.io  0.7.8 - 0.7.9 || 1.8.0 - 6.6.1
>   Depends on vulnerable versions of cookie
>   Depends on vulnerable versions of ws
>   node_modules/engine.io
>     socket.io  1.6.0 - 4.7.5
>     Depends on vulnerable versions of engine.io
>     node_modules/socket.iod3-color  <3.1.0
> Severity: high
> d3-color vulnerable to ReDoS - 
> https://github.com/advisories/GHSA-36jr-mh4h-2g58
> fix available via `npm audit fix`
> node_modules/d3-interpolate/node_modules/d3-color
>   d3-interpolate  0.1.3 - 2.0.1
>   Depends on vulnerable versions of d3-color
>   node_modules/d3-interpolate
>     @antv/g-base  <=0.5.11
>     Depends on vulnerable versions of d3-interpolate
>     node_modules/@antv/g-basefollow-redirects  <=1.15.5
> Severity: moderate
> Follow Redirects improperly handles URLs in the url.parse() function - 
> https://github.com/advisories/GHSA-jchw-25xp-jwwc
> follow-redirects' Proxy-Authorization header kept across hosts - 
> https://github.com/advisories/GHSA-cxjh-pqwp-8mfp
> fix available via `npm audit fix`
> node_modules/follow-redirectshttp-proxy-middleware  <2.0.7
> Severity: high
> Denial of service in http-proxy-middleware - 
> https://github.com/advisories/GHSA-c7qv-q95q-8v27
> fix available via `npm audit fix`
> node_modules/http-proxy-middlewareip  *
> Severity: high
> NPM IP package incorrectly identifies some private IP addresses as public - 
> https://github.com/advisories/GHSA-78xj-cgh5-2h22
> ip SSRF improper categorization in isPublic - 
> https://github.com/advisories/GHSA-2p57-rm9w-gvfp
> fix available via `npm 

Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


snuyanzin commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893703612


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -649,6 +649,13 @@ public boolean dropTemporaryTable(String path) {
 }
 }
 
+@Override
+public boolean dropTable(String path) {
+UnresolvedIdentifier unresolvedIdentifier = 
getParser().parseIdentifier(path);
+ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+return catalogManager.dropTable(identifier, true);

Review Comment:
   i didn't get it , could you elaborate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]

2024-12-20 Thread via GitHub


yiyutian1 opened a new pull request, #25828:
URL: https://github.com/apache/flink/pull/25828

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36941) Fix Doc for DATE_FORMAT

2024-12-20 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36941:
---
Labels: pull-request-available  (was: )

> Fix Doc for DATE_FORMAT
> ---
>
> Key: FLINK-36941
> URL: https://issues.apache.org/jira/browse/FLINK-36941
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Yiyu Tian
>Priority: Major
>  Labels: pull-request-available
>
> The documentation for DATE_FORMAT is outdated. 
> https://github.com/apache/flink/blob/6951686be5691dc855b6d07d575535ac239670da/docs/data/sql_functions.yml#L645



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36943) FLIP-488: Expose Async State Processing and New State APIs in Datastream(V1) APIs

2024-12-20 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-36943:
---

 Summary: FLIP-488: Expose Async State Processing and New State 
APIs in Datastream(V1) APIs
 Key: FLINK-36943
 URL: https://issues.apache.org/jira/browse/FLINK-36943
 Project: Flink
  Issue Type: Improvement
Reporter: Zakelly Lan
Assignee: Zakelly Lan


FLIP-424 introduced a new State API set which enables the async state query, 
and FLIP-425 introduced the async state processing. These are highly 
anticipated features for Flink 2.0. For SQL users, the FLIP-473 implements SQL 
operators using the new State APIs and they could easily make use of the async 
state processing as well as the disaggregated state management. However, it is 
also important that the Datastream users could leverage the capability of async 
state query and disaggregated state management. This FLIP proposes to expose 
and enable them in Datastream(V1) APIs.

For more details, please read the FLIP-488 
https://cwiki.apache.org/confluence/x/yIrREw



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36069][runtime/rest] Extending job detail rest API to expose json stream graph [flink]

2024-12-20 Thread via GitHub


yuchen-ecnu commented on code in PR #25798:
URL: https://github.com/apache/flink/pull/25798#discussion_r1893709324


##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java:
##
@@ -79,6 +79,10 @@ public class JobDetailsInfo implements ResponseBody {
 
 public static final String FIELD_NAME_JSON_PLAN = "plan";
 
+public static final String FIELD_NAME_STREAM_GRAPH_JSON_PLAN = 
"stream-graph-plan";
+
+public static final String FIELD_NAME_PENDING_OPERATOR_COUNT = 
"pending-operator-count";

Review Comment:
   Updated.



##
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java:
##
@@ -47,6 +47,13 @@ public interface AccessExecutionGraph extends 
JobStatusProvider {
  */
 String getJsonPlan();
 
+/**
+ * Returns the stream graph as a JSON string.
+ *
+ * @return stream graph as a JSON string, or empty string if the job 
submitted with JobGraph.

Review Comment:
   Updated.



##
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java:
##
@@ -141,6 +142,11 @@ private static JobDetailsInfo createJobDetailsInfo(
 executionState, 
jobVerticesPerState[executionState.ordinal()]);
 }
 
+JobPlanInfo.RawJson streamGraphJson = null;
+if 
(!StringUtils.isNullOrWhitespaceOnly(executionGraph.getStreamGraphJson())) {

Review Comment:
   That's true, but it's safe to check non-null at the same time by 
`StringUtils.isNullOrWhitespaceOnly`? WDYT?



##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java:
##
@@ -62,4 +62,13 @@ public interface StreamGraphContext {
  * @return true if all modifications were successful and applied 
atomically, false otherwise.
  */
 boolean modifyStreamEdge(List requestInfos);
+
+interface StreamGraphUpdateListener {

Review Comment:
   Updated.



##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphContext.java:
##
@@ -62,4 +62,13 @@ public interface StreamGraphContext {
  * @return true if all modifications were successful and applied 
atomically, false otherwise.
  */
 boolean modifyStreamEdge(List requestInfos);
+
+interface StreamGraphUpdateListener {
+/**
+ * This method is called whenever the StreamGraph is updated.
+ *
+ * @param streamGraph the updated StreamGraph
+ */
+void onStreamGraphUpdated(StreamGraph streamGraph);

Review Comment:
   It's true that `AdaptiveGraphManager` hold a stream graph, but the 
`StreamGraphContext` should notify it's listener with the updated stream graph 
(since the stream graph in the `AdaptiveGraphManager` maybe outdated?).
   Just like `JobStatusListener` should notify listener with `newJobStatus`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]

2024-12-20 Thread via GitHub


flinkbot commented on PR #25828:
URL: https://github.com/apache/flink/pull/25828#issuecomment-2556636543

   
   ## CI report:
   
   * 5a9ea558b896c515f962554f0a5b33e7fda015dd UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36069][runtime/rest] Extending job detail rest API to expose json stream graph [flink]

2024-12-20 Thread via GitHub


yuchen-ecnu commented on code in PR #25798:
URL: https://github.com/apache/flink/pull/25798#discussion_r1893709533


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java:
##
@@ -291,6 +299,8 @@ private List 
createJobVerticesAndUpdateGraph(List streamN
 
 generateConfigForJobVertices(jobVertexBuildContext);
 
+onStreamGraphUpdated(streamGraph);

Review Comment:
   I added a private function `generateStreamGraphJson()` to 
`AdaptiveGraphManager` and called it in `onStreamGraphUpdated` & here to make 
it clearer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893692402


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -649,6 +649,13 @@ public boolean dropTemporaryTable(String path) {
 }
 }
 
+@Override
+public boolean dropTable(String path) {
+UnresolvedIdentifier unresolvedIdentifier = 
getParser().parseIdentifier(path);
+ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
+return catalogManager.dropTable(identifier, true);

Review Comment:
   `true` should be `ignoreIfNotExists` here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36944) Introduce 'enableAsycState' in DataStream v1 interfaces

2024-12-20 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-36944:
---

 Summary: Introduce 'enableAsycState' in DataStream v1 interfaces
 Key: FLINK-36944
 URL: https://issues.apache.org/jira/browse/FLINK-36944
 Project: Flink
  Issue Type: Sub-task
  Components: API / DataStream
Reporter: Zakelly Lan
Assignee: Zakelly Lan






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36943) FLIP-488: Expose Async State Processing and New State APIs in Datastream(V1) APIs

2024-12-20 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan updated FLINK-36943:

Component/s: API / DataStream
 Runtime / Async State Processing

> FLIP-488: Expose Async State Processing and New State APIs in Datastream(V1) 
> APIs
> -
>
> Key: FLINK-36943
> URL: https://issues.apache.org/jira/browse/FLINK-36943
> Project: Flink
>  Issue Type: Improvement
>  Components: API / DataStream, Runtime / Async State Processing
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>
> FLIP-424 introduced a new State API set which enables the async state query, 
> and FLIP-425 introduced the async state processing. These are highly 
> anticipated features for Flink 2.0. For SQL users, the FLIP-473 implements 
> SQL operators using the new State APIs and they could easily make use of the 
> async state processing as well as the disaggregated state management. 
> However, it is also important that the Datastream users could leverage the 
> capability of async state query and disaggregated state management. This FLIP 
> proposes to expose and enable them in Datastream(V1) APIs.
> For more details, please read the FLIP-488 
> https://cwiki.apache.org/confluence/x/yIrREw



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [BP-1.20][FLINK-36689][Runtime/Web Frontend] Update ng-zorro-antd to v18 [flink]

2024-12-20 Thread via GitHub


mehdid93 opened a new pull request, #25829:
URL: https://github.com/apache/flink/pull/25829

   ## What is the purpose of the change
   
   This PR backport the changes done of the PR made by @simplejason 
(https://github.com/apache/flink/pull/25713) in master for version 1.20.X to be 
used in dependencies upgrade.
   
   
   ## Brief change log
   
   - Angular: v18.2.13
   - ng-zorro-antd: v18.2.1
   - typescript: v5.4.5
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36763 / 36690][runtime] Support new "distributed" schema evolution topology & fix parallelized hang glitch [flink-cdc]

2024-12-20 Thread via GitHub


yuxiqian commented on code in PR #3801:
URL: https://github.com/apache/flink-cdc/pull/3801#discussion_r1893623717


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java:
##
@@ -0,0 +1,437 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.distributed;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry;
+import 
org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
+import 
org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSchemaRequest;
+import 
org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
+import 
org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import 
org.apache.flink.shaded.guava31.com.google.common.collect.HashBasedTable;
+import org.apache.flink.shaded.guava31.com.google.common.collect.HashMultimap;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Multimap;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Table;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils.wrap;
+
+/** Coordinator node for {@link SchemaOperator}. Registry actor in distributed 
topology. */
+public class SchemaCoordinator extends SchemaRegistry {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SchemaCoordinator.class);
+
+/** Atomic finite state machine to track global schema evolving state. */
+private transient AtomicReference evolvingStatus;
+
+/** Request futures from pending schema mappers. */
+private transient Map<
+Integer, Tuple2>>
+pendingRequests;
+
+/** Tracing sink writers that have flushed successfully. */
+protected transient Set flushedSinkWriters;
+
+/**
+ * Transient upstream table schema. The second arity is source partition 
ID, because in
+ * distributed topology, schemas might vary among partitions, so we can't 
rely on {@code
+ * schemaManager} to store original schemas.
+ */
+private transient Table upstreamSchemaTable;
+
+/**
+ * In distributed topology, one schema change event will be broadcast 
N-times (N = downstream
+ * parallelism). We need to effectively ignore duplicate ones since not 
all {@link
+ * SchemaChangeEvent}s are idempotent.
+ */
+private transient Multimap, Integer>
+   

Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


snuyanzin commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893604056


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1030,6 +1030,17 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryTable(String path);
 
+/**
+ * Drops a table registered in the given path.

Review Comment:
   yep, even more `DROP TABLE` via `DropTableOpeartion` leads execution to the 
same code
   
https://github.com/apache/flink/blob/84044f45830542d13d6c9baeb2bfe30de3eac4ac/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java#L71-L75



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][docs] Fix Apache Avro Specification Link. [flink]

2024-12-20 Thread via GitHub


davidradl commented on PR #25769:
URL: https://github.com/apache/flink/pull/25769#issuecomment-2556549160

   @rmetzger as discussed at the Chi meeting are you ok to merge this? @donPain 
can you raise a separate issue to track the variable replacement improvement 
idea please. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator

2024-12-20 Thread Wang Qilong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Qilong updated FLINK-36942:

Description: 
When I use the async state API in RowTimeSortOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires

Regarding how to reproduce:
in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
And execute the tests in RowTimestamportOperatorTest

I think this is a very strange bug because it can be triggered smoothly in 
[AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], but 
cannot be implemented in RowTimestamportOperator

  was:
When I use the async state API in RowTimeSortOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires

Regarding how to reproduce:
in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
And execute the tests in RowTimestamportOperatorTest

I think this is a very strange bug because it can be triggered smoothly in 
AsyncStateWindowJoinOperator[https://github.com/apache/flink/pull/25815], but 
cannot be implemented in RowTimestamportOperator


> Unable to trigger onEventTime() when using the async state API in 
> RowTimestamportOperator
> -
>
> Key: FLINK-36942
> URL: https://issues.apache.org/jira/browse/FLINK-36942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 2.0.0
>Reporter: Wang Qilong
>Priority: Major
>
> When I use the async state API in RowTimeSortOperator and inherit it from 
> AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the 
> timer expires
> Regarding how to reproduce:
> in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
> And execute the tests in RowTimestamportOperatorTest
> I think this is a very strange bug because it can be triggered smoothly in 
> [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], 
> but cannot be implemented in RowTimestamportOperator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator

2024-12-20 Thread Wang Qilong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Qilong updated FLINK-36942:

Description: 
When I use the async state API in RowTimeSortOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires

Regarding how to reproduce:
in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
And execute the tests in RowTimestamportOperatorTest

I think this is a very strange bug because it can be triggered smoothly in 
windowJoinOperator, but cannot be implemented in RowTimestamportOperator

  was:
When I use the async state API in RowTimestamportOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires

Regarding how to reproduce:
in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
And execute the tests in RowTimestamportOperatorTest

I think this is a very strange bug because it can be triggered smoothly in 
windowJoinOperator, but cannot be implemented in RowTimestamportOperator


> Unable to trigger onEventTime() when using the async state API in 
> RowTimestamportOperator
> -
>
> Key: FLINK-36942
> URL: https://issues.apache.org/jira/browse/FLINK-36942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 2.0.0
>Reporter: Wang Qilong
>Priority: Major
>
> When I use the async state API in RowTimeSortOperator and inherit it from 
> AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the 
> timer expires
> Regarding how to reproduce:
> in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
> And execute the tests in RowTimestamportOperatorTest
> I think this is a very strange bug because it can be triggered smoothly in 
> windowJoinOperator, but cannot be implemented in RowTimestamportOperator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [BP-1.20][FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-12-20 Thread via GitHub


mehdid93 opened a new pull request, #25827:
URL: https://github.com/apache/flink/pull/25827

   ## What is the purpose of the change
   
   This PR backport the changes done of the PR made by @zentol 
(https://github.com/apache/flink/pull/25708) in master for version 1.20.X to be 
used in dependencies upgrade.
   
   
   ## Brief change log
   
   - Ubuntu CI image version is no more v18 but v22
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893679664


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1040,6 +1066,32 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryView(String path);
 
+/**
+ * Drops a view registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryView}. This method
+ * can only drop permanent objects.
+ *
+ * * Compared to SQL, this method will not throw an error if the table 
does not exist. Use
+ * {@link #dropView(java.lang.String, boolean)} to change the default 
behavior.
+ *
+ * @return true if a view existed in the given path and was removed
+ */
+boolean dropView(String path);
+
+/**
+ * Drops a view registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryView}. This method
+ * can only drop permanent objects.
+ *
+ * @return true if a view existed in the given path and was removed Throws 
{@link
+ * ValidationException} if view not exists and ignoreIfNotExists is 
false
+ */
+boolean dropView(String path, boolean ignoreIfNotExists);

Review Comment:
   missing @params javadoc



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1030,6 +1030,32 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryTable(String path);
 
+/**
+ * Drops a table registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryTable}. This method
+ * can only drop permanent objects.
+ *
+ * Compared to SQL, this method will not throw an error if the table 
does not exist. Use
+ * {@link #dropTable(java.lang.String, boolean)} to change the default 
behavior.
+ *
+ * @return true if a table existed in the given path and was removed

Review Comment:
   missing @params javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893681601


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1040,6 +1066,32 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryView(String path);
 
+/**
+ * Drops a view registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryView}. This method
+ * can only drop permanent objects.
+ *
+ * * Compared to SQL, this method will not throw an error if the table 
does not exist. Use

Review Comment:
   table -> view 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-1.20][FLINK-34194] Update CI to Ubuntu 22.04 (Jammy) [flink]

2024-12-20 Thread via GitHub


flinkbot commented on PR #25827:
URL: https://github.com/apache/flink/pull/25827#issuecomment-2556600135

   
   ## CI report:
   
   * 565408f120180d245b935b09165a43bac4731bd7 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator

2024-12-20 Thread Wang Qilong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Qilong updated FLINK-36942:

Description: 
When I use the async state API in RowTimeSortOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires

Regarding how to reproduce:
in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
And execute the tests in RowTimestamportOperatorTest

I think this is a very strange bug because it can be triggered smoothly in 
AsyncStateWindowJoinOperator[https://github.com/apache/flink/pull/25815], but 
cannot be implemented in RowTimestamportOperator

  was:
When I use the async state API in RowTimeSortOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires

Regarding how to reproduce:
in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
And execute the tests in RowTimestamportOperatorTest

I think this is a very strange bug because it can be triggered smoothly in 
windowJoinOperator, but cannot be implemented in RowTimestamportOperator


> Unable to trigger onEventTime() when using the async state API in 
> RowTimestamportOperator
> -
>
> Key: FLINK-36942
> URL: https://issues.apache.org/jira/browse/FLINK-36942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 2.0.0
>Reporter: Wang Qilong
>Priority: Major
>
> When I use the async state API in RowTimeSortOperator and inherit it from 
> AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the 
> timer expires
> Regarding how to reproduce:
> in  https://github.com/Au-Miner/flink/tree/RowTimeSortOperator
> And execute the tests in RowTimestamportOperatorTest
> I think this is a very strange bug because it can be triggered smoothly in 
> AsyncStateWindowJoinOperator[https://github.com/apache/flink/pull/25815], but 
> cannot be implemented in RowTimestamportOperator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


twalthr commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893677952


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1040,6 +1066,32 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryView(String path);
 
+/**
+ * Drops a view registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryView}. This method
+ * can only drop permanent objects.
+ *
+ * * Compared to SQL, this method will not throw an error if the table 
does not exist. Use

Review Comment:
   ```suggestion
* Compared to SQL, this method will not throw an error if the table 
does not exist. Use
   ```



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1040,6 +1066,32 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryView(String path);
 
+/**
+ * Drops a view registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryView}. This method
+ * can only drop permanent objects.
+ *
+ * * Compared to SQL, this method will not throw an error if the table 
does not exist. Use
+ * {@link #dropView(java.lang.String, boolean)} to change the default 
behavior.
+ *
+ * @return true if a view existed in the given path and was removed
+ */
+boolean dropView(String path);
+
+/**
+ * Drops a view registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryView}. This method
+ * can only drop permanent objects.
+ *
+ * @return true if a view existed in the given path and was removed Throws 
{@link
+ * ValidationException} if view not exists and ignoreIfNotExists is 
false

Review Comment:
   ```suggestion
* ValidationException} if the view does not exist and 
ignoreIfNotExists is false
   ```



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1040,6 +1066,32 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryView(String path);
 
+/**
+ * Drops a view registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryView}. This method
+ * can only drop permanent objects.
+ *
+ * * Compared to SQL, this method will not throw an error if the table 
does not exist. Use
+ * {@link #dropView(java.lang.String, boolean)} to change the default 
behavior.
+ *
+ * @return true if a view existed in the given path and was removed
+ */
+boolean dropView(String path);
+
+/**
+ * Drops a view registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryView}. This method
+ * can only drop permanent objects.
+ *
+ * @return true if a view existed in the given path and was removed Throws 
{@link

Review Comment:
   ```suggestion
* @return true if a view existed in the given path and was removed, 
throws {@link
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36942) Unable to trigger onEventTime() when using the async state API in RowTimestamportOperator

2024-12-20 Thread Wang Qilong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Qilong updated FLINK-36942:

Description: 
When I use the async state API in RowTimeSortOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires.

Regarding how to reproduce:
In 
[AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator]
 and execute the tests in RowTimestamportOperatorTest.

I think this is a very strange bug because it can be triggered smoothly in 
[AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], but 
cannot be implemented in RowTimestamportOperator.

  was:
When I use the async state API in RowTimeSortOperator and inherit it from 
AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the timer 
expires

Regarding how to reproduce:
in 
[AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator]
And execute the tests in RowTimestamportOperatorTest

I think this is a very strange bug because it can be triggered smoothly in 
[AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], but 
cannot be implemented in RowTimestamportOperator


> Unable to trigger onEventTime() when using the async state API in 
> RowTimestamportOperator
> -
>
> Key: FLINK-36942
> URL: https://issues.apache.org/jira/browse/FLINK-36942
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 2.0.0
>Reporter: Wang Qilong
>Priority: Major
>
> When I use the async state API in RowTimeSortOperator and inherit it from 
> AbstractAsyncStateStreamOperator, I cannot trigger onEventTime() when the 
> timer expires.
> Regarding how to reproduce:
> In 
> [AsyncStateRowTimeSortOperator|https://github.com/Au-Miner/flink/tree/RowTimeSortOperator]
>  and execute the tests in RowTimestamportOperatorTest.
> I think this is a very strange bug because it can be triggered smoothly in 
> [AsyncStateWindowJoinOperator|https://github.com/apache/flink/pull/25815], 
> but cannot be implemented in RowTimestamportOperator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893685759


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1030,6 +1030,32 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryTable(String path);
 
+/**
+ * Drops a table registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryTable}. This method
+ * can only drop permanent objects.
+ *
+ * Compared to SQL, this method will not throw an error if the table 
does not exist. Use

Review Comment:
   I am wondering if we could change `Compared to SQL`  to `Compared to ANSI 
SQL` which I guess is what we mean here. As Flink SQL is SQL as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893689041


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1040,6 +1066,32 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryView(String path);
 
+/**
+ * Drops a view registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryView}. This method

Review Comment:
   nit: I would move sentence "This method can only drop permanent objects" to 
the beginning of the paragraph. So it is obvious what this method is before we 
talk about temporary objects.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


davidradl commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893681174


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1040,6 +1066,32 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryView(String path);
 
+/**
+ * Drops a view registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryView}. This method
+ * can only drop permanent objects.
+ *
+ * * Compared to SQL, this method will not throw an error if the table 
does not exist. Use
+ * {@link #dropView(java.lang.String, boolean)} to change the default 
behavior.
+ *
+ * @return true if a view existed in the given path and was removed

Review Comment:
   missing @params javadoc



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36919][table] Add missing dropTable/dropView methods to TableEnvironment [flink]

2024-12-20 Thread via GitHub


snuyanzin commented on code in PR #25810:
URL: https://github.com/apache/flink/pull/25810#discussion_r1893690187


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java:
##
@@ -1030,6 +1030,32 @@ void createTemporarySystemFunction(
  */
 boolean dropTemporaryTable(String path);
 
+/**
+ * Drops a table registered in the given path.
+ *
+ * Temporary objects can shadow permanent ones. If a temporary object 
exists in a given path,
+ * make sure to drop the temporary object first using {@link 
#dropTemporaryTable}. This method
+ * can only drop permanent objects.
+ *
+ * Compared to SQL, this method will not throw an error if the table 
does not exist. Use

Review Comment:
   No here we mean Flink SQL and we compare Table API vs Flink SQL (usually 
called just SQL) behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36941][hotfix] Update DATE_FORMAT Doc and Python Tests [flink]

2024-12-20 Thread via GitHub


snuyanzin commented on PR #25828:
URL: https://github.com/apache/flink/pull/25828#issuecomment-2556669198

   fyi: it can not be both `[hotfix]` and marked with jira issue.
   So please change commit message with something like `[FLINK-36941][docs] ...`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >