Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-10-14 Thread via GitHub


nateab commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1800019486


##
flink-table/flink-sql-client/src/test/resources/sql/function.q:
##
@@ -346,3 +346,159 @@ show user functions;
 SHOW JARS;
 Empty set
 !ok
+
+# ==
+# test describe function
+# ==
+
+ADD JAR '$VAR_UDF_JAR_PATH';
+[INFO] Execute statement succeeded.
+!info
+
+describe function `SUM`;
++-++
+|   info name | info value |
++-++
+| system function |   true |
+|   temporary |  false |
++-++
+2 rows in set
+!ok
+
+describe function extended `SUM`;
++--++
+|info name | info value |
++--++
+|  system function |   true |
+|temporary |  false |
+| kind |  AGGREGATE |
+| requirements | [] |
+|deterministic |   true |
+| constant folding |   true |
+|signature | SUM() |
++--++
+7 rows in set
+!ok
+
+describe function temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |
++---+-$VAR_UDF_JAR_PATH_DASH+
+|   system function |  
$VAR_UDF_JAR_PATH_SPACE false |
+| temporary |   
$VAR_UDF_JAR_PATH_SPACE true |
+|class name |   
$VAR_UDF_JAR_PATH_SPACE UpperUDF |
+| function language |   
$VAR_UDF_JAR_PATH_SPACE JAVA |
+| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] 
|
++---+-$VAR_UDF_JAR_PATH_DASH+
+5 rows in set
+!ok
+
+describe function extended temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |
++---+-$VAR_UDF_JAR_PATH_DASH+
+|   system function |  
$VAR_UDF_JAR_PATH_SPACE false |
+| temporary |   
$VAR_UDF_JAR_PATH_SPACE true |
+|class name |   
$VAR_UDF_JAR_PATH_SPACE UpperUDF |
+| function language |   
$VAR_UDF_JAR_PATH_SPACE JAVA |
+| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] 
|
+|  kind | 
$VAR_UDF_JAR_PATH_SPACE SCALAR |
+|  requirements | 
$VAR_UDF_JAR_PATH_SPACE [] |
+| deterministic |   
$VAR_UDF_JAR_PATH_SPACE true |
+|  constant folding |   
$VAR_UDF_JAR_PATH_SPACE true |
+| signature |$VAR_UDF_JAR_PATH_SPACE 
c1.db.temp_upperudf(STRING) |
++---+-$VAR_UDF_JAR_PATH_DASH+
+10 rows in set
+!ok
+
+desc function temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |
++---+-$VAR_UDF_JAR_PATH_DASH+
+|   system function |  
$VAR_UDF_JAR_PATH_SPACE false |
+| temporary |   
$VAR_UDF_JAR_PATH_SPACE true |
+|class name |   
$VAR_UDF_JAR_PATH_SPACE UpperUDF |
+| function language |   
$VAR_UDF_JAR_PATH_SPACE JAVA |
+| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] 
|
++---+-$VAR_UDF_JAR_PATH_DASH+
+5 rows in set
+!ok
+
+desc function extended temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |

Review Comment:
   This would deviate from what `DESCRIBE CATALOG` currently does, are you okay 
with that? Also see my prior comment on the naming here 
https://github.com/apache/flink/pull/25115#discussion_r1706084728
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e

[jira] [Created] (FLINK-36532) Kafka group offsets are not committed on stop with savepoint

2024-10-14 Thread LEONID ILYEVSKY (Jira)
LEONID ILYEVSKY created FLINK-36532:
---

 Summary: Kafka group offsets are not committed on stop with 
savepoint
 Key: FLINK-36532
 URL: https://issues.apache.org/jira/browse/FLINK-36532
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.18.1
Reporter: LEONID ILYEVSKY


In the pipeline with kafka source, with checkpointing enabled, on every 
checkpoint kafka source will commit the group offsets to the broker.

However, when the job is stopped with savepoint, it does not commit those 
offsets.

I verified this behavior in versions 1.16.1 and 1.18.1.

In some situations, committing group offsets upon the job stop is very 
important. The best example is, when program code changes and the savepoint 
from the previous version is not compatible with the new code. If we cannot use 
the savepoint for the initial offsets, then we could use the group offsets, so 
it is critical to have them committed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-10-14 Thread via GitHub


nateab commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1800063746


##
flink-table/flink-sql-client/src/test/resources/sql/function.q:
##
@@ -346,3 +346,159 @@ show user functions;
 SHOW JARS;
 Empty set
 !ok
+
+# ==
+# test describe function
+# ==
+
+ADD JAR '$VAR_UDF_JAR_PATH';
+[INFO] Execute statement succeeded.
+!info
+
+describe function `SUM`;
++-++
+|   info name | info value |
++-++
+| system function |   true |
+|   temporary |  false |
++-++
+2 rows in set
+!ok
+
+describe function extended `SUM`;
++--++
+|info name | info value |
++--++
+|  system function |   true |
+|temporary |  false |
+| kind |  AGGREGATE |
+| requirements | [] |
+|deterministic |   true |
+| constant folding |   true |
+|signature | SUM() |
++--++
+7 rows in set
+!ok
+
+describe function temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |
++---+-$VAR_UDF_JAR_PATH_DASH+
+|   system function |  
$VAR_UDF_JAR_PATH_SPACE false |
+| temporary |   
$VAR_UDF_JAR_PATH_SPACE true |
+|class name |   
$VAR_UDF_JAR_PATH_SPACE UpperUDF |
+| function language |   
$VAR_UDF_JAR_PATH_SPACE JAVA |
+| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] 
|
++---+-$VAR_UDF_JAR_PATH_DASH+
+5 rows in set
+!ok
+
+describe function extended temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |
++---+-$VAR_UDF_JAR_PATH_DASH+
+|   system function |  
$VAR_UDF_JAR_PATH_SPACE false |
+| temporary |   
$VAR_UDF_JAR_PATH_SPACE true |
+|class name |   
$VAR_UDF_JAR_PATH_SPACE UpperUDF |
+| function language |   
$VAR_UDF_JAR_PATH_SPACE JAVA |
+| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] 
|
+|  kind | 
$VAR_UDF_JAR_PATH_SPACE SCALAR |
+|  requirements | 
$VAR_UDF_JAR_PATH_SPACE [] |
+| deterministic |   
$VAR_UDF_JAR_PATH_SPACE true |
+|  constant folding |   
$VAR_UDF_JAR_PATH_SPACE true |
+| signature |$VAR_UDF_JAR_PATH_SPACE 
c1.db.temp_upperudf(STRING) |
++---+-$VAR_UDF_JAR_PATH_DASH+
+10 rows in set
+!ok
+
+desc function temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |
++---+-$VAR_UDF_JAR_PATH_DASH+
+|   system function |  
$VAR_UDF_JAR_PATH_SPACE false |
+| temporary |   
$VAR_UDF_JAR_PATH_SPACE true |
+|class name |   
$VAR_UDF_JAR_PATH_SPACE UpperUDF |
+| function language |   
$VAR_UDF_JAR_PATH_SPACE JAVA |
+| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] 
|
++---+-$VAR_UDF_JAR_PATH_DASH+
+5 rows in set
+!ok
+
+desc function extended temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |
++---+-$VAR_UDF_JAR_PATH_DASH+
+|   system function |  
$VAR_UDF_JAR_PATH_SPACE false |

Review Comment:
   is that necessary? we don't do that for `DESCRIBE CATALOG` for example. I 
tried to order the rows in a way that I thought was logical  



-- 
This is an automated message from the Apache Git Service.
To res

Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-10-14 Thread via GitHub


