Re: [PR] [FLINK-35167][cdc-connector] Introduce MaxCompute pipeline DataSink [flink-cdc]
lvyanquan commented on code in PR #3254: URL: https://github.com/apache/flink-cdc/pull/3254#discussion_r1829218514 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.maxcompute.utils; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.utils.StringUtils; +import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions; +import org.apache.flink.util.CollectionUtil; + +import com.aliyun.odps.Instance; +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.TableSchema; +import com.aliyun.odps.Tables; +import com.aliyun.odps.task.SQLTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Schema evolution utils for maxcompute. */ +public class SchemaEvolutionUtils { +private static final Logger LOG = LoggerFactory.getLogger(SchemaEvolutionUtils.class); +private static final Map unsupportSchemahints = new HashMap<>(); +private static final Map supportSchemaHints = new HashMap<>(); + +static { +unsupportSchemahints.put("odps.sql.type.system.odps2", "true"); +unsupportSchemahints.put("odps.sql.decimal.odps2", "true"); +unsupportSchemahints.put("odps.sql.allow.schema.evolution", "true"); + +supportSchemaHints.put("odps.sql.type.system.odps2", "true"); +supportSchemaHints.put("odps.sql.decimal.odps2", "true"); +supportSchemaHints.put("odps.namespace.schema", "true"); +supportSchemaHints.put("odps.sql.allow.namespace.schema", "true"); +supportSchemaHints.put("odps.sql.allow.schema.evolution", "true"); +} + +private SchemaEvolutionUtils() {} + +/** + * equals to run a sql like: create table table_name (col_name1 type1 comment [, col_name2 type2 + * ...]);. + */ +public static void createTable(MaxComputeOptions options, TableId tableId, Schema schema) +throws OdpsException { +Odps odps = MaxComputeUtils.getOdps(options); +TableSchema tableSchema = TypeConvertUtils.toMaxCompute(schema); +if (options.isSupportSchema() +&& !StringUtils.isNullOrWhitespaceOnly(tableId.getNamespace())) { +LOG.info("create schema {}", tableId.getNamespace()); +odps.schemas() +.create( +odps.getDefaultProject(), +tableId.getNamespace(), +"generate by Flink CDC", +true); +} +Tables.TableCreator tableCreator = +odps.tables() +.newTableCreator( +odps.getDefaultProject(), tableId.getTableName(), tableSchema) +.withHints(unsupportSchemahints) +.ifNotExists() +.debug(); +if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) { +tableCreator +.transactionTable() +.withBucketNum(options.getBucketSize()) +.withPrimaryKeys(schema.primaryKeys()); +} +if (options.isSupportSchema()) { +if (StringUtils.isNullOrWhitespaceOnly(tableId.getNamespace())) { + tableCreator.withSchemaName("default").withHints(supportSchemaHints); +} else { + tableCreator.withSchemaName(tableId.getNamespace()).withHints(supportSchemaHints); +} +} +LOG.info("create table {}, schema {}", getFullTableName(options, tableId), schema); +tableCreator.create(); +} + +/** + * equals to run a sql like: 'alter table table_name add columns (col_name1 type1 comment [, +
Re: [PR] [FLINK-35167][cdc-connector] Introduce MaxCompute pipeline DataSink [flink-cdc]
lvyanquan commented on code in PR #3254: URL: https://github.com/apache/flink-cdc/pull/3254#discussion_r1829218514 ## flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/utils/SchemaEvolutionUtils.java: ## @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.maxcompute.utils; + +import org.apache.flink.cdc.common.event.AddColumnEvent; +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.schema.Schema; +import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.utils.StringUtils; +import org.apache.flink.cdc.connectors.maxcompute.options.MaxComputeOptions; +import org.apache.flink.util.CollectionUtil; + +import com.aliyun.odps.Instance; +import com.aliyun.odps.Odps; +import com.aliyun.odps.OdpsException; +import com.aliyun.odps.TableSchema; +import com.aliyun.odps.Tables; +import com.aliyun.odps.task.SQLTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Schema evolution utils for maxcompute. */ +public class SchemaEvolutionUtils { +private static final Logger LOG = LoggerFactory.getLogger(SchemaEvolutionUtils.class); +private static final Map unsupportSchemahints = new HashMap<>(); +private static final Map supportSchemaHints = new HashMap<>(); + +static { +unsupportSchemahints.put("odps.sql.type.system.odps2", "true"); +unsupportSchemahints.put("odps.sql.decimal.odps2", "true"); +unsupportSchemahints.put("odps.sql.allow.schema.evolution", "true"); + +supportSchemaHints.put("odps.sql.type.system.odps2", "true"); +supportSchemaHints.put("odps.sql.decimal.odps2", "true"); +supportSchemaHints.put("odps.namespace.schema", "true"); +supportSchemaHints.put("odps.sql.allow.namespace.schema", "true"); +supportSchemaHints.put("odps.sql.allow.schema.evolution", "true"); +} + +private SchemaEvolutionUtils() {} + +/** + * equals to run a sql like: create table table_name (col_name1 type1 comment [, col_name2 type2 + * ...]);. + */ +public static void createTable(MaxComputeOptions options, TableId tableId, Schema schema) +throws OdpsException { +Odps odps = MaxComputeUtils.getOdps(options); +TableSchema tableSchema = TypeConvertUtils.toMaxCompute(schema); +if (options.isSupportSchema() +&& !StringUtils.isNullOrWhitespaceOnly(tableId.getNamespace())) { +LOG.info("create schema {}", tableId.getNamespace()); +odps.schemas() +.create( +odps.getDefaultProject(), +tableId.getNamespace(), +"generate by Flink CDC", +true); +} +Tables.TableCreator tableCreator = +odps.tables() +.newTableCreator( +odps.getDefaultProject(), tableId.getTableName(), tableSchema) +.withHints(unsupportSchemahints) +.ifNotExists() +.debug(); +if (!CollectionUtil.isNullOrEmpty(schema.primaryKeys())) { +tableCreator +.transactionTable() +.withBucketNum(options.getBucketSize()) +.withPrimaryKeys(schema.primaryKeys()); +} +if (options.isSupportSchema()) { +if (StringUtils.isNullOrWhitespaceOnly(tableId.getNamespace())) { + tableCreator.withSchemaName("default").withHints(supportSchemaHints); +} else { + tableCreator.withSchemaName(tableId.getNamespace()).withHints(supportSchemaHints); +} +} +LOG.info("create table {}, schema {}", getFullTableName(options, tableId), schema); +tableCreator.create(); +} + +/** + * equals to run a sql like: 'alter table table_name add columns (col_name1 type1 comment [, +
Re: [PR] [FLINK-23595][formats / json] JSON format support deserialize non-numeric numbe fields [flink]
loyispa closed pull request #16813: [FLINK-23595][formats / json] JSON format support deserialize non-numeric numbe fields URL: https://github.com/apache/flink/pull/16813 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36451) Kubernetes Application JobManager Potential Deadlock and TaskManager Pod Residuals
[ https://issues.apache.org/jira/browse/FLINK-36451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895568#comment-17895568 ] Matthias Pohl commented on FLINK-36451: --- Thanks for sharing, [~xiechenling]. I will have a look at it. > Kubernetes Application JobManager Potential Deadlock and TaskManager Pod > Residuals > -- > > Key: FLINK-36451 > URL: https://issues.apache.org/jira/browse/FLINK-36451 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.1 > Environment: * Flink version: 1.19.1 > * - Deployment mode: Flink Kubernetes Application Mode > * - JVM version: OpenJDK 17 > >Reporter: xiechenling >Assignee: Matthias Pohl >Priority: Major > Attachments: 1.png, 2.png, jobmanager.log, jstack.txt > > > In Kubernetes Application Mode, when there is significant etcd latency or > instability, the Flink JobManager may enter a deadlock situation. > Additionally, TaskManager pods are not cleaned up properly, resulting in > stale resources that prevent the Flink job from recovering correctly. This > issue occurs during frequent service restarts or network instability. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix] [docs] Add lead()/lag() Table API agg functions to docs [flink]
fhueske merged PR #25608: URL: https://github.com/apache/flink/pull/25608 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-36002) Add CompiledPlan annotations to BatchExecMatch
[ https://issues.apache.org/jira/browse/FLINK-36002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz closed FLINK-36002. Resolution: Implemented Implemented in 37642307128a83cdc83630bc286e2936decf7426 > Add CompiledPlan annotations to BatchExecMatch > -- > > Key: FLINK-36002 > URL: https://issues.apache.org/jira/browse/FLINK-36002 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner >Reporter: Jim Hughes >Assignee: Jim Hughes >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > In addition to the annotations, implement the BatchCompiledPlan test for this > operator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35136] Bump connector version to 4.0, adapt CI workflows [flink-connector-hbase]
JuliaBogdan commented on PR #46: URL: https://github.com/apache/flink-connector-hbase/pull/46#issuecomment-2456808836 > > Great to see the PR merged, thanks for moving it forward @ferenc-csaky ! How does the release process and the timeline look? Is there anything I can help with to make it happen? > > I plan to kick off the release process this week, hopefully there will be an RC by EOD. When the EOD is present, anyone can help to validate it. If you are interested in that, I will post that update here as well, or you can follow the Flink dev mailing list with a [VOTE] thread about the `flink-hbase-connector` release. Sounds good! I'll follow the thread in Flink dev mailing list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36451) Kubernetes Application JobManager Potential Deadlock and TaskManager Pod Residuals
[ https://issues.apache.org/jira/browse/FLINK-36451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reassigned FLINK-36451: - Assignee: Matthias Pohl > Kubernetes Application JobManager Potential Deadlock and TaskManager Pod > Residuals > -- > > Key: FLINK-36451 > URL: https://issues.apache.org/jira/browse/FLINK-36451 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination >Affects Versions: 1.19.1 > Environment: * Flink version: 1.19.1 > * - Deployment mode: Flink Kubernetes Application Mode > * - JVM version: OpenJDK 17 > >Reporter: xiechenling >Assignee: Matthias Pohl >Priority: Major > Attachments: 1.png, 2.png, jobmanager.log, jstack.txt > > > In Kubernetes Application Mode, when there is significant etcd latency or > instability, the Flink JobManager may enter a deadlock situation. > Additionally, TaskManager pods are not cleaned up properly, resulting in > stale resources that prevent the Flink job from recovering correctly. This > issue occurs during frequent service restarts or network instability. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33137][Connectors/Prometheus] Prometheus connector docs [flink-connector-prometheus]
hlteoh37 commented on PR #3: URL: https://github.com/apache/flink-connector-prometheus/pull/3#issuecomment-2457230128 @nicusX could we add `[FLINK-33137][Connectors/Prometheus][docs]` to the commit ID? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-36099) JobIDLoggingITCase fails due to "Cannot find task to fail for execution [...]" info log message in TM logs
[ https://issues.apache.org/jira/browse/FLINK-36099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895628#comment-17895628 ] Piotr Nowojski edited comment on FLINK-36099 at 11/5/24 2:19 PM: - {quote} actually, looking into it once more: we could make the JobID accessible in the log message by passing it down as part of the failTask callback. {quote} (y) That's what I did. merged commit 0b64b57 into apache:master was (Author: pnowojski): merged commit 0b64b57 into apache:master > JobIDLoggingITCase fails due to "Cannot find task to fail for execution > [...]" info log message in TM logs > -- > > Key: FLINK-36099 > URL: https://issues.apache.org/jira/browse/FLINK-36099 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 2.0.0, 1.18.1, 1.20.0, 1.19.1 >Reporter: Matthias Pohl >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 2.0.0 > > > {{JobIDLoggingITCase}} can fail (observed with the {{AdaptiveScheduler}} > enabled): > {code} > Test > org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging[testJobIDLogging(ClusterClient, > Path, MiniCluster)] failed with: > java.lang.AssertionError: [too many events without Job ID logged by > org.apache.flink.runtime.taskexecutor.TaskExecutor] > Expecting empty but was: > [Logger=org.apache.flink.runtime.taskexecutor.TaskExecutor Level=INFO > Message=Cannot find task to fail for execution > 5447dca7a6c7f9679346cad41dc8e3be_cbc357ccb763df2852fee8c4fc7d55f2_0_0 with > exception:] > at > org.apache.flink.test.misc.JobIDLoggingITCase.assertJobIDPresent(JobIDLoggingITCase.java:267) > at > org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(JobIDLoggingITCase.java:155) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(No
[jira] [Updated] (FLINK-36662) Incorrect constant pull up when group keys in aggregate are NULL
[ https://issues.apache.org/jira/browse/FLINK-36662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-36662: Description: Currently, for the following case: {code:java} val groupingSetsQuery = """ |SELECT | case | when g1 = 1 then 'aaa' | when g2 = 1 then 'bbb' | end as gt, | b, c, | AVG(a) AS a |FROM (select *, 1 g1, 1 g2 from MyTable) t | GROUP BY GROUPING SETS ((g1, b), (g2, b, c)) """.stripMargin util.verifyExecPlan(groupingSetsQuery) {code} we'll get a wrong plan: {code:java} Calc(select=[CAST('aaa' AS CHAR(3)) AS gt, b, c, a]) +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, AVG(a) AS a]) +- Exchange(distribution=[hash[g1, b, g2, c, $e]]) +- Expand(projects=[{g1, b, null AS g2, null AS c, a, 3 AS $e}, {null AS g1, b, g2, c, a, 8 AS $e}]) +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} the expected right plan: {code:java} Calc(select=[CASE((g1 = 1), 'aaa', (g2 = 1), 'bbb', null:CHAR(3)) AS gt, b, c, a]) +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, AVG(a) AS a]) +- Exchange(distribution=[hash[g1, b, g2, c, $e]]) +- Expand(projects=[\{g1, b, null AS g2, null AS c, a, 3 AS $e}, \{null AS g1, b, g2, c, a, 8 AS $e}]) +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} This is the issue which CALCITE-6317 addressed. Before upgraded corresponding calcite version(1.37), we can have the fix that copy the related `RelMdPredicates` to flink and remove it after calcite version upgrading done. was: Currently, for the following case: {code} val groupingSetsQuery = """ |SELECT | case | when g1 = 1 then 'aaa' | when g2 = 1 then 'bbb' | end as gt, | b, c, | AVG(a) AS a |FROM (select *, 1 g1, 1 g2 from MyTable) t | GROUP BY GROUPING SETS ((g1, b), (g2, b, c)) """.stripMargin util.verifyExecPlan(groupingSetsQuery) {code} we'll get a wrong plan: {code} Calc(select=[CAST('aaa' CHAR(3)) AS gt, b, c, a]) +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, AVG(a) AS a]) +- Exchange(distribution=[hash[g1, b, g2, c, $e]]) +- Expand(projects=[\{g1, b, null AS g2, null AS c, a, 3 AS $e}, \{null AS g1, b, g2, c, a, 8 AS $e}]) +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} the expected right plan: {code} Calc(select=[CASE((g1 = 1), 'aaa', (g2 = 1), 'bbb', null:CHAR(3)) AS gt, b, c, a]) +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, AVG(a) AS a]) +- Exchange(distribution=[hash[g1, b, g2, c, $e]]) +- Expand(projects=[\{g1, b, null AS g2, null AS c, a, 3 AS $e}, \{null AS g1, b, g2, c, a, 8 AS $e}]) +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} This is the issue which CALCITE-6317 addressed. Before upgraded corresponding calcite version(1.37), we can have the fix that copy the related `RelMdPredicates` to flink and remove it after calcite version upgrading done. > Incorrect constant pull up when group keys in aggregate are NULL > > > Key: FLINK-36662 > URL: https://issues.apache.org/jira/browse/FLINK-36662 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0, 2.0-preview >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Fix For: 2.0.0 > > > Currently, for the following case: > {code:java} > val groupingSetsQuery = > """ > |SELECT > | case > | when g1 = 1 then 'aaa' > | when g2 = 1 then 'bbb' > | end as gt, > | b, c, > | AVG(a) AS a > |FROM (select *, 1 g1, 1 g2 from MyTable) t > | GROUP BY GROUPING SETS ((g1, b), (g2, b, c)) > """.stripMargin > util.verifyExecPlan(groupingSetsQuery) > {code} > we'll get a wrong plan: > {code:java} > Calc(select=[CAST('aaa' AS CHAR(3)) AS gt, b, c, a]) > +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, > AVG(a) AS a]) >+- Exchange(distribution=[hash[g1, b, g2, c, $e]]) > +- Expand(projects=[{g1, b, null AS g2, null AS c, a, 3 AS $e}, {null > AS g1, b, g2, c, a, 8 AS $e}]) > +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, > b, c]) > {code} > the expected right plan: > {code:java} > Calc(select=[CASE((g1 = 1),
[jira] [Updated] (FLINK-36662) Incorrect constant pull up when group keys in aggregate are NULL
[ https://issues.apache.org/jira/browse/FLINK-36662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee updated FLINK-36662: Description: Currently, for the following case: {code:java} val groupingSetsQuery = """ |SELECT | case | when g1 = 1 then 'aaa' | when g2 = 1 then 'bbb' | end as gt, | b, c, | AVG(a) AS a |FROM (select *, 1 g1, 1 g2 from MyTable) t | GROUP BY GROUPING SETS ((g1, b), (g2, b, c)) """.stripMargin util.verifyExecPlan(groupingSetsQuery) {code} we'll get a wrong plan: {code:java} Calc(select=[CAST('aaa' AS CHAR(3)) AS gt, b, c, a]) +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, AVG(a) AS a]) +- Exchange(distribution=[hash[g1, b, g2, c, $e]]) +- Expand(projects=[{g1, b, null AS g2, null AS c, a, 3 AS $e}, {null AS g1, b, g2, c, a, 8 AS $e}]) +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} the expected right plan: {code:java} Calc(select=[CASE((g1 = 1), 'aaa', (g2 = 1), 'bbb', null:CHAR(3)) AS gt, b, c, a]) +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, AVG(a) AS a]) +- Exchange(distribution=[hash[g1, b, g2, c, $e]]) +- Expand(projects=[{g1, b, null AS g2, null AS c, a, 3 AS $e}, {null AS g1, b, g2, c, a, 8 AS $e}]) +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} This is the issue which CALCITE-6317 addressed. Before upgraded corresponding calcite version(1.37), we can have the fix that copy the related `RelMdPredicates` to flink and remove it after calcite version upgrading done. was: Currently, for the following case: {code:java} val groupingSetsQuery = """ |SELECT | case | when g1 = 1 then 'aaa' | when g2 = 1 then 'bbb' | end as gt, | b, c, | AVG(a) AS a |FROM (select *, 1 g1, 1 g2 from MyTable) t | GROUP BY GROUPING SETS ((g1, b), (g2, b, c)) """.stripMargin util.verifyExecPlan(groupingSetsQuery) {code} we'll get a wrong plan: {code:java} Calc(select=[CAST('aaa' AS CHAR(3)) AS gt, b, c, a]) +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, AVG(a) AS a]) +- Exchange(distribution=[hash[g1, b, g2, c, $e]]) +- Expand(projects=[{g1, b, null AS g2, null AS c, a, 3 AS $e}, {null AS g1, b, g2, c, a, 8 AS $e}]) +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} the expected right plan: {code:java} Calc(select=[CASE((g1 = 1), 'aaa', (g2 = 1), 'bbb', null:CHAR(3)) AS gt, b, c, a]) +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, AVG(a) AS a]) +- Exchange(distribution=[hash[g1, b, g2, c, $e]]) +- Expand(projects=[\{g1, b, null AS g2, null AS c, a, 3 AS $e}, \{null AS g1, b, g2, c, a, 8 AS $e}]) +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} This is the issue which CALCITE-6317 addressed. Before upgraded corresponding calcite version(1.37), we can have the fix that copy the related `RelMdPredicates` to flink and remove it after calcite version upgrading done. > Incorrect constant pull up when group keys in aggregate are NULL > > > Key: FLINK-36662 > URL: https://issues.apache.org/jira/browse/FLINK-36662 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0, 2.0-preview >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Fix For: 2.0.0 > > > Currently, for the following case: > {code:java} > val groupingSetsQuery = > """ > |SELECT > | case > | when g1 = 1 then 'aaa' > | when g2 = 1 then 'bbb' > | end as gt, > | b, c, > | AVG(a) AS a > |FROM (select *, 1 g1, 1 g2 from MyTable) t > | GROUP BY GROUPING SETS ((g1, b), (g2, b, c)) > """.stripMargin > util.verifyExecPlan(groupingSetsQuery) > {code} > we'll get a wrong plan: > {code:java} > Calc(select=[CAST('aaa' AS CHAR(3)) AS gt, b, c, a]) > +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, > AVG(a) AS a]) >+- Exchange(distribution=[hash[g1, b, g2, c, $e]]) > +- Expand(projects=[{g1, b, null AS g2, null AS c, a, 3 AS $e}, {null > AS g1, b, g2, c, a, 8 AS $e}]) > +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, > b, c]) > {code} >
[jira] [Created] (FLINK-36662) Incorrect constant pull up when group keys in aggregate are NULL
lincoln lee created FLINK-36662: --- Summary: Incorrect constant pull up when group keys in aggregate are NULL Key: FLINK-36662 URL: https://issues.apache.org/jira/browse/FLINK-36662 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 2.0-preview, 1.20.0 Reporter: lincoln lee Assignee: lincoln lee Fix For: 2.0.0 Currently, for the following case: {code} val groupingSetsQuery = """ |SELECT | case | when g1 = 1 then 'aaa' | when g2 = 1 then 'bbb' | end as gt, | b, c, | AVG(a) AS a |FROM (select *, 1 g1, 1 g2 from MyTable) t | GROUP BY GROUPING SETS ((g1, b), (g2, b, c)) """.stripMargin util.verifyExecPlan(groupingSetsQuery) {code} we'll get a wrong plan: {code} Calc(select=[CAST('aaa' CHAR(3)) AS gt, b, c, a]) +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, AVG(a) AS a]) +- Exchange(distribution=[hash[g1, b, g2, c, $e]]) +- Expand(projects=[\{g1, b, null AS g2, null AS c, a, 3 AS $e}, \{null AS g1, b, g2, c, a, 8 AS $e}]) +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} the expected right plan: {code} Calc(select=[CASE((g1 = 1), 'aaa', (g2 = 1), 'bbb', null:CHAR(3)) AS gt, b, c, a]) +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, AVG(a) AS a]) +- Exchange(distribution=[hash[g1, b, g2, c, $e]]) +- Expand(projects=[\{g1, b, null AS g2, null AS c, a, 3 AS $e}, \{null AS g1, b, g2, c, a, 8 AS $e}]) +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c]) {code} This is the issue which CALCITE-6317 addressed. Before upgraded corresponding calcite version(1.37), we can have the fix that copy the related `RelMdPredicates` to flink and remove it after calcite version upgrading done. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-35780][state] Support state ttl migration [flink]
xiangyuf opened a new pull request, #25611: URL: https://github.com/apache/flink/pull/25611 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35780][state] Support state ttl migration [flink]
xiangyuf commented on PR #25611: URL: https://github.com/apache/flink/pull/25611#issuecomment-2457579637 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36662) Incorrect constant pull up when group keys in aggregate are NULL
[ https://issues.apache.org/jira/browse/FLINK-36662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36662: --- Labels: pull-request-available (was: ) > Incorrect constant pull up when group keys in aggregate are NULL > > > Key: FLINK-36662 > URL: https://issues.apache.org/jira/browse/FLINK-36662 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0, 2.0-preview >Reporter: lincoln lee >Assignee: lincoln lee >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > Currently, for the following case: > {code:java} > val groupingSetsQuery = > """ > |SELECT > | case > | when g1 = 1 then 'aaa' > | when g2 = 1 then 'bbb' > | end as gt, > | b, c, > | AVG(a) AS a > |FROM (select *, 1 g1, 1 g2 from MyTable) t > | GROUP BY GROUPING SETS ((g1, b), (g2, b, c)) > """.stripMargin > util.verifyExecPlan(groupingSetsQuery) > {code} > we'll get a wrong plan: > {code:java} > Calc(select=[CAST('aaa' AS CHAR(3)) AS gt, b, c, a]) > +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, > AVG(a) AS a]) >+- Exchange(distribution=[hash[g1, b, g2, c, $e]]) > +- Expand(projects=[{g1, b, null AS g2, null AS c, a, 3 AS $e}, {null > AS g1, b, g2, c, a, 8 AS $e}]) > +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, > b, c]) > {code} > the expected right plan: > {code:java} > Calc(select=[CASE((g1 = 1), 'aaa', (g2 = 1), 'bbb', null:CHAR(3)) AS gt, b, > c, a]) > +- GroupAggregate(groupBy=[g1, b, g2, c, $e], select=[g1, b, g2, c, $e, > AVG(a) AS a]) >+- Exchange(distribution=[hash[g1, b, g2, c, $e]]) > +- Expand(projects=[{g1, b, null AS g2, null AS c, a, 3 AS $e}, {null > AS g1, b, g2, c, a, 8 AS $e}]) > +- Calc(select=[1 AS g1, b, 1 AS g2, c, a]) > +- LegacyTableSourceScan(table=[[default_catalog, > default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, > b, c]) > {code} > This is the issue which CALCITE-6317 addressed. Before upgraded corresponding > calcite version(1.37), we can have the fix that copy the related > `RelMdPredicates` to flink and remove it after calcite version upgrading done. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36662][table-planner] Port CALCITE-6317 to fix the incorrectly constant pullup when group keys in aggregate are NULL [flink]
flinkbot commented on PR #25612: URL: https://github.com/apache/flink/pull/25612#issuecomment-2457590308 ## CI report: * 6ae95ef131f97250ebe7b3cb0cdd98d588544f97 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35780][state] Support state ttl migration [flink]
flinkbot commented on PR #25611: URL: https://github.com/apache/flink/pull/25611#issuecomment-2457589698 ## CI report: * d35824f9401420cb34bccff562a1b32a222fbff4 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36662][table-planner] Port CALCITE-6317 to fix the incorrectly constant pullup when group keys in aggregate are NULL [flink]
lincoln-lil opened a new pull request, #25612: URL: https://github.com/apache/flink/pull/25612 ## What is the purpose of the change This pr port the bugfix of CALCITE-6317 to fix the incorrectly constant pullup when group keys in aggregate are NULL Before upgraded corresponding calcite version(1.37), we can have the fix that copy the related `RelMdPredicates` to flink and remove it after calcite version upgrading done. ## Brief change log Copy the `RelMdPredicates` based on 1.32.0 and add the fix of CALCITE-6317 ## Verifying this change Add new case for `GroupingSetsTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @Public(Evolving): (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35332][yarn] Removed setting rest.address to the actual IP add… [flink]
ferenc-csaky commented on PR #24775: URL: https://github.com/apache/flink/pull/24775#issuecomment-2457582149 @paul8263 pls. rebase your PR against the latest `master` and force-push again. The error that failed the CI is already addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36662][table-planner] Port CALCITE-6317 to fix the incorrectly constant pullup when group keys in aggregate are NULL [flink]
snuyanzin commented on code in PR #25612: URL: https://github.com/apache/flink/pull/25612#discussion_r1829639312 ## flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/metadata/RelMdPredicates.java: ## @@ -0,0 +1,1001 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.metadata; + +import static java.util.Objects.requireNonNull; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import org.apache.calcite.linq4j.Ord; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptPredicateList; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RexImplicationChecker; +import org.apache.calcite.plan.Strong; +import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.Aggregate; +import org.apache.calcite.rel.core.Exchange; +import org.apache.calcite.rel.core.Filter; +import org.apache.calcite.rel.core.Intersect; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.Minus; +import org.apache.calcite.rel.core.Project; +import org.apache.calcite.rel.core.Sort; +import org.apache.calcite.rel.core.TableModify; +import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.core.Union; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexExecutor; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexPermuteInputsShuttle; +import org.apache.calcite.rex.RexSimplify; +import org.apache.calcite.rex.RexUnknownAs; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.rex.RexVisitorImpl; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.util.BitSets; +import org.apache.calcite.util.Bug; +import org.apache.calcite.util.ImmutableBitSet; +import org.apache.calcite.util.Util; +import org.apache.calcite.util.mapping.Mapping; +import org.apache.calcite.util.mapping.MappingType; +import org.apache.calcite.util.mapping.Mappings; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * Utility to infer Predicates that are applicable above a RelNode. + * + * The class was copied over based on calcite-1.32.0 and added the bugfix of CALCITE-6317. Review Comment: Can you please mark the changed lines with ``` // -- FLINK MODIFICATION BEGIN ... // -- FLINK MODIFICATION END ``` and put numbers of lines in class comment as it is done for other Calcite's classes it will significantly help during upgrade (before we reach 1.37) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33142) Prometheus Sink Connector - Update Documentation
[ https://issues.apache.org/jira/browse/FLINK-33142?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh resolved FLINK-33142. - Resolution: Fixed {color:#FF} {color}merged commit [{{08dd161}}|https://github.com/apache/flink-connector-prometheus/commit/08dd1618745646347ec24952e0883b97d77bbe3a] into apache:main > Prometheus Sink Connector - Update Documentation > > > Key: FLINK-33142 > URL: https://issues.apache.org/jira/browse/FLINK-33142 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Prometheus, Documentation >Reporter: Lorenzo Nicora >Assignee: Lorenzo Nicora >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33137][Connectors/Prometheus] Prometheus connector docs [flink-connector-prometheus]
hlteoh37 merged PR #3: URL: https://github.com/apache/flink-connector-prometheus/pull/3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
jnh5y commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829644347 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java: ## @@ -50,74 +67,43 @@ void testUpsertInto() { "UPSERT INTO statement is not supported. Please use INSERT INTO instead."); } -@Test -void testInsertIntoShouldColumnMismatchWithValues() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES(1)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithSelect() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) SELECT 1")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithLastValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithFirstValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1), (2,3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithMultiFieldValues() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4,5)")) +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES(1)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3)", +"INSERT INTO t2 (a, b) VALUES (1), (2, 3)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4, 5)", +"INSERT INTO t2 (a, b) SELECT 1", +"INSERT INTO t2 (a, b) SELECT * FROM t1", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT * FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t_nested", +"INSERT INTO t2 (a, b) SELECT * FROM (TABLE t_nested)" +}) +void testInvalidNumberOfColumnsWhileInsertInto(String sql) { +assertThatThrownBy(() -> plannerMocks.getParser().parse(sql)) .isInstanceOf(ValidationException.class) .hasMessageContaining(" Number of columns must match number of query columns"); } -@Test -void testInsertIntoShouldNotColumnMismatchWithValues() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4)"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSelect() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) Select 1, 2"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSingleColValues() { +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4)", +"INSERT INTO t2 (a) VALUES (1), (3)", +"INSERT INTO t2 (a, b) SELECT 1, 2", +"INSERT INTO t2 (a, b) SELECT * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t1", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t1", +"INSERT INTO t2 (a, b) SELECT f.* FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t2_copy" Review Comment: Is this the case which catches the condition `identifiersSize == 0` above? (Asking for my understanding.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
jnh5y commented on PR #25316: URL: https://github.com/apache/flink/pull/25316#issuecomment-2457624570 Would it make sense to add a test case to something like `TableSinkITCase` to validate things end-to-end and also prevent regressions? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix] [docs] Add lead()/lag() Table API agg functions to docs [flink]
fhueske opened a new pull request, #25608: URL: https://github.com/apache/flink/pull/25608 ## What is the purpose of the change Update the docs to include the Table API aggregation functions `lead()` and `lag()` (added with #25582) ## Brief change log Adds syntax for Table API `lead()` and `lag()` functions to English and Chinese docs. The functions already have a description because they've been available for SQL. ## Verifying this change n/a ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation Docs-only change to describe recently added functionality. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] [docs] Add lead()/lag() Table API agg functions to docs [flink]
flinkbot commented on PR #25608: URL: https://github.com/apache/flink/pull/25608#issuecomment-2456574946 ## CI report: * cb22534b7d438da4d707b9bfc465e0ce34c99934 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Replace System.out.println with logger for better log management [flink-connector-gcp-pubsub]
boring-cyborg[bot] commented on PR #31: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/31#issuecomment-2456467184 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36592][state/forst] Support file cache for ForStStateBackend [flink]
fredia commented on code in PR #25561: URL: https://github.com/apache/flink/pull/25561#discussion_r1828883909 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ## @@ -75,7 +111,22 @@ public ForStFlinkFileSystem(FileSystem delegateFS, String remoteBase, String loc public static FileSystem get(URI uri) throws IOException { String localBase = remoteLocalMapping.get(uri.toString()); Preconditions.checkNotNull(localBase, "localBase is null, remote uri:" + uri); -return new ForStFlinkFileSystem(FileSystem.get(uri), uri.toString(), localBase); +return new ForStFlinkFileSystem( +FileSystem.get(uri), +uri.toString(), +localBase, +cacheBase == null +? null +: new FileBasedCache( +Integer.MAX_VALUE, +new BundledCacheLimitPolicy( +new SystemCacheLimitPolicy( +new File(localBase), +cacheReservedSize, +SST_FILE_SIZE), +new FixedCapCacheLimitPolicy(cacheCapacity)), Review Comment: Good suggestion, add `ForStOptions.CACHE_TYPE` for different cache policies. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-36077) Implement Table API support for PubSubSink V2
[ https://issues.apache.org/jira/browse/FLINK-36077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Nuyanzin resolved FLINK-36077. - Resolution: Fixed Merged as [303f4dbe1817f57646caaba377d35742859dec65|https://github.com/apache/flink-connector-gcp-pubsub/commit/303f4dbe1817f57646caaba377d35742859dec65] > Implement Table API support for PubSubSink V2 > - > > Key: FLINK-36077 > URL: https://issues.apache.org/jira/browse/FLINK-36077 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Google Cloud PubSub >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Major > Labels: pull-request-available > Fix For: gcp-pubsub-3.2.0 > > > Duplicate of https://issues.apache.org/jira/browse/FLINK-24299 specific to > Sink implemetnation -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33137][Connectors/Prometheus] Prometheus connector docs [flink-connector-prometheus]
hlteoh37 commented on code in PR #3: URL: https://github.com/apache/flink-connector-prometheus/pull/3#discussion_r1829043534 ## docs/content/docs/connectors/datastream/prometheus.md: ## @@ -0,0 +1,473 @@ +--- +title: Prometheus +weight: 5 +type: docs +aliases: + - /dev/connectors/prometheus.html + - /apis/streaming/connectors/prometheus.html +--- + + + +# Prometheus Sink + +This sink connector can be used to write **data** to Prometheus-compatible storage, using the [Remote Write](https://prometheus.io/docs/specs/remote_write_spec/) Prometheus interface. + +The Prometheus-compatible backend must support [Remote Write 1.0](https://prometheus.io/docs/specs/remote_write_spec/) standard API, and the Remote Write endpoint must be enabled. + +{{< hint warn >}}This connector is not meant for sending internal Flink metrics to Prometheus. +To publish Flink metrics, for monitoring health and operations of the Flink cluster, you should use +[Metric Reporters](../../../deployment/metric_reporters/).{{< /hint >}} + +To use the connector, add the following Maven dependency to your project: + +{{< connector_artifact flink-connector-prometheus prometheus >}} + +## Usage + +The Prometheus sink provides a builder class to build a `PrometheusSink` instance. The code snippets below shows +how to build a `PrometheusSink` with a basic configuration, and an optional [request signer](#request-signer). + +```java +PrometheusSink sink = PrometheusSink.builder() +.setPrometheusRemoteWriteUrl(prometheusRemoteWriteUrl) +.setRequestSigner(new AmazonManagedPrometheusWriteRequestSigner(prometheusRemoteWriteUrl, prometheusRegion)) // Optional +.build(); +``` +The only **required** configuration is `prometheusRemoteWriteUrl`. All other configurations are optional. + +If your sink has parallelism > 1, you need to ensure the stream is keyed using the `PrometheusTimeSeriesLabelsAndMetricNameKeySelector` +key selector, so that all samples of the same time-series are in the same partition and order is not lost. +See [Sink parallelism and keyed streams](#sink-parallelism-and-keyed-streams) for more details. + + +### Input data objects + +The sink expects `PrometheusTimeSeries` records as input. +Your input data must be converted into `PrometheusTimeSeries`, using a map or flatMap operator, before the sending to the sink. + +`PrometheusTimeSeries` instances are immutable and cannot be reused. You can use the [builder](#populating-a-prometheustimeseries) +to create and populate instances. + +A `PrometheusTimeSeries` represents a single time-series record when sent to the Remote Write interface. Each time-series +record may contain multiple samples. + +{{< hint info >}} +In the context of Prometheus, the term "time-series" is overloaded. +It means both *a series of samples with a unique set of labels* (a time-series in the underlying time-series database), +and *a record sent to the Remote Write interface*. A `PrometheusTimeSeries` instance represents a record sent to the interface. + +The two concepts are related, because time-series "records" with the same sets of labels are sent to the same +"database time-series".{{< /hint >}} + +Each `PrometheusTimeSeries` record contains: + +- One **`metricName`**. A string that is translated into the value of the `__name__` label. +- Zero or more **`Label`** entries. Each label has a `key` and a `value`, both `String`. Labels represent additional dimensions of the samples. Duplicate Label keys are not allowed. +- One or more **`Sample`**. Each sample has a `value` (`double`) representing the measure, and a `timestamp` (`long`) representing the time of the measure, in milliseconds from the Epoch. Duplicate timestamps in the same record are not allowed. + +The following pseudocode represents the structure of a `PrometheusTimeSeries` record: + +``` +PrometheusTimeSeries + + --> (1) metricName + + --> (0..*) Label ++ name ++ value + + --> 1..* Sample ++ timestamp ++ value +``` Review Comment: The numbers are a little confusing - can we instead add comments on the right to explain optional / non-optional? ## docs/content/docs/connectors/datastream/prometheus.md: ## @@ -0,0 +1,473 @@ +--- +title: Prometheus +weight: 5 +type: docs +aliases: + - /dev/connectors/prometheus.html + - /apis/streaming/connectors/prometheus.html +--- + + + +# Prometheus Sink + +This sink connector can be used to write **data** to Prometheus-compatible storage, using the [Remote Write](https://prometheus.io/docs/specs/remote_write_spec/) Prometheus interface. + +The Prometheus-compatible backend must support [Remote Write 1.0](https://prometheus.io/docs/specs/remote_write_spec/) standard API, and the Remote Write endpoint must be enabled. + +{{< hint warn >}}This connector is not meant for sending internal Flink metrics to Prometheus. +To publish Flink metrics, for monit
Re: [PR] [FLINK-36288][Connectors/Prometheus]Set up archunit tests [flink-connector-prometheus]
hlteoh37 merged PR #12: URL: https://github.com/apache/flink-connector-prometheus/pull/12 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-33137) FLIP-312: Prometheus Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-33137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh resolved FLINK-33137. - Resolution: Fixed > FLIP-312: Prometheus Sink Connector > --- > > Key: FLINK-33137 > URL: https://issues.apache.org/jira/browse/FLINK-33137 > Project: Flink > Issue Type: New Feature > Components: Connectors / Prometheus >Reporter: Lorenzo Nicora >Assignee: Lorenzo Nicora >Priority: Major > Labels: Connector, pull-request-available > Fix For: prometheus-connector-1.0.0 > > > Umbrella Jira for implementation of Prometheus Sink Connector > https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36099][task] Log jobID in TaskExecutor#failTask [flink]
flinkbot commented on PR #25609: URL: https://github.com/apache/flink/pull/25609#issuecomment-2456713844 ## CI report: * 5af77a469ec2edbdf98c2cf8c89b2ec5d873c000 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35136] Bump connector version to 4.0, adapt CI workflows [flink-connector-hbase]
ferenc-csaky commented on PR #46: URL: https://github.com/apache/flink-connector-hbase/pull/46#issuecomment-2456749670 > Great to see the PR merged, thanks for moving it forward @ferenc-csaky ! How does the release process and the timeline look? Is there anything I can help with to make it happen? I plan to kick off the release process this week, hopefully there will be an RC by EOD. When the EOD is present, anyone can help to validate it. If you are interested in that, I will post that update here as well, or you can follow the Flink dev mailing list with a [VOTE] thread about the `flink-hbase-connector` release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-18476) PythonEnvUtilsTest#testStartPythonProcess fails
[ https://issues.apache.org/jira/browse/FLINK-18476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895627#comment-17895627 ] Piotr Nowojski commented on FLINK-18476: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=63497&view=logs&j=fc5181b0-e452-5c8f-68de-1097947f6483&t=995c650b-6573-581c-9ce6-7ad4cc038461 > PythonEnvUtilsTest#testStartPythonProcess fails > --- > > Key: FLINK-18476 > URL: https://issues.apache.org/jira/browse/FLINK-18476 > Project: Flink > Issue Type: Bug > Components: API / Python, Tests >Affects Versions: 1.11.0, 1.15.3, 1.18.0, 1.19.0, 1.20.0 >Reporter: Dawid Wysakowicz >Priority: Major > Labels: auto-deprioritized-major, auto-deprioritized-minor, > test-stability > > The > {{org.apache.flink.client.python.PythonEnvUtilsTest#testStartPythonProcess}} > failed in my local environment as it assumes the environment has > {{/usr/bin/python}}. > I don't know exactly how did I get python in Ubuntu 20.04, but I have only > alias for {{python = python3}}. Therefore the tests fails. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36099) JobIDLoggingITCase fails due to "Cannot find task to fail for execution [...]" info log message in TM logs
[ https://issues.apache.org/jira/browse/FLINK-36099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski closed FLINK-36099. -- Fix Version/s: 2.0.0 Resolution: Fixed merged commit 0b64b57 into apache:master > JobIDLoggingITCase fails due to "Cannot find task to fail for execution > [...]" info log message in TM logs > -- > > Key: FLINK-36099 > URL: https://issues.apache.org/jira/browse/FLINK-36099 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 2.0.0, 1.18.1, 1.20.0, 1.19.1 >Reporter: Matthias Pohl >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available, test-stability > Fix For: 2.0.0 > > > {{JobIDLoggingITCase}} can fail (observed with the {{AdaptiveScheduler}} > enabled): > {code} > Test > org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging[testJobIDLogging(ClusterClient, > Path, MiniCluster)] failed with: > java.lang.AssertionError: [too many events without Job ID logged by > org.apache.flink.runtime.taskexecutor.TaskExecutor] > Expecting empty but was: > [Logger=org.apache.flink.runtime.taskexecutor.TaskExecutor Level=INFO > Message=Cannot find task to fail for execution > 5447dca7a6c7f9679346cad41dc8e3be_cbc357ccb763df2852fee8c4fc7d55f2_0_0 with > exception:] > at > org.apache.flink.test.misc.JobIDLoggingITCase.assertJobIDPresent(JobIDLoggingITCase.java:267) > at > org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(JobIDLoggingITCase.java:155) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > org.junit.platform.engine.sup
Re: [PR] [FLINK-36099][task] Log jobID in TaskExecutor#failTask [flink]
pnowojski merged PR #25609: URL: https://github.com/apache/flink/pull/25609 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-34341) Add support for DDB Streams for DataStream API
[ https://issues.apache.org/jira/browse/FLINK-34341?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-34341: Parent: (was: FLINK-24438) Issue Type: New Feature (was: Sub-task) > Add support for DDB Streams for DataStream API > -- > > Key: FLINK-34341 > URL: https://issues.apache.org/jira/browse/FLINK-34341 > Project: Flink > Issue Type: New Feature > Components: Connectors / Kinesis >Reporter: Danny Cranmer >Assignee: Abhi Gupta >Priority: Major > Labels: pull-request-available > Fix For: aws-connector-5.0.0 > > > > In the legacy KDS source we support Amazon DynamoDB streams via an adapter > shim. Both KDS and DDB streams have a similar API. > This task builds upon https://issues.apache.org/jira/browse/FLINK-34339 and > will add a {{DynamoDBStreamsSource}} which will setup a DDB SDK client shim. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33137][Connectors/Prometheus] Prometheus connector docs [flink-connector-prometheus]
nicusX commented on PR #3: URL: https://github.com/apache/flink-connector-prometheus/pull/3#issuecomment-2457198460 @hlteoh37 I fixed the typos and improved the readability of the data structure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36623] Improve logging in DefaultStateTransitionManager [flink]
XComp merged PR #25610: URL: https://github.com/apache/flink/pull/25610 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-36623) Improve logging in DefaultStateTransitionManager
[ https://issues.apache.org/jira/browse/FLINK-36623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl resolved FLINK-36623. --- Fix Version/s: 2.0.0 Resolution: Fixed master: [21eea0d2ddeea14a791c13c9595d8cb54e9f1b0c|https://github.com/apache/flink/commit/21eea0d2ddeea14a791c13c9595d8cb54e9f1b0c] > Improve logging in DefaultStateTransitionManager > > > Key: FLINK-36623 > URL: https://issues.apache.org/jira/browse/FLINK-36623 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 2.0.0, 2.0-preview >Reporter: Roman Khachatryan >Assignee: Zdenek Tison >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > When the job transitions from one state to another, e.g. restarts when new > slots are available; it's not visible in the logs unless log.level is debug. > Therefore, it'd make sense to: > # Change log level from DEBUG to INFO > # Log job ID when such transition happens -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
snuyanzin commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829676623 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java: ## @@ -50,74 +67,43 @@ void testUpsertInto() { "UPSERT INTO statement is not supported. Please use INSERT INTO instead."); } -@Test -void testInsertIntoShouldColumnMismatchWithValues() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES(1)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithSelect() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) SELECT 1")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithLastValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithFirstValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1), (2,3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithMultiFieldValues() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4,5)")) +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES(1)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3)", +"INSERT INTO t2 (a, b) VALUES (1), (2, 3)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4, 5)", +"INSERT INTO t2 (a, b) SELECT 1", +"INSERT INTO t2 (a, b) SELECT * FROM t1", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT * FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t_nested", +"INSERT INTO t2 (a, b) SELECT * FROM (TABLE t_nested)" +}) +void testInvalidNumberOfColumnsWhileInsertInto(String sql) { +assertThatThrownBy(() -> plannerMocks.getParser().parse(sql)) .isInstanceOf(ValidationException.class) .hasMessageContaining(" Number of columns must match number of query columns"); } -@Test -void testInsertIntoShouldNotColumnMismatchWithValues() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4)"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSelect() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) Select 1, 2"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSingleColValues() { +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4)", +"INSERT INTO t2 (a) VALUES (1), (3)", +"INSERT INTO t2 (a, b) SELECT 1, 2", +"INSERT INTO t2 (a, b) SELECT * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t1", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t1", +"INSERT INTO t2 (a, b) SELECT f.* FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t2_copy" Review Comment: i guess no here are only tests with SQL do you have an idea how SQL would look like for the case >identifiersSize == 0 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org
Re: [PR] [FLINK-35780][state] Support state ttl migration [flink]
xiangyuf commented on PR #25611: URL: https://github.com/apache/flink/pull/25611#issuecomment-2457653564 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
jnh5y commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829716133 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java: ## @@ -50,74 +67,43 @@ void testUpsertInto() { "UPSERT INTO statement is not supported. Please use INSERT INTO instead."); } -@Test -void testInsertIntoShouldColumnMismatchWithValues() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES(1)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithSelect() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) SELECT 1")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithLastValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithFirstValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1), (2,3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithMultiFieldValues() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4,5)")) +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES(1)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3)", +"INSERT INTO t2 (a, b) VALUES (1), (2, 3)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4, 5)", +"INSERT INTO t2 (a, b) SELECT 1", +"INSERT INTO t2 (a, b) SELECT * FROM t1", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT * FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t_nested", +"INSERT INTO t2 (a, b) SELECT * FROM (TABLE t_nested)" +}) +void testInvalidNumberOfColumnsWhileInsertInto(String sql) { +assertThatThrownBy(() -> plannerMocks.getParser().parse(sql)) .isInstanceOf(ValidationException.class) .hasMessageContaining(" Number of columns must match number of query columns"); } -@Test -void testInsertIntoShouldNotColumnMismatchWithValues() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4)"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSelect() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) Select 1, 2"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSingleColValues() { +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4)", +"INSERT INTO t2 (a) VALUES (1), (3)", +"INSERT INTO t2 (a, b) SELECT 1, 2", +"INSERT INTO t2 (a, b) SELECT * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t1", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t1", +"INSERT INTO t2 (a, b) SELECT f.* FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t2_copy" Review Comment: Interesting. In that case, how did you figure out that you needed to add that condition? (I'm mainly just curious.) If it cannot be hit from SQL, it may be nice to see a unit test which exercises the new condition. Not sure how hard that'd be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and u
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
twalthr commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829717408 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala: ## @@ -130,8 +131,11 @@ object SqlRewriterUtils { call.getKind match { case SqlKind.SELECT => val sqlSelect = call.asInstanceOf[SqlSelect] - -if (targetPosition.nonEmpty && sqlSelect.getSelectList.size() != targetPosition.size()) { +val identifiersSize = sqlSelect.getSelectList.count(s => s.isInstanceOf[SqlIdentifier]) +if ( + identifiersSize == 0 Review Comment: I have a similar question than Jim. What is this line good for? maybe an inline comment makes sense here with an example? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36099][task] Log jobID in TaskExecutor#failTask [flink]
pnowojski commented on PR #25609: URL: https://github.com/apache/flink/pull/25609#issuecomment-2457299456 Test failure unrelated: https://issues.apache.org/jira/browse/FLINK-18476 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36651][Connectors/Elasticsearch] Fix IT test not compatible with 1.20, drop main branch tests for 1.18 [flink-connector-elasticsearch]
vahmed-hamdy commented on PR #110: URL: https://github.com/apache/flink-connector-elasticsearch/pull/110#issuecomment-2457790813 @snuyanzin yeah, I guess we can update the arch rules separately, could you please rereun the CI and check now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36648) Release flink-connector-kafka v4.0.0 for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-36648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LvYanquan updated FLINK-36648: -- Affects Version/s: 2.0.0 > Release flink-connector-kafka v4.0.0 for Flink 2.0 > -- > > Key: FLINK-36648 > URL: https://issues.apache.org/jira/browse/FLINK-36648 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Kafka >Affects Versions: 2.0.0, 2.0-preview >Reporter: LvYanquan >Priority: Major > Fix For: 2.0-preview > > > Flink now already have a release for 2.0-preview1 version, Kafka, as one of > the most commonly used connectors, needs to be bump to this version as soon > as possible for users and developers to use. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36649] Fix oracle connection close error by OracleIncrementalSource read [flink-cdc]
JNSimba commented on PR #3678: URL: https://github.com/apache/flink-cdc/pull/3678#issuecomment-2456528985 @leonardBang Thanks for your reviewd, I added a description. I see that there are similar cases already in ITCase, such as **testConsumingAllEvents** in **OracleConnectorITCase** class. But it is strange that these ITCases have never reported errors, but they appear frequently in my local area, and from the code point of view, this situation may occur. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-25921) Support different input parallelism for preCommit topology
[ https://issues.apache.org/jira/browse/FLINK-25921?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895532#comment-17895532 ] Fabian Paul commented on FLINK-25921: - [~poorvank] I assigned the ticket to you. Do you already have a use case or reproduction of how the current behavior causes issues? > Support different input parallelism for preCommit topology > -- > > Key: FLINK-25921 > URL: https://issues.apache.org/jira/browse/FLINK-25921 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Affects Versions: 1.15.0, 1.16.0 >Reporter: Fabian Paul >Assignee: Poorvank Bhatia >Priority: Major > > Currently, we assume that the pre-commit topology has the same parallelism as > the operator before when inserting the failover region. To support a > different parallelism we might need to insert a different identity map to > customize the mapping. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-25921) Support different input parallelism for preCommit topology
[ https://issues.apache.org/jira/browse/FLINK-25921?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Paul reassigned FLINK-25921: --- Assignee: Poorvank Bhatia > Support different input parallelism for preCommit topology > -- > > Key: FLINK-25921 > URL: https://issues.apache.org/jira/browse/FLINK-25921 > Project: Flink > Issue Type: Sub-task > Components: API / DataStream >Affects Versions: 1.15.0, 1.16.0 >Reporter: Fabian Paul >Assignee: Poorvank Bhatia >Priority: Major > > Currently, we assume that the pre-commit topology has the same parallelism as > the operator before when inserting the failover region. To support a > different parallelism we might need to insert a different identity map to > customize the mapping. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36315][cdc-base]The flink-cdc-base module supports source metric statistics [flink-cdc]
liuxiao2shf commented on code in PR #3619: URL: https://github.com/apache/flink-cdc/pull/3619#discussion_r1828970845 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/experimental/config/MySqlSourceConfigFactory.java: ## @@ -63,6 +63,10 @@ public MySqlSourceConfig create(int subtaskId) { props.setProperty("database.user", checkNotNull(username)); props.setProperty("database.password", checkNotNull(password)); props.setProperty("database.port", String.valueOf(port)); +// props.setProperty("hostname", checkNotNull(hostname)); +// props.setProperty("user", checkNotNull(username)); +// props.setProperty("password", checkNotNull(password)); +// props.setProperty("port", String.valueOf(port)); Review Comment: These comments are useless, I will remove them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36592][state/forst] Support file cache for ForStStateBackend [flink]
Zakelly commented on code in PR #25561: URL: https://github.com/apache/flink/pull/25561#discussion_r1829002947 ## flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java: ## @@ -46,21 +58,45 @@ public class ForStFlinkFileSystem extends FileSystem { // TODO: make it configurable private static final int DEFAULT_INPUT_STREAM_CAPACITY = 32; +private static final long SST_FILE_SIZE = 1024 * 1024 * 64; + private static final Map remoteLocalMapping = new ConcurrentHashMap<>(); private static final Function miscFileFilter = s -> !s.endsWith(".sst"); +private static Path cacheBase = null; +private static long cacheCapacity = Long.MAX_VALUE; +private static long cacheReservedSize = 0; private final FileSystem localFS; private final FileSystem delegateFS; private final String remoteBase; private final Function localFileFilter; private final String localBase; +@Nullable private final FileBasedCache fileBasedCache; -public ForStFlinkFileSystem(FileSystem delegateFS, String remoteBase, String localBase) { +public ForStFlinkFileSystem( +FileSystem delegateFS, +String remoteBase, +String localBase, +@Nullable FileBasedCache fileBasedCache) { this.localFS = FileSystem.getLocalFileSystem(); this.delegateFS = delegateFS; this.localFileFilter = miscFileFilter; this.remoteBase = remoteBase; this.localBase = localBase; +this.fileBasedCache = fileBasedCache; +} + +/** + * Configure cache for ForStFlinkFileSystem. + * + * @param path the cache base path. + * @param cacheCap the cache capacity. + * @param reserveSize the cache reserved size. + */ +public static void configureCache(Path path, long cacheCap, long reserveSize) { Review Comment: Why using static method with state? I mean what if two ForSt state backend is invoking this method in the meantime? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36068][runtime] Support for adaptive job scheduling based on StreamGraph [flink]
JunRuiLee closed pull request #25591: [FLINK-36068][runtime] Support for adaptive job scheduling based on StreamGraph URL: https://github.com/apache/flink/pull/25591 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36615] Support LEAD/LAG functions in Table API [flink]
dawidwys merged PR #25582: URL: https://github.com/apache/flink/pull/25582 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36400) 403 and 404 responses should be fatal
[ https://issues.apache.org/jira/browse/FLINK-36400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-36400: Fix Version/s: prometheus-connector-1.0.0 > 403 and 404 responses should be fatal > - > > Key: FLINK-36400 > URL: https://issues.apache.org/jira/browse/FLINK-36400 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Prometheus >Reporter: Lorenzo Nicora >Priority: Major > Fix For: prometheus-connector-1.0.0 > > > h3. The issue > If the endpoint requires authentication and the connector is not configured > with correct authentication and credentials, the endpoint throws 403. This > should be considered fatal. The connector should never continue. > Similarly, if the endpoint URL path is misconfigured, Prometheus may respond > with a 404. This should also be considered fatal. > With the current behaviour, every 4xx exception (except 429) is considered > "non-retriable" as by specs, and the connector discard and continue. But 403 > and 404 are usually caused by a misconfiguration, and the connector should > fail. > h3. Change required > The error handling behaviour should be ignored on 403 or 404 responses. The > writer should always throw an exception -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36286) Set up validation workflows
[ https://issues.apache.org/jira/browse/FLINK-36286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-36286: Fix Version/s: prometheus-connector-1.0.0 > Set up validation workflows > --- > > Key: FLINK-36286 > URL: https://issues.apache.org/jira/browse/FLINK-36286 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Prometheus >Reporter: Hong Liang Teoh >Assignee: Hong Liang Teoh >Priority: Major > Labels: pull-request-available > Fix For: prometheus-connector-1.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36400) 403 and 404 responses should be fatal
[ https://issues.apache.org/jira/browse/FLINK-36400?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh reassigned FLINK-36400: --- Assignee: Lorenzo Nicora > 403 and 404 responses should be fatal > - > > Key: FLINK-36400 > URL: https://issues.apache.org/jira/browse/FLINK-36400 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Prometheus >Reporter: Lorenzo Nicora >Assignee: Lorenzo Nicora >Priority: Major > Fix For: prometheus-connector-1.0.0 > > > h3. The issue > If the endpoint requires authentication and the connector is not configured > with correct authentication and credentials, the endpoint throws 403. This > should be considered fatal. The connector should never continue. > Similarly, if the endpoint URL path is misconfigured, Prometheus may respond > with a 404. This should also be considered fatal. > With the current behaviour, every 4xx exception (except 429) is considered > "non-retriable" as by specs, and the connector discard and continue. But 403 > and 404 are usually caused by a misconfiguration, and the connector should > fail. > h3. Change required > The error handling behaviour should be ignored on 403 or 404 responses. The > writer should always throw an exception -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36603) Inconsistent spelling of "behavior" and "retryable" across the API
[ https://issues.apache.org/jira/browse/FLINK-36603?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-36603: Fix Version/s: prometheus-connector-1.0.0 > Inconsistent spelling of "behavior" and "retryable" across the API > -- > > Key: FLINK-36603 > URL: https://issues.apache.org/jira/browse/FLINK-36603 > Project: Flink > Issue Type: Sub-task > Components: Connectors / Prometheus >Reporter: Lorenzo Nicora >Assignee: Lorenzo Nicora >Priority: Minor > Labels: pull-request-available > Fix For: prometheus-connector-1.0.0 > > > The spelling of the word "behavior" is inconsistent across the API > e.g. > {{.setErrorHandlingBehaviourConfiguration}} > {{OnErrorBehavior.DISCARD_AND_CONTINUE}} > It should always be "behavior" following the US English spelling. > Also the word "retryable" is misspelled as "retriable" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33137) FLIP-312: Prometheus Sink Connector
[ https://issues.apache.org/jira/browse/FLINK-33137?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-33137: Fix Version/s: prometheus-connector-1.0.0 > FLIP-312: Prometheus Sink Connector > --- > > Key: FLINK-33137 > URL: https://issues.apache.org/jira/browse/FLINK-33137 > Project: Flink > Issue Type: New Feature > Components: Connectors / Prometheus >Reporter: Lorenzo Nicora >Assignee: Lorenzo Nicora >Priority: Major > Labels: Connector, pull-request-available > Fix For: prometheus-connector-1.0.0 > > > Umbrella Jira for implementation of Prometheus Sink Connector > https://cwiki.apache.org/confluence/display/FLINK/FLIP-312:+Prometheus+Sink+Connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36288) Set up architectural test
[ https://issues.apache.org/jira/browse/FLINK-36288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh closed FLINK-36288. --- Resolution: Fixed merged commit 60aa7ee into apache:main > Set up architectural test > - > > Key: FLINK-36288 > URL: https://issues.apache.org/jira/browse/FLINK-36288 > Project: Flink > Issue Type: Sub-task >Reporter: Hong Liang Teoh >Assignee: Lorenzo Nicora >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36288) Set up architectural test
[ https://issues.apache.org/jira/browse/FLINK-36288?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-36288: Fix Version/s: prometheus-connector-1.0.0 > Set up architectural test > - > > Key: FLINK-36288 > URL: https://issues.apache.org/jira/browse/FLINK-36288 > Project: Flink > Issue Type: Sub-task >Reporter: Hong Liang Teoh >Assignee: Lorenzo Nicora >Priority: Major > Labels: pull-request-available > Fix For: prometheus-connector-1.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36099) JobIDLoggingITCase fails due to "Cannot find task to fail for execution [...]" info log message in TM logs
[ https://issues.apache.org/jira/browse/FLINK-36099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-36099: -- Assignee: Piotr Nowojski > JobIDLoggingITCase fails due to "Cannot find task to fail for execution > [...]" info log message in TM logs > -- > > Key: FLINK-36099 > URL: https://issues.apache.org/jira/browse/FLINK-36099 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 2.0.0, 1.18.1, 1.20.0, 1.19.1 >Reporter: Matthias Pohl >Assignee: Piotr Nowojski >Priority: Major > Labels: test-stability > > {{JobIDLoggingITCase}} can fail (observed with the {{AdaptiveScheduler}} > enabled): > {code} > Test > org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging[testJobIDLogging(ClusterClient, > Path, MiniCluster)] failed with: > java.lang.AssertionError: [too many events without Job ID logged by > org.apache.flink.runtime.taskexecutor.TaskExecutor] > Expecting empty but was: > [Logger=org.apache.flink.runtime.taskexecutor.TaskExecutor Level=INFO > Message=Cannot find task to fail for execution > 5447dca7a6c7f9679346cad41dc8e3be_cbc357ccb763df2852fee8c4fc7d55f2_0_0 with > exception:] > at > org.apache.flink.test.misc.JobIDLoggingITCase.assertJobIDPresent(JobIDLoggingITCase.java:267) > at > org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(JobIDLoggingITCase.java:155) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > org.junit.platform.engine.support.hierarchical.N
[jira] [Commented] (FLINK-36289) Create Prometheus Flink connector release
[ https://issues.apache.org/jira/browse/FLINK-36289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895556#comment-17895556 ] Hong Liang Teoh commented on FLINK-36289: - Closing as we don't need a Jira to track release > Create Prometheus Flink connector release > - > > Key: FLINK-36289 > URL: https://issues.apache.org/jira/browse/FLINK-36289 > Project: Flink > Issue Type: Sub-task >Reporter: Hong Liang Teoh >Priority: Major > > Update the Flink -docs > [https://github.com/apache/flink/tree/master/docs] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33140) Prometheus Sink Connector - E2E example on AWS
[ https://issues.apache.org/jira/browse/FLINK-33140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-33140: Parent: (was: FLINK-33137) Issue Type: Bug (was: Sub-task) > Prometheus Sink Connector - E2E example on AWS > -- > > Key: FLINK-33140 > URL: https://issues.apache.org/jira/browse/FLINK-33140 > Project: Flink > Issue Type: Bug >Reporter: Lorenzo Nicora >Priority: Major > > End-to-end example application, deployable on Amazon Managed Service for > Apache Flink, and writing to Amazon Managed Prometheus -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33152) Prometheus Sink Connector - Integration tests
[ https://issues.apache.org/jira/browse/FLINK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-33152: Parent: (was: FLINK-33137) Issue Type: Bug (was: Sub-task) > Prometheus Sink Connector - Integration tests > - > > Key: FLINK-33152 > URL: https://issues.apache.org/jira/browse/FLINK-33152 > Project: Flink > Issue Type: Bug > Components: Connectors / Prometheus >Reporter: Lorenzo Nicora >Priority: Major > > Integration tests against containerised Prometheus -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33139) Prometheus Sink Connector - Table API support
[ https://issues.apache.org/jira/browse/FLINK-33139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh updated FLINK-33139: Parent: (was: FLINK-33137) Issue Type: Bug (was: Sub-task) > Prometheus Sink Connector - Table API support > - > > Key: FLINK-33139 > URL: https://issues.apache.org/jira/browse/FLINK-33139 > Project: Flink > Issue Type: Bug > Components: Connectors / Prometheus >Reporter: Lorenzo Nicora >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36289) Create Prometheus Flink connector release
[ https://issues.apache.org/jira/browse/FLINK-36289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hong Liang Teoh closed FLINK-36289. --- Resolution: Done > Create Prometheus Flink connector release > - > > Key: FLINK-36289 > URL: https://issues.apache.org/jira/browse/FLINK-36289 > Project: Flink > Issue Type: Sub-task >Reporter: Hong Liang Teoh >Priority: Major > > Update the Flink -docs > [https://github.com/apache/flink/tree/master/docs] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35136] Bump connector version to 4.0, adapt CI workflows [flink-connector-hbase]
JuliaBogdan commented on PR #46: URL: https://github.com/apache/flink-connector-hbase/pull/46#issuecomment-2456683061 Great to see the PR merged, thanks for moving it forward @ferenc-csaky ! How does the release process and the timeline look? Is there anything I can help with to make it happen? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36099) JobIDLoggingITCase fails due to "Cannot find task to fail for execution [...]" info log message in TM logs
[ https://issues.apache.org/jira/browse/FLINK-36099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36099: --- Labels: pull-request-available test-stability (was: test-stability) > JobIDLoggingITCase fails due to "Cannot find task to fail for execution > [...]" info log message in TM logs > -- > > Key: FLINK-36099 > URL: https://issues.apache.org/jira/browse/FLINK-36099 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 2.0.0, 1.18.1, 1.20.0, 1.19.1 >Reporter: Matthias Pohl >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available, test-stability > > {{JobIDLoggingITCase}} can fail (observed with the {{AdaptiveScheduler}} > enabled): > {code} > Test > org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging[testJobIDLogging(ClusterClient, > Path, MiniCluster)] failed with: > java.lang.AssertionError: [too many events without Job ID logged by > org.apache.flink.runtime.taskexecutor.TaskExecutor] > Expecting empty but was: > [Logger=org.apache.flink.runtime.taskexecutor.TaskExecutor Level=INFO > Message=Cannot find task to fail for execution > 5447dca7a6c7f9679346cad41dc8e3be_cbc357ccb763df2852fee8c4fc7d55f2_0_0 with > exception:] > at > org.apache.flink.test.misc.JobIDLoggingITCase.assertJobIDPresent(JobIDLoggingITCase.java:267) > at > org.apache.flink.test.misc.JobIDLoggingITCase.testJobIDLogging(JobIDLoggingITCase.java:155) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) > at > org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) >
[PR] [FLINK-36099][task] Log jobID in TaskExecutor#failTask [flink]
pnowojski opened a new pull request, #25609: URL: https://github.com/apache/flink/pull/25609 ## What is the purpose of the change Log jobID in TaskExecutor#failTask ## Brief change log Log jobID in TaskExecutor#failTask ## Verifying this change None directly, that's a simple change only in logging which is non trivial to test. Regardless of that, bugs caused by this changed would have been detected via a multitude of ITCases. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33137][Connectors/Prometheus] Prometheus connector docs [flink-connector-prometheus]
nicusX commented on code in PR #3: URL: https://github.com/apache/flink-connector-prometheus/pull/3#discussion_r1829116325 ## docs/content/docs/connectors/datastream/prometheus.md: ## @@ -0,0 +1,473 @@ +--- +title: Prometheus +weight: 5 +type: docs +aliases: + - /dev/connectors/prometheus.html + - /apis/streaming/connectors/prometheus.html +--- + + + +# Prometheus Sink + +This sink connector can be used to write **data** to Prometheus-compatible storage, using the [Remote Write](https://prometheus.io/docs/specs/remote_write_spec/) Prometheus interface. + +The Prometheus-compatible backend must support [Remote Write 1.0](https://prometheus.io/docs/specs/remote_write_spec/) standard API, and the Remote Write endpoint must be enabled. + +{{< hint warn >}}This connector is not meant for sending internal Flink metrics to Prometheus. +To publish Flink metrics, for monitoring health and operations of the Flink cluster, you should use +[Metric Reporters](../../../deployment/metric_reporters/).{{< /hint >}} + +To use the connector, add the following Maven dependency to your project: + +{{< connector_artifact flink-connector-prometheus prometheus >}} + +## Usage + +The Prometheus sink provides a builder class to build a `PrometheusSink` instance. The code snippets below shows +how to build a `PrometheusSink` with a basic configuration, and an optional [request signer](#request-signer). + +```java +PrometheusSink sink = PrometheusSink.builder() +.setPrometheusRemoteWriteUrl(prometheusRemoteWriteUrl) +.setRequestSigner(new AmazonManagedPrometheusWriteRequestSigner(prometheusRemoteWriteUrl, prometheusRegion)) // Optional +.build(); +``` +The only **required** configuration is `prometheusRemoteWriteUrl`. All other configurations are optional. + +If your sink has parallelism > 1, you need to ensure the stream is keyed using the `PrometheusTimeSeriesLabelsAndMetricNameKeySelector` +key selector, so that all samples of the same time-series are in the same partition and order is not lost. +See [Sink parallelism and keyed streams](#sink-parallelism-and-keyed-streams) for more details. + + +### Input data objects + +The sink expects `PrometheusTimeSeries` records as input. +Your input data must be converted into `PrometheusTimeSeries`, using a map or flatMap operator, before the sending to the sink. + +`PrometheusTimeSeries` instances are immutable and cannot be reused. You can use the [builder](#populating-a-prometheustimeseries) +to create and populate instances. + +A `PrometheusTimeSeries` represents a single time-series record when sent to the Remote Write interface. Each time-series +record may contain multiple samples. + +{{< hint info >}} +In the context of Prometheus, the term "time-series" is overloaded. +It means both *a series of samples with a unique set of labels* (a time-series in the underlying time-series database), +and *a record sent to the Remote Write interface*. A `PrometheusTimeSeries` instance represents a record sent to the interface. + +The two concepts are related, because time-series "records" with the same sets of labels are sent to the same +"database time-series".{{< /hint >}} + +Each `PrometheusTimeSeries` record contains: + +- One **`metricName`**. A string that is translated into the value of the `__name__` label. +- Zero or more **`Label`** entries. Each label has a `key` and a `value`, both `String`. Labels represent additional dimensions of the samples. Duplicate Label keys are not allowed. +- One or more **`Sample`**. Each sample has a `value` (`double`) representing the measure, and a `timestamp` (`long`) representing the time of the measure, in milliseconds from the Epoch. Duplicate timestamps in the same record are not allowed. + +The following pseudocode represents the structure of a `PrometheusTimeSeries` record: + +``` +PrometheusTimeSeries + + --> (1) metricName + + --> (0..*) Label ++ name ++ value + + --> 1..* Sample ++ timestamp ++ value +``` + +{{< hint info >}}The set of Labels and metricName are the unique identifiers of the database time-series. +A composite of all Labels and metricName is also the key you should use to partition data, both inside the Flink application +and upstream, to guarantee ordering per time-series is retained.{{< /hint >}} + + +### Populating a PrometheusTimeSeries + +`PrometheusTimeSeries` provides a builder interface. + +```java +PrometheusTimeSeries inputRecord = +PrometheusTimeSeries.builder() +.withMetricName(metricName) +.addLabel("DeviceID", instanceId) +.addLabel("RoomID", roomId) +.addSample(measurement1, time1) +.addSample(measurement2, time2) +.build(); +``` + +Each `PrometheusTimeSeries` instance can contain multiple samples. Call `.addSample(...)` for each of them. +The order in which samples are
Re: [PR] [FLINK-33137][Connectors/Prometheus] Prometheus connector docs [flink-connector-prometheus]
nicusX commented on code in PR #3: URL: https://github.com/apache/flink-connector-prometheus/pull/3#discussion_r1829138048 ## docs/content/docs/connectors/datastream/prometheus.md: ## @@ -0,0 +1,473 @@ +--- +title: Prometheus +weight: 5 +type: docs +aliases: + - /dev/connectors/prometheus.html + - /apis/streaming/connectors/prometheus.html +--- + + + +# Prometheus Sink + +This sink connector can be used to write **data** to Prometheus-compatible storage, using the [Remote Write](https://prometheus.io/docs/specs/remote_write_spec/) Prometheus interface. + +The Prometheus-compatible backend must support [Remote Write 1.0](https://prometheus.io/docs/specs/remote_write_spec/) standard API, and the Remote Write endpoint must be enabled. + +{{< hint warn >}}This connector is not meant for sending internal Flink metrics to Prometheus. +To publish Flink metrics, for monitoring health and operations of the Flink cluster, you should use +[Metric Reporters](../../../deployment/metric_reporters/).{{< /hint >}} + +To use the connector, add the following Maven dependency to your project: + +{{< connector_artifact flink-connector-prometheus prometheus >}} + +## Usage + +The Prometheus sink provides a builder class to build a `PrometheusSink` instance. The code snippets below shows +how to build a `PrometheusSink` with a basic configuration, and an optional [request signer](#request-signer). + +```java +PrometheusSink sink = PrometheusSink.builder() +.setPrometheusRemoteWriteUrl(prometheusRemoteWriteUrl) +.setRequestSigner(new AmazonManagedPrometheusWriteRequestSigner(prometheusRemoteWriteUrl, prometheusRegion)) // Optional +.build(); +``` +The only **required** configuration is `prometheusRemoteWriteUrl`. All other configurations are optional. + +If your sink has parallelism > 1, you need to ensure the stream is keyed using the `PrometheusTimeSeriesLabelsAndMetricNameKeySelector` +key selector, so that all samples of the same time-series are in the same partition and order is not lost. +See [Sink parallelism and keyed streams](#sink-parallelism-and-keyed-streams) for more details. + + +### Input data objects + +The sink expects `PrometheusTimeSeries` records as input. +Your input data must be converted into `PrometheusTimeSeries`, using a map or flatMap operator, before the sending to the sink. + +`PrometheusTimeSeries` instances are immutable and cannot be reused. You can use the [builder](#populating-a-prometheustimeseries) +to create and populate instances. + +A `PrometheusTimeSeries` represents a single time-series record when sent to the Remote Write interface. Each time-series +record may contain multiple samples. + +{{< hint info >}} +In the context of Prometheus, the term "time-series" is overloaded. +It means both *a series of samples with a unique set of labels* (a time-series in the underlying time-series database), +and *a record sent to the Remote Write interface*. A `PrometheusTimeSeries` instance represents a record sent to the interface. + +The two concepts are related, because time-series "records" with the same sets of labels are sent to the same +"database time-series".{{< /hint >}} + +Each `PrometheusTimeSeries` record contains: + +- One **`metricName`**. A string that is translated into the value of the `__name__` label. +- Zero or more **`Label`** entries. Each label has a `key` and a `value`, both `String`. Labels represent additional dimensions of the samples. Duplicate Label keys are not allowed. +- One or more **`Sample`**. Each sample has a `value` (`double`) representing the measure, and a `timestamp` (`long`) representing the time of the measure, in milliseconds from the Epoch. Duplicate timestamps in the same record are not allowed. + +The following pseudocode represents the structure of a `PrometheusTimeSeries` record: + +``` +PrometheusTimeSeries + + --> (1) metricName + + --> (0..*) Label ++ name ++ value + + --> 1..* Sample ++ timestamp ++ value +``` Review Comment: Really? This is supposed to be pseudo-code and uses the standard cardinality notation commonly used in diagrams, like ER or UML. Labels and Samples are lists, is not a matter of being mandatory. To convey the fact one list may contain zero or more elements and the other must contain at least one element, it would become something like this, IMO this is more confusing: ``` + --> metricName // mandatory + --> list of Label // zero or more elements + name + value + --> list of Sample // one or more elements + timestamp + value ``` I think in the original there was a parenthesis missing. Fixing it it would be this: ``` + --> (1) metricName + --> (0..*) Label + name + value + --> (1..*) Sample + timestamp
Re: [PR] [FLINK-33137][Connectors/Prometheus] Prometheus connector docs [flink-connector-prometheus]
nicusX commented on code in PR #3: URL: https://github.com/apache/flink-connector-prometheus/pull/3#discussion_r1829138048 ## docs/content/docs/connectors/datastream/prometheus.md: ## @@ -0,0 +1,473 @@ +--- +title: Prometheus +weight: 5 +type: docs +aliases: + - /dev/connectors/prometheus.html + - /apis/streaming/connectors/prometheus.html +--- + + + +# Prometheus Sink + +This sink connector can be used to write **data** to Prometheus-compatible storage, using the [Remote Write](https://prometheus.io/docs/specs/remote_write_spec/) Prometheus interface. + +The Prometheus-compatible backend must support [Remote Write 1.0](https://prometheus.io/docs/specs/remote_write_spec/) standard API, and the Remote Write endpoint must be enabled. + +{{< hint warn >}}This connector is not meant for sending internal Flink metrics to Prometheus. +To publish Flink metrics, for monitoring health and operations of the Flink cluster, you should use +[Metric Reporters](../../../deployment/metric_reporters/).{{< /hint >}} + +To use the connector, add the following Maven dependency to your project: + +{{< connector_artifact flink-connector-prometheus prometheus >}} + +## Usage + +The Prometheus sink provides a builder class to build a `PrometheusSink` instance. The code snippets below shows +how to build a `PrometheusSink` with a basic configuration, and an optional [request signer](#request-signer). + +```java +PrometheusSink sink = PrometheusSink.builder() +.setPrometheusRemoteWriteUrl(prometheusRemoteWriteUrl) +.setRequestSigner(new AmazonManagedPrometheusWriteRequestSigner(prometheusRemoteWriteUrl, prometheusRegion)) // Optional +.build(); +``` +The only **required** configuration is `prometheusRemoteWriteUrl`. All other configurations are optional. + +If your sink has parallelism > 1, you need to ensure the stream is keyed using the `PrometheusTimeSeriesLabelsAndMetricNameKeySelector` +key selector, so that all samples of the same time-series are in the same partition and order is not lost. +See [Sink parallelism and keyed streams](#sink-parallelism-and-keyed-streams) for more details. + + +### Input data objects + +The sink expects `PrometheusTimeSeries` records as input. +Your input data must be converted into `PrometheusTimeSeries`, using a map or flatMap operator, before the sending to the sink. + +`PrometheusTimeSeries` instances are immutable and cannot be reused. You can use the [builder](#populating-a-prometheustimeseries) +to create and populate instances. + +A `PrometheusTimeSeries` represents a single time-series record when sent to the Remote Write interface. Each time-series +record may contain multiple samples. + +{{< hint info >}} +In the context of Prometheus, the term "time-series" is overloaded. +It means both *a series of samples with a unique set of labels* (a time-series in the underlying time-series database), +and *a record sent to the Remote Write interface*. A `PrometheusTimeSeries` instance represents a record sent to the interface. + +The two concepts are related, because time-series "records" with the same sets of labels are sent to the same +"database time-series".{{< /hint >}} + +Each `PrometheusTimeSeries` record contains: + +- One **`metricName`**. A string that is translated into the value of the `__name__` label. +- Zero or more **`Label`** entries. Each label has a `key` and a `value`, both `String`. Labels represent additional dimensions of the samples. Duplicate Label keys are not allowed. +- One or more **`Sample`**. Each sample has a `value` (`double`) representing the measure, and a `timestamp` (`long`) representing the time of the measure, in milliseconds from the Epoch. Duplicate timestamps in the same record are not allowed. + +The following pseudocode represents the structure of a `PrometheusTimeSeries` record: + +``` +PrometheusTimeSeries + + --> (1) metricName + + --> (0..*) Label ++ name ++ value + + --> 1..* Sample ++ timestamp ++ value +``` Review Comment: Really? This is supposed to be pseudo-code and uses the standard cardinality notation commonly used in diagrams, like ER or UML. Labels and Samples are lists, is not a matter of being mandatory. To convey the fact one list may contain zero or more elements and the other must contain at least one element, it would become something like this, IMO this is more confusing: ``` + --> metricName // mandatory + --> Label // list of zero or more elements + name + value + --> Sample // list of one or more elements + timestamp + value ``` I think in the original there was a parenthesis missing. Fixing it it would be this: ``` + --> (1) metricName + --> (0..*) Label + name + value + --> (1..*) Sample + timestamp
[PR] Flink 36623 [flink]
ztison opened a new pull request, #25610: URL: https://github.com/apache/flink/pull/25610 ## Brief change log - Exposed the JobID in `Adaptive Scheduler` states - Added JobID to logs in `DefaultStateTransitionManager` ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33137][Connectors/Prometheus] Prometheus connector docs [flink-connector-prometheus]
nicusX commented on code in PR #3: URL: https://github.com/apache/flink-connector-prometheus/pull/3#discussion_r1829138048 ## docs/content/docs/connectors/datastream/prometheus.md: ## @@ -0,0 +1,473 @@ +--- +title: Prometheus +weight: 5 +type: docs +aliases: + - /dev/connectors/prometheus.html + - /apis/streaming/connectors/prometheus.html +--- + + + +# Prometheus Sink + +This sink connector can be used to write **data** to Prometheus-compatible storage, using the [Remote Write](https://prometheus.io/docs/specs/remote_write_spec/) Prometheus interface. + +The Prometheus-compatible backend must support [Remote Write 1.0](https://prometheus.io/docs/specs/remote_write_spec/) standard API, and the Remote Write endpoint must be enabled. + +{{< hint warn >}}This connector is not meant for sending internal Flink metrics to Prometheus. +To publish Flink metrics, for monitoring health and operations of the Flink cluster, you should use +[Metric Reporters](../../../deployment/metric_reporters/).{{< /hint >}} + +To use the connector, add the following Maven dependency to your project: + +{{< connector_artifact flink-connector-prometheus prometheus >}} + +## Usage + +The Prometheus sink provides a builder class to build a `PrometheusSink` instance. The code snippets below shows +how to build a `PrometheusSink` with a basic configuration, and an optional [request signer](#request-signer). + +```java +PrometheusSink sink = PrometheusSink.builder() +.setPrometheusRemoteWriteUrl(prometheusRemoteWriteUrl) +.setRequestSigner(new AmazonManagedPrometheusWriteRequestSigner(prometheusRemoteWriteUrl, prometheusRegion)) // Optional +.build(); +``` +The only **required** configuration is `prometheusRemoteWriteUrl`. All other configurations are optional. + +If your sink has parallelism > 1, you need to ensure the stream is keyed using the `PrometheusTimeSeriesLabelsAndMetricNameKeySelector` +key selector, so that all samples of the same time-series are in the same partition and order is not lost. +See [Sink parallelism and keyed streams](#sink-parallelism-and-keyed-streams) for more details. + + +### Input data objects + +The sink expects `PrometheusTimeSeries` records as input. +Your input data must be converted into `PrometheusTimeSeries`, using a map or flatMap operator, before the sending to the sink. + +`PrometheusTimeSeries` instances are immutable and cannot be reused. You can use the [builder](#populating-a-prometheustimeseries) +to create and populate instances. + +A `PrometheusTimeSeries` represents a single time-series record when sent to the Remote Write interface. Each time-series +record may contain multiple samples. + +{{< hint info >}} +In the context of Prometheus, the term "time-series" is overloaded. +It means both *a series of samples with a unique set of labels* (a time-series in the underlying time-series database), +and *a record sent to the Remote Write interface*. A `PrometheusTimeSeries` instance represents a record sent to the interface. + +The two concepts are related, because time-series "records" with the same sets of labels are sent to the same +"database time-series".{{< /hint >}} + +Each `PrometheusTimeSeries` record contains: + +- One **`metricName`**. A string that is translated into the value of the `__name__` label. +- Zero or more **`Label`** entries. Each label has a `key` and a `value`, both `String`. Labels represent additional dimensions of the samples. Duplicate Label keys are not allowed. +- One or more **`Sample`**. Each sample has a `value` (`double`) representing the measure, and a `timestamp` (`long`) representing the time of the measure, in milliseconds from the Epoch. Duplicate timestamps in the same record are not allowed. + +The following pseudocode represents the structure of a `PrometheusTimeSeries` record: + +``` +PrometheusTimeSeries + + --> (1) metricName + + --> (0..*) Label ++ name ++ value + + --> 1..* Sample ++ timestamp ++ value +``` Review Comment: Really? This is supposed to be pseudo-code and uses the standard cardinality notation commonly used in diagrams, like ER or UML. Labels and Samples are lists, is not a matter of being mandatory. To convey the fact one list may contain zero or more elements and the other must contain at least one element, it would become something like this, IMO this is more confusing: ``` + --> metricName // mandatory + --> Label // list of zero or more elements + name + value + --> Sample // list of one or more elements + timestamp + value ``` I think in the original there was a parenthesis missing. Fixing it it would be this: ``` + --> (1) metricName + --> (0..*) Label + name + value + --> (1..*) Sample + timestamp
Re: [PR] [FLINK-36623] Improve logging in DefaultStateTransitionManager [flink]
flinkbot commented on PR #25610: URL: https://github.com/apache/flink/pull/25610#issuecomment-2456861519 ## CI report: * 6dfa8c04a33326dbb4203365d2c817f173851283 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36623) Improve logging in DefaultStateTransitionManager
[ https://issues.apache.org/jira/browse/FLINK-36623?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36623: --- Labels: pull-request-available (was: ) > Improve logging in DefaultStateTransitionManager > > > Key: FLINK-36623 > URL: https://issues.apache.org/jira/browse/FLINK-36623 > Project: Flink > Issue Type: Improvement > Components: Runtime / Coordination >Affects Versions: 2.0.0, 2.0-preview >Reporter: Roman Khachatryan >Assignee: Zdenek Tison >Priority: Major > Labels: pull-request-available > > When the job transitions from one state to another, e.g. restarts when new > slots are available; it's not visible in the logs unless log.level is debug. > Therefore, it'd make sense to: > # Change log level from DEBUG to INFO > # Log job ID when such transition happens -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-33137][Connectors/Prometheus] Prometheus connector docs [flink-connector-prometheus]
nicusX commented on code in PR #3: URL: https://github.com/apache/flink-connector-prometheus/pull/3#discussion_r1829138048 ## docs/content/docs/connectors/datastream/prometheus.md: ## @@ -0,0 +1,473 @@ +--- +title: Prometheus +weight: 5 +type: docs +aliases: + - /dev/connectors/prometheus.html + - /apis/streaming/connectors/prometheus.html +--- + + + +# Prometheus Sink + +This sink connector can be used to write **data** to Prometheus-compatible storage, using the [Remote Write](https://prometheus.io/docs/specs/remote_write_spec/) Prometheus interface. + +The Prometheus-compatible backend must support [Remote Write 1.0](https://prometheus.io/docs/specs/remote_write_spec/) standard API, and the Remote Write endpoint must be enabled. + +{{< hint warn >}}This connector is not meant for sending internal Flink metrics to Prometheus. +To publish Flink metrics, for monitoring health and operations of the Flink cluster, you should use +[Metric Reporters](../../../deployment/metric_reporters/).{{< /hint >}} + +To use the connector, add the following Maven dependency to your project: + +{{< connector_artifact flink-connector-prometheus prometheus >}} + +## Usage + +The Prometheus sink provides a builder class to build a `PrometheusSink` instance. The code snippets below shows +how to build a `PrometheusSink` with a basic configuration, and an optional [request signer](#request-signer). + +```java +PrometheusSink sink = PrometheusSink.builder() +.setPrometheusRemoteWriteUrl(prometheusRemoteWriteUrl) +.setRequestSigner(new AmazonManagedPrometheusWriteRequestSigner(prometheusRemoteWriteUrl, prometheusRegion)) // Optional +.build(); +``` +The only **required** configuration is `prometheusRemoteWriteUrl`. All other configurations are optional. + +If your sink has parallelism > 1, you need to ensure the stream is keyed using the `PrometheusTimeSeriesLabelsAndMetricNameKeySelector` +key selector, so that all samples of the same time-series are in the same partition and order is not lost. +See [Sink parallelism and keyed streams](#sink-parallelism-and-keyed-streams) for more details. + + +### Input data objects + +The sink expects `PrometheusTimeSeries` records as input. +Your input data must be converted into `PrometheusTimeSeries`, using a map or flatMap operator, before the sending to the sink. + +`PrometheusTimeSeries` instances are immutable and cannot be reused. You can use the [builder](#populating-a-prometheustimeseries) +to create and populate instances. + +A `PrometheusTimeSeries` represents a single time-series record when sent to the Remote Write interface. Each time-series +record may contain multiple samples. + +{{< hint info >}} +In the context of Prometheus, the term "time-series" is overloaded. +It means both *a series of samples with a unique set of labels* (a time-series in the underlying time-series database), +and *a record sent to the Remote Write interface*. A `PrometheusTimeSeries` instance represents a record sent to the interface. + +The two concepts are related, because time-series "records" with the same sets of labels are sent to the same +"database time-series".{{< /hint >}} + +Each `PrometheusTimeSeries` record contains: + +- One **`metricName`**. A string that is translated into the value of the `__name__` label. +- Zero or more **`Label`** entries. Each label has a `key` and a `value`, both `String`. Labels represent additional dimensions of the samples. Duplicate Label keys are not allowed. +- One or more **`Sample`**. Each sample has a `value` (`double`) representing the measure, and a `timestamp` (`long`) representing the time of the measure, in milliseconds from the Epoch. Duplicate timestamps in the same record are not allowed. + +The following pseudocode represents the structure of a `PrometheusTimeSeries` record: + +``` +PrometheusTimeSeries + + --> (1) metricName + + --> (0..*) Label ++ name ++ value + + --> 1..* Sample ++ timestamp ++ value +``` Review Comment: Really? This is supposed to be pseudo-code and uses the standard cardinality notation commonly used in diagrams, like ER or UML. It is not a matter of being mandatory. Labels and Samples are lists. To convey the fact that one list may contain zero or more elements and the other must contain at least one element, it would become something like this: ``` + --> metricName // mandatory + --> Label // list of zero or more elements + name + value + --> Sample // list of one or more elements + timestamp + value ``` IMO this is more confusing, but I am okay with that if looks clearer. In the original there was a parenthesis missing. Fixing it it would be this: ``` + --> (1) metricName + --> (0..*) Label + name + value + -
[jira] [Updated] (FLINK-33967) Remove/Rename log4j2-test.properties in flink-streaming-java's test bundle
[ https://issues.apache.org/jira/browse/FLINK-33967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth William Krugler updated FLINK-33967: Issue Type: Bug (was: Improvement) > Remove/Rename log4j2-test.properties in flink-streaming-java's test bundle > -- > > Key: FLINK-33967 > URL: https://issues.apache.org/jira/browse/FLINK-33967 > Project: Flink > Issue Type: Bug >Reporter: Koala Lam >Priority: Major > > This file from test classpath is picked automatically by Log4j2. In order to > reliably use our own log4j2 test config, we have to specify system property > "log4j2.configurationFile" which is not ideal as we have to manually set it > in IDE config. > https://logging.apache.org/log4j/2.x/manual/configuration.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33967) Remove/Rename log4j2-test.properties in flink-streaming-java's test bundle
[ https://issues.apache.org/jira/browse/FLINK-33967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kenneth William Krugler updated FLINK-33967: Priority: Minor (was: Major) > Remove/Rename log4j2-test.properties in flink-streaming-java's test bundle > -- > > Key: FLINK-33967 > URL: https://issues.apache.org/jira/browse/FLINK-33967 > Project: Flink > Issue Type: Bug >Reporter: Koala Lam >Priority: Minor > > This file from test classpath is picked automatically by Log4j2. In order to > reliably use our own log4j2 test config, we have to specify system property > "log4j2.configurationFile" which is not ideal as we have to manually set it > in IDE config. > https://logging.apache.org/log4j/2.x/manual/configuration.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33967) Remove/Rename log4j2-test.properties in flink-streaming-java's test bundle
[ https://issues.apache.org/jira/browse/FLINK-33967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17895725#comment-17895725 ] Kenneth William Krugler commented on FLINK-33967: - I think this should be reclassified as a minor bug. Including the log4j-test.properties file means anyone who wants to change logging when running tests has to use a somewhat-obscure workaround of adding a log4j2.component.properties to src/test/resources/ that contains log4j2.configurationFile=log4j2-test.properties, and then the desired log4j2-test.properties file in the same location. It took me several hours, plus help from a Gradle expert, to figure out why this was causing logging to not work as expected in one of several sub-projects, but other sub-projects were fine. The problematic sub-projects were including the flink-runtime-1.19.1-tests.jar, which also has this log4j2-test.properties file. I think the solution, for Flink artifacts that have a test-jar goal, is to exclude this file via a in the pom.xml > Remove/Rename log4j2-test.properties in flink-streaming-java's test bundle > -- > > Key: FLINK-33967 > URL: https://issues.apache.org/jira/browse/FLINK-33967 > Project: Flink > Issue Type: Improvement >Reporter: Koala Lam >Priority: Major > > This file from test classpath is picked automatically by Log4j2. In order to > reliably use our own log4j2 test config, we have to specify system property > "log4j2.configurationFile" which is not ideal as we have to manually set it > in IDE config. > https://logging.apache.org/log4j/2.x/manual/configuration.html -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [hotfix][Connector/Elasticsearch] Update version to 3.1-SNAPSHOT [flink-connector-elasticsearch]
vahmed-hamdy opened a new pull request, #111: URL: https://github.com/apache/flink-connector-elasticsearch/pull/111 ## Notes - since there are no breaking changes from the latest release v3.0.1 the version should be v3.1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [backport][FLINK-36332] Fix okhttp dependency handling [flink-kubernetes-operator]
SamBarker opened a new pull request, #909: URL: https://github.com/apache/flink-kubernetes-operator/pull/909 ## What is the purpose of the change * Backport a fix (cherry picked from commit c821b48a75c601c3a64bc03bfbada218a68a0e0d) to ensure OKHttp is cleanly excluded from the whole project. ## Brief change log - Ensure that OkHttp is correctly excluded. ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changes to the `CustomResourceDescriptors`: **no** - Core observer or reconciler logic that is regularly executed: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [backport][FLINK-36332] Fix okhttp dependency handling [flink-kubernetes-operator]
SamBarker commented on PR #909: URL: https://github.com/apache/flink-kubernetes-operator/pull/909#issuecomment-2458058560 @gyfora backport of the webhook fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
snuyanzin commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829801746 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java: ## @@ -50,74 +67,43 @@ void testUpsertInto() { "UPSERT INTO statement is not supported. Please use INSERT INTO instead."); } -@Test -void testInsertIntoShouldColumnMismatchWithValues() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES(1)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithSelect() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) SELECT 1")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithLastValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithFirstValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1), (2,3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithMultiFieldValues() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4,5)")) +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES(1)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3)", +"INSERT INTO t2 (a, b) VALUES (1), (2, 3)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4, 5)", +"INSERT INTO t2 (a, b) SELECT 1", +"INSERT INTO t2 (a, b) SELECT * FROM t1", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT * FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t_nested", +"INSERT INTO t2 (a, b) SELECT * FROM (TABLE t_nested)" +}) +void testInvalidNumberOfColumnsWhileInsertInto(String sql) { +assertThatThrownBy(() -> plannerMocks.getParser().parse(sql)) .isInstanceOf(ValidationException.class) .hasMessageContaining(" Number of columns must match number of query columns"); } -@Test -void testInsertIntoShouldNotColumnMismatchWithValues() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4)"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSelect() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) Select 1, 2"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSingleColValues() { +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4)", +"INSERT INTO t2 (a) VALUES (1), (3)", +"INSERT INTO t2 (a, b) SELECT 1, 2", +"INSERT INTO t2 (a, b) SELECT * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t1", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t1", +"INSERT INTO t2 (a, b) SELECT f.* FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t2_copy" Review Comment: after the fixing comment yes now I tend to think to turn it to boolean -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructu
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
snuyanzin commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829780691 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala: ## @@ -130,8 +131,11 @@ object SqlRewriterUtils { call.getKind match { case SqlKind.SELECT => val sqlSelect = call.asInstanceOf[SqlSelect] - -if (targetPosition.nonEmpty && sqlSelect.getSelectList.size() != targetPosition.size()) { +val identifiersSize = sqlSelect.getSelectList.count(s => s.isInstanceOf[SqlIdentifier]) +if ( + identifiersSize == 0 Review Comment: continued discussion in https://github.com/apache/flink/pull/25316#discussion_r1829780200 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
snuyanzin commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829780200 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java: ## @@ -50,74 +67,43 @@ void testUpsertInto() { "UPSERT INTO statement is not supported. Please use INSERT INTO instead."); } -@Test -void testInsertIntoShouldColumnMismatchWithValues() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES(1)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithSelect() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) SELECT 1")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithLastValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithFirstValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1), (2,3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithMultiFieldValues() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4,5)")) +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES(1)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3)", +"INSERT INTO t2 (a, b) VALUES (1), (2, 3)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4, 5)", +"INSERT INTO t2 (a, b) SELECT 1", +"INSERT INTO t2 (a, b) SELECT * FROM t1", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT * FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t_nested", +"INSERT INTO t2 (a, b) SELECT * FROM (TABLE t_nested)" +}) +void testInvalidNumberOfColumnsWhileInsertInto(String sql) { +assertThatThrownBy(() -> plannerMocks.getParser().parse(sql)) .isInstanceOf(ValidationException.class) .hasMessageContaining(" Number of columns must match number of query columns"); } -@Test -void testInsertIntoShouldNotColumnMismatchWithValues() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4)"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSelect() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) Select 1, 2"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSingleColValues() { +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4)", +"INSERT INTO t2 (a) VALUES (1), (3)", +"INSERT INTO t2 (a, b) SELECT 1, 2", +"INSERT INTO t2 (a, b) SELECT * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t1", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t1", +"INSERT INTO t2 (a, b) SELECT f.* FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t2_copy" Review Comment: Seems I got what you mean from one side there was a wrong condition in filter, fix that, thanks for catching from another side currently there are 2 checks one at [SqlIdentifier](https://github.com/apache/flink/pull/25316/files#diff-12ede804afb130bfe9c83df9da2ac5745b14345d7a71bee0e803257e6df73951R136) another at https://github.com/apache/flink/pull
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
snuyanzin commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829790186 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala: ## @@ -157,9 +161,14 @@ object SqlRewriterUtils { operands.get(1).asInstanceOf[SqlNodeList], operands.get(2), operands.get(3)) + case SqlKind.EXPLICIT_TABLE => +val operands = call.getOperandList +val expTable = new ExplicitTableSqlSelect( + operands.get(0).asInstanceOf[SqlIdentifier], + Collections.emptyList()) +rewriterUtils.rewriteSelect(expTable, targetRowType, assignedFields, targetPosition) // Not support: // case SqlKind.WITH => Review Comment: looks like for that first should be supported in Calcite -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
jnh5y commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829778233 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala: ## @@ -131,7 +131,7 @@ object SqlRewriterUtils { call.getKind match { case SqlKind.SELECT => val sqlSelect = call.asInstanceOf[SqlSelect] -val identifiersSize = sqlSelect.getSelectList.count(s => s.isInstanceOf[SqlIdentifier]) +val identifiersSize = sqlSelect.getSelectList.count(s => !SqlIdentifier.STAR.equals(s)) Review Comment: Interesting. This change makes me think that the columns could some computation. Is it worth testing that case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
snuyanzin commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829788023 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala: ## @@ -131,7 +131,7 @@ object SqlRewriterUtils { call.getKind match { case SqlKind.SELECT => val sqlSelect = call.asInstanceOf[SqlSelect] -val identifiersSize = sqlSelect.getSelectList.count(s => s.isInstanceOf[SqlIdentifier]) +val identifiersSize = sqlSelect.getSelectList.count(s => !SqlIdentifier.STAR.equals(s)) Review Comment: sure, added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
jnh5y commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829826534 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java: ## @@ -50,74 +67,43 @@ void testUpsertInto() { "UPSERT INTO statement is not supported. Please use INSERT INTO instead."); } -@Test -void testInsertIntoShouldColumnMismatchWithValues() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES(1)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithSelect() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) SELECT 1")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithLastValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithFirstValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1), (2,3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithMultiFieldValues() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4,5)")) +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES(1)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3)", +"INSERT INTO t2 (a, b) VALUES (1), (2, 3)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4, 5)", +"INSERT INTO t2 (a, b) SELECT 1", +"INSERT INTO t2 (a, b) SELECT * FROM t1", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT * FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t_nested", +"INSERT INTO t2 (a, b) SELECT * FROM (TABLE t_nested)" +}) +void testInvalidNumberOfColumnsWhileInsertInto(String sql) { +assertThatThrownBy(() -> plannerMocks.getParser().parse(sql)) .isInstanceOf(ValidationException.class) .hasMessageContaining(" Number of columns must match number of query columns"); } -@Test -void testInsertIntoShouldNotColumnMismatchWithValues() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4)"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSelect() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) Select 1, 2"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSingleColValues() { +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4)", +"INSERT INTO t2 (a) VALUES (1), (3)", +"INSERT INTO t2 (a, b) SELECT 1, 2", +"INSERT INTO t2 (a, b) SELECT * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t1", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t1", +"INSERT INTO t2 (a, b) SELECT f.* FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t2_copy" Review Comment: I like the change to boolean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
jnh5y commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829793928 ## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidatorTest.java: ## @@ -50,74 +67,43 @@ void testUpsertInto() { "UPSERT INTO statement is not supported. Please use INSERT INTO instead."); } -@Test -void testInsertIntoShouldColumnMismatchWithValues() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES(1)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithSelect() { -assertThatThrownBy(() -> plannerMocks.getParser().parse("INSERT INTO t2 (a,b) SELECT 1")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithLastValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithFirstValue() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1), (2,3)")) -.isInstanceOf(ValidationException.class) -.hasMessageContaining(" Number of columns must match number of query columns"); -} - -@Test -void testInsertIntoShouldColumnMismatchWithMultiFieldValues() { -assertThatThrownBy( -() -> -plannerMocks -.getParser() -.parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4,5)")) +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES(1)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3)", +"INSERT INTO t2 (a, b) VALUES (1), (2, 3)", +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4, 5)", +"INSERT INTO t2 (a, b) SELECT 1", +"INSERT INTO t2 (a, b) SELECT * FROM t1", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT * FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t_nested", +"INSERT INTO t2 (a, b) SELECT * FROM (TABLE t_nested)" +}) +void testInvalidNumberOfColumnsWhileInsertInto(String sql) { +assertThatThrownBy(() -> plannerMocks.getParser().parse(sql)) .isInstanceOf(ValidationException.class) .hasMessageContaining(" Number of columns must match number of query columns"); } -@Test -void testInsertIntoShouldNotColumnMismatchWithValues() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) VALUES (1,2), (3,4)"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSelect() { -assertDoesNotThrow( -() -> { -plannerMocks.getParser().parse("INSERT INTO t2 (a,b) Select 1, 2"); -}); -} - -@Test -void testInsertIntoShouldNotColumnMismatchWithSingleColValues() { +@ParameterizedTest +@ValueSource( +strings = { +"INSERT INTO t2 (a, b) VALUES (1, 2), (3, 4)", +"INSERT INTO t2 (a) VALUES (1), (3)", +"INSERT INTO t2 (a, b) SELECT 1, 2", +"INSERT INTO t2 (a, b) SELECT * FROM t2_copy", +"INSERT INTO t2 (a, b) SELECT *, 42 FROM t1", +"INSERT INTO t2 (a, b) SELECT 42, * FROM t1", +"INSERT INTO t2 (a, b) SELECT f.* FROM t_nested", +"INSERT INTO t2 (a, b) TABLE t2_copy" Review Comment: Ok, so `identifiersSize == 0` means that all members of `getSelectList` is a STAR, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastr
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
snuyanzin commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1830063028 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala: ## @@ -173,69 +207,73 @@ object SqlRewriterUtils { // Expands the select list first in case there is a star(*). // Validates the select first to register the where scope. validator.validate(select) -val sourceList = validator.expandStar(select.getSelectList, select, false).getList +reorderAndValidateForSelect(validator, select, targetRowType, assignedFields, targetPosition) +select + } + def rewriteSqlValues( + values: SqlCall, + targetRowType: RelDataType, + assignedFields: util.LinkedHashMap[Integer, SqlNode], + targetPosition: util.List[Int]): SqlCall = { val fixedNodes = new util.ArrayList[SqlNode] +(0 until values.getOperandList.size()).foreach { + valueIdx => +val value = values.getOperandList.get(valueIdx) +val valueAsList = if (value.getKind == SqlKind.ROW) { + value.asInstanceOf[SqlCall].getOperandList +} else { + Collections.singletonList(value) +} +val nodes = getReorderedNodes(targetRowType, assignedFields, targetPosition, valueAsList) + fixedNodes.add(SqlStdOperatorTable.ROW.createCall(value.getParserPosition, nodes)) +} +SqlStdOperatorTable.VALUES.createCall(values.getParserPosition, fixedNodes) + } + + private def getReorderedNodes( Review Comment: Here mostly extracting common logic in one place to reduce code duplication -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36271] Support reading json and jsonb types in PostgreSQL dialect [flink-connector-jdbc]
grzegorz8 commented on PR #141: URL: https://github.com/apache/flink-connector-jdbc/pull/141#issuecomment-2458120435 @matriv I think `Caused by: org.postgresql.util.PSQLException: FATAL: sorry, too many clients already` error is irrelevant. I ofter see the error when I run tests locally, even on main branch. Please run the CrateDB tests in Intellij. Maybe let me clarify why I'm asking you for help. Namely, I modified postgresql dialect to support jsonb format. While running tests I noticed that crateDB inherits from postgresql dialect, but it does not work well with my change (`NoClassDefFoundError: org/postgresql/util/PGobject`). I thought you may be able to give me some hint how to handle it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36266][table] Insert into as select * behaves incorrect [flink]
snuyanzin commented on code in PR #25316: URL: https://github.com/apache/flink/pull/25316#discussion_r1829790186 ## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/SqlRewriterUtils.scala: ## @@ -157,9 +161,14 @@ object SqlRewriterUtils { operands.get(1).asInstanceOf[SqlNodeList], operands.get(2), operands.get(3)) + case SqlKind.EXPLICIT_TABLE => +val operands = call.getOperandList +val expTable = new ExplicitTableSqlSelect( + operands.get(0).asInstanceOf[SqlIdentifier], + Collections.emptyList()) +rewriterUtils.rewriteSelect(expTable, targetRowType, assignedFields, targetPosition) // Not support: // case SqlKind.WITH => Review Comment: ~~looks like for that first should be supported in Calcite~~ there is support in Oracle's style like ```sql INSERT INTO ... WITH cte ... SELECT ... ``` added support for that initially I was looking for Postgres style like ```sql WITH cte ... INSERT INTO ... SELECT ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35463]Fixed issue for route rule changed when restored from checkpoint. [flink-cdc]
github-actions[bot] commented on PR #3364: URL: https://github.com/apache/flink-cdc/pull/3364#issuecomment-2458440160 This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36224][docs] Add the version mapping between pipeline connectors and flink [flink-cdc]
github-actions[bot] commented on PR #3598: URL: https://github.com/apache/flink-cdc/pull/3598#issuecomment-2458439947 This pull request has been automatically marked as stale because it has not had recent activity for 60 days. It will be closed in 30 days if no further activity occurs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35071][cdc-connector][cdc-base] Shade guava31 to avoid dependency conflict with flink below 1.18 [flink-cdc]
github-actions[bot] commented on PR #3083: URL: https://github.com/apache/flink-cdc/pull/3083#issuecomment-2458440213 This pull request has been closed because it has not had recent activity. You could reopen it if you try to continue your work, and anyone who are interested in it are encouraged to continue work on this pull request. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org