Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
nateab commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1800019486 ## flink-table/flink-sql-client/src/test/resources/sql/function.q: ## @@ -346,3 +346,159 @@ show user functions; SHOW JARS; Empty set !ok + +# == +# test describe function +# == + +ADD JAR '$VAR_UDF_JAR_PATH'; +[INFO] Execute statement succeeded. +!info + +describe function `SUM`; ++-++ +| info name | info value | ++-++ +| system function | true | +| temporary | false | ++-++ +2 rows in set +!ok + +describe function extended `SUM`; ++--++ +|info name | info value | ++--++ +| system function | true | +|temporary | false | +| kind | AGGREGATE | +| requirements | [] | +|deterministic | true | +| constant folding | true | +|signature | SUM() | ++--++ +7 rows in set +!ok + +describe function temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | ++---+-$VAR_UDF_JAR_PATH_DASH+ +| system function | $VAR_UDF_JAR_PATH_SPACE false | +| temporary | $VAR_UDF_JAR_PATH_SPACE true | +|class name | $VAR_UDF_JAR_PATH_SPACE UpperUDF | +| function language | $VAR_UDF_JAR_PATH_SPACE JAVA | +| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] | ++---+-$VAR_UDF_JAR_PATH_DASH+ +5 rows in set +!ok + +describe function extended temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | ++---+-$VAR_UDF_JAR_PATH_DASH+ +| system function | $VAR_UDF_JAR_PATH_SPACE false | +| temporary | $VAR_UDF_JAR_PATH_SPACE true | +|class name | $VAR_UDF_JAR_PATH_SPACE UpperUDF | +| function language | $VAR_UDF_JAR_PATH_SPACE JAVA | +| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] | +| kind | $VAR_UDF_JAR_PATH_SPACE SCALAR | +| requirements | $VAR_UDF_JAR_PATH_SPACE [] | +| deterministic | $VAR_UDF_JAR_PATH_SPACE true | +| constant folding | $VAR_UDF_JAR_PATH_SPACE true | +| signature |$VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(STRING) | ++---+-$VAR_UDF_JAR_PATH_DASH+ +10 rows in set +!ok + +desc function temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | ++---+-$VAR_UDF_JAR_PATH_DASH+ +| system function | $VAR_UDF_JAR_PATH_SPACE false | +| temporary | $VAR_UDF_JAR_PATH_SPACE true | +|class name | $VAR_UDF_JAR_PATH_SPACE UpperUDF | +| function language | $VAR_UDF_JAR_PATH_SPACE JAVA | +| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] | ++---+-$VAR_UDF_JAR_PATH_DASH+ +5 rows in set +!ok + +desc function extended temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | Review Comment: This would deviate from what `DESCRIBE CATALOG` currently does, are you okay with that? Also see my prior comment on the naming here https://github.com/apache/flink/pull/25115#discussion_r1706084728 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e
[jira] [Created] (FLINK-36532) Kafka group offsets are not committed on stop with savepoint
LEONID ILYEVSKY created FLINK-36532: --- Summary: Kafka group offsets are not committed on stop with savepoint Key: FLINK-36532 URL: https://issues.apache.org/jira/browse/FLINK-36532 Project: Flink Issue Type: Bug Components: Connectors / Kafka Affects Versions: 1.18.1 Reporter: LEONID ILYEVSKY In the pipeline with kafka source, with checkpointing enabled, on every checkpoint kafka source will commit the group offsets to the broker. However, when the job is stopped with savepoint, it does not commit those offsets. I verified this behavior in versions 1.16.1 and 1.18.1. In some situations, committing group offsets upon the job stop is very important. The best example is, when program code changes and the savepoint from the previous version is not compatible with the new code. If we cannot use the savepoint for the initial offsets, then we could use the group offsets, so it is critical to have them committed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
nateab commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1800063746 ## flink-table/flink-sql-client/src/test/resources/sql/function.q: ## @@ -346,3 +346,159 @@ show user functions; SHOW JARS; Empty set !ok + +# == +# test describe function +# == + +ADD JAR '$VAR_UDF_JAR_PATH'; +[INFO] Execute statement succeeded. +!info + +describe function `SUM`; ++-++ +| info name | info value | ++-++ +| system function | true | +| temporary | false | ++-++ +2 rows in set +!ok + +describe function extended `SUM`; ++--++ +|info name | info value | ++--++ +| system function | true | +|temporary | false | +| kind | AGGREGATE | +| requirements | [] | +|deterministic | true | +| constant folding | true | +|signature | SUM() | ++--++ +7 rows in set +!ok + +describe function temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | ++---+-$VAR_UDF_JAR_PATH_DASH+ +| system function | $VAR_UDF_JAR_PATH_SPACE false | +| temporary | $VAR_UDF_JAR_PATH_SPACE true | +|class name | $VAR_UDF_JAR_PATH_SPACE UpperUDF | +| function language | $VAR_UDF_JAR_PATH_SPACE JAVA | +| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] | ++---+-$VAR_UDF_JAR_PATH_DASH+ +5 rows in set +!ok + +describe function extended temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | ++---+-$VAR_UDF_JAR_PATH_DASH+ +| system function | $VAR_UDF_JAR_PATH_SPACE false | +| temporary | $VAR_UDF_JAR_PATH_SPACE true | +|class name | $VAR_UDF_JAR_PATH_SPACE UpperUDF | +| function language | $VAR_UDF_JAR_PATH_SPACE JAVA | +| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] | +| kind | $VAR_UDF_JAR_PATH_SPACE SCALAR | +| requirements | $VAR_UDF_JAR_PATH_SPACE [] | +| deterministic | $VAR_UDF_JAR_PATH_SPACE true | +| constant folding | $VAR_UDF_JAR_PATH_SPACE true | +| signature |$VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(STRING) | ++---+-$VAR_UDF_JAR_PATH_DASH+ +10 rows in set +!ok + +desc function temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | ++---+-$VAR_UDF_JAR_PATH_DASH+ +| system function | $VAR_UDF_JAR_PATH_SPACE false | +| temporary | $VAR_UDF_JAR_PATH_SPACE true | +|class name | $VAR_UDF_JAR_PATH_SPACE UpperUDF | +| function language | $VAR_UDF_JAR_PATH_SPACE JAVA | +| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] | ++---+-$VAR_UDF_JAR_PATH_DASH+ +5 rows in set +!ok + +desc function extended temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | ++---+-$VAR_UDF_JAR_PATH_DASH+ +| system function | $VAR_UDF_JAR_PATH_SPACE false | Review Comment: is that necessary? we don't do that for `DESCRIBE CATALOG` for example. I tried to order the rows in a way that I thought was logical -- This is an automated message from the Apache Git Service. To res
Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
nateab commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1800150565 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.ContextResolvedFunction; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildTableResult; +import static org.apache.flink.table.types.inference.TypeInferenceUtil.generateSignature; + +/** + * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier + * statement. + */ +@Internal +public class DescribeFunctionOperation implements Operation, ExecutableOperation { + +private final UnresolvedIdentifier sqlIdentifier; +private final boolean isExtended; + +public DescribeFunctionOperation(UnresolvedIdentifier sqlIdentifier, boolean isExtended) { +this.sqlIdentifier = sqlIdentifier; +this.isExtended = isExtended; +} + +public UnresolvedIdentifier getSqlIdentifier() { +return sqlIdentifier; +} + +public boolean isExtended() { +return isExtended; +} + +@Override +public String asSummaryString() { +Map params = new LinkedHashMap<>(); +params.put("identifier", sqlIdentifier); +params.put("isExtended", isExtended); +return OperationUtils.formatWithChildren( +"DESCRIBE FUNCTION", params, Collections.emptyList(), Operation::asSummaryString); +} + +@Override +public TableResultInternal execute(Context ctx) { +// DESCRIBE FUNCTION shows all the function properties. +Optional functionOpt = +ctx.getFunctionCatalog().lookupFunction(sqlIdentifier); +if (!functionOpt.isPresent()) { +throw new ValidationException( +String.format( +"Function with the identifier '%s' doesn't exist.", +sqlIdentifier.asSummaryString())); +} +final ContextResolvedFunction function = functionOpt.get(); +final CatalogFunction catalogFunction = function.getCatalogFunction(); + +List> rows = new ArrayList<>(); +rows.add(Arrays.asList("system function", String.valueOf(catalogFunction == null))); +rows.add(Arrays.asList("temporary", String.valueOf(function.isTemporary(; +if (catalogFunction != null) { +rows.add(Arrays.asList("class name", catalogFunction.getClassName())); +rows.add( +Arrays.asList( +"function language", catalogFunction.getFunctionLanguage().toString())); +rows.add( +Arrays.asList( +"resource uris", catalogFunction.getFunctionResources().toString())); +} + +if (isExtended) { +final FunctionDefinition definition = function.getDefinition(); +rows.add(Arrays.asList("kind", definition.getKind().toString())); +rows.add(Arrays.asList("requirements", definition.getRequirements().toString())); +rows.add(Arrays.asList("deterministic", String.valueOf(definition.isDeterministic(; Review Comment: added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: i
Re: [PR] [FLINK-35016] catalog changes for model resource [flink]
lihaosky commented on PR #25036: URL: https://github.com/apache/flink/pull/25036#issuecomment-2412469319 Close in favor of https://github.com/apache/flink/pull/25211 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35016] catalog changes for model resource [flink]
lihaosky closed pull request #25036: [FLINK-35016] catalog changes for model resource URL: https://github.com/apache/flink/pull/25036 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
nateab commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1800082399 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/DescribeFunctionOperation.java: ## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.ContextResolvedFunction; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.functions.FunctionDefinition; +import org.apache.flink.table.types.DataType; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.table.api.internal.TableResultUtils.buildTableResult; +import static org.apache.flink.table.types.inference.TypeInferenceUtil.generateSignature; + +/** + * Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier + * statement. Review Comment: added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35016] catalog changes for model resource [flink]
lihaosky commented on code in PR #25211: URL: https://github.com/apache/flink/pull/25211#discussion_r1800369200 ## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogModel.java: ## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.api.Schema; + +import javax.annotation.Nullable; + +import java.util.Map; + +/** Interface for a model in a catalog. */ +@PublicEvolving +public interface CatalogModel { +/** Returns a map of string-based model options. */ +Map getOptions(); + +/** + * Get the unresolved input schema of the model. + * + * @return unresolved input schema of the model. + */ +Schema getInputSchema(); Review Comment: Cannot be null. Added null check -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36533) An exception occurs when using the taskmanager.data.bind-port
five created FLINK-36533: Summary: An exception occurs when using the taskmanager.data.bind-port Key: FLINK-36533 URL: https://issues.apache.org/jira/browse/FLINK-36533 Project: Flink Issue Type: Bug Reporter: five Attachments: exception.png, source_code.png An error occurred when using a port range for the {{taskmanager.data.bind-port}} configuration item. Environment Information: * Flink version: 1.20.0 * Java: 1.8 Configuration Information: * taskmanager.data.bind-port: 12001-13000 When starting the Flink job, an exception occurs. Please see the attached file for the exception message: exception.png The error received is a NativeIoException. However, the source code is set up to catch a BindException. (Refer to the attached file: source_code.png) The two do not match. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36517][cdc-connect][paimon] use filterAndCommit API for Avoid commit the same datafile duplicate [flink-cdc]
beryllw commented on code in PR #3639: URL: https://github.com/apache/flink-cdc/pull/3639#discussion_r1797628800 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java: ## @@ -60,28 +62,12 @@ public void commit(Collection> commitReques .collect(Collectors.toList()); // All CommitRequest shared the same checkpointId. long checkpointId = committables.get(0).checkpointId(); -int retriedNumber = commitRequests.stream().findFirst().get().getNumberOfRetries(); WrappedManifestCommittable wrappedManifestCommittable = storeMultiCommitter.combine(checkpointId, 1L, committables); -try { -if (retriedNumber > 0) { -storeMultiCommitter.filterAndCommit( -Collections.singletonList(wrappedManifestCommittable)); -} else { - storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable)); -} -commitRequests.forEach(CommitRequest::signalAlreadyCommitted); -LOGGER.info( -String.format( -"Commit succeeded for %s with %s committable", -checkpointId, committables.size())); -} catch (Exception e) { -commitRequests.forEach(CommitRequest::retryLater); Review Comment: Is there a specific purpose for retrying later in this context? @lvyanquan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36510][runtime] Support partial lineage info collection [flink]
FangYongs commented on PR #25440: URL: https://github.com/apache/flink/pull/25440#issuecomment-2412784324 Thanks @HuangZhenQiu , LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36510][runtime] Support partial lineage info collection [flink]
FangYongs merged PR #25440: URL: https://github.com/apache/flink/pull/25440 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]
leonardBang commented on PR #757: URL: https://github.com/apache/flink-web/pull/757#issuecomment-2412946416 @AHeise Could you change the PR from DRAT to READY for review ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]
leonardBang commented on PR #757: URL: https://github.com/apache/flink-web/pull/757#issuecomment-2412947976 I changed the status and thus we can start following review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36405: --- Labels: pull-request-available (was: ) > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Chenyu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36405][runtime][security] Fix startup issues on kerberos clusters. [flink]
gaborgsomogyi commented on code in PR #25428: URL: https://github.com/apache/flink/pull/25428#discussion_r1800513124 ## flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier: ## @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more Review Comment: Why this file needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36405][runtime][security] Fix startup issues on kerberos clusters. [flink]
zhengchenyu commented on code in PR #25428: URL: https://github.com/apache/flink/pull/25428#discussion_r1800525208 ## flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier: ## @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more Review Comment: @gaborgsomogyi After fix renewer user, still throw exception like below: ``` 2024-09-26 14:35:07,144 ERROR org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - Failed to obtain delegation token for HiveServer2 java.lang.NullPointerException: null at org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] at org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] ... ``` I found [tokenIdentifier](https://github.com/apache/flink/blob/da393c92db814803c9ac96c6cdd55ae444c43689/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java#L149C70-L149C85) is null. Through further debugging, I found HiveServer2DelegationTokenIdentifier is not in services file, then org.apache.hadoop.security.token.Token::getClassForIdentifier can not recognize the kind `HIVE_DELEGATION_TOKEN`, then hive2Token.decodeIdentifier() will return null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36405][runtime][security] Fix startup issues on kerberos clusters. [flink]
zhengchenyu commented on code in PR #25428: URL: https://github.com/apache/flink/pull/25428#discussion_r1800525208 ## flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier: ## @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more Review Comment: After fix renewer user, still throw exception like below: ``` 2024-09-26 14:35:07,144 ERROR org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - Failed to obtain delegation token for HiveServer2 java.lang.NullPointerException: null at org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] at org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] ... ``` I found [tokenIdentifier](https://github.com/apache/flink/blob/da393c92db814803c9ac96c6cdd55ae444c43689/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProvider.java#L149C70-L149C85) is null. Through further debugging, I found HiveServer2DelegationTokenIdentifier is not in services file, then org.apache.hadoop.security.token.Token::getClassForIdentifier can not recognize the kind `HIVE_DELEGATION_TOKEN`, then hive2Token.decodeIdentifier() will return null. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36534) Upgrade Flink Kubernetes operator to flink-1.20
[ https://issues.apache.org/jira/browse/FLINK-36534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889464#comment-17889464 ] Gyula Fora commented on FLINK-36534: Let's do this once 1.20.1 is released > Upgrade Flink Kubernetes operator to flink-1.20 > --- > > Key: FLINK-36534 > URL: https://issues.apache.org/jira/browse/FLINK-36534 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Shailesh Gupta >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-36405: - Affects Version/s: (was: 2.0-preview) > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Chenyu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-36405: - Affects Version/s: 2.0-preview > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1, 2.0-preview >Reporter: Chenyu Zheng >Assignee: Chenyu Zheng >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36484) Remove deprecated method `StreamTableEnvironment#registerFunction`
[ https://issues.apache.org/jira/browse/FLINK-36484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889468#comment-17889468 ] Ammu Parvathy commented on FLINK-36484: --- I have started looking into this one. > Remove deprecated method `StreamTableEnvironment#registerFunction` > -- > > Key: FLINK-36484 > URL: https://issues.apache.org/jira/browse/FLINK-36484 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: xuyang >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36477) Remove deprecated method `BaseExpressions#cast` in table-api-java
[ https://issues.apache.org/jira/browse/FLINK-36477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889473#comment-17889473 ] corgy commented on FLINK-36477: --- I am taking action on this subtask, please assign it to me > Remove deprecated method `BaseExpressions#cast` in table-api-java > - > > Key: FLINK-36477 > URL: https://issues.apache.org/jira/browse/FLINK-36477 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: xuyang >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36478) Remove deprecated methods `EnvironmentSettings#fromConfiguration` and `EnvironmentSettings#toConfiguration`
[ https://issues.apache.org/jira/browse/FLINK-36478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889475#comment-17889475 ] corgy commented on FLINK-36478: --- I am taking action on this subtask, please assign it to me > Remove deprecated methods `EnvironmentSettings#fromConfiguration` and > `EnvironmentSettings#toConfiguration` > --- > > Key: FLINK-36478 > URL: https://issues.apache.org/jira/browse/FLINK-36478 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: xuyang >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36534) Upgrade Flink Kubernetes operator to flink-1.20
[ https://issues.apache.org/jira/browse/FLINK-36534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889476#comment-17889476 ] Shailesh Gupta commented on FLINK-36534: Sure. Thanks. > Upgrade Flink Kubernetes operator to flink-1.20 > --- > > Key: FLINK-36534 > URL: https://issues.apache.org/jira/browse/FLINK-36534 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Shailesh Gupta >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer [flink]
Zakelly commented on PR #25508: URL: https://github.com/apache/flink/pull/25508#issuecomment-2413020131 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36480) Remove all deprecated methods in `TableConfig`
[ https://issues.apache.org/jira/browse/FLINK-36480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889479#comment-17889479 ] corgy commented on FLINK-36480: --- I am taking action on this subtask, please assign it to me > Remove all deprecated methods in `TableConfig` > -- > > Key: FLINK-36480 > URL: https://issues.apache.org/jira/browse/FLINK-36480 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: xuyang >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36483) Remove deprecated method `TableResult#getTableSchema`
[ https://issues.apache.org/jira/browse/FLINK-36483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889485#comment-17889485 ] corgy commented on FLINK-36483: --- I am taking action on this subtask, please assign it to me > Remove deprecated method `TableResult#getTableSchema` > - > > Key: FLINK-36483 > URL: https://issues.apache.org/jira/browse/FLINK-36483 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / API >Reporter: xuyang >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36517][cdc-connect][paimon] use filterAndCommit API for Avoid commit the same datafile duplicate [flink-cdc]
beryllw commented on PR #3639: URL: https://github.com/apache/flink-cdc/pull/3639#issuecomment-2412713852 Could you please assist in reviewing this PR? Thank you. @lvyanquan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36534) Upgrade Flink Kubernetes operator to flink-1.20
Shailesh Gupta created FLINK-36534: -- Summary: Upgrade Flink Kubernetes operator to flink-1.20 Key: FLINK-36534 URL: https://issues.apache.org/jira/browse/FLINK-36534 Project: Flink Issue Type: New Feature Components: Kubernetes Operator Reporter: Shailesh Gupta -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36533) An exception occurs when using the taskmanager.data.bind-port
[ https://issues.apache.org/jira/browse/FLINK-36533?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky reassigned FLINK-36533: Assignee: Ferenc Csaky > An exception occurs when using the taskmanager.data.bind-port > - > > Key: FLINK-36533 > URL: https://issues.apache.org/jira/browse/FLINK-36533 > Project: Flink > Issue Type: Bug >Reporter: five >Assignee: Ferenc Csaky >Priority: Major > Attachments: exception.png, source_code.png > > > An error occurred when using a port range for the > {{taskmanager.data.bind-port}} configuration item. > > Environment Information: > * Flink version: 1.20.0 > * Java: 1.8 > Configuration Information: > * taskmanager.data.bind-port: 12001-13000 > When starting the Flink job, an exception occurs. Please see the attached > file for the exception message: exception.png > The error received is a NativeIoException. However, the source code is set up > to catch a BindException. (Refer to the attached file: source_code.png) The > two do not match. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35822] Introduce DESCRIBE FUNCTION [flink]
nateab commented on code in PR #25115: URL: https://github.com/apache/flink/pull/25115#discussion_r1800019486 ## flink-table/flink-sql-client/src/test/resources/sql/function.q: ## @@ -346,3 +346,159 @@ show user functions; SHOW JARS; Empty set !ok + +# == +# test describe function +# == + +ADD JAR '$VAR_UDF_JAR_PATH'; +[INFO] Execute statement succeeded. +!info + +describe function `SUM`; ++-++ +| info name | info value | ++-++ +| system function | true | +| temporary | false | ++-++ +2 rows in set +!ok + +describe function extended `SUM`; ++--++ +|info name | info value | ++--++ +| system function | true | +|temporary | false | +| kind | AGGREGATE | +| requirements | [] | +|deterministic | true | +| constant folding | true | +|signature | SUM() | ++--++ +7 rows in set +!ok + +describe function temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | ++---+-$VAR_UDF_JAR_PATH_DASH+ +| system function | $VAR_UDF_JAR_PATH_SPACE false | +| temporary | $VAR_UDF_JAR_PATH_SPACE true | +|class name | $VAR_UDF_JAR_PATH_SPACE UpperUDF | +| function language | $VAR_UDF_JAR_PATH_SPACE JAVA | +| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] | ++---+-$VAR_UDF_JAR_PATH_DASH+ +5 rows in set +!ok + +describe function extended temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | ++---+-$VAR_UDF_JAR_PATH_DASH+ +| system function | $VAR_UDF_JAR_PATH_SPACE false | +| temporary | $VAR_UDF_JAR_PATH_SPACE true | +|class name | $VAR_UDF_JAR_PATH_SPACE UpperUDF | +| function language | $VAR_UDF_JAR_PATH_SPACE JAVA | +| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] | +| kind | $VAR_UDF_JAR_PATH_SPACE SCALAR | +| requirements | $VAR_UDF_JAR_PATH_SPACE [] | +| deterministic | $VAR_UDF_JAR_PATH_SPACE true | +| constant folding | $VAR_UDF_JAR_PATH_SPACE true | +| signature |$VAR_UDF_JAR_PATH_SPACE c1.db.temp_upperudf(STRING) | ++---+-$VAR_UDF_JAR_PATH_DASH+ +10 rows in set +!ok + +desc function temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | ++---+-$VAR_UDF_JAR_PATH_DASH+ +| system function | $VAR_UDF_JAR_PATH_SPACE false | +| temporary | $VAR_UDF_JAR_PATH_SPACE true | +|class name | $VAR_UDF_JAR_PATH_SPACE UpperUDF | +| function language | $VAR_UDF_JAR_PATH_SPACE JAVA | +| resource uris | [ResourceUri{resourceType=JAR, uri='$VAR_UDF_JAR_PATH'}] | ++---+-$VAR_UDF_JAR_PATH_DASH+ +5 rows in set +!ok + +desc function extended temp_upperudf; ++---+-$VAR_UDF_JAR_PATH_DASH+ +| info name | $VAR_UDF_JAR_PATH_SPACE info value | Review Comment: This would deviate from `DESCRIBE CATALOG` currently does, are you okay with that? Also see my prior comment on the naming here https://github.com/apache/flink/pull/25115#discussion_r1706084728 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail
[jira] [Commented] (FLINK-36531) AutoScaler needs to consider the lag from last checkpoint
[ https://issues.apache.org/jira/browse/FLINK-36531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889404#comment-17889404 ] yuanfenghu commented on FLINK-36531: [~dsaisharath] There is already an optimization to solve t[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]his problem, it may be helpful to you > AutoScaler needs to consider the lag from last checkpoint > - > > Key: FLINK-36531 > URL: https://issues.apache.org/jira/browse/FLINK-36531 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Sai Sharath Dandi >Priority: Major > > Autoscaler computes the target processing capacity as > [below|https://sg.uberinternal.com/code.uber.internal/uber-code/data-flink-kubernetes-operator@release-1.9-uber/-/blob/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java?L47] > // Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + > INPUT_RATE/TARGET_UTIL > > During the scaling action, the autoscaler will restart the job from the last > successful checkpoint, we need to include the number of processed records > since last successful checkpoint as part of the lag as those records will be > replayed after scaling. This is particularly important for jobs with long > checkpoint intervals and large state as there could be a significant > difference between the realtime lag and the lag from the checkpoint -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-36531) AutoScaler needs to consider the lag from last checkpoint
[ https://issues.apache.org/jira/browse/FLINK-36531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889404#comment-17889404 ] yuanfenghu edited comment on FLINK-36531 at 10/15/24 2:22 AM: -- [~dsaisharath] There is already an optimization to solve [FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler] this problem, it may be helpful to you was (Author: JIRAUSER296932): [~dsaisharath] There is already an optimization to solve t[FLIP-461|https://cwiki.apache.org/confluence/display/FLINK/FLIP-461%3A+Synchronize+rescaling+with+checkpoint+creation+to+minimize+reprocessing+for+the+AdaptiveScheduler]his problem, it may be helpful to you > AutoScaler needs to consider the lag from last checkpoint > - > > Key: FLINK-36531 > URL: https://issues.apache.org/jira/browse/FLINK-36531 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Sai Sharath Dandi >Priority: Major > > Autoscaler computes the target processing capacity as > [below|https://sg.uberinternal.com/code.uber.internal/uber-code/data-flink-kubernetes-operator@release-1.9-uber/-/blob/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/utils/AutoScalerUtils.java?L47] > // Target = LAG/CATCH_UP + INPUT_RATE*RESTART/CATCH_UP + > INPUT_RATE/TARGET_UTIL > > During the scaling action, the autoscaler will restart the job from the last > successful checkpoint, we need to include the number of processed records > since last successful checkpoint as part of the lag as those records will be > replayed after scaling. This is particularly important for jobs with long > checkpoint intervals and large state as there could be a significant > difference between the realtime lag and the lag from the checkpoint -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36518) Paimon Sink support per-table property configuration
[ https://issues.apache.org/jira/browse/FLINK-36518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-36518: -- Description: As title. (was: Solved by transform table-options.) > Paimon Sink support per-table property configuration > > > Key: FLINK-36518 > URL: https://issues.apache.org/jira/browse/FLINK-36518 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 > Environment: Flink 1.18.0 > flink 3.2.0 >Reporter: JunboWang >Priority: Minor > > As title. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36518) Paimon Sink support per-table property configuration
[ https://issues.apache.org/jira/browse/FLINK-36518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang updated FLINK-36518: -- Description: Solved by transform table-options. (was: As title.) > Paimon Sink support per-table property configuration > > > Key: FLINK-36518 > URL: https://issues.apache.org/jira/browse/FLINK-36518 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 > Environment: Flink 1.18.0 > flink 3.2.0 >Reporter: JunboWang >Priority: Minor > > Solved by transform table-options. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36518) Paimon Sink support per-table property configuration
[ https://issues.apache.org/jira/browse/FLINK-36518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889392#comment-17889392 ] JunboWang commented on FLINK-36518: --- Solved by transform table-options. > Paimon Sink support per-table property configuration > > > Key: FLINK-36518 > URL: https://issues.apache.org/jira/browse/FLINK-36518 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 > Environment: Flink 1.18.0 > flink 3.2.0 >Reporter: JunboWang >Priority: Minor > > As title. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-36518) Paimon Sink support per-table property configuration
[ https://issues.apache.org/jira/browse/FLINK-36518 ] JunboWang deleted comment on FLINK-36518: --- was (Author: JIRAUSER305453): Solved by transform table-options. > Paimon Sink support per-table property configuration > > > Key: FLINK-36518 > URL: https://issues.apache.org/jira/browse/FLINK-36518 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 > Environment: Flink 1.18.0 > flink 3.2.0 >Reporter: JunboWang >Priority: Minor > > As title. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36518) Paimon Sink support per-table property configuration
[ https://issues.apache.org/jira/browse/FLINK-36518?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] JunboWang closed FLINK-36518. - Resolution: Not A Problem Solved by transform table-options. > Paimon Sink support per-table property configuration > > > Key: FLINK-36518 > URL: https://issues.apache.org/jira/browse/FLINK-36518 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.2.0 > Environment: Flink 1.18.0 > flink 3.2.0 >Reporter: JunboWang >Priority: Minor > > As title. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36528] [Kubernetes Operator] Update org.apache.avro from 1.8.2 to 1.12.0 [flink-kubernetes-operator]
kartik-3513 commented on PR #901: URL: https://github.com/apache/flink-kubernetes-operator/pull/901#issuecomment-2412922906 I have evaluated updating beam-dependency, we can proceed with that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35016] catalog changes for model resource [flink]
lihaosky commented on code in PR #25211: URL: https://github.com/apache/flink/pull/25211#discussion_r1800214336 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ## @@ -1341,6 +1352,293 @@ private void dropTableInternal( } } +/** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ +public Optional getModel(ObjectIdentifier objectIdentifier) { +CatalogModel temporaryModel = temporaryModels.get(objectIdentifier); +if (temporaryModel != null) { +final ResolvedCatalogModel resolvedModel = resolveCatalogModel(temporaryModel); +return Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel)); +} +Optional catalogOptional = getCatalog(objectIdentifier.getCatalogName()); +ObjectPath objectPath = objectIdentifier.toObjectPath(); +if (catalogOptional.isPresent()) { +Catalog currentCatalog = catalogOptional.get(); +try { +final CatalogModel model = currentCatalog.getModel(objectPath); +if (model != null) { +final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); +return Optional.of( +ContextResolvedModel.permanent( +objectIdentifier, currentCatalog, resolvedModel)); +} +} catch (ModelNotExistException e) { +// Ignore. +} catch (UnsupportedOperationException e) { +// Ignore for catalogs that don't support models. Review Comment: Based on @twalthr 's previous comment, we return empty here instead of not supported exception ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java: ## @@ -1341,6 +1352,293 @@ private void dropTableInternal( } } +/** + * Retrieves a fully qualified model. If the path is not yet fully qualified use {@link + * #qualifyIdentifier(UnresolvedIdentifier)} first. + * + * @param objectIdentifier full path of the model to retrieve + * @return model that the path points to. + */ +public Optional getModel(ObjectIdentifier objectIdentifier) { +CatalogModel temporaryModel = temporaryModels.get(objectIdentifier); +if (temporaryModel != null) { +final ResolvedCatalogModel resolvedModel = resolveCatalogModel(temporaryModel); +return Optional.of(ContextResolvedModel.temporary(objectIdentifier, resolvedModel)); +} +Optional catalogOptional = getCatalog(objectIdentifier.getCatalogName()); +ObjectPath objectPath = objectIdentifier.toObjectPath(); +if (catalogOptional.isPresent()) { +Catalog currentCatalog = catalogOptional.get(); +try { +final CatalogModel model = currentCatalog.getModel(objectPath); +if (model != null) { +final ResolvedCatalogModel resolvedModel = resolveCatalogModel(model); +return Optional.of( +ContextResolvedModel.permanent( +objectIdentifier, currentCatalog, resolvedModel)); +} +} catch (ModelNotExistException e) { +// Ignore. +} catch (UnsupportedOperationException e) { +// Ignore for catalogs that don't support models. +} +} +return Optional.empty(); +} + +/** + * Like {@link #getModel(ObjectIdentifier)}, but throws an error when the model is not available + * in any of the catalogs. + */ +public ContextResolvedModel getModelOrError(ObjectIdentifier objectIdentifier) { +return getModel(objectIdentifier) +.orElseThrow( +() -> +new TableException( +String.format( +"Cannot find model '%s' in any of the catalogs %s.", +objectIdentifier, listCatalogs(; +} + +/** + * Return whether the model with a fully qualified table path is temporary or not. + * + * @param objectIdentifier full path of the table + * @return the model is temporary or not. + */ +public boolean isTemporaryModel(ObjectIdentifier objectIdentifier) { +return temporaryModels.containsKey(objectIdentifier); +} + +/** + * Returns an array of names of all models registered in the namespace of the current catalog + * and database
Re: [PR] Add insert-only change stream option [flink-cdc]
henneberger closed pull request #3562: Add insert-only change stream option URL: https://github.com/apache/flink-cdc/pull/3562 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889188#comment-17889188 ] Chenyu Zheng edited comment on FLINK-36405 at 10/15/24 3:51 AM: [~fcsaky] Thanks! I have rebase my PR to current master. was (Author: zhengchenyu): [~fcsaky] Thanks? I have rebase my PR to current master. > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Chenyu Zheng >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]
yuxiqian commented on code in PR #3642: URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799076504 ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java: ## @@ -43,7 +28,6 @@ import static org.apache.flink.cdc.common.utils.ChangeEventUtils.resolveSchemaEvolutionOptions; import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull; -/** Parser for converting YAML formatted pipeline definition to {@link PipelineDef}. */ Review Comment: Ditto. ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java: ## @@ -287,9 +291,27 @@ private Configuration toPipelineConfig(JsonNode pipelineConfigNode) { if (pipelineConfigNode == null || pipelineConfigNode.isNull()) { return new Configuration(); } -Map pipelineConfigMap = -mapper.convertValue( -pipelineConfigNode, new TypeReference>() {}); +Map pipelineConfigMap = new HashMap<>(); +pipelineConfigNode +.fields() +.forEachRemaining( +entry -> { +String key = entry.getKey(); +JsonNode value = entry.getValue(); +if (!key.equals(MODEL_KEY)) { +pipelineConfigMap.put(key, value.asText()); +} +}); return Configuration.fromMap(pipelineConfigMap); } + +private List parseModels(JsonNode modelsNode) { +List modelDefs = new ArrayList<>(); +if (modelsNode != null && modelsNode.isArray()) { Review Comment: Maybe we can throw an exception if `modelsNode` isn't a list. Or such code may silently fail: ```yaml pipeline: models: name: ... ``` While one may want to write this: ```yaml pipeline: models: - name: ... ``` ## flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java: ## @@ -287,9 +291,27 @@ private Configuration toPipelineConfig(JsonNode pipelineConfigNode) { if (pipelineConfigNode == null || pipelineConfigNode.isNull()) { return new Configuration(); } -Map pipelineConfigMap = -mapper.convertValue( -pipelineConfigNode, new TypeReference>() {}); +Map pipelineConfigMap = new HashMap<>(); +pipelineConfigNode +.fields() +.forEachRemaining( +entry -> { +String key = entry.getKey(); +JsonNode value = entry.getValue(); +if (!key.equals(MODEL_KEY)) { +pipelineConfigMap.put(key, value.asText()); +} +}); Review Comment: Models configuration could be parsed and removed in `YamlPipelineDefinitionParser#parse` method in advance, so there's no need to iterate configuration maps again here. ## flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java: ## Review Comment: I doubt if it's the best way to parameterize UDFs. Maybe we can left `UdfDef` unchanged, and let `ModelDef` to extend` UdfDef` classes, and put extra parameters there? Thus, we won't need any of these string split hack and serialization chore. ## flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/ModelDef.java: ## @@ -0,0 +1,92 @@ +package org.apache.flink.cdc.composer.definition; + +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.ScalarFunction; + +import java.util.Objects; + +public class ModelDef { +private final String name; +private final String host; +private final String apiKey; + +public ModelDef(String name, String host, String apiKey) { +this.name = name; +this.host = host; +this.apiKey = apiKey; +} + +public String getName() { +return name; +} + +public String getHost() { +return host; +} + +public String getApiKey() { +return apiKey; +} + +// 创建一个表示这个模型的 UDF +public ScalarFunction createUdf() { +return new ModelUdf(this); +} + +// 内部类,代表这个模型的 UDF +public class ModelUdf extends ScalarFunction { +private final ModelDef model; + +public ModelUdf(ModelDef model) { +this.model = model; +} + +// UDF 的主要方法,处理输入并返回结果 +public String eval(String input) { +// 这里实现调用模型 API 的逻辑 +// 使用 model.getHost() 和 model.getApiKey() 来访问 API +// 这只是一个示例实现,实际逻辑需要根据具体的 API 调用方式来编写 +return "Embedding for: " + input; +} + +@Override +
Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]
yuxiqian commented on code in PR #3642: URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799126412 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java: ## @@ -77,6 +80,53 @@ public static int currentDate(long epochTime, String timezone) { return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond()); } +private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002"; +private static OpenAiEmbeddingModel embeddingModel; + +public static void initializeOpenAiEmbeddingModel(String apiKey, String baseUrl) { +embeddingModel = +OpenAiEmbeddingModel.builder() +.apiKey(apiKey) +.baseUrl(baseUrl) +.modelName(DEFAULT_MODEL_NAME) +.timeout(Duration.ofSeconds(30)) +.maxRetries(3) +.build(); +} + +public static String getEmbedding(String input, String apiKey, String model) { +if (input == null || input.trim().isEmpty()) { +LOG.debug("Empty or null input provided for embedding."); +return ""; +} + +try { +// 确保 OpenAiEmbeddingModel 已初始化 +if (embeddingModel == null) { +initializeOpenAiEmbeddingModel(apiKey, "https://api.openai.com/v1/";); Review Comment: Is the endpoint hard-encoded here? Why we still need this function and passing apiKeys manually? Shouldn't these be configured in `models:` rule block? ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java: ## @@ -77,6 +80,53 @@ public static int currentDate(long epochTime, String timezone) { return timestampMillisToDate(localtimestamp(epochTime, timezone).getMillisecond()); } +private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002"; +private static OpenAiEmbeddingModel embeddingModel; + +public static void initializeOpenAiEmbeddingModel(String apiKey, String baseUrl) { +embeddingModel = +OpenAiEmbeddingModel.builder() +.apiKey(apiKey) +.baseUrl(baseUrl) +.modelName(DEFAULT_MODEL_NAME) +.timeout(Duration.ofSeconds(30)) +.maxRetries(3) +.build(); +} + +public static String getEmbedding(String input, String apiKey, String model) { +if (input == null || input.trim().isEmpty()) { +LOG.debug("Empty or null input provided for embedding."); +return ""; +} + +try { +// 确保 OpenAiEmbeddingModel 已初始化 +if (embeddingModel == null) { +initializeOpenAiEmbeddingModel(apiKey, "https://api.openai.com/v1/";); Review Comment: Is the endpoint hard-encoded here? Why we need this function and passing apiKeys manually? Shouldn't these be configured in `models:` rule block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36528) Update org.apache.avro : avro dependency
Kartik Goyal created FLINK-36528: Summary: Update org.apache.avro : avro dependency Key: FLINK-36528 URL: https://issues.apache.org/jira/browse/FLINK-36528 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.10.0 Reporter: Kartik Goyal Assignee: Kartik Goyal Fix For: kubernetes-operator-1.10.0 Bump the mysql-connector-j version to remediate vulnerability associated with this package. Package info: [https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33] Vulnerability info: [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-22102] Proposed change solution: Bump the version from 8.0.33 to 8.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]
yuxiqian commented on code in PR #3642: URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799217964 ## flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java: ## @@ -85,11 +80,45 @@ public DataStream translatePostTransform( } } postTransformFunctionBuilder.addTimezone(timezone); + +List allFunctions = new ArrayList<>(udfFunctions); +allFunctions.addAll(convertModelsToUdfs(models)); + postTransformFunctionBuilder.addUdfFunctions( -udfFunctions.stream() -.map(udf -> Tuple2.of(udf.getName(), udf.getClasspath())) -.collect(Collectors.toList())); + allFunctions.stream().map(this::udfDefToTuple2).collect(Collectors.toList())); return input.transform( "Transform:Data", new EventTypeInfo(), postTransformFunctionBuilder.build()); } + +private List convertModelsToUdfs(List models) { +return models.stream().map(this::modelToUdf).collect(Collectors.toList()); +} Review Comment: This conversion is quite worrying, since obviously `ModelDef` carries more information (the params), and the conversion could not be done without some hacky approaches like pasting params into the name field. Modifying the `addUdfFunctions` function, making it accepting a more universal data structure (instead of `Name - Classpath` `Tuple2`) makes more sense to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36476) Remove all deprecated methods under public APIs in table modules
[ https://issues.apache.org/jira/browse/FLINK-36476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889170#comment-17889170 ] lajith commented on FLINK-36476: Hi @[~xuyangzhong] Can I pick some of the task?. > Remove all deprecated methods under public APIs in table modules > > > Key: FLINK-36476 > URL: https://issues.apache.org/jira/browse/FLINK-36476 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: xuyang >Priority: Blocker > Fix For: 2.0.0 > > > All methods needed to be removed are: > https://docs.google.com/document/d/1bVrmcB9UFOd1-sT7xDRTMb5rmO0vK0eDlLRUYYL9gd4/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] typo in property name httpclinent -> httpclient [flink-kubernetes-operator]
gyfora merged PR #899: URL: https://github.com/apache/flink-kubernetes-operator/pull/899 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]
yuxiqian commented on code in PR #3642: URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799190408 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/model/ModelUdf.java: ## @@ -0,0 +1,132 @@ +package org.apache.flink.cdc.runtime.operators.model; + +import org.apache.flink.cdc.common.data.ArrayData; +import org.apache.flink.cdc.common.data.GenericArrayData; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +import com.google.gson.Gson; +import dev.langchain4j.data.document.Metadata; +import dev.langchain4j.data.embedding.Embedding; +import dev.langchain4j.data.segment.TextSegment; +import dev.langchain4j.model.openai.OpenAiEmbeddingModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class ModelUdf implements UserDefinedFunction { + +private static final Logger LOG = LoggerFactory.getLogger(ModelUdf.class); +private static final String DEFAULT_API_KEY = +"sk-WegHEuogRpIyRSwaF5Ce6fE3E62e459dA61eFaF6CcF8C79b"; +private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002"; +private static final int DEFAULT_TIMEOUT_SECONDS = 30; +private static final String DEFAULT_BASE_URL = "https://api.gpt.ge/v1/";; Review Comment: Passing these "default" API keys and endpoints aren't meaningful. Request tokens might ran out, endpoints might expire. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]
yuxiqian commented on code in PR #3642: URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799190408 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/model/ModelUdf.java: ## @@ -0,0 +1,132 @@ +package org.apache.flink.cdc.runtime.operators.model; + +import org.apache.flink.cdc.common.data.ArrayData; +import org.apache.flink.cdc.common.data.GenericArrayData; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; + +import com.google.gson.Gson; +import dev.langchain4j.data.document.Metadata; +import dev.langchain4j.data.embedding.Embedding; +import dev.langchain4j.data.segment.TextSegment; +import dev.langchain4j.model.openai.OpenAiEmbeddingModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class ModelUdf implements UserDefinedFunction { + +private static final Logger LOG = LoggerFactory.getLogger(ModelUdf.class); +private static final String DEFAULT_API_KEY = +"sk-WegHEuogRpIyRSwaF5Ce6fE3E62e459dA61eFaF6CcF8C79b"; +private static final String DEFAULT_MODEL_NAME = "text-embedding-ada-002"; +private static final int DEFAULT_TIMEOUT_SECONDS = 30; +private static final String DEFAULT_BASE_URL = "https://api.gpt.ge/v1/";; Review Comment: Providing these "default" API keys and endpoints aren't meaningful. Request tokens might ran out, endpoints might expire. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36332] Exclude kubernetes-httpclient form API module. [flink-kubernetes-operator]
gyfora merged PR #900: URL: https://github.com/apache/flink-kubernetes-operator/pull/900 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36528) Update org.apache.avro : avro dependency
[ https://issues.apache.org/jira/browse/FLINK-36528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kartik Goyal updated FLINK-36528: - Description: Update the org.apache.avro : avro package present in flink-beam-example to remediate the vulnerabilities associated with this package. It is a transitive dependency for beam-sdks-java-core and the current version 1.8.2. Package info: [https://mvnrepository.com/artifact/org.apache.avro/avro/1.8.2] Vulnerabilities info: Direct vulnerabilities: [CVE-2024-47561|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-47561] [CVE-2023-39410|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-39410] Vulnerabilities from dependencies: [CVE-2024-25710|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-25710] [CVE-2023-43642|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-43642] [CVE-2023-34455|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34455] [CVE-2023-34454|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34454] [CVE-2023-34453|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34453] [CVE-2021-36090|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-36090] [CVE-2021-35517|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35517] [CVE-2021-35516|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35516] [CVE-2021-35515|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35515] [CVE-2020-15250|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-15250] [CVE-2019-10202|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10202] [CVE-2019-10172|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10172] [CVE-2018-11771|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2018-11771] Proposed change solution: Bump the version from 1.8.2 to 1.12.0 was: Bump the mysql-connector-j version to remediate vulnerability associated with this package. Package info: [https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33] Vulnerability info: [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-22102] Proposed change solution: Bump the version from 8.0.33 to 8.4.0 > Update org.apache.avro : avro dependency > > > Key: FLINK-36528 > URL: https://issues.apache.org/jira/browse/FLINK-36528 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.10.0 >Reporter: Kartik Goyal >Assignee: Kartik Goyal >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.10.0 > > > Update the org.apache.avro : avro package present in flink-beam-example to > remediate the vulnerabilities associated with this package. It is a > transitive dependency for beam-sdks-java-core and the current version 1.8.2. > Package info: > [https://mvnrepository.com/artifact/org.apache.avro/avro/1.8.2] > Vulnerabilities info: > Direct vulnerabilities: > [CVE-2024-47561|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-47561] > [CVE-2023-39410|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-39410] > Vulnerabilities from dependencies: > [CVE-2024-25710|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-25710] > [CVE-2023-43642|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-43642] > [CVE-2023-34455|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34455] > [CVE-2023-34454|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34454] > [CVE-2023-34453|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34453] > [CVE-2021-36090|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-36090] > [CVE-2021-35517|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35517] > [CVE-2021-35516|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35516] > [CVE-2021-35515|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35515] > [CVE-2020-15250|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-15250] > [CVE-2019-10202|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10202] > [CVE-2019-10172|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10172] > [CVE-2018-11771|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2018-11771] > Proposed change solution: > Bump the version from 1.8.2 to 1.12.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]
yuxiqian commented on code in PR #3642: URL: https://github.com/apache/flink-cdc/pull/3642#discussion_r1799243324 ## flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/ModelDef.java: ## @@ -0,0 +1,92 @@ +package org.apache.flink.cdc.composer.definition; + +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.ScalarFunction; + +import java.util.Objects; + +public class ModelDef { +private final String name; +private final String host; +private final String apiKey; + +public ModelDef(String name, String host, String apiKey) { +this.name = name; +this.host = host; +this.apiKey = apiKey; +} + +public String getName() { +return name; +} + +public String getHost() { +return host; +} + +public String getApiKey() { +return apiKey; +} + +// 创建一个表示这个模型的 UDF +public ScalarFunction createUdf() { +return new ModelUdf(this); +} + +// 内部类,代表这个模型的 UDF +public class ModelUdf extends ScalarFunction { +private final ModelDef model; + +public ModelUdf(ModelDef model) { +this.model = model; +} + +// UDF 的主要方法,处理输入并返回结果 +public String eval(String input) { +// 这里实现调用模型 API 的逻辑 +// 使用 model.getHost() 和 model.getApiKey() 来访问 API +// 这只是一个示例实现,实际逻辑需要根据具体的 API 调用方式来编写 +return "Embedding for: " + input; +} + +@Override +public void open(FunctionContext context) throws Exception { +// 初始化逻辑,如建立API连接等 +} + +@Override +public void close() throws Exception { +// 清理逻辑,如关闭API连接等 +} +} Review Comment: Put this in documentation if this was meant to be an implementation reference. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36528) Update org.apache.avro : avro dependency
[ https://issues.apache.org/jira/browse/FLINK-36528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kartik Goyal updated FLINK-36528: - Labels: (was: pull-request-available) > Update org.apache.avro : avro dependency > > > Key: FLINK-36528 > URL: https://issues.apache.org/jira/browse/FLINK-36528 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.10.0 >Reporter: Kartik Goyal >Assignee: Kartik Goyal >Priority: Major > Fix For: kubernetes-operator-1.10.0 > > > Update the org.apache.avro : avro package present in flink-beam-example to > remediate the vulnerabilities associated with this package. It is a > transitive dependency for beam-sdks-java-core and the current version 1.8.2. > Package info: > [https://mvnrepository.com/artifact/org.apache.avro/avro/1.8.2] > Vulnerabilities info: > Direct vulnerabilities: > [CVE-2024-47561|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-47561] > [CVE-2023-39410|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-39410] > Vulnerabilities from dependencies: > [CVE-2024-25710|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-25710] > [CVE-2023-43642|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-43642] > [CVE-2023-34455|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34455] > [CVE-2023-34454|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34454] > [CVE-2023-34453|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34453] > [CVE-2021-36090|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-36090] > [CVE-2021-35517|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35517] > [CVE-2021-35516|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35516] > [CVE-2021-35515|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35515] > [CVE-2020-15250|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-15250] > [CVE-2019-10202|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10202] > [CVE-2019-10172|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10172] > [CVE-2018-11771|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2018-11771] > Proposed change solution: > Bump the version from 1.8.2 to 1.12.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-1.18][FLINK-36421] [fs] [checkpoint] Sync outputStream before returning handle in FsCheckpointStreamFactory [flink]
Zakelly commented on PR #25479: URL: https://github.com/apache/flink/pull/25479#issuecomment-2410862308 I guess there is something wrong with CI infra for 1.18. Any thoughts? @JingGe I'd propose merge this since this is a small one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [W.I.P] Disable parallel serial operator factory [flink]
JunRuiLee closed pull request #25506: [W.I.P] Disable parallel serial operator factory URL: https://github.com/apache/flink/pull/25506 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36528] [Kubernetes Operator] Update org.apache.avro from 1.8.2 to 1.12.0 [flink-kubernetes-operator]
kartik-3513 opened a new pull request, #901: URL: https://github.com/apache/flink-kubernetes-operator/pull/901 ## What is the purpose of the change This PR updates the dependency "org.apache.avro:avro" version from 1.8.2 to 1.12.0 ## Brief change log The transitive dependency "org.apache.avro:avro" version 1.8.2 present in beam-sdks-java-core under flink-beam-example module has 2 direct and 12 vulnerabilities from dependent packages. Updating it to 1.12.0 removes all of them. List can be found here: https://mvnrepository.com/artifact/org.apache.avro/avro/1.8.2 ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36528) Update org.apache.avro : avro dependency
[ https://issues.apache.org/jira/browse/FLINK-36528?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36528: --- Labels: pull-request-available (was: ) > Update org.apache.avro : avro dependency > > > Key: FLINK-36528 > URL: https://issues.apache.org/jira/browse/FLINK-36528 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.10.0 >Reporter: Kartik Goyal >Assignee: Kartik Goyal >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.10.0 > > > Update the org.apache.avro : avro package present in flink-beam-example to > remediate the vulnerabilities associated with this package. It is a > transitive dependency for beam-sdks-java-core and the current version 1.8.2. > Package info: > [https://mvnrepository.com/artifact/org.apache.avro/avro/1.8.2] > Vulnerabilities info: > Direct vulnerabilities: > [CVE-2024-47561|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-47561] > [CVE-2023-39410|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-39410] > Vulnerabilities from dependencies: > [CVE-2024-25710|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-25710] > [CVE-2023-43642|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-43642] > [CVE-2023-34455|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34455] > [CVE-2023-34454|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34454] > [CVE-2023-34453|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-34453] > [CVE-2021-36090|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-36090] > [CVE-2021-35517|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35517] > [CVE-2021-35516|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35516] > [CVE-2021-35515|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2021-35515] > [CVE-2020-15250|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-15250] > [CVE-2019-10202|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10202] > [CVE-2019-10172|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-10172] > [CVE-2018-11771|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2018-11771] > Proposed change solution: > Bump the version from 1.8.2 to 1.12.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Lineage interfaces for kafka connector [flink-connector-kafka]
boring-cyborg[bot] commented on PR #130: URL: https://github.com/apache/flink-connector-kafka/pull/130#issuecomment-2410887979 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-36405: - Affects Version/s: 1.19.1 1.20.0 (was: 1.9.1) > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Priority: Major > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky updated FLINK-36405: - Fix Version/s: 2.0.0 1.19.2 1.20.1 > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky reassigned FLINK-36405: Assignee: Ferenc Csaky > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Ferenc Csaky >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer [flink]
flinkbot commented on PR #25508: URL: https://github.com/apache/flink/pull/25508#issuecomment-2410366766 ## CI report: * 6e26d14c7081b78d3d21952760ff86ff3c65a896 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer [flink]
Zakelly opened a new pull request, #25508: URL: https://github.com/apache/flink/pull/25508 ## What is the purpose of the change Currently, the ForSt gives a direct buffer to `ByteBufferWritableFSDataOutputStream`, where the data will be written one byte by byte. According our perf, the statistics of hadoop based fs will be updated once for each byte, which takes a lot of CPU. Below is a flamegraph, where the statistics part is marked as purple (taking 8.14% of the overall CPU).  This PR copies the data to a heap buffer before invoking `write` to optimize this. ## Brief change log - Writing logic in `ByteBufferWritableFSDataOutputStream` ## Verifying this change This change is a trivial rework without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36526) Optimize the overhead of writing with direct buffer in ForSt
[ https://issues.apache.org/jira/browse/FLINK-36526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36526: --- Labels: pull-request-available (was: ) > Optimize the overhead of writing with direct buffer in ForSt > - > > Key: FLINK-36526 > URL: https://issues.apache.org/jira/browse/FLINK-36526 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > Attachments: image-2024-10-14-15-52-41-457.png > > > Currently, the ForSt gives a direct buffer to > {{{}ByteBufferWritableFSDataOutputStream{}}}, where the data will be written > one byte by byte. According our perf, the statistics of hadoop based fs will > be updated once for each byte, which takes a lot of CPU. Below is a > flamegraph, where the statistics part is marked as purple (taking 8.14% of > the overall CPU). > !image-2024-10-14-15-52-41-457.png|width=1296,height=616! > > It might be better to copy to a heap buffer before invoking write. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35029][state/forst] Store timer in JVM heap when use async state backend [flink]
fredia commented on code in PR #25501: URL: https://github.com/apache/flink/pull/25501#discussion_r1798959007 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateHandler.java: ## @@ -244,6 +244,11 @@ void snapshotState( && ((AbstractKeyedStateBackend) keyedStateBackend) .requiresLegacySynchronousTimerSnapshots( checkpointOptions.getCheckpointType()); +requiresLegacyRawKeyedStateSnapshots |= +keyedStateBackend instanceof AsyncKeyedStateBackend +&& ((AsyncKeyedStateBackend) keyedStateBackend) + .requiresLegacySynchronousTimerSnapshots( Review Comment: > How about remove AsyncKeyedStateBackend#requiresLegacySynchronousTimerSnapshots and do the following ForSt state backend may support storing timers in files in the future, so `requiresLegacySynchronousTimerSnapshots` will return false. I prefer to keep it as it is. > should subclasses like AsyncKeyedStateBackendAdaptor override this method? Good point, we can store timers in the form of `delegated keyed state backend`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenyu Zheng updated FLINK-36405: - External issue URL: https://github.com/apache/flink/pull/25428 > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Ferenc Csaky >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36065][runtime] Support submit stream graph. [flink]
JunRuiLee commented on PR #25472: URL: https://github.com/apache/flink/pull/25472#issuecomment-2410911107 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889182#comment-17889182 ] Chenyu Zheng commented on FLINK-36405: -- [~fcsaky] Hi, I have submit pr https://github.com/apache/flink/pull/25428. I don't know why not link to this. > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Ferenc Csaky >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenyu Zheng updated FLINK-36405: - External issue URL: (was: https://github.com/apache/flink/pull/25428) > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Ferenc Csaky >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36523][Connectors/DynamoDB][BugFix] Fixing LATEST behaviour for DDB Connector [flink-connector-aws]
hlteoh37 commented on code in PR #177: URL: https://github.com/apache/flink-connector-aws/pull/177#discussion_r1799315333 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ## @@ -89,13 +91,77 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection shardsToAdd) { -Set discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); +if (TRIM_HORIZON.equals(initialPosition)) { +addSplitsForTrimHorizon(shardsToAdd); +return; +} +addSplitsForLatest(shardsToAdd); +} + +/** + * If there is an ancestor of an open shard in knownSplits, we put all of its ancestors after + * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the ancestors and track only + * the open shard with LATEST. There might be a case when restoring from expired snapshots that + * there is an ancestor in knownSplits, but we wont know that it is an ancestor, so in that case + * also, all the ancestors will not be tracked and rather only the current open shard will be + * tracked with LATEST. Review Comment: We should align that the expected behavior, when using `LATEST`, should be: 1. When starting without snapshot, - All shards read from `LATEST`. If we want, we can focus on just the open shards. 2. When reading with snapshot, - All shards from time of start of application are read from `TRIM_HORIZON`, ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ## @@ -89,13 +91,77 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection shardsToAdd) { -Set discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); +if (TRIM_HORIZON.equals(initialPosition)) { +addSplitsForTrimHorizon(shardsToAdd); +return; +} +addSplitsForLatest(shardsToAdd); +} + +/** + * If there is an ancestor of an open shard in knownSplits, we put all of its ancestors after + * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the ancestors and track only + * the open shard with LATEST. There might be a case when restoring from expired snapshots that + * there is an ancestor in knownSplits, but we wont know that it is an ancestor, so in that case + * also, all the ancestors will not be tracked and rather only the current open shard will be + * tracked with LATEST. Review Comment: We should align that the expected behavior, when using `LATEST`, should be: 1. When starting without snapshot: - All shards read from `LATEST`. If we want, we can focus on just the open shards. 2. When reading with snapshot: - All shards from time of start of application are read from `TRIM_HORIZON` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36523][Connectors/DynamoDB][BugFix] Fixing LATEST behaviour for DDB Connector [flink-connector-aws]
hlteoh37 commented on code in PR #177: URL: https://github.com/apache/flink-connector-aws/pull/177#discussion_r1799315333 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ## @@ -89,13 +91,77 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection shardsToAdd) { -Set discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); +if (TRIM_HORIZON.equals(initialPosition)) { +addSplitsForTrimHorizon(shardsToAdd); +return; +} +addSplitsForLatest(shardsToAdd); +} + +/** + * If there is an ancestor of an open shard in knownSplits, we put all of its ancestors after + * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the ancestors and track only + * the open shard with LATEST. There might be a case when restoring from expired snapshots that + * there is an ancestor in knownSplits, but we wont know that it is an ancestor, so in that case + * also, all the ancestors will not be tracked and rather only the current open shard will be + * tracked with LATEST. Review Comment: We should align that the expected behavior, when using `LATEST`, should be: 1. When starting without snapshot: - All shards from start of application read from `LATEST`. If we want, we can focus on just the open shards. - All child shards are read from `TRIM_HORIZON` 2. When reading with snapshot: - All shards from time of start of application are read from `TRIM_HORIZON` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ferenc Csaky reassigned FLINK-36405: Assignee: Chenyu Zheng (was: Ferenc Csaky) > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Chenyu Zheng >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889184#comment-17889184 ] Ferenc Csaky commented on FLINK-36405: -- Ah very nice, assigned it to you! Can you pls. rebase your PR to the current master pls? > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Chenyu Zheng >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889184#comment-17889184 ] Ferenc Csaky edited comment on FLINK-36405 at 10/14/24 11:32 AM: - Ah very nice, assigned it to you! Can you pls. rebase your PR to the current master? was (Author: JIRAUSER306586): Ah very nice, assigned it to you! Can you pls. rebase your PR to the current master pls? > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Chenyu Zheng >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889184#comment-17889184 ] Ferenc Csaky edited comment on FLINK-36405 at 10/14/24 11:33 AM: - -Ah very nice, assigned it to you! Can you pls. rebase your PR to the current master?- Actually, right now the Hive connector is not compiled on master because of JDK17 issues, so I think it would make more sense for now to point the PR against the {{release-1.20}} branch was (Author: JIRAUSER306586): Ah very nice, assigned it to you! Can you pls. rebase your PR to the current master? > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Chenyu Zheng >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-36405) Fix startup issues on kerberos clusters
[ https://issues.apache.org/jira/browse/FLINK-36405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889184#comment-17889184 ] Ferenc Csaky edited comment on FLINK-36405 at 10/14/24 11:33 AM: - Ah very nice, assigned it to you! -Can you pls. rebase your PR to the current master?- Actually, right now the Hive connector is not compiled on master because of JDK17 issues, so I think it would make more sense for now to point the PR against the {{release-1.20}} branch was (Author: JIRAUSER306586): -Ah very nice, assigned it to you! Can you pls. rebase your PR to the current master?- Actually, right now the Hive connector is not compiled on master because of JDK17 issues, so I think it would make more sense for now to point the PR against the {{release-1.20}} branch > Fix startup issues on kerberos clusters > --- > > Key: FLINK-36405 > URL: https://issues.apache.org/jira/browse/FLINK-36405 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime >Affects Versions: 1.20.0, 1.19.1 >Reporter: Chenyu Zheng >Assignee: Chenyu Zheng >Priority: Major > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Fount these error when startup on kerberos cluster. > Error 1: renew user dismatched > {code:java} > Caused by: org.apache.flink.util.FlinkRuntimeException: > MetaException(message:usera tries to renew a token (HIVE_DELEGATION_TOKEN > owner=usera/h...@hadoop.com, renewer=hive, realUser=usera/h...@hadoop.com, > issueDate=1727264927044, maxDate=1727869727044, sequenceNumber=251, > masterKeyId=7) with non-matching renewer hive) > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getNewExpiration(HiveServer2DelegationTokenProvider.java:203) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:190) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:159) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of the problem is that the renewer is set to the value of > `hive.metastore.kerberos.principal`, which is generally the startup user of > hive metastore. However, when renewing DelegationToken, will use the startup > user of flink. This will cause the renewer to be mismatched. > > Error2: HIVE_DELEGATION_TOKEN is not in service list > > {code:java} > 2024-09-26 14:35:07,144 ERROR > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider [] - > Failed to obtain delegation token for HiveServer2 > java.lang.NullPointerException: null > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.getTokenRenewalInterval(HiveServer2DelegationTokenProvider.java:202) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > at > org.apache.flink.table.security.token.HiveServer2DelegationTokenProvider.lambda$obtainDelegationTokens$0(HiveServer2DelegationTokenProvider.java:170) > ~[flink-sql-connector-hive-3.1.3_2.12-1.19.1.jar:1.19.1] > ...{code} > The cause of this problem is that HIVE_DELEGATION_TOKEN is not in service. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36476) Remove all deprecated methods under public APIs in table modules
[ https://issues.apache.org/jira/browse/FLINK-36476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889109#comment-17889109 ] corgy commented on FLINK-36476: --- Hi [~xuyangzhong] , can you assign me a few subtasks? I want to start my contribution journey through this simple task. > Remove all deprecated methods under public APIs in table modules > > > Key: FLINK-36476 > URL: https://issues.apache.org/jira/browse/FLINK-36476 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Reporter: xuyang >Priority: Blocker > Fix For: 2.0.0 > > > All methods needed to be removed are: > https://docs.google.com/document/d/1bVrmcB9UFOd1-sT7xDRTMb5rmO0vK0eDlLRUYYL9gd4/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33977][runtime] Adaptive scheduler may not minimize the number of TMs during downscaling [flink]
1996fanrui commented on PR #25218: URL: https://github.com/apache/flink/pull/25218#issuecomment-2410276176 > > May I know that only session mode needs to spreads the workload across as many workers as possible, right? > > Yes, in session mode. @ztison @XComp Thanks for the detailed discussion and feedback, I have no any question for now. I think your suggestions make sense. We may need a FLIP to make this feature applicable to more scenarios. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]
dannycranmer commented on PR #757: URL: https://github.com/apache/flink-web/pull/757#issuecomment-2410603904 @AHeise we do not usually include the "rebuild website" commit in this PR, this is usually done as a follow up, without PR. Usually the date in the PR is a placeholder and it needs to change later anyway -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]
dannycranmer commented on PR #757: URL: https://github.com/apache/flink-web/pull/757#issuecomment-2410608441 You are missing a change in the release_archive, see here for example https://github.com/apache/flink-web/pull/737/files -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36446) Refactor Job Submission Process to Use ExecutionPlan Instead of JobGraph
[ https://issues.apache.org/jira/browse/FLINK-36446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889156#comment-17889156 ] david radley commented on FLINK-36446: -- [~JunRuiLi] thanks for sharing the background of this fix - that makes sense. > Refactor Job Submission Process to Use ExecutionPlan Instead of JobGraph > > > Key: FLINK-36446 > URL: https://issues.apache.org/jira/browse/FLINK-36446 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Coordination >Reporter: Junrui Li >Assignee: Junrui Li >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > Refactor the job submission process to submit an {{ExecutionPlan}} instead of > a {{{}JobGraph{}}}. > Since {{JobGraph}} implements the {{ExecutionPlan}} interface, this change > will not impact the existing submission process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (FLINK-36522) Bump mysql-connector-j version
[ https://issues.apache.org/jira/browse/FLINK-36522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan resolved FLINK-36522. - Resolution: Fixed Merged to main(1.10.0) via: 6c73673811e1207350bd7f589ff5998e2caf97b9 > Bump mysql-connector-j version > -- > > Key: FLINK-36522 > URL: https://issues.apache.org/jira/browse/FLINK-36522 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.10.0 >Reporter: Kartik Goyal >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.10.0 > > > Bump the mysql-connector-j version to remediate vulnerability associated with > this package. > Package info: > [https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33] > Vulnerability info: > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-22102] > Proposed change solution: > Bump the version from 8.0.33 to 8.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36522) Bump mysql-connector-j version
[ https://issues.apache.org/jira/browse/FLINK-36522?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan reassigned FLINK-36522: --- Assignee: Kartik Goyal > Bump mysql-connector-j version > -- > > Key: FLINK-36522 > URL: https://issues.apache.org/jira/browse/FLINK-36522 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.10.0 >Reporter: Kartik Goyal >Assignee: Kartik Goyal >Priority: Major > Labels: pull-request-available > Fix For: kubernetes-operator-1.10.0 > > > Bump the mysql-connector-j version to remediate vulnerability associated with > this package. > Package info: > [https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.33] > Vulnerability info: > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-22102] > Proposed change solution: > Bump the version from 8.0.33 to 8.4.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36522] [Kubernetes-operator] Bump mysql-connector-java from 8.0.33 to mysql-connecto… [flink-kubernetes-operator]
1996fanrui merged PR #898: URL: https://github.com/apache/flink-kubernetes-operator/pull/898 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34467] bump flink version to 1.20.0 [flink-connector-kafka]
boring-cyborg[bot] commented on PR #111: URL: https://github.com/apache/flink-connector-kafka/pull/111#issuecomment-2410480510 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-34467] bump flink version to 1.20.0 [flink-connector-kafka]
AHeise merged PR #111: URL: https://github.com/apache/flink-connector-kafka/pull/111 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36523][Connectors/DynamoDB][BugFix] Fixing LATEST behaviour for DDB Connector [flink-connector-aws]
hlteoh37 commented on code in PR #177: URL: https://github.com/apache/flink-connector-aws/pull/177#discussion_r1799008421 ## flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java: ## @@ -89,13 +91,77 @@ public SplitTracker( * @param shardsToAdd collection of splits to add to tracking */ public void addSplits(Collection shardsToAdd) { -Set discoveredShardIds = - shardsToAdd.stream().map(Shard::shardId).collect(Collectors.toSet()); +if (TRIM_HORIZON.equals(initialPosition)) { +addSplitsForTrimHorizon(shardsToAdd); +return; +} +addSplitsForLatest(shardsToAdd); +} + +/** + * If there is an ancestor of an open shard in knownSplits, we put all of its ancestors after + * the tracked ancestors as TRIM_HORIZON otherwise, we ignore all the ancestors and track only + * the open shard with LATEST. There might be a case when restoring from expired snapshots that + * there is an ancestor in knownSplits, but we wont know that it is an ancestor, so in that case + * also, all the ancestors will not be tracked and rather only the current open shard will be + * tracked with LATEST. Review Comment: I don't think this is correct. When restoring from snapshot, we should read all records since the restored snapshot (`TRIM_HORIZON`), and not `LATEST`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-34467) Implement Lineage Interface in Jdbc Connector
[ https://issues.apache.org/jira/browse/FLINK-34467?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889135#comment-17889135 ] Arvid Heise commented on FLINK-34467: - Merged version bump into main as 2dfdae6e8e44828a9c2b6db833348b65d9379b43. > Implement Lineage Interface in Jdbc Connector > - > > Key: FLINK-34467 > URL: https://issues.apache.org/jira/browse/FLINK-34467 > Project: Flink > Issue Type: Sub-task > Components: Connectors / JDBC >Affects Versions: 1.19.0 >Reporter: Zhenqiu Huang >Assignee: Zhenqiu Huang >Priority: Minor > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Upgrade Kafka connector to 3.3.0 [flink-web]
AHeise closed pull request #757: Upgrade Kafka connector to 3.3.0 URL: https://github.com/apache/flink-web/pull/757 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36474] Support merging timestamp columns when routing [flink-cdc]
lvyanquan commented on code in PR #3636: URL: https://github.com/apache/flink-cdc/pull/3636#discussion_r1798837548 ## flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaDerivation.java: ## @@ -343,23 +348,14 @@ private List handleCreateTableEvent( return schemaChangeEvents; } -private DataType getWiderType(DataType thisType, DataType thatType) { -if (thisType.equals(thatType)) { -return thisType; -} -if (thisType.is(DataTypeFamily.INTEGER_NUMERIC) -&& thatType.is(DataTypeFamily.INTEGER_NUMERIC)) { -return DataTypes.BIGINT(); -} -if (thisType.is(DataTypeFamily.CHARACTER_STRING) -&& thatType.is(DataTypeFamily.CHARACTER_STRING)) { -return DataTypes.STRING(); -} -if (thisType.is(DataTypeFamily.APPROXIMATE_NUMERIC) -&& thatType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) { -return DataTypes.DOUBLE(); +private DataType getWiderType(String columnName, DataType thisType, DataType thatType) { +try { +return SchemaUtils.inferWiderType(thisType, thatType); +} catch (IllegalStateException e) { +throw new IllegalStateException( +String.format( +"Incompatible types found for column `%s': \"%s\" and \"%s\"", Review Comment: Incompatible types found for column `%s`: \"%s\" and \"%s\" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36466][runtime] Change default value of execution.runtime-mode from STREAMING to AUTOMATIC [flink]
reswqa closed pull request #25505: [FLINK-36466][runtime] Change default value of execution.runtime-mode from STREAMING to AUTOMATIC URL: https://github.com/apache/flink/pull/25505 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36526) Optimize the overhead of writing with direct buffer in ForSt
Zakelly Lan created FLINK-36526: --- Summary: Optimize the overhead of writing with direct buffer in ForSt Key: FLINK-36526 URL: https://issues.apache.org/jira/browse/FLINK-36526 Project: Flink Issue Type: Sub-task Components: Runtime / State Backends Reporter: Zakelly Lan Assignee: Zakelly Lan Attachments: image-2024-10-14-15-52-41-457.png Currently, the ForSt gives a direct buffer to \{{ByteBufferWritableFSDataOutputStream}}, where the data will be written one byte by byte. According our perf, the statistics of hadoop based fs will be updated once for each byte, which takes a lot of CPU. !image-2024-10-14-15-52-41-457.png! It might be better to copy to a heap buffer before invoking write. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36527) Introduce a parameter to support the autoscaler to adopt a more aggressive strategy when Source or upstream shuffle is keyBy
yuanfenghu created FLINK-36527: -- Summary: Introduce a parameter to support the autoscaler to adopt a more aggressive strategy when Source or upstream shuffle is keyBy Key: FLINK-36527 URL: https://issues.apache.org/jira/browse/FLINK-36527 Project: Flink Issue Type: Improvement Components: Autoscaler Reporter: yuanfenghu In [https://issues.apache.org/jira/browse/FLINK-36192|https://issues.apache.org/jira/browse/FLINK-36192] We have completed the optimization of Souce parallelism determination. Task Source or upstream shuffle is keyBy will be adjusted to be a divisor of the number of partitions or the maximum degree of parallelism. This Jira hopes to introduce a parameter to enable a {color:#de350b}more aggressive{color} strategy : https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1764419949 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36526) Optimize the overhead of writing with direct buffer in ForSt
[ https://issues.apache.org/jira/browse/FLINK-36526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zakelly Lan updated FLINK-36526: Description: Currently, the ForSt gives a direct buffer to {{{}ByteBufferWritableFSDataOutputStream{}}}, where the data will be written one byte by byte. According our perf, the statistics of hadoop based fs will be updated once for each byte, which takes a lot of CPU. Below is a flamegraph, where the statistics part is marked as purple (taking 8.14% of the overall CPU). !image-2024-10-14-15-52-41-457.png|width=1296,height=616! It might be better to copy to a heap buffer before invoking write. was: Currently, the ForSt gives a direct buffer to \{{ByteBufferWritableFSDataOutputStream}}, where the data will be written one byte by byte. According our perf, the statistics of hadoop based fs will be updated once for each byte, which takes a lot of CPU. !image-2024-10-14-15-52-41-457.png! It might be better to copy to a heap buffer before invoking write. > Optimize the overhead of writing with direct buffer in ForSt > - > > Key: FLINK-36526 > URL: https://issues.apache.org/jira/browse/FLINK-36526 > Project: Flink > Issue Type: Sub-task > Components: Runtime / State Backends >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Attachments: image-2024-10-14-15-52-41-457.png > > > Currently, the ForSt gives a direct buffer to > {{{}ByteBufferWritableFSDataOutputStream{}}}, where the data will be written > one byte by byte. According our perf, the statistics of hadoop based fs will > be updated once for each byte, which takes a lot of CPU. Below is a > flamegraph, where the statistics part is marked as purple (taking 8.14% of > the overall CPU). > !image-2024-10-14-15-52-41-457.png|width=1296,height=616! > > It might be better to copy to a heap buffer before invoking write. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36377) Support the use of the LAST_VALUE aggregate function on ROW type data
[ https://issues.apache.org/jira/browse/FLINK-36377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889120#comment-17889120 ] Yang Li commented on FLINK-36377: - I'm glad to hear that! Yes, other complex types like {{{}Map{}}}, {{{}Array{}}}, and even {{Raw}} types can also have similar issues, and I am adding some unit test to verify the correctness. If possible, this feature can be assgined to me and I will continue to follow up on it. > Support the use of the LAST_VALUE aggregate function on ROW type data > -- > > Key: FLINK-36377 > URL: https://issues.apache.org/jira/browse/FLINK-36377 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends >Reporter: Yang Li >Priority: Major > > h2. Introduction > In Flink, after applying a group by, users may use LAST_VALUE to process > certain fields to ensure that all fields have corresponding aggregation > functions. Currently, LAST_VALUE does not support the ROW type syntax, so > users always apply the LAST_VALUE function to each individual field > separately, as shown below. > SELECT > LAST_VALUE(bool_a) AS last_bool_a, > LAST_VALUE(int2_b) AS last_int2_b, > LAST_VALUE(int4_c) AS last_int4_c, > LAST_VALUE(int8_d) AS last_int8_d, > LAST_VALUE(float4_e) AS last_float4_e, > LAST_VALUE(float4_f) AS last_float4_f, > LAST_VALUE(numeric_g) AS last_numeric_g, > LAST_VALUE(text_m) AS last_text_m, > LAST_VALUE(varchar_p) AS last_varchar_p, > date_h > FROM source_table > GROUP BY date_h > > If the upstream operator is a retract stream, this approach will lead to > redundant StateMap traversal. To facilitate retraction, Flink's internal{{ > LastValueWithRetractAggFunction}} will store all historical data related to > the primary key. When the last value is deleted, it will traverse all keys in > the{{ orderToValue}} (which maps timestamps to data) and this {{MapView}} is > stored in the form of {{{}StateMap{}}}. More {{LAST_VALUE}} functions leads > to more times the read and write operations of RocksDB. Therefore, I advocate > for handling {{ROW}} types with {{{}LAST_VALUE{}}}, allowing support for all > fields with just one {{LAST_VALUE}} function as below. > SELECT > LAST_VALUE( > ROW( > bool_a, > int2_b, > int4_c, > int8_d, > float4_e, > float4_f, > numeric_g, > text_m, > varchar_p > ) > ) AS row_data, > date_h > FROM source_table > GROUP BY date_h > The experiment indicates that applying the {{ROW}} type to the {{LAST_VALUE}} > function can improve the processing speed for retract streams, but has no > effect on append-only streams. > h2. Evaluation: > The throughput of jobs was compared based on whether the {{ROW}} type was > used in the {{LAST_VALUE}} function, considering both retract and append-only > scenarios. > h3. Retraction > Use a deduplication operator to convert the append-only stream generated by > datagen into a retract stream. > Two jobs show little difference in throughput (Row 4817: Mean 1808). > Through flame graph analysis, applying the ROW type to the LAST_VALUE > function reduces the consumption of the aggregate function calls to > accumulate, > with CPU usage for accumulate being (ROW 20.02%: Separated 66.98%). > LastValueWithRetractAccumulator uses MapState storage MapView. > Therefore, updating the LastValueWithRetractAccumulator requires reading from > or writing to RocksDB. > h3. AppendOnly > Two jobs show little difference in throughput (Row 13411: Mean 10673). > Further examination of the flame graphs for both processes reveals that the > bottleneck > for both jobs lies in getting {{RocksDBValueState}} which is called by > {{{}GroupFunction{}}}. > Using {{ROW}} aggregation does not yield significant optimization in this > part. I suspect it's > because Flink uses RowData to store data from multiple Accumulators, and > every time > the {{accState}} invokes the {{value}} method, it reads all the Accumulators > at the same time. > Therefore, the use of ROW optimization might not be very effective. > h2. Conclusion > # Using ROW type for LAST_VALUE Aggregation can improve the processing speed > for retract streams, with effectiveness proportional to the number of fields > contained in the {{{}ROW{}}}. > # Using ROW type for LAST_VALUE Aggregation results in limited improvements > , as the optimization effect on state backend read speed is not significant. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer [flink]
fredia commented on code in PR #25508: URL: https://github.com/apache/flink/pull/25508#discussion_r1798973369 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java: ## @@ -58,6 +69,22 @@ public void write(ByteBuffer bb) throws IOException { } if (bb.hasArray()) { write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()); +} else if (bb.isDirect()) { +int len = bb.remaining(); +int segment = Math.min(len, SEGMENT_BUFFER_SIZE); +byte[] bytes = new byte[segment]; +for (int i = 0; i < len;) { +int copy = Math.min(segment, bb.remaining()); +UNSAFE.copyMemory( +null, Review Comment: Why `null` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36526][state/forst] Optimize the overhead of writing with direct buffer [flink]
fredia commented on code in PR #25508: URL: https://github.com/apache/flink/pull/25508#discussion_r1798978105 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java: ## @@ -58,6 +69,22 @@ public void write(ByteBuffer bb) throws IOException { } if (bb.hasArray()) { write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()); +} else if (bb.isDirect()) { +int len = bb.remaining(); +int segment = Math.min(len, SEGMENT_BUFFER_SIZE); +byte[] bytes = new byte[segment]; +for (int i = 0; i < len;) { +int copy = Math.min(segment, bb.remaining()); +UNSAFE.copyMemory( +null, Review Comment: Oh, I see, because the `MemoryUtils.getByteBufferAddress(bb)` is an address. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36527) Introduce a parameter to support the autoscaler to adopt a more aggressive strategy when Source or upstream shuffle is keyBy
[ https://issues.apache.org/jira/browse/FLINK-36527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan reassigned FLINK-36527: --- Assignee: yuanfenghu > Introduce a parameter to support the autoscaler to adopt a more aggressive > strategy when Source or upstream shuffle is keyBy > > > Key: FLINK-36527 > URL: https://issues.apache.org/jira/browse/FLINK-36527 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: yuanfenghu >Assignee: yuanfenghu >Priority: Minor > > In > [https://issues.apache.org/jira/browse/FLINK-36192|https://issues.apache.org/jira/browse/FLINK-36192] > We have completed the optimization of Souce parallelism determination. > Task Source or upstream shuffle is keyBy will be adjusted to be a divisor of > the number of partitions or the maximum degree of parallelism. > This Jira hopes to introduce a parameter to enable a {color:#de350b}more > aggressive{color} strategy : > https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1764419949 -- This message was sent by Atlassian Jira (v8.20.10#820010)