nateab commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1800150565


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildTableResult;
+import static 
org.apache.flink.table.types.inference.TypeInferenceUtil.generateSignature;
+
+/**
+ * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] 
dataBasesName].sqlIdentifier
+ * statement.
+ */
+@Internal
+public class DescribeFunctionOperation implements Operation, 
ExecutableOperation {
+
+private final UnresolvedIdentifier sqlIdentifier;
+private final boolean isExtended;
+
+public DescribeFunctionOperation(UnresolvedIdentifier sqlIdentifier, 
boolean isExtended) {
+this.sqlIdentifier = sqlIdentifier;
+this.isExtended = isExtended;
+}
+
+public UnresolvedIdentifier getSqlIdentifier() {
+return sqlIdentifier;
+}
+
+public boolean isExtended() {
+return isExtended;
+}
+
+@Override
+public String asSummaryString() {
+Map params = new LinkedHashMap<>();
+params.put("identifier", sqlIdentifier);
+params.put("isExtended", isExtended);
+return OperationUtils.formatWithChildren(
+"DESCRIBE FUNCTION", params, Collections.emptyList(), 
Operation::asSummaryString);
+}
+
+@Override
+public TableResultInternal execute(Context ctx) {
+// DESCRIBE FUNCTION  shows all the function properties.
+Optional functionOpt =
+ctx.getFunctionCatalog().lookupFunction(sqlIdentifier);
+if (!functionOpt.isPresent()) {
+throw new ValidationException(
+String.format(
+"Function with the identifier '%s' doesn't exist.",
+sqlIdentifier.asSummaryString()));
+}
+final ContextResolvedFunction function = functionOpt.get();
+final CatalogFunction catalogFunction = function.getCatalogFunction();
+
+List> rows = new ArrayList<>();
+rows.add(Arrays.asList("system function", 
String.valueOf(catalogFunction == null)));
+rows.add(Arrays.asList("temporary", 
String.valueOf(function.isTemporary(;
+if (catalogFunction != null) {
+rows.add(Arrays.asList("class name", 
catalogFunction.getClassName()));
+rows.add(
+Arrays.asList(
+"function language", 
catalogFunction.getFunctionLanguage().toString()));
+rows.add(
+Arrays.asList(
+"resource uris", 
catalogFunction.getFunctionResources().toString()));
+}
+
+if (isExtended) {
+final FunctionDefinition definition = function.getDefinition();
+rows.add(Arrays.asList("kind", definition.getKind().toString()));
+rows.add(Arrays.asList("requirements", 
definition.getRequirements().toString()));
+rows.add(Arrays.asList("deterministic", 
String.valueOf(definition.isDeterministic(;

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: i

Re: [PR] [FLINK-35016] catalog changes for model resource [flink]

2024-10-14 Thread via GitHub


lihaosky commented on PR #25036:
URL: https://github.com/apache/flink/pull/25036#issuecomment-2412469319

   Close in favor of https://github.com/apache/flink/pull/25211


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35016] catalog changes for model resource [flink]

2024-10-14 Thread via GitHub


lihaosky closed pull request #25036: [FLINK-35016] catalog changes for model 
resource
URL: https://github.com/apache/flink/pull/25036


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-10-14 Thread via GitHub


nateab commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1800082399


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java:
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.ContextResolvedFunction;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.types.DataType;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.api.internal.TableResultUtils.buildTableResult;
+import static 
org.apache.flink.table.types.inference.TypeInferenceUtil.generateSignature;
+
+/**
+ * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] 
dataBasesName].sqlIdentifier
+ * statement.

Review Comment:
   added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35016] catalog changes for model resource [flink]

2024-10-14 Thread via GitHub


lihaosky commented on code in PR #25211:
URL: https://github.com/apache/flink/pull/25211#discussion_r1800369200


##
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogModel.java:
##
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.Schema;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** Interface for a model in a catalog. */
+@PublicEvolving
+public interface CatalogModel {
+/** Returns a map of string-based model options. */
+Map getOptions();
+
+/**
+ * Get the unresolved input schema of the model.
+ *
+ * @return unresolved input schema of the model.
+ */
+Schema getInputSchema();

Review Comment:
   Cannot be null. Added null check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36533) An exception occurs when using the taskmanager.data.bind-port

2024-10-14 Thread five (Jira)
five created FLINK-36533:


 Summary: An exception occurs when using the 
taskmanager.data.bind-port
 Key: FLINK-36533
 URL: https://issues.apache.org/jira/browse/FLINK-36533
 Project: Flink
  Issue Type: Bug
Reporter: five
 Attachments: exception.png, source_code.png

An error occurred when using a port range for the 
{{taskmanager.data.bind-port}} configuration item.

 

Environment Information:
 * Flink version: 1.20.0
 * Java: 1.8

Configuration Information:
 * taskmanager.data.bind-port: 12001-13000

When starting the Flink job, an exception occurs. Please see the attached file 
for the exception message: exception.png

The error received is a NativeIoException. However, the source code is set up 
to catch a BindException. (Refer to the attached file: source_code.png) The two 
do not match.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36517][cdc-connect][paimon] use filterAndCommit API for Avoid commit the same datafile duplicate [flink-cdc]

2024-10-14 Thread via GitHub


beryllw commented on code in PR #3639:
URL: https://github.com/apache/flink-cdc/pull/3639#discussion_r1797628800


##
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java:
##
@@ -60,28 +62,12 @@ public void 
commit(Collection> commitReques
 .collect(Collectors.toList());
 // All CommitRequest shared the same checkpointId.
 long checkpointId = committables.get(0).checkpointId();
-int retriedNumber = 
commitRequests.stream().findFirst().get().getNumberOfRetries();
 WrappedManifestCommittable wrappedManifestCommittable =
 storeMultiCommitter.combine(checkpointId, 1L, committables);
-try {
-if (retriedNumber > 0) {
-storeMultiCommitter.filterAndCommit(
-Collections.singletonList(wrappedManifestCommittable));
-} else {
-
storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));
-}
-commitRequests.forEach(CommitRequest::signalAlreadyCommitted);
-LOGGER.info(
-String.format(
-"Commit succeeded for %s with %s committable",
-checkpointId, committables.size()));
-} catch (Exception e) {
-commitRequests.forEach(CommitRequest::retryLater);

Review Comment:
   Is there a specific purpose for retrying later in this context? @lvyanquan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36510][runtime] Support partial lineage info collection [flink]

2024-10-14 Thread via GitHub


FangYongs commented on PR #25440:
URL: https://github.com/apache/flink/pull/25440#issuecomment-2412784324

   Thanks @HuangZhenQiu , LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36510][runtime] Support partial lineage info collection [flink]

2024-10-14 Thread via GitHub


FangYongs merged PR #25440:
URL: https://github.com/apache/flink/pull/25440


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]

2024-10-14 Thread via GitHub


leonardBang commented on PR #757:
URL: https://github.com/apache/flink-web/pull/757#issuecomment-2412946416

   @AHeise Could you change the PR from DRAT to READY for review ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]

2024-10-14 Thread via GitHub


leonardBang commented on PR #757:
URL: https://github.com/apache/flink-web/pull/757#issuecomment-2412947976

   I changed the status and thus we can start following review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36405:
---
Labels: pull-request-available  (was: )

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Chenyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36405][runtime][security] Fix startup issues on kerberos clusters. [flink]

2024-10-14 Thread via GitHub


gaborgsomogyi commented on code in PR #25428:
URL: https://github.com/apache/flink/pull/25428#discussion_r1800513124


##
flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier:
##
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   Why this file needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36405][runtime][security] Fix startup issues on kerberos clusters. [flink]

2024-10-14 Thread via GitHub


zhengchenyu commented on code in PR #25428:
URL: https://github.com/apache/flink/pull/25428#discussion_r1800525208


##
flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier:
##
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   @gaborgsomogyi 
   
   After fix renewer user, still throw exception like below:
   
   ```
   2024-09-26 14:35:07,144 ERROR 
org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
Failed to obtain delegation token for HiveServer2
   java.lang.NullPointerException: null
   at 
org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
 ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
   at 
org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
 ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
   ...
   ```
   
I found 
[tokenIdentifier](https://github.com/apache/flink/blob/da393c92db814803c9ac96c6cdd55ae444c43689/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java#L149C70-L149C85)
 is null. Through further debugging, I found 
HiveServer2DelegationTokenIdentifier is not in services file, then 
org.apache.hadoop.security.token.Token::getClassForIdentifier can not recognize 
the kind `HIVE_DELEGATION_TOKEN`, then hive2Token.decodeIdentifier() will 
return null.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36405][runtime][security] Fix startup issues on kerberos clusters. [flink]

2024-10-14 Thread via GitHub


zhengchenyu commented on code in PR #25428:
URL: https://github.com/apache/flink/pull/25428#discussion_r1800525208


##
flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier:
##
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more

Review Comment:
   After fix renewer user, still throw exception like below:
   
   ```
   2024-09-26 14:35:07,144 ERROR 
org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
Failed to obtain delegation token for HiveServer2
   java.lang.NullPointerException: null
   at 
org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
 ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
   at 
org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
 ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
   ...
   ```
   
I found 
[tokenIdentifier](https://github.com/apache/flink/blob/da393c92db814803c9ac96c6cdd55ae444c43689/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java#L149C70-L149C85)
 is null. Through further debugging, I found 
HiveServer2DelegationTokenIdentifier is not in services file, then 
org.apache.hadoop.security.token.Token::getClassForIdentifier can not recognize 
the kind `HIVE_DELEGATION_TOKEN`, then hive2Token.decodeIdentifier() will 
return null.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36534) Upgrade Flink Kubernetes operator to flink-1.20

2024-10-14 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889464#comment-17889464
 ] 

Gyula Fora commented on FLINK-36534:


Let's do this once 1.20.1 is released

> Upgrade Flink Kubernetes operator to flink-1.20
> ---
>
> Key: FLINK-36534
> URL: https://issues.apache.org/jira/browse/FLINK-36534
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Shailesh Gupta
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-36405:
-
Affects Version/s: (was: 2.0-preview)

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Chenyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-36405:
-
Affects Version/s: 2.0-preview

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1, 2.0-preview
>Reporter: Chenyu Zheng
>Assignee: Chenyu Zheng
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36484) Remove deprecated method `StreamTableEnvironment#registerFunction`

2024-10-14 Thread Ammu Parvathy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889468#comment-17889468
 ] 

Ammu Parvathy commented on FLINK-36484:
---

I have started looking into this one.

> Remove deprecated method `StreamTableEnvironment#registerFunction`
> --
>
> Key: FLINK-36484
> URL: https://issues.apache.org/jira/browse/FLINK-36484
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36477) Remove deprecated method `BaseExpressions#cast` in table-api-java

2024-10-14 Thread corgy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889473#comment-17889473
 ] 

corgy commented on FLINK-36477:
---

I am taking action on this subtask, please assign it to me

> Remove deprecated method `BaseExpressions#cast` in table-api-java
> -
>
> Key: FLINK-36477
> URL: https://issues.apache.org/jira/browse/FLINK-36477
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36478) Remove deprecated methods `EnvironmentSettings#fromConfiguration` and `EnvironmentSettings#toConfiguration`

2024-10-14 Thread corgy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889475#comment-17889475
 ] 

corgy commented on FLINK-36478:
---

I am taking action on this subtask, please assign it to me

> Remove deprecated methods `EnvironmentSettings#fromConfiguration` and 
> `EnvironmentSettings#toConfiguration`
> ---
>
> Key: FLINK-36478
> URL: https://issues.apache.org/jira/browse/FLINK-36478
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36534) Upgrade Flink Kubernetes operator to flink-1.20

2024-10-14 Thread Shailesh Gupta (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889476#comment-17889476
 ] 

Shailesh Gupta commented on FLINK-36534:


Sure. Thanks.

> Upgrade Flink Kubernetes operator to flink-1.20
> ---
>
> Key: FLINK-36534
> URL: https://issues.apache.org/jira/browse/FLINK-36534
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Shailesh Gupta
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer [flink]

2024-10-14 Thread via GitHub


Zakelly commented on PR #25508:
URL: https://github.com/apache/flink/pull/25508#issuecomment-2413020131

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36480) Remove all deprecated methods in `TableConfig`

2024-10-14 Thread corgy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889479#comment-17889479
 ] 

corgy commented on FLINK-36480:
---

I am taking action on this subtask, please assign it to me

> Remove all deprecated methods in `TableConfig`
> --
>
> Key: FLINK-36480
> URL: https://issues.apache.org/jira/browse/FLINK-36480
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36483) Remove deprecated method `TableResult#getTableSchema`

2024-10-14 Thread corgy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889485#comment-17889485
 ] 

corgy commented on FLINK-36483:
---

I am taking action on this subtask, please assign it to me

> Remove deprecated method `TableResult#getTableSchema`
> -
>
> Key: FLINK-36483
> URL: https://issues.apache.org/jira/browse/FLINK-36483
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36517][cdc-connect][paimon] use filterAndCommit API for Avoid commit the same datafile duplicate [flink-cdc]

2024-10-14 Thread via GitHub


beryllw commented on PR #3639:
URL: https://github.com/apache/flink-cdc/pull/3639#issuecomment-2412713852

   Could you please assist in reviewing this PR? Thank you. @lvyanquan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36534) Upgrade Flink Kubernetes operator to flink-1.20

2024-10-14 Thread Shailesh Gupta (Jira)
Shailesh Gupta created FLINK-36534:
--

 Summary: Upgrade Flink Kubernetes operator to flink-1.20
 Key: FLINK-36534
 URL: https://issues.apache.org/jira/browse/FLINK-36534
 Project: Flink
  Issue Type: New Feature
  Components: Kubernetes Operator
Reporter: Shailesh Gupta






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36533) An exception occurs when using the taskmanager.data.bind-port

2024-10-14 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky reassigned FLINK-36533:


Assignee: Ferenc Csaky

> An exception occurs when using the taskmanager.data.bind-port
> -
>
> Key: FLINK-36533
> URL: https://issues.apache.org/jira/browse/FLINK-36533
> Project: Flink
>  Issue Type: Bug
>Reporter: five
>Assignee: Ferenc Csaky
>Priority: Major
> Attachments: exception.png, source_code.png
>
>
> An error occurred when using a port range for the 
> {{taskmanager.data.bind-port}} configuration item.
>  
> Environment Information:
>  * Flink version: 1.20.0
>  * Java: 1.8
> Configuration Information:
>  * taskmanager.data.bind-port: 12001-13000
> When starting the Flink job, an exception occurs. Please see the attached 
> file for the exception message: exception.png
> The error received is a NativeIoException. However, the source code is set up 
> to catch a BindException. (Refer to the attached file: source_code.png) The 
> two do not match.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]

2024-10-14 Thread via GitHub


nateab commented on code in PR #25115:
URL: https://github.com/apache/flink/pull/25115#discussion_r1800019486


##
flink-table/flink-sql-client/src/test/resources/sql/function.q:
##
@@ -346,3 +346,159 @@ show user functions;
 SHOW JARS;
 Empty set
 !ok
+
+# ==
+# test describe function
+# ==
+
+ADD JAR '$VAR_UDF_JAR_PATH';
+[INFO] Execute statement succeeded.
+!info
+
+describe function `SUM`;
++-++
+|   info name | info value |
++-++
+| system function |   true |
+|   temporary |  false |
++-++
+2 rows in set
+!ok
+
+describe function extended `SUM`;
++--++
+|info name | info value |
++--++
+|  system function |   true |
+|temporary |  false |
+| kind |  AGGREGATE |
+| requirements | [] |
+|deterministic |   true |
+| constant folding |   true |
+|signature | SUM() |
++--++
+7 rows in set
+!ok
+
+describe function temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |
++---+-$VAR_UDF_JAR_PATH_DASH+
+|   system function |  
$VAR_UDF_JAR_PATH_SPACE false |
+| temporary |   
$VAR_UDF_JAR_PATH_SPACE true |
+|class name |   
$VAR_UDF_JAR_PATH_SPACE UpperUDF |
+| function language |   
$VAR_UDF_JAR_PATH_SPACE JAVA |
+| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] 
|
++---+-$VAR_UDF_JAR_PATH_DASH+
+5 rows in set
+!ok
+
+describe function extended temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |
++---+-$VAR_UDF_JAR_PATH_DASH+
+|   system function |  
$VAR_UDF_JAR_PATH_SPACE false |
+| temporary |   
$VAR_UDF_JAR_PATH_SPACE true |
+|class name |   
$VAR_UDF_JAR_PATH_SPACE UpperUDF |
+| function language |   
$VAR_UDF_JAR_PATH_SPACE JAVA |
+| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] 
|
+|  kind | 
$VAR_UDF_JAR_PATH_SPACE SCALAR |
+|  requirements | 
$VAR_UDF_JAR_PATH_SPACE [] |
+| deterministic |   
$VAR_UDF_JAR_PATH_SPACE true |
+|  constant folding |   
$VAR_UDF_JAR_PATH_SPACE true |
+| signature |$VAR_UDF_JAR_PATH_SPACE 
c1.db.temp_upperudf(STRING) |
++---+-$VAR_UDF_JAR_PATH_DASH+
+10 rows in set
+!ok
+
+desc function temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |
++---+-$VAR_UDF_JAR_PATH_DASH+
+|   system function |  
$VAR_UDF_JAR_PATH_SPACE false |
+| temporary |   
$VAR_UDF_JAR_PATH_SPACE true |
+|class name |   
$VAR_UDF_JAR_PATH_SPACE UpperUDF |
+| function language |   
$VAR_UDF_JAR_PATH_SPACE JAVA |
+| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] 
|
++---+-$VAR_UDF_JAR_PATH_DASH+
+5 rows in set
+!ok
+
+desc function extended temp_upperudf;
++---+-$VAR_UDF_JAR_PATH_DASH+
+| info name | $VAR_UDF_JAR_PATH_SPACE 
info value |

Review Comment:
   This would deviate from `DESCRIBE CATALOG` currently does, are you okay with 
that? Also see my prior comment on the naming here 
https://github.com/apache/flink/pull/25115#discussion_r1706084728
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail

[jira] [Commented] (FLINK-36531) AutoScaler needs to consider the lag from last checkpoint

2024-10-14 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889404#comment-17889404
 ] 

yuanfenghu commented on FLINK-36531:


[~dsaisharath]
There is already an optimization  to solve 
t[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]his
 problem, it may be helpful to you
 

> AutoScaler needs to consider the lag from last checkpoint
> -
>
> Key: FLINK-36531
> URL: https://issues.apache.org/jira/browse/FLINK-36531
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Sai Sharath Dandi
>Priority: Major
>
> Autoscaler computes the target processing capacity as 
> [below|https://sg.uberinternal.com/code.uber.internal/uber-code/data-flink-kubernetes-operator@release-1.9-uber/-/blob/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java?L47]
> // Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + 
> INPUT_RATE/TARGET_UTIL
>  
> During the scaling action, the autoscaler will restart the job from the last 
> successful checkpoint, we need to include the number of processed records 
> since last successful checkpoint as part of the lag as those records will be 
> replayed after scaling. This is particularly important for jobs with long 
> checkpoint intervals and large state as there could be a significant 
> difference between the realtime lag and the lag from the checkpoint



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36531) AutoScaler needs to consider the lag from last checkpoint

2024-10-14 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889404#comment-17889404
 ] 

yuanfenghu edited comment on FLINK-36531 at 10/15/24 2:22 AM:
--

[~dsaisharath]
There is already an optimization  to solve 
[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]
 this problem, it may be helpful to you
 


was (Author: JIRAUSER296932):
[~dsaisharath]
There is already an optimization  to solve 
t[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]his
 problem, it may be helpful to you
 

> AutoScaler needs to consider the lag from last checkpoint
> -
>
> Key: FLINK-36531
> URL: https://issues.apache.org/jira/browse/FLINK-36531
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Sai Sharath Dandi
>Priority: Major
>
> Autoscaler computes the target processing capacity as 
> [below|https://sg.uberinternal.com/code.uber.internal/uber-code/data-flink-kubernetes-operator@release-1.9-uber/-/blob/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java?L47]
> // Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + 
> INPUT_RATE/TARGET_UTIL
>  
> During the scaling action, the autoscaler will restart the job from the last 
> successful checkpoint, we need to include the number of processed records 
> since last successful checkpoint as part of the lag as those records will be 
> replayed after scaling. This is particularly important for jobs with long 
> checkpoint intervals and large state as there could be a significant 
> difference between the realtime lag and the lag from the checkpoint



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36518) Paimon Sink support per-table property configuration

2024-10-14 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-36518:
--
Description: As title.  (was: Solved by transform table-options.)

> Paimon Sink support per-table property configuration
> 
>
> Key: FLINK-36518
> URL: https://issues.apache.org/jira/browse/FLINK-36518
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
> Environment: Flink 1.18.0
> flink 3.2.0
>Reporter: JunboWang
>Priority: Minor
>
> As title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36518) Paimon Sink support per-table property configuration

2024-10-14 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang updated FLINK-36518:
--
Description: Solved by transform table-options.  (was: As title.)

> Paimon Sink support per-table property configuration
> 
>
> Key: FLINK-36518
> URL: https://issues.apache.org/jira/browse/FLINK-36518
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
> Environment: Flink 1.18.0
> flink 3.2.0
>Reporter: JunboWang
>Priority: Minor
>
> Solved by transform table-options.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36518) Paimon Sink support per-table property configuration

2024-10-14 Thread JunboWang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889392#comment-17889392
 ] 

JunboWang commented on FLINK-36518:
---

Solved by transform table-options.

> Paimon Sink support per-table property configuration
> 
>
> Key: FLINK-36518
> URL: https://issues.apache.org/jira/browse/FLINK-36518
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
> Environment: Flink 1.18.0
> flink 3.2.0
>Reporter: JunboWang
>Priority: Minor
>
> As title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-36518) Paimon Sink support per-table property configuration

2024-10-14 Thread JunboWang (Jira)


[ https://issues.apache.org/jira/browse/FLINK-36518 ]


JunboWang deleted comment on FLINK-36518:
---

was (Author: JIRAUSER305453):
Solved by transform table-options.

> Paimon Sink support per-table property configuration
> 
>
> Key: FLINK-36518
> URL: https://issues.apache.org/jira/browse/FLINK-36518
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
> Environment: Flink 1.18.0
> flink 3.2.0
>Reporter: JunboWang
>Priority: Minor
>
> As title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36518) Paimon Sink support per-table property configuration

2024-10-14 Thread JunboWang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

JunboWang closed FLINK-36518.
-
Resolution: Not A Problem

Solved by transform table-options.

> Paimon Sink support per-table property configuration
> 
>
> Key: FLINK-36518
> URL: https://issues.apache.org/jira/browse/FLINK-36518
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
> Environment: Flink 1.18.0
> flink 3.2.0
>Reporter: JunboWang
>Priority: Minor
>
> As title.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36528] [Kubernetes Operator] Update org.apache.avro from 1.8.2 to 1.12.0 [flink-kubernetes-operator]

2024-10-14 Thread via GitHub


kartik-3513 commented on PR #901:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/901#issuecomment-2412922906

   I have evaluated updating beam-dependency, we can proceed with that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35016] catalog changes for model resource [flink]

2024-10-14 Thread via GitHub


lihaosky commented on code in PR #25211:
URL: https://github.com/apache/flink/pull/25211#discussion_r1800214336


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -1341,6 +1352,293 @@ private void dropTableInternal(
 }
 }
 
+/**
+ * Retrieves a fully qualified model. If the path is not yet fully 
qualified use {@link
+ * #qualifyIdentifier(UnresolvedIdentifier)} first.
+ *
+ * @param objectIdentifier full path of the model to retrieve
+ * @return model that the path points to.
+ */
+public Optional getModel(ObjectIdentifier 
objectIdentifier) {
+CatalogModel temporaryModel = temporaryModels.get(objectIdentifier);
+if (temporaryModel != null) {
+final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(temporaryModel);
+return 
Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel));
+}
+Optional catalogOptional = 
getCatalog(objectIdentifier.getCatalogName());
+ObjectPath objectPath = objectIdentifier.toObjectPath();
+if (catalogOptional.isPresent()) {
+Catalog currentCatalog = catalogOptional.get();
+try {
+final CatalogModel model = currentCatalog.getModel(objectPath);
+if (model != null) {
+final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+return Optional.of(
+ContextResolvedModel.permanent(
+objectIdentifier, currentCatalog, 
resolvedModel));
+}
+} catch (ModelNotExistException e) {
+// Ignore.
+} catch (UnsupportedOperationException e) {
+// Ignore for catalogs that don't support models.

Review Comment:
   Based on @twalthr 's previous comment, we return empty here instead of not 
supported exception



##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##
@@ -1341,6 +1352,293 @@ private void dropTableInternal(
 }
 }
 
+/**
+ * Retrieves a fully qualified model. If the path is not yet fully 
qualified use {@link
+ * #qualifyIdentifier(UnresolvedIdentifier)} first.
+ *
+ * @param objectIdentifier full path of the model to retrieve
+ * @return model that the path points to.
+ */
+public Optional getModel(ObjectIdentifier 
objectIdentifier) {
+CatalogModel temporaryModel = temporaryModels.get(objectIdentifier);
+if (temporaryModel != null) {
+final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(temporaryModel);
+return 
Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel));
+}
+Optional catalogOptional = 
getCatalog(objectIdentifier.getCatalogName());
+ObjectPath objectPath = objectIdentifier.toObjectPath();
+if (catalogOptional.isPresent()) {
+Catalog currentCatalog = catalogOptional.get();
+try {
+final CatalogModel model = currentCatalog.getModel(objectPath);
+if (model != null) {
+final ResolvedCatalogModel resolvedModel = 
resolveCatalogModel(model);
+return Optional.of(
+ContextResolvedModel.permanent(
+objectIdentifier, currentCatalog, 
resolvedModel));
+}
+} catch (ModelNotExistException e) {
+// Ignore.
+} catch (UnsupportedOperationException e) {
+// Ignore for catalogs that don't support models.
+}
+}
+return Optional.empty();
+}
+
+/**
+ * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the 
model is not available
+ * in any of the catalogs.
+ */
+public ContextResolvedModel getModelOrError(ObjectIdentifier 
objectIdentifier) {
+return getModel(objectIdentifier)
+.orElseThrow(
+() ->
+new TableException(
+String.format(
+"Cannot find model '%s' in any 
of the catalogs %s.",
+objectIdentifier, 
listCatalogs(;
+}
+
+/**
+ * Return whether the model with a fully qualified table path is temporary 
or not.
+ *
+ * @param objectIdentifier full path of the table
+ * @return the model is temporary or not.
+ */
+public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) {
+return temporaryModels.containsKey(objectIdentifier);
+}
+
+/**
+ * Returns an array of names of all models registered in the namespace of 
the current catalog
+ * and database

Re: [PR] Add insert-only change stream option [flink-cdc]

2024-10-14 Thread via GitHub


henneberger closed pull request #3562: Add insert-only change stream option
URL: https://github.com/apache/flink-cdc/pull/3562


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Chenyu Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889188#comment-17889188
 ] 

Chenyu Zheng edited comment on FLINK-36405 at 10/15/24 3:51 AM:


[~fcsaky]  Thanks! I have rebase my PR to current master.


was (Author: zhengchenyu):
[~fcsaky]  Thanks? I have rebase my PR to current master.

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Chenyu Zheng
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]

2024-10-14 Thread via GitHub


yuxiqian commented on code in PR #3642:
URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799076504


##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##
@@ -43,7 +28,6 @@
 import static 
org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions;
 import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
 
-/** Parser for converting YAML formatted pipeline definition to {@link 
PipelineDef}. */

Review Comment:
   Ditto.



##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##
@@ -287,9 +291,27 @@ private Configuration toPipelineConfig(JsonNode 
pipelineConfigNode) {
 if (pipelineConfigNode == null || pipelineConfigNode.isNull()) {
 return new Configuration();
 }
-Map pipelineConfigMap =
-mapper.convertValue(
-pipelineConfigNode, new TypeReference>() {});
+Map pipelineConfigMap = new HashMap<>();
+pipelineConfigNode
+.fields()
+.forEachRemaining(
+entry -> {
+String key = entry.getKey();
+JsonNode value = entry.getValue();
+if (!key.equals(MODEL_KEY)) {
+pipelineConfigMap.put(key, value.asText());
+}
+});
 return Configuration.fromMap(pipelineConfigMap);
 }
+
+private List parseModels(JsonNode modelsNode) {
+List modelDefs = new ArrayList<>();
+if (modelsNode != null && modelsNode.isArray()) {

Review Comment:
   Maybe we can throw an exception if `modelsNode` isn't a list. Or such code 
may silently fail:
   ```yaml
   pipeline:
 models:
   name: ...
   ```
   
   While one may want to write this:
   
   ```yaml
   pipeline:
 models:
   - name: ...
   ```



##
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java:
##
@@ -287,9 +291,27 @@ private Configuration toPipelineConfig(JsonNode 
pipelineConfigNode) {
 if (pipelineConfigNode == null || pipelineConfigNode.isNull()) {
 return new Configuration();
 }
-Map pipelineConfigMap =
-mapper.convertValue(
-pipelineConfigNode, new TypeReference>() {});
+Map pipelineConfigMap = new HashMap<>();
+pipelineConfigNode
+.fields()
+.forEachRemaining(
+entry -> {
+String key = entry.getKey();
+JsonNode value = entry.getValue();
+if (!key.equals(MODEL_KEY)) {
+pipelineConfigMap.put(key, value.asText());
+}
+});

Review Comment:
   Models configuration could be parsed and removed in 
`YamlPipelineDefinitionParser#parse` method in advance, so there's no need to 
iterate configuration maps again here.



##
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java:
##


Review Comment:
   I doubt if it's the best way to parameterize UDFs. Maybe we can left 
`UdfDef` unchanged, and let `ModelDef` to extend` UdfDef` classes, and put 
extra parameters there? Thus, we won't need any of these string split hack and 
serialization chore.



##
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/ModelDef.java:
##
@@ -0,0 +1,92 @@
+package org.apache.flink.cdc.composer.definition;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import java.util.Objects;
+
+public class ModelDef {
+private final String name;
+private final String host;
+private final String apiKey;
+
+public ModelDef(String name, String host, String apiKey) {
+this.name = name;
+this.host = host;
+this.apiKey = apiKey;
+}
+
+public String getName() {
+return name;
+}
+
+public String getHost() {
+return host;
+}
+
+public String getApiKey() {
+return apiKey;
+}
+
+// 创建一个表示这个模型的 UDF
+public ScalarFunction createUdf() {
+return new ModelUdf(this);
+}
+
+// 内部类,代表这个模型的 UDF
+public class ModelUdf extends ScalarFunction {
+private final ModelDef model;
+
+public ModelUdf(ModelDef model) {
+this.model = model;
+}
+
+// UDF 的主要方法,处理输入并返回结果
+public String eval(String input) {
+// 这里实现调用模型 API 的逻辑
+// 使用 model.getHost() 和 model.getApiKey() 来访问 API
+// 这只是一个示例实现,实际逻辑需要根据具体的 API 调用方式来编写
+return "Embedding for: " + input;
+}
+
+@Override
+  

Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]

2024-10-14 Thread via GitHub


yuxiqian commented on code in PR #3642:
URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799126412


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##
@@ -77,6 +80,53 @@ public static int currentDate(long epochTime, String 
timezone) {
 return timestampMillisToDate(localtimestamp(epochTime, 
timezone).getMillisecond());
 }
 
+private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002";
+private static OpenAiEmbeddingModel embeddingModel;
+
+public static void initializeOpenAiEmbeddingModel(String apiKey, String 
baseUrl) {
+embeddingModel =
+OpenAiEmbeddingModel.builder()
+.apiKey(apiKey)
+.baseUrl(baseUrl)
+.modelName(DEFAULT_MODEL_NAME)
+.timeout(Duration.ofSeconds(30))
+.maxRetries(3)
+.build();
+}
+
+public static String getEmbedding(String input, String apiKey, String 
model) {
+if (input == null || input.trim().isEmpty()) {
+LOG.debug("Empty or null input provided for embedding.");
+return "";
+}
+
+try {
+// 确保 OpenAiEmbeddingModel 已初始化
+if (embeddingModel == null) {
+initializeOpenAiEmbeddingModel(apiKey, 
"https://api.openai.com/v1/";);

Review Comment:
   Is the endpoint hard-encoded here? Why we still need this function and 
passing apiKeys manually? Shouldn't these be configured in `models:` rule block?



##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java:
##
@@ -77,6 +80,53 @@ public static int currentDate(long epochTime, String 
timezone) {
 return timestampMillisToDate(localtimestamp(epochTime, 
timezone).getMillisecond());
 }
 
+private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002";
+private static OpenAiEmbeddingModel embeddingModel;
+
+public static void initializeOpenAiEmbeddingModel(String apiKey, String 
baseUrl) {
+embeddingModel =
+OpenAiEmbeddingModel.builder()
+.apiKey(apiKey)
+.baseUrl(baseUrl)
+.modelName(DEFAULT_MODEL_NAME)
+.timeout(Duration.ofSeconds(30))
+.maxRetries(3)
+.build();
+}
+
+public static String getEmbedding(String input, String apiKey, String 
model) {
+if (input == null || input.trim().isEmpty()) {
+LOG.debug("Empty or null input provided for embedding.");
+return "";
+}
+
+try {
+// 确保 OpenAiEmbeddingModel 已初始化
+if (embeddingModel == null) {
+initializeOpenAiEmbeddingModel(apiKey, 
"https://api.openai.com/v1/";);

Review Comment:
   Is the endpoint hard-encoded here? Why we need this function and passing 
apiKeys manually? Shouldn't these be configured in `models:` rule block?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36528) Update org.apache.avro : avro dependency

2024-10-14 Thread Kartik Goyal (Jira)
Kartik Goyal created FLINK-36528:


 Summary: Update org.apache.avro : avro dependency
 Key: FLINK-36528
 URL: https://issues.apache.org/jira/browse/FLINK-36528
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.10.0
Reporter: Kartik Goyal
Assignee: Kartik Goyal
 Fix For: kubernetes-operator-1.10.0


Bump the mysql-connector-j version to remediate vulnerability associated with 
this package.

Package info:
[https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33] 

Vulnerability info:
[https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-22102]

Proposed change solution:
Bump the version from 8.0.33 to 8.4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]

2024-10-14 Thread via GitHub


yuxiqian commented on code in PR #3642:
URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799217964


##
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java:
##
@@ -85,11 +80,45 @@ public DataStream translatePostTransform(
 }
 }
 postTransformFunctionBuilder.addTimezone(timezone);
+
+List allFunctions = new ArrayList<>(udfFunctions);
+allFunctions.addAll(convertModelsToUdfs(models));
+
 postTransformFunctionBuilder.addUdfFunctions(
-udfFunctions.stream()
-.map(udf -> Tuple2.of(udf.getName(), 
udf.getClasspath()))
-.collect(Collectors.toList()));
+
allFunctions.stream().map(this::udfDefToTuple2).collect(Collectors.toList()));
 return input.transform(
 "Transform:Data", new EventTypeInfo(), 
postTransformFunctionBuilder.build());
 }
+
+private List convertModelsToUdfs(List models) {
+return 
models.stream().map(this::modelToUdf).collect(Collectors.toList());
+}

Review Comment:
   This conversion is quite worrying, since obviously `ModelDef` carries more 
information (the params), and the conversion could not be done without some 
hacky approaches like pasting params into the name field.
   
   Modifying the `addUdfFunctions` function, making it accepting a more 
universal data structure (instead of `Name - Classpath` `Tuple2`) makes more 
sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36476) Remove all deprecated methods under public APIs in table modules

2024-10-14 Thread lajith (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889170#comment-17889170
 ] 

lajith commented on FLINK-36476:


Hi @[~xuyangzhong] Can I pick some of the task?.

> Remove all deprecated methods under public APIs in table modules
> 
>
> Key: FLINK-36476
> URL: https://issues.apache.org/jira/browse/FLINK-36476
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Blocker
> Fix For: 2.0.0
>
>
> All methods needed to be removed are:
> https://docs.google.com/document/d/1bVrmcB9UFOd1-sT7xDRTMb5rmO0vK0eDlLRUYYL9gd4/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix] typo in property name httpclinent -> httpclient [flink-kubernetes-operator]

2024-10-14 Thread via GitHub


gyfora merged PR #899:
URL: https://github.com/apache/flink-kubernetes-operator/pull/899


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]

2024-10-14 Thread via GitHub


yuxiqian commented on code in PR #3642:
URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799190408


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/model/ModelUdf.java:
##
@@ -0,0 +1,132 @@
+package org.apache.flink.cdc.runtime.operators.model;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import com.google.gson.Gson;
+import dev.langchain4j.data.document.Metadata;
+import dev.langchain4j.data.embedding.Embedding;
+import dev.langchain4j.data.segment.TextSegment;
+import dev.langchain4j.model.openai.OpenAiEmbeddingModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ModelUdf implements UserDefinedFunction {
+
+private static final Logger LOG = LoggerFactory.getLogger(ModelUdf.class);
+private static final String DEFAULT_API_KEY =
+"sk-WegHEuogRpIyRSwaF5Ce6fE3E62e459dA61eFaF6CcF8C79b";
+private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002";
+private static final int DEFAULT_TIMEOUT_SECONDS = 30;
+private static final String DEFAULT_BASE_URL = "https://api.gpt.ge/v1/";;

Review Comment:
   Passing these "default" API keys and endpoints aren't meaningful. Request 
tokens might ran out, endpoints might expire.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]

2024-10-14 Thread via GitHub


yuxiqian commented on code in PR #3642:
URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799190408


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/model/ModelUdf.java:
##
@@ -0,0 +1,132 @@
+package org.apache.flink.cdc.runtime.operators.model;
+
+import org.apache.flink.cdc.common.data.ArrayData;
+import org.apache.flink.cdc.common.data.GenericArrayData;
+import org.apache.flink.cdc.common.types.DataType;
+import org.apache.flink.cdc.common.types.DataTypes;
+import org.apache.flink.cdc.common.udf.UserDefinedFunction;
+
+import com.google.gson.Gson;
+import dev.langchain4j.data.document.Metadata;
+import dev.langchain4j.data.embedding.Embedding;
+import dev.langchain4j.data.segment.TextSegment;
+import dev.langchain4j.model.openai.OpenAiEmbeddingModel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ModelUdf implements UserDefinedFunction {
+
+private static final Logger LOG = LoggerFactory.getLogger(ModelUdf.class);
+private static final String DEFAULT_API_KEY =
+"sk-WegHEuogRpIyRSwaF5Ce6fE3E62e459dA61eFaF6CcF8C79b";
+private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002";
+private static final int DEFAULT_TIMEOUT_SECONDS = 30;
+private static final String DEFAULT_BASE_URL = "https://api.gpt.ge/v1/";;

Review Comment:
   Providing these "default" API keys and endpoints aren't meaningful. Request 
tokens might ran out, endpoints might expire.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36332] Exclude kubernetes-httpclient form API module. [flink-kubernetes-operator]

2024-10-14 Thread via GitHub


gyfora merged PR #900:
URL: https://github.com/apache/flink-kubernetes-operator/pull/900


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36528) Update org.apache.avro : avro dependency

2024-10-14 Thread Kartik Goyal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik Goyal updated FLINK-36528:
-
Description: 
Update the org.apache.avro : avro package present in flink-beam-example to 
remediate the vulnerabilities associated with this package. It is a transitive 
dependency for beam-sdks-java-core and the current version 1.8.2. 

Package info:
[https://mvnrepository.com/artifact/org.apache.avro/avro/1.8.2] 

Vulnerabilities info:
Direct vulnerabilities:
[CVE-2024-47561|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-47561]
[CVE-2023-39410|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-39410]

Vulnerabilities from dependencies:
[CVE-2024-25710|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-25710]
[CVE-2023-43642|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-43642]
[CVE-2023-34455|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34455]
[CVE-2023-34454|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34454]
[CVE-2023-34453|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34453]
[CVE-2021-36090|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-36090]
[CVE-2021-35517|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35517]
[CVE-2021-35516|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35516]
[CVE-2021-35515|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35515]
[CVE-2020-15250|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-15250]
[CVE-2019-10202|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10202]
[CVE-2019-10172|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10172]
[CVE-2018-11771|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2018-11771]

Proposed change solution:
Bump the version from 1.8.2 to 1.12.0

  was:
Bump the mysql-connector-j version to remediate vulnerability associated with 
this package.

Package info:
[https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33] 

Vulnerability info:
[https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-22102]

Proposed change solution:
Bump the version from 8.0.33 to 8.4.0


> Update org.apache.avro : avro dependency
> 
>
> Key: FLINK-36528
> URL: https://issues.apache.org/jira/browse/FLINK-36528
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.10.0
>Reporter: Kartik Goyal
>Assignee: Kartik Goyal
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.10.0
>
>
> Update the org.apache.avro : avro package present in flink-beam-example to 
> remediate the vulnerabilities associated with this package. It is a 
> transitive dependency for beam-sdks-java-core and the current version 1.8.2. 
> Package info:
> [https://mvnrepository.com/artifact/org.apache.avro/avro/1.8.2] 
> Vulnerabilities info:
> Direct vulnerabilities:
> [CVE-2024-47561|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-47561]
> [CVE-2023-39410|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-39410]
> Vulnerabilities from dependencies:
> [CVE-2024-25710|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-25710]
> [CVE-2023-43642|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-43642]
> [CVE-2023-34455|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34455]
> [CVE-2023-34454|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34454]
> [CVE-2023-34453|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34453]
> [CVE-2021-36090|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-36090]
> [CVE-2021-35517|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35517]
> [CVE-2021-35516|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35516]
> [CVE-2021-35515|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35515]
> [CVE-2020-15250|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-15250]
> [CVE-2019-10202|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10202]
> [CVE-2019-10172|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10172]
> [CVE-2018-11771|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2018-11771]
> Proposed change solution:
> Bump the version from 1.8.2 to 1.12.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]

2024-10-14 Thread via GitHub


yuxiqian commented on code in PR #3642:
URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799243324


##
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/ModelDef.java:
##
@@ -0,0 +1,92 @@
+package org.apache.flink.cdc.composer.definition;
+
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.table.functions.ScalarFunction;
+
+import java.util.Objects;
+
+public class ModelDef {
+private final String name;
+private final String host;
+private final String apiKey;
+
+public ModelDef(String name, String host, String apiKey) {
+this.name = name;
+this.host = host;
+this.apiKey = apiKey;
+}
+
+public String getName() {
+return name;
+}
+
+public String getHost() {
+return host;
+}
+
+public String getApiKey() {
+return apiKey;
+}
+
+// 创建一个表示这个模型的 UDF
+public ScalarFunction createUdf() {
+return new ModelUdf(this);
+}
+
+// 内部类,代表这个模型的 UDF
+public class ModelUdf extends ScalarFunction {
+private final ModelDef model;
+
+public ModelUdf(ModelDef model) {
+this.model = model;
+}
+
+// UDF 的主要方法,处理输入并返回结果
+public String eval(String input) {
+// 这里实现调用模型 API 的逻辑
+// 使用 model.getHost() 和 model.getApiKey() 来访问 API
+// 这只是一个示例实现,实际逻辑需要根据具体的 API 调用方式来编写
+return "Embedding for: " + input;
+}
+
+@Override
+public void open(FunctionContext context) throws Exception {
+// 初始化逻辑,如建立API连接等
+}
+
+@Override
+public void close() throws Exception {
+// 清理逻辑,如关闭API连接等
+}
+}

Review Comment:
   Put this in documentation if this was meant to be an implementation 
reference.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36528) Update org.apache.avro : avro dependency

2024-10-14 Thread Kartik Goyal (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kartik Goyal updated FLINK-36528:
-
Labels:   (was: pull-request-available)

> Update org.apache.avro : avro dependency
> 
>
> Key: FLINK-36528
> URL: https://issues.apache.org/jira/browse/FLINK-36528
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.10.0
>Reporter: Kartik Goyal
>Assignee: Kartik Goyal
>Priority: Major
> Fix For: kubernetes-operator-1.10.0
>
>
> Update the org.apache.avro : avro package present in flink-beam-example to 
> remediate the vulnerabilities associated with this package. It is a 
> transitive dependency for beam-sdks-java-core and the current version 1.8.2. 
> Package info:
> [https://mvnrepository.com/artifact/org.apache.avro/avro/1.8.2] 
> Vulnerabilities info:
> Direct vulnerabilities:
> [CVE-2024-47561|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-47561]
> [CVE-2023-39410|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-39410]
> Vulnerabilities from dependencies:
> [CVE-2024-25710|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-25710]
> [CVE-2023-43642|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-43642]
> [CVE-2023-34455|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34455]
> [CVE-2023-34454|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34454]
> [CVE-2023-34453|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34453]
> [CVE-2021-36090|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-36090]
> [CVE-2021-35517|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35517]
> [CVE-2021-35516|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35516]
> [CVE-2021-35515|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35515]
> [CVE-2020-15250|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-15250]
> [CVE-2019-10202|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10202]
> [CVE-2019-10172|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10172]
> [CVE-2018-11771|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2018-11771]
> Proposed change solution:
> Bump the version from 1.8.2 to 1.12.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-1.18][FLINK-36421] [fs] [checkpoint] Sync outputStream before returning handle in FsCheckpointStreamFactory [flink]

2024-10-14 Thread via GitHub


Zakelly commented on PR #25479:
URL: https://github.com/apache/flink/pull/25479#issuecomment-2410862308

   I guess there is something wrong with CI infra for 1.18. Any thoughts? 
@JingGe 
   I'd propose merge this since this is a small one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [W.I.P] Disable parallel serial operator factory [flink]

2024-10-14 Thread via GitHub


JunRuiLee closed pull request #25506: [W.I.P] Disable parallel serial operator 
factory
URL: https://github.com/apache/flink/pull/25506


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36528] [Kubernetes Operator] Update org.apache.avro from 1.8.2 to 1.12.0 [flink-kubernetes-operator]

2024-10-14 Thread via GitHub


kartik-3513 opened a new pull request, #901:
URL: https://github.com/apache/flink-kubernetes-operator/pull/901

   ## What is the purpose of the change
   
   This PR updates the dependency "org.apache.avro:avro" version from 1.8.2 to 
1.12.0
   
   
   ## Brief change log
   
   The transitive dependency "org.apache.avro:avro" version 1.8.2 present in 
beam-sdks-java-core under flink-beam-example module has 2 direct and 12 
vulnerabilities from dependent packages. Updating it to 1.12.0 removes all of 
them. List can be found here: 
https://mvnrepository.com/artifact/org.apache.avro/avro/1.8.2 
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36528) Update org.apache.avro : avro dependency

2024-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36528:
---
Labels: pull-request-available  (was: )

> Update org.apache.avro : avro dependency
> 
>
> Key: FLINK-36528
> URL: https://issues.apache.org/jira/browse/FLINK-36528
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.10.0
>Reporter: Kartik Goyal
>Assignee: Kartik Goyal
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.10.0
>
>
> Update the org.apache.avro : avro package present in flink-beam-example to 
> remediate the vulnerabilities associated with this package. It is a 
> transitive dependency for beam-sdks-java-core and the current version 1.8.2. 
> Package info:
> [https://mvnrepository.com/artifact/org.apache.avro/avro/1.8.2] 
> Vulnerabilities info:
> Direct vulnerabilities:
> [CVE-2024-47561|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-47561]
> [CVE-2023-39410|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-39410]
> Vulnerabilities from dependencies:
> [CVE-2024-25710|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-25710]
> [CVE-2023-43642|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-43642]
> [CVE-2023-34455|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34455]
> [CVE-2023-34454|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34454]
> [CVE-2023-34453|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34453]
> [CVE-2021-36090|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-36090]
> [CVE-2021-35517|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35517]
> [CVE-2021-35516|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35516]
> [CVE-2021-35515|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35515]
> [CVE-2020-15250|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-15250]
> [CVE-2019-10202|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10202]
> [CVE-2019-10172|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10172]
> [CVE-2018-11771|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2018-11771]
> Proposed change solution:
> Bump the version from 1.8.2 to 1.12.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Lineage interfaces for kafka connector [flink-connector-kafka]

2024-10-14 Thread via GitHub


boring-cyborg[bot] commented on PR #130:
URL: 
https://github.com/apache/flink-connector-kafka/pull/130#issuecomment-2410887979

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-36405:
-
Affects Version/s: 1.19.1
   1.20.0
   (was: 1.9.1)

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Priority: Major
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky updated FLINK-36405:
-
Fix Version/s: 2.0.0
   1.19.2
   1.20.1

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky reassigned FLINK-36405:


Assignee: Ferenc Csaky

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Ferenc Csaky
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer [flink]

2024-10-14 Thread via GitHub


flinkbot commented on PR #25508:
URL: https://github.com/apache/flink/pull/25508#issuecomment-2410366766

   
   ## CI report:
   
   * 6e26d14c7081b78d3d21952760ff86ff3c65a896 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer [flink]

2024-10-14 Thread via GitHub


Zakelly opened a new pull request, #25508:
URL: https://github.com/apache/flink/pull/25508

   ## What is the purpose of the change
   
   Currently, the ForSt gives a direct buffer to 
`ByteBufferWritableFSDataOutputStream`, where the data will be written one byte 
by byte. According our perf, the statistics of hadoop based fs will be updated 
once for each byte, which takes a lot of CPU. Below is a flamegraph, where the 
statistics part is marked as purple (taking 8.14% of the overall CPU).
   
![image](https://github.com/user-attachments/assets/c60cd376-f63c-4bb2-8c05-7a98bc6235d6)
   This PR copies the data to a heap buffer before invoking `write` to optimize 
this.
   
   ## Brief change log
   
- Writing logic in `ByteBufferWritableFSDataOutputStream`
   
   
   ## Verifying this change
   
   This change is a trivial rework without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36526) Optimize the overhead of writing with direct buffer in ForSt

2024-10-14 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36526:
---
Labels: pull-request-available  (was: )

> Optimize the overhead of writing with direct buffer in ForSt 
> -
>
> Key: FLINK-36526
> URL: https://issues.apache.org/jira/browse/FLINK-36526
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2024-10-14-15-52-41-457.png
>
>
> Currently, the ForSt gives a direct buffer to 
> {{{}ByteBufferWritableFSDataOutputStream{}}}, where the data will be written 
> one byte by byte. According our perf, the statistics of hadoop based fs will 
> be updated once for each byte, which takes a lot of CPU. Below is a 
> flamegraph, where the statistics part is marked as purple (taking 8.14% of 
> the overall CPU).
> !image-2024-10-14-15-52-41-457.png|width=1296,height=616!
>  
> It might be better to copy to a heap buffer before invoking write.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]

2024-10-14 Thread via GitHub


fredia commented on code in PR #25501:
URL: https://github.com/apache/flink/pull/25501#discussion_r1798959007


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java:
##
@@ -244,6 +244,11 @@ void snapshotState(
 && ((AbstractKeyedStateBackend) 
keyedStateBackend)
 
.requiresLegacySynchronousTimerSnapshots(
 
checkpointOptions.getCheckpointType());
+requiresLegacyRawKeyedStateSnapshots |=
+keyedStateBackend instanceof AsyncKeyedStateBackend
+&& ((AsyncKeyedStateBackend) 
keyedStateBackend)
+
.requiresLegacySynchronousTimerSnapshots(

Review Comment:
   > How about remove 
AsyncKeyedStateBackend#requiresLegacySynchronousTimerSnapshots and do the 
following
   
   ForSt state backend may support storing timers in files in the future, so 
`requiresLegacySynchronousTimerSnapshots` will return false. I prefer to keep 
it as it is.
   
   > should subclasses like AsyncKeyedStateBackendAdaptor override this method?
   
   Good point, we can store timers in the form of  `delegated keyed state 
backend`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Chenyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenyu Zheng updated FLINK-36405:
-
External issue URL: https://github.com/apache/flink/pull/25428

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Ferenc Csaky
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36065][runtime] Support submit stream graph. [flink]

2024-10-14 Thread via GitHub


JunRuiLee commented on PR #25472:
URL: https://github.com/apache/flink/pull/25472#issuecomment-2410911107

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Chenyu Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889182#comment-17889182
 ] 

Chenyu Zheng commented on FLINK-36405:
--

[~fcsaky] Hi, I have submit pr https://github.com/apache/flink/pull/25428. I 
don't know why not link to this.

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Ferenc Csaky
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Chenyu Zheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenyu Zheng updated FLINK-36405:
-
External issue URL:   (was: https://github.com/apache/flink/pull/25428)

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Ferenc Csaky
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36523][Connectors/DynamoDB][BugFix] Fixing LATEST behaviour for DDB Connector [flink-connector-aws]

2024-10-14 Thread via GitHub


hlteoh37 commented on code in PR #177:
URL: 
https://github.com/apache/flink-connector-aws/pull/177#discussion_r1799315333


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##
@@ -89,13 +91,77 @@ public SplitTracker(
  * @param shardsToAdd collection of splits to add to tracking
  */
 public void addSplits(Collection shardsToAdd) {
-Set discoveredShardIds =
-
shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet());
+if (TRIM_HORIZON.equals(initialPosition)) {
+addSplitsForTrimHorizon(shardsToAdd);
+return;
+}
+addSplitsForLatest(shardsToAdd);
+}
+
+/**
+ * If there is an ancestor of an open shard in knownSplits, we put all of 
its ancestors after
+ * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the 
ancestors and track only
+ * the open shard with LATEST. There might be a case when restoring from 
expired snapshots that
+ * there is an ancestor in knownSplits, but we wont know that it is an 
ancestor, so in that case
+ * also, all the ancestors will not be tracked and rather only the current 
open shard will be
+ * tracked with LATEST.

Review Comment:
   We should align that the expected behavior, when using `LATEST`, should be:
   
   1. When starting without snapshot,
  - All shards read from `LATEST`. If we want, we can focus on just the 
open shards.
   2. When reading with snapshot,
  - All shards from time of start of application are read from 
`TRIM_HORIZON`, 



##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##
@@ -89,13 +91,77 @@ public SplitTracker(
  * @param shardsToAdd collection of splits to add to tracking
  */
 public void addSplits(Collection shardsToAdd) {
-Set discoveredShardIds =
-
shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet());
+if (TRIM_HORIZON.equals(initialPosition)) {
+addSplitsForTrimHorizon(shardsToAdd);
+return;
+}
+addSplitsForLatest(shardsToAdd);
+}
+
+/**
+ * If there is an ancestor of an open shard in knownSplits, we put all of 
its ancestors after
+ * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the 
ancestors and track only
+ * the open shard with LATEST. There might be a case when restoring from 
expired snapshots that
+ * there is an ancestor in knownSplits, but we wont know that it is an 
ancestor, so in that case
+ * also, all the ancestors will not be tracked and rather only the current 
open shard will be
+ * tracked with LATEST.

Review Comment:
   We should align that the expected behavior, when using `LATEST`, should be:
   
   1. When starting without snapshot:
  - All shards read from `LATEST`. If we want, we can focus on just the 
open shards.
   2. When reading with snapshot:
  - All shards from time of start of application are read from 
`TRIM_HORIZON`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36523][Connectors/DynamoDB][BugFix] Fixing LATEST behaviour for DDB Connector [flink-connector-aws]

2024-10-14 Thread via GitHub


hlteoh37 commented on code in PR #177:
URL: 
https://github.com/apache/flink-connector-aws/pull/177#discussion_r1799315333


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##
@@ -89,13 +91,77 @@ public SplitTracker(
  * @param shardsToAdd collection of splits to add to tracking
  */
 public void addSplits(Collection shardsToAdd) {
-Set discoveredShardIds =
-
shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet());
+if (TRIM_HORIZON.equals(initialPosition)) {
+addSplitsForTrimHorizon(shardsToAdd);
+return;
+}
+addSplitsForLatest(shardsToAdd);
+}
+
+/**
+ * If there is an ancestor of an open shard in knownSplits, we put all of 
its ancestors after
+ * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the 
ancestors and track only
+ * the open shard with LATEST. There might be a case when restoring from 
expired snapshots that
+ * there is an ancestor in knownSplits, but we wont know that it is an 
ancestor, so in that case
+ * also, all the ancestors will not be tracked and rather only the current 
open shard will be
+ * tracked with LATEST.

Review Comment:
   We should align that the expected behavior, when using `LATEST`, should be:
   
   1. When starting without snapshot:
  - All shards from start of application read from `LATEST`. If we want, we 
can focus on just the open shards.
  - All child shards are read from `TRIM_HORIZON`
   2. When reading with snapshot:
  - All shards from time of start of application are read from 
`TRIM_HORIZON`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Ferenc Csaky (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ferenc Csaky reassigned FLINK-36405:


Assignee: Chenyu Zheng  (was: Ferenc Csaky)

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Chenyu Zheng
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Ferenc Csaky (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889184#comment-17889184
 ] 

Ferenc Csaky commented on FLINK-36405:
--

Ah very nice, assigned it to you! Can you pls. rebase your PR to the current 
master pls?

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Chenyu Zheng
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Ferenc Csaky (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889184#comment-17889184
 ] 

Ferenc Csaky edited comment on FLINK-36405 at 10/14/24 11:32 AM:
-

Ah very nice, assigned it to you! Can you pls. rebase your PR to the current 
master?


was (Author: JIRAUSER306586):
Ah very nice, assigned it to you! Can you pls. rebase your PR to the current 
master pls?

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Chenyu Zheng
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Ferenc Csaky (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889184#comment-17889184
 ] 

Ferenc Csaky edited comment on FLINK-36405 at 10/14/24 11:33 AM:
-

-Ah very nice, assigned it to you! Can you pls. rebase your PR to the current 
master?-

Actually, right now the Hive connector is not compiled on master because of 
JDK17 issues, so I think it would make more sense for now to point the PR 
against the {{release-1.20}} branch


was (Author: JIRAUSER306586):
Ah very nice, assigned it to you! Can you pls. rebase your PR to the current 
master?

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Chenyu Zheng
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-36405) Fix startup issues on kerberos clusters

2024-10-14 Thread Ferenc Csaky (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889184#comment-17889184
 ] 

Ferenc Csaky edited comment on FLINK-36405 at 10/14/24 11:33 AM:
-

Ah very nice, assigned it to you! -Can you pls. rebase your PR to the current 
master?-

Actually, right now the Hive connector is not compiled on master because of 
JDK17 issues, so I think it would make more sense for now to point the PR 
against the {{release-1.20}} branch


was (Author: JIRAUSER306586):
-Ah very nice, assigned it to you! Can you pls. rebase your PR to the current 
master?-

Actually, right now the Hive connector is not compiled on master because of 
JDK17 issues, so I think it would make more sense for now to point the PR 
against the {{release-1.20}} branch

> Fix startup issues on kerberos clusters
> ---
>
> Key: FLINK-36405
> URL: https://issues.apache.org/jira/browse/FLINK-36405
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Runtime
>Affects Versions: 1.20.0, 1.19.1
>Reporter: Chenyu Zheng
>Assignee: Chenyu Zheng
>Priority: Major
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Fount these error when startup on kerberos cluster.
> Error 1: renew user dismatched
> {code:java}
> Caused by: org.apache.flink.util.FlinkRuntimeException: 
> MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN 
> owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, 
> issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, 
> masterKeyId=7) with non-matching renewer hive)
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] 
> ...{code}
> The cause of the problem is that the renewer is set to the value of 
> `hive.metastore.kerberos.principal`, which is generally the startup user of 
> hive metastore. However, when renewing DelegationToken, will use the startup 
> user of flink. This will cause the renewer to be mismatched.
>  
> Error2: HIVE_DELEGATION_TOKEN is not in service list
>  
> {code:java}
> 2024-09-26 14:35:07,144 ERROR 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - 
> Failed to obtain delegation token for HiveServer2
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
>     at 
> org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170)
>  ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1]
> ...{code}
> The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36476) Remove all deprecated methods under public APIs in table modules

2024-10-14 Thread corgy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889109#comment-17889109
 ] 

corgy commented on FLINK-36476:
---

Hi [~xuyangzhong] , can you assign me a few subtasks? I want to start my 
contribution journey through this simple task.

> Remove all deprecated methods under public APIs in table modules
> 
>
> Key: FLINK-36476
> URL: https://issues.apache.org/jira/browse/FLINK-36476
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Reporter: xuyang
>Priority: Blocker
> Fix For: 2.0.0
>
>
> All methods needed to be removed are:
> https://docs.google.com/document/d/1bVrmcB9UFOd1-sT7xDRTMb5rmO0vK0eDlLRUYYL9gd4/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-33977][runtime] Adaptive scheduler may not minimize the number of TMs during downscaling [flink]

2024-10-14 Thread via GitHub


1996fanrui commented on PR #25218:
URL: https://github.com/apache/flink/pull/25218#issuecomment-2410276176

   > > May I know that only session mode needs to spreads the workload across 
as many workers as possible, right?
   > 
   > Yes, in session mode.
   
   @ztison @XComp Thanks for the detailed discussion and feedback, I have no 
any question for now.
   
   I think your suggestions make sense. We may need a FLIP to make this feature 
applicable to more scenarios.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]

2024-10-14 Thread via GitHub


dannycranmer commented on PR #757:
URL: https://github.com/apache/flink-web/pull/757#issuecomment-2410603904

   @AHeise we do not usually include the "rebuild website" commit in this PR, 
this is usually done as a follow up, without PR. Usually the date in the PR is 
a placeholder and it needs to change later anyway


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]

2024-10-14 Thread via GitHub


dannycranmer commented on PR #757:
URL: https://github.com/apache/flink-web/pull/757#issuecomment-2410608441

   You are missing a change in the release_archive, see here for example 
https://github.com/apache/flink-web/pull/737/files


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36446) Refactor Job Submission Process to Use ExecutionPlan Instead of JobGraph

2024-10-14 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889156#comment-17889156
 ] 

david radley commented on FLINK-36446:
--

[~JunRuiLi] thanks for sharing the background of this fix - that makes sense.

> Refactor Job Submission Process to Use ExecutionPlan Instead of JobGraph
> 
>
> Key: FLINK-36446
> URL: https://issues.apache.org/jira/browse/FLINK-36446
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Coordination
>Reporter: Junrui Li
>Assignee: Junrui Li
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Refactor the job submission process to submit an {{ExecutionPlan}} instead of 
> a {{{}JobGraph{}}}.
> Since {{JobGraph}} implements the {{ExecutionPlan}} interface, this change 
> will not impact the existing submission process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (FLINK-36522) Bump mysql-connector-j version

2024-10-14 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-36522.
-
Resolution: Fixed

Merged to main(1.10.0) via: 6c73673811e1207350bd7f589ff5998e2caf97b9

> Bump mysql-connector-j version
> --
>
> Key: FLINK-36522
> URL: https://issues.apache.org/jira/browse/FLINK-36522
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.10.0
>Reporter: Kartik Goyal
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.10.0
>
>
> Bump the mysql-connector-j version to remediate vulnerability associated with 
> this package.
> Package info:
> [https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33] 
> Vulnerability info:
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-22102]
> Proposed change solution:
> Bump the version from 8.0.33 to 8.4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36522) Bump mysql-connector-j version

2024-10-14 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-36522:
---

Assignee: Kartik Goyal

> Bump mysql-connector-j version
> --
>
> Key: FLINK-36522
> URL: https://issues.apache.org/jira/browse/FLINK-36522
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.10.0
>Reporter: Kartik Goyal
>Assignee: Kartik Goyal
>Priority: Major
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.10.0
>
>
> Bump the mysql-connector-j version to remediate vulnerability associated with 
> this package.
> Package info:
> [https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33] 
> Vulnerability info:
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-22102]
> Proposed change solution:
> Bump the version from 8.0.33 to 8.4.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36522] [Kubernetes-operator] Bump mysql-connector-java from 8.0.33 to mysql-connecto… [flink-kubernetes-operator]

2024-10-14 Thread via GitHub


1996fanrui merged PR #898:
URL: https://github.com/apache/flink-kubernetes-operator/pull/898


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34467] bump flink version to 1.20.0 [flink-connector-kafka]

2024-10-14 Thread via GitHub


boring-cyborg[bot] commented on PR #111:
URL: 
https://github.com/apache/flink-connector-kafka/pull/111#issuecomment-2410480510

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-34467] bump flink version to 1.20.0 [flink-connector-kafka]

2024-10-14 Thread via GitHub


AHeise merged PR #111:
URL: https://github.com/apache/flink-connector-kafka/pull/111


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36523][Connectors/DynamoDB][BugFix] Fixing LATEST behaviour for DDB Connector [flink-connector-aws]

2024-10-14 Thread via GitHub


hlteoh37 commented on code in PR #177:
URL: 
https://github.com/apache/flink-connector-aws/pull/177#discussion_r1799008421


##
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java:
##
@@ -89,13 +91,77 @@ public SplitTracker(
  * @param shardsToAdd collection of splits to add to tracking
  */
 public void addSplits(Collection shardsToAdd) {
-Set discoveredShardIds =
-
shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet());
+if (TRIM_HORIZON.equals(initialPosition)) {
+addSplitsForTrimHorizon(shardsToAdd);
+return;
+}
+addSplitsForLatest(shardsToAdd);
+}
+
+/**
+ * If there is an ancestor of an open shard in knownSplits, we put all of 
its ancestors after
+ * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the 
ancestors and track only
+ * the open shard with LATEST. There might be a case when restoring from 
expired snapshots that
+ * there is an ancestor in knownSplits, but we wont know that it is an 
ancestor, so in that case
+ * also, all the ancestors will not be tracked and rather only the current 
open shard will be
+ * tracked with LATEST.

Review Comment:
   I don't think this is correct. When restoring from snapshot, we should read 
all records since the restored snapshot (`TRIM_HORIZON`), and not `LATEST`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-34467) Implement Lineage Interface in Jdbc Connector

2024-10-14 Thread Arvid Heise (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889135#comment-17889135
 ] 

Arvid Heise commented on FLINK-34467:
-

Merged version bump into main as 2dfdae6e8e44828a9c2b6db833348b65d9379b43.

> Implement Lineage Interface in Jdbc Connector
> -
>
> Key: FLINK-34467
> URL: https://issues.apache.org/jira/browse/FLINK-34467
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / JDBC
>Affects Versions: 1.19.0
>Reporter: Zhenqiu Huang
>Assignee: Zhenqiu Huang
>Priority: Minor
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]

2024-10-14 Thread via GitHub


AHeise closed pull request #757: Upgrade Kafka connector to 3.3.0
URL: https://github.com/apache/flink-web/pull/757


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36474] Support merging timestamp columns when routing [flink-cdc]

2024-10-14 Thread via GitHub


lvyanquan commented on code in PR #3636:
URL: https://github.com/apache/flink-cdc/pull/3636#discussion_r1798837548


##
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java:
##
@@ -343,23 +348,14 @@ private List handleCreateTableEvent(
 return schemaChangeEvents;
 }
 
-private DataType getWiderType(DataType thisType, DataType thatType) {
-if (thisType.equals(thatType)) {
-return thisType;
-}
-if (thisType.is(DataTypeFamily.INTEGER_NUMERIC)
-&& thatType.is(DataTypeFamily.INTEGER_NUMERIC)) {
-return DataTypes.BIGINT();
-}
-if (thisType.is(DataTypeFamily.CHARACTER_STRING)
-&& thatType.is(DataTypeFamily.CHARACTER_STRING)) {
-return DataTypes.STRING();
-}
-if (thisType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
-&& thatType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
-return DataTypes.DOUBLE();
+private DataType getWiderType(String columnName, DataType thisType, 
DataType thatType) {
+try {
+return SchemaUtils.inferWiderType(thisType, thatType);
+} catch (IllegalStateException e) {
+throw new IllegalStateException(
+String.format(
+"Incompatible types found for column `%s': \"%s\" 
and \"%s\"",

Review Comment:
   Incompatible types found for column `%s`: \"%s\" and \"%s\"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36466][runtime] Change default value of execution.runtime-mode from STREAMING to AUTOMATIC [flink]

2024-10-14 Thread via GitHub


reswqa closed pull request #25505: [FLINK-36466][runtime] Change default value 
of execution.runtime-mode from STREAMING to AUTOMATIC
URL: https://github.com/apache/flink/pull/25505


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36526) Optimize the overhead of writing with direct buffer in ForSt

2024-10-14 Thread Zakelly Lan (Jira)
Zakelly Lan created FLINK-36526:
---

 Summary: Optimize the overhead of writing with direct buffer in 
ForSt 
 Key: FLINK-36526
 URL: https://issues.apache.org/jira/browse/FLINK-36526
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / State Backends
Reporter: Zakelly Lan
Assignee: Zakelly Lan
 Attachments: image-2024-10-14-15-52-41-457.png

Currently, the ForSt gives a direct buffer to 
\{{ByteBufferWritableFSDataOutputStream}}, where the data will be written one 
byte by byte. According our perf, the statistics of hadoop based fs will be 
updated once for each byte, which takes a lot of CPU.

!image-2024-10-14-15-52-41-457.png!

 

It might be better to copy to a heap buffer before invoking write.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36527) Introduce a parameter to support the autoscaler to adopt a more aggressive strategy when Source or upstream shuffle is keyBy

2024-10-14 Thread yuanfenghu (Jira)
yuanfenghu created FLINK-36527:
--

 Summary: Introduce a parameter to support the autoscaler to adopt 
a more aggressive strategy when Source or upstream shuffle is keyBy
 Key: FLINK-36527
 URL: https://issues.apache.org/jira/browse/FLINK-36527
 Project: Flink
  Issue Type: Improvement
  Components: Autoscaler
Reporter: yuanfenghu


In 
[https://issues.apache.org/jira/browse/FLINK-36192|https://issues.apache.org/jira/browse/FLINK-36192]
We have completed the optimization of Souce parallelism determination.

Task Source or upstream shuffle is keyBy will be adjusted to be a divisor of 
the number of partitions or the maximum degree of parallelism.
This Jira hopes to introduce a parameter to enable a {color:#de350b}more 
aggressive{color} strategy : 
https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1764419949



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36526) Optimize the overhead of writing with direct buffer in ForSt

2024-10-14 Thread Zakelly Lan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zakelly Lan updated FLINK-36526:

Description: 
Currently, the ForSt gives a direct buffer to 
{{{}ByteBufferWritableFSDataOutputStream{}}}, where the data will be written 
one byte by byte. According our perf, the statistics of hadoop based fs will be 
updated once for each byte, which takes a lot of CPU. Below is a flamegraph, 
where the statistics part is marked as purple (taking 8.14% of the overall CPU).

!image-2024-10-14-15-52-41-457.png|width=1296,height=616!

 

It might be better to copy to a heap buffer before invoking write.

  was:
Currently, the ForSt gives a direct buffer to 
\{{ByteBufferWritableFSDataOutputStream}}, where the data will be written one 
byte by byte. According our perf, the statistics of hadoop based fs will be 
updated once for each byte, which takes a lot of CPU.

!image-2024-10-14-15-52-41-457.png!

 

It might be better to copy to a heap buffer before invoking write.


> Optimize the overhead of writing with direct buffer in ForSt 
> -
>
> Key: FLINK-36526
> URL: https://issues.apache.org/jira/browse/FLINK-36526
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / State Backends
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
> Attachments: image-2024-10-14-15-52-41-457.png
>
>
> Currently, the ForSt gives a direct buffer to 
> {{{}ByteBufferWritableFSDataOutputStream{}}}, where the data will be written 
> one byte by byte. According our perf, the statistics of hadoop based fs will 
> be updated once for each byte, which takes a lot of CPU. Below is a 
> flamegraph, where the statistics part is marked as purple (taking 8.14% of 
> the overall CPU).
> !image-2024-10-14-15-52-41-457.png|width=1296,height=616!
>  
> It might be better to copy to a heap buffer before invoking write.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36377) Support the use of the LAST_VALUE aggregate function on ROW type data

2024-10-14 Thread Yang Li (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889120#comment-17889120
 ] 

Yang Li commented on FLINK-36377:
-

I'm glad to hear that! Yes, other complex types like {{{}Map{}}}, 
{{{}Array{}}}, and even {{Raw}} types can also have similar issues, and I am 
adding some unit test to verify the correctness. If possible, this feature can 
be assgined to me and I will continue to follow up on it.

>  Support the use of the LAST_VALUE aggregate function on ROW type data
> --
>
> Key: FLINK-36377
> URL: https://issues.apache.org/jira/browse/FLINK-36377
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / State Backends
>Reporter: Yang Li
>Priority: Major
>
> h2. Introduction
> In Flink, after applying a group by, users may use LAST_VALUE to process 
> certain fields to ensure that all fields have corresponding aggregation 
> functions. Currently, LAST_VALUE does not support the ROW type syntax, so 
> users always apply the LAST_VALUE function to each individual field 
> separately, as shown below.
> SELECT
>     LAST_VALUE(bool_a) AS last_bool_a, 
>     LAST_VALUE(int2_b) AS last_int2_b, 
>     LAST_VALUE(int4_c) AS last_int4_c, 
>     LAST_VALUE(int8_d) AS last_int8_d, 
>     LAST_VALUE(float4_e) AS last_float4_e, 
>     LAST_VALUE(float4_f) AS last_float4_f, 
>     LAST_VALUE(numeric_g) AS last_numeric_g, 
>     LAST_VALUE(text_m) AS last_text_m, 
>     LAST_VALUE(varchar_p) AS last_varchar_p,
>     date_h
> FROM source_table
> GROUP BY date_h
>  
> If the upstream operator is a retract stream, this approach will lead to 
> redundant StateMap traversal. To facilitate retraction, Flink's internal{{ 
> LastValueWithRetractAggFunction}} will store all historical data related to 
> the primary key. When the last value is deleted, it will traverse all keys in 
> the{{ orderToValue}} (which maps timestamps to data) and this {{MapView}} is 
> stored in the form of {{{}StateMap{}}}. More {{LAST_VALUE}} functions leads 
> to more times the read and write operations of RocksDB. Therefore, I advocate 
> for handling {{ROW}} types with {{{}LAST_VALUE{}}}, allowing support for all 
> fields with just one {{LAST_VALUE}} function as below.
> SELECT
>  LAST_VALUE(
>     ROW(
>     bool_a,
>         int2_b,
>         int4_c,
>         int8_d,
>         float4_e,
>         float4_f,
>         numeric_g,
>         text_m,
>         varchar_p
>     )
> ) AS row_data,
> date_h
> FROM source_table
> GROUP BY date_h
> The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}} 
> function can improve the processing speed for retract streams, but has no 
> effect on append-only streams.
> h2. Evaluation:
> The throughput of jobs was compared based on whether the {{ROW}} type was 
> used in the {{LAST_VALUE}} function, considering both retract and append-only 
> scenarios.
> h3. Retraction
> Use a deduplication operator to convert the append-only stream generated by 
> datagen into a retract stream.
> Two jobs show little difference in throughput (Row 4817: Mean 1808).
> Through flame graph analysis, applying the ROW type to the LAST_VALUE
> function reduces the consumption of the aggregate function calls to 
> accumulate,
> with CPU usage for accumulate being (ROW 20.02%: Separated 66.98%).
> LastValueWithRetractAccumulator uses MapState storage MapView.
> Therefore, updating the LastValueWithRetractAccumulator requires reading from 
> or writing to RocksDB.
> h3. AppendOnly
> Two jobs show little difference in throughput (Row 13411: Mean 10673). 
> Further examination of the flame graphs for both processes reveals that the 
> bottleneck
> for both jobs lies in getting {{RocksDBValueState}} which is called by 
> {{{}GroupFunction{}}}.
> Using {{ROW}} aggregation does not yield significant optimization in this 
> part. I suspect it's
> because Flink uses RowData to store data from multiple Accumulators, and 
> every time
> the {{accState}} invokes the {{value}} method, it reads all the Accumulators 
> at the same time.
> Therefore, the use of ROW optimization might not be very effective.
> h2. Conclusion
>  # Using ROW type for LAST_VALUE Aggregation can improve the processing speed 
> for retract streams, with effectiveness proportional to the number of fields 
> contained in the {{{}ROW{}}}.
>  # Using ROW type for LAST_VALUE Aggregation results in limited improvements 
> , as the optimization effect on state backend read speed is not significant.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer [flink]

2024-10-14 Thread via GitHub


fredia commented on code in PR #25508:
URL: https://github.com/apache/flink/pull/25508#discussion_r1798973369


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java:
##
@@ -58,6 +69,22 @@ public void write(ByteBuffer bb) throws IOException {
 }
 if (bb.hasArray()) {
 write(bb.array(), bb.arrayOffset() + bb.position(), 
bb.remaining());
+} else if (bb.isDirect()) {
+int len = bb.remaining();
+int segment = Math.min(len, SEGMENT_BUFFER_SIZE);
+byte[] bytes = new byte[segment];
+for (int i = 0; i < len;) {
+int copy = Math.min(segment, bb.remaining());
+UNSAFE.copyMemory(
+null,

Review Comment:
   Why `null` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer [flink]

2024-10-14 Thread via GitHub


fredia commented on code in PR #25508:
URL: https://github.com/apache/flink/pull/25508#discussion_r1798978105


##
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java:
##
@@ -58,6 +69,22 @@ public void write(ByteBuffer bb) throws IOException {
 }
 if (bb.hasArray()) {
 write(bb.array(), bb.arrayOffset() + bb.position(), 
bb.remaining());
+} else if (bb.isDirect()) {
+int len = bb.remaining();
+int segment = Math.min(len, SEGMENT_BUFFER_SIZE);
+byte[] bytes = new byte[segment];
+for (int i = 0; i < len;) {
+int copy = Math.min(segment, bb.remaining());
+UNSAFE.copyMemory(
+null,

Review Comment:
   Oh, I see, because the `MemoryUtils.getByteBufferAddress(bb)` is an address.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36527) Introduce a parameter to support the autoscaler to adopt a more aggressive strategy when Source or upstream shuffle is keyBy

2024-10-14 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan reassigned FLINK-36527:
---

Assignee: yuanfenghu

> Introduce a parameter to support the autoscaler to adopt a more aggressive 
> strategy when Source or upstream shuffle is keyBy
> 
>
> Key: FLINK-36527
> URL: https://issues.apache.org/jira/browse/FLINK-36527
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: yuanfenghu
>Assignee: yuanfenghu
>Priority: Minor
>
> In 
> [https://issues.apache.org/jira/browse/FLINK-36192|https://issues.apache.org/jira/browse/FLINK-36192]
> We have completed the optimization of Souce parallelism determination.
> Task Source or upstream shuffle is keyBy will be adjusted to be a divisor of 
> the number of partitions or the maximum degree of parallelism.
> This Jira hopes to introduce a parameter to enable a {color:#de350b}more 
> aggressive{color} strategy : 
> https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1764419949



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >