[PR] [FLINK-37156] [cdc-composer/cli] pipeline supports collecting data once and writing it to multiple sinks. [flink-cdc]
qyfftf opened a new pull request, #3906: URL: https://github.com/apache/flink-cdc/pull/3906 I've implemented a solution where data is collected from MySQL once and then sent to multiple target data sources, such as Paimon, Kafka, and StarRocks, which reduces redundant data collection. pipeline yaml : ``` source: type: mysql name: MySQL Source hostname: 127.0.0.1 port: 3306 username:root password: 123456 tables: test.order server-id: 5401-5404 jdbc.properties.useSSL: false sink: - type: paimon name: Paimon Sink catalog.properties.metastore: filesystem catalog.properties.warehouse: /tmp/path/warehouse - type: kafka name: Kafka Sink properties.bootstrap.servers: PLAINTEXT://localhost:9092 - type: starrocks jdbc-url: jdbc:mysql://127.0.0.1:9030 load-url: 127.0.0.1:8030 username: root password: "" table.create.properties.replication_num: 1 route: - source-table: test.order sink-table: test.order pipeline: name: MySQL to Paimon Pipeline parallelism: 2 ``` https://github.com/user-attachments/assets/00ff0811-d682-4cfe-b21e-18ca56bbf53f"; /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37255) Unsupported options: scan.partition.record-size
[ https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun updated FLINK-37255: --- Description: The new configuration option scan.partition.record-size was introduced by FLINK-36075, but it was not added to the optional options, making this configuration item unusable. {code:sql} [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: Unsupported options found for 'mongodb'. Unsupported options: scan.partition.record-size Supported options: collection connector database filter.handling.policy ... {code} was: The new configuration option scan.partition.record-size was introduced by FLINK-36075, but it was not added to the optional options, making this configuration item unusable. > Unsupported options: scan.partition.record-size > --- > > Key: FLINK-37255 > URL: https://issues.apache.org/jira/browse/FLINK-37255 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Jiabao Sun >Assignee: Jiabao Sun >Priority: Blocker > Fix For: mongodb-2.0.0 > > > The new configuration option scan.partition.record-size was introduced by > FLINK-36075, but it was not added to the optional options, making this > configuration item unusable. > {code:sql} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Unsupported options found for > 'mongodb'. > Unsupported options: > scan.partition.record-size > Supported options: > collection > connector > database > filter.handling.policy > ... > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37156) A Flink pipeline supports collecting data once and writing it to multiple sinks.
[ https://issues.apache.org/jira/browse/FLINK-37156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37156: --- Labels: pull-request-available (was: ) > A Flink pipeline supports collecting data once and writing it to multiple > sinks. > > > Key: FLINK-37156 > URL: https://issues.apache.org/jira/browse/FLINK-37156 > Project: Flink > Issue Type: New Feature >Affects Versions: cdc-3.3.0 >Reporter: cheng qian >Priority: Major > Labels: pull-request-available > Attachments: image-2025-01-24-09-45-15-720.png, > image-2025-01-24-09-46-05-142.png, image-2025-01-24-09-46-52-848.png > > > Is there any plan in the community to support the scenario of multiple sinks? > We want to synchronize data from a MySQL database to StarRocks, Paimon, and > Kafka. Currently, we need to create three Pipelines, which triples the I/O > load on the MySQL database. We envision defining three sinks within a single > Pipeline, enabling data to be collected once and written to three different > downstream systems. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37255) Unsupported options: scan.partition.record-size
[ https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun updated FLINK-37255: --- Priority: Blocker (was: Major) > Unsupported options: scan.partition.record-size > --- > > Key: FLINK-37255 > URL: https://issues.apache.org/jira/browse/FLINK-37255 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Jiabao Sun >Assignee: Jiabao Sun >Priority: Blocker > Fix For: mongodb-2.0.0 > > > The new configuration option scan.partition.record-size was introduced by > FLINK-36075, but it was not added to the optional options, making this > configuration item unusable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-37255) Unsupported options: scan.partition.record-size
[ https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923952#comment-17923952 ] yux commented on FLINK-37255: - I'll prepare a patch for this. > Unsupported options: scan.partition.record-size > --- > > Key: FLINK-37255 > URL: https://issues.apache.org/jira/browse/FLINK-37255 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Jiabao Sun >Assignee: Jiabao Sun >Priority: Blocker > Fix For: mongodb-2.0.0 > > > The new configuration option scan.partition.record-size was introduced by > FLINK-36075, but it was not added to the optional options, making this > configuration item unusable. > {code:sql} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Unsupported options found for > 'mongodb'. > Unsupported options: > scan.partition.record-size > Supported options: > collection > connector > database > filter.handling.policy > ... > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-37255) Unsupported options: scan.partition.record-size
[ https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun reassigned FLINK-37255: -- Assignee: yux (was: Jiabao Sun) > Unsupported options: scan.partition.record-size > --- > > Key: FLINK-37255 > URL: https://issues.apache.org/jira/browse/FLINK-37255 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Jiabao Sun >Assignee: yux >Priority: Blocker > Fix For: mongodb-2.0.0 > > > The new configuration option scan.partition.record-size was introduced by > FLINK-36075, but it was not added to the optional options, making this > configuration item unusable. > {code:sql} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Unsupported options found for > 'mongodb'. > Unsupported options: > scan.partition.record-size > Supported options: > collection > connector > database > filter.handling.policy > ... > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-37256) Firing timers can block recovery process
[ https://issues.apache.org/jira/browse/FLINK-37256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923966#comment-17923966 ] Piotr Nowojski edited comment on FLINK-37256 at 2/5/25 9:02 AM: I can see two broad solutions. 1. Use something similar to the splittable timers mechanism during recovery to prevent operators from firing timers. For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to return without firing any timers when subtask is not RUNNING. The problem with that approach is that currently if we interrupt {{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. So if we just {{return false}} without any further changes, the mailbox executor would just again try to fire the timers, this time via mailbox, before processing remaining in-flight records leading to a live lock. There might be some easy solution to this problem. 2. Alternative approach might be to just filter out watermarks from unaligned checkpoints in-flight data. Currently this seems to be fine, as the mechanism of persisting watermarks in the in-flight data is very dubious. Already processed watermarks by the operators are not persisted in any way, so there is just a small random chance that some watermark(s) will be persisted as in-flight data - and only those will be persisted across recovery. So either way, we must relay on the watermark generators to re-emit new watermarks after recovery. Filtering out watermarks from the in-flight records would be more consistent. Watermarks that are freshly generated, that were not persisted the in-flight data are not a problem, as this predominantly happens only in source tasks, and so due to lack of the input in-flight data for the source tasks, those watermarks are already generated in the subtask's RUNNING state. Downside of this approach is: if we decide to finally persist watermarks across recoveries, we would have to re-introduce watermarks back to the in-flight data. However I'm not sure if we will ever be able to implement persisting watermarks, especially across rescalings. Regardless of that, I would lean towards the option 2. was (Author: pnowojski): I can see two broad solutions. 1. Use something similar to the splittable timers mechanism during recovery to prevent operators from firing timers. For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to return without firing any timers when subtask is not RUNNING. The problem with that approach is that currently if we interrupt {{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. So if we just {{return false}} without any further changes, the mailbox executor would just again try to fire the timers, this time via mailbox, before processing remaining in-flight records leading to a live lock. There might be some easy solution to this problem. 2. Alternative approach might be to just filter out watermarks from unaligned checkpoints in-flight data. Currently this seems to be fine, as the mechanism of persisting watermarks in the in-flight data is very dubious. Already processed watermarks by the operators are not persisted in any way, so there is just a small random chance that some watermark(s) will be persisted as in-flight data - and only those will be persisted across recovery. So either way, we must relay on the watermark generators to re-emit new watermarks after recovery. Filtering out watermarks from the in-flight records would be more consistent. Watermarks that are freshly generated, that were not persisted the in-flight data are not a problem, as this predominantly happens only in source tasks, and so due to lack of the input in-flight data for the source tasks, those watermarks are already generated in the subtask's RUNNING state. Downside of this approach is: * if we decide to finally persist watermarks across recoveries, we would have to re-introduce watermarks back to the in-flight data. However I'm not sure if we will ever be able to implement persisting watermarks, especially across rescalings. > Firing timers can block recovery process > > > Key: FLINK-37256 > URL: https://issues.apache.org/jira/browse/FLINK-37256 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Task >Affects Versions: 2.0.0, 1.20.0 >Reporter: Piotr Nowojski >Priority: Major > > Splitable/interruptible timers for checkpointnig were introduced in > FLINK-20217 as part of the > https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing >
[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’
[ https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] caixiaowei updated FLINK-37254: --- Description: When the table schema contains the following field: {color:#00}`tenant_id`{color} {color:#80}bigint{color}({color:#ff}20{color}) {color:#80}unsigned{color} {color:#80}NOT{color} {color:#80}NULL{color} {color:#80}DEFAULT{color} {color:#008000}'0'{color} {color:#00}COMMENT{color} {color:#008000}'Tenant ID'{color}, The following exception is thrown: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180) at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268) at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261) at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: io.debezium.DebeziumException: Failed to set field default value for 'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default value is 0 of type class java.lang.String at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421) at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.u
[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’
[ https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] caixiaowei updated FLINK-37254: --- Attachment: 1.png > Flink CDC OceanBase Connector Throws Exception for Column Default Value > ‘DEFAULT ‘0’ > > > Key: FLINK-37254 > URL: https://issues.apache.org/jira/browse/FLINK-37254 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 > Environment: flink 1.18.1 > flink cdc 3.1.0 > oceanbase 5.7.25-OceanBase_CE-v4.1.0.2 > >Reporter: caixiaowei >Priority: Major > Labels: pull-request-available > Attachments: 1.png > > > When the table schema contains the following field: > {color:#00}`tenant_id`{color} > {color:#80}bigint{color}({color:#ff}20{color}) > {color:#80}unsigned{color} {color:#80}NOT{color} > {color:#80}NULL{color} {color:#80}DEFAULT{color} > {color:#008000}'0'{color} {color:#00}COMMENT{color} > {color:#008000}'Tenant ID'{color}, > > The following exception is thrown: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180) > at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261) > at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) > at > org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > at > org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) > at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) > at > org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) > at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) > at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) > at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) > at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) > at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) > at > java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) >
Re: [PR] [FLINK-36648] Bump Flink version to Flink 2.0-preview1 [flink-connector-kafka]
lvyanquan commented on PR #140: URL: https://github.com/apache/flink-connector-kafka/pull/140#issuecomment-2636096310 Hi, @PatrickRen can you help to review again and help to complete this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37144] Update version to 4.0-SNAPSHOT. [flink-connector-kafka]
lvyanquan commented on PR #149: URL: https://github.com/apache/flink-connector-kafka/pull/149#issuecomment-2636103200 Hi, @PatrickRen can you help to review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’
[ https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] caixiaowei updated FLINK-37254: --- Description: When the table schema contains the following field: : {color:#00}`tenant_id`{color} {color:#80}bigint{color}({color:#ff}20{color}) {color:#80}unsigned{color} {color:#80}NOT{color} {color:#80}NULL{color} {color:#80}DEFAULT{color} {color:#008000}'0'{color} {color:#00}COMMENT{color} {color:#008000}'租户ID'{color}, The following exception is thrown: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180) at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268) at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261) at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: io.debezium.DebeziumException: Failed to set field default value for 'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default value is 0 of type class java.lang.String at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421) at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.uti
[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’
[ https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] caixiaowei updated FLINK-37254: --- Summary: Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’ (was: flink cdc的oceanbase cdc 存在表字段的默认值为default '0'抛出异常) > Flink CDC OceanBase Connector Throws Exception for Column Default Value > ‘DEFAULT ‘0’ > > > Key: FLINK-37254 > URL: https://issues.apache.org/jira/browse/FLINK-37254 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 > Environment: flink 1.18.1 > flink cdc 3.1.0 > oceanbase 5.7.25-OceanBase_CE-v4.1.0.2 > >Reporter: caixiaowei >Priority: Major > Labels: pull-request-available > > 当表结构的字段存在: > {color:#00}{color:#006464}`tenant_id`{color} > {color:#80}bigint{color}({color:#ff}20{color}) > {color:#80}unsigned{color} {color:#80}NOT{color} > {color:#80}NULL{color} {color:#80}DEFAULT{color} > {color:#008000}'0'{color} {color:#00}COMMENT{color} > {color:#008000}'租户ID'{color},{color} > > 会抛出以下异常: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180) > at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261) > at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) > at > org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > at > org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) > at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) > at > org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) > at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) > at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) > at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) > at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) > at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) > at > java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) > at > java.util.concurrent.Fork
Re: [PR] [BP-2.0][FLINK-37021][state/forst] Fix incorrect paths when reusing and creating files. [flink]
flinkbot commented on PR #26106: URL: https://github.com/apache/flink/pull/26106#issuecomment-2636105860 ## CI report: * 0f17c6288fb5f885c235bb7aa3e7dd78de2ade92 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-37255] Fix unable to configure `SCAN_PARTITION_RECORD_SIZE` option [flink-connector-mongodb]
yuxiqian opened a new pull request, #51: URL: https://github.com/apache/flink-connector-mongodb/pull/51 This closes FLINK-37255. `scan.partition.record-size` option was introduced in FLINK-36075, but corresponding config manifest wasn't updated, which makes it impossible for Flink SQL users to configure it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-37021][state/forst] Fix incorrect paths when reusing and creating files. [flink]
AlexYinHan opened a new pull request, #26106: URL: https://github.com/apache/flink/pull/26106 ## What is the purpose of the change This pull request addresses several bugs in ForSt that may prevent proper file reuse, leading to slower or failed snapshot and restoration operations. ## Brief change log - Make ```DataTransferStrategyBuilder``` function properly. - Use ```URI``` to tell whether two ```FileSystem``` are the same. - Use 'Reuse' strategy for Restoration only in CLAIM mode - Use 'Reuse' strategy for Snapshot only when ```SharingFilesStrategy``` is 'FORWARD_BACKWARD' - Ensure we correctly distinguish between ```dbFilePath``` and ```realSourcePath```. - During Snapshots: do not copy file if it is already owned by JM. ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’
[ https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] caixiaowei updated FLINK-37254: --- Description: When the table schema contains the following field: : {color:#00}`tenant_id`{color} {color:#80}bigint{color}({color:#ff}20{color}) {color:#80}unsigned{color} {color:#80}NOT{color} {color:#80}NULL{color} {color:#80}DEFAULT{color} {color:#008000}'0'{color} {color:#00}COMMENT{color} {color:#008000}'Tenant ID'{color}, The following exception is thrown: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180) at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268) at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261) at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: io.debezium.DebeziumException: Failed to set field default value for 'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default value is 0 of type class java.lang.String at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421) at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at jav
[jira] [Commented] (FLINK-37256) Firing timers can block recovery process
[ https://issues.apache.org/jira/browse/FLINK-37256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923966#comment-17923966 ] Piotr Nowojski commented on FLINK-37256: I can see two broad solutions. 1. Use something similar to the splittable timers mechanism during recovery to prevent operators from firing timers. For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to return without firing any timers when subtask is not RUNNING. The problem with that approach is that currently if we interrupt {{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. So if we just {{return false}} without any further changes, the mailbox executor would just again try to fire the timers, this time via mailbox, before processing remaining in-flight records leading to a live lock. There might be some easy solution to this problem. 2. Alternative approach might be to just filter out watermarks from unaligned checkpoints in-flight data. Currently this seems to be fine, as the mechanism of persisting watermarks in the in-flight data is very dubious. Already processed watermarks by the operators are not persisted in any way, so there is just a small random chance that some watermark(s) will be persisted as in-flight data - and only those will be persisted across recovery. So either way, we must relay on the watermark generators to re-emit new watermarks after recovery. Filtering out watermarks from the in-flight records would be more consistent. Watermarks that are freshly generated, that were not persisted the in-flight data are not a problem, as this predominantly happens only in source tasks, and so due to lack of the input in-flight data for the source tasks, those watermarks are already generated in the subtask's RUNNING state. Downside of this approach is: * if we decide to finally persist watermarks across recoveries, we would have to re-introduce watermarks back to the in-flight data > Firing timers can block recovery process > > > Key: FLINK-37256 > URL: https://issues.apache.org/jira/browse/FLINK-37256 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Task >Affects Versions: 2.0.0, 1.20.0 >Reporter: Piotr Nowojski >Priority: Major > > Splitable/interruptible timers for checkpointnig were introduced in > FLINK-20217 as part of the > https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing > . > However the exact same problem can happen during recovery. Usually (only?) > due to a watermark that was caught along the in-flight data, that is being > processed during a subtask's "INITIALIZATION" phase. The problem is now that > while we are in the initialization phase, job can not perform any > checkpoints. This issue is compounded if there is some data multiplication > operator in the pipeline, downstream from the operator that has a lot of > timers to fire. What can happen then is: > * some upstream operator A is firing a lot of timers, that produce a lot of > data (for example 100 000 records) while it's still INITIALIZING > * those records are multiplied downstream (operators B, C, ...) by for > example factor of 100x > * in the end, sinks have to accept ~100 000 * 100 records before that > upstream operator A can finish processing in-flight data and switch to RUNNING > This can take hours. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’
[ https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] caixiaowei updated FLINK-37254: --- Description: When the table schema contains the following field: {color:#00}`tenant_id`{color} {color:#80}bigint{color}({color:#ff}20{color}) {color:#80}unsigned{color} {color:#80}NOT{color} {color:#80}NULL{color} {color:#80}DEFAULT{color} {color:#008000}'0'{color} {color:#00}COMMENT{color} {color:#008000}'Tenant ID'{color}, The following exception is thrown: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180) at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268) at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261) at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: io.debezium.DebeziumException: Failed to set field default value for 'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default value is 0 of type class java.lang.String at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421) at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.u
[jira] [Comment Edited] (FLINK-37256) Firing timers can block recovery process
[ https://issues.apache.org/jira/browse/FLINK-37256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923966#comment-17923966 ] Piotr Nowojski edited comment on FLINK-37256 at 2/5/25 8:56 AM: I can see two broad solutions. 1. Use something similar to the splittable timers mechanism during recovery to prevent operators from firing timers. For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to return without firing any timers when subtask is not RUNNING. The problem with that approach is that currently if we interrupt {{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. So if we just {{return false}} without any further changes, the mailbox executor would just again try to fire the timers, this time via mailbox, before processing remaining in-flight records leading to a live lock. There might be some easy solution to this problem. 2. Alternative approach might be to just filter out watermarks from unaligned checkpoints in-flight data. Currently this seems to be fine, as the mechanism of persisting watermarks in the in-flight data is very dubious. Already processed watermarks by the operators are not persisted in any way, so there is just a small random chance that some watermark(s) will be persisted as in-flight data - and only those will be persisted across recovery. So either way, we must relay on the watermark generators to re-emit new watermarks after recovery. Filtering out watermarks from the in-flight records would be more consistent. Watermarks that are freshly generated, that were not persisted the in-flight data are not a problem, as this predominantly happens only in source tasks, and so due to lack of the input in-flight data for the source tasks, those watermarks are already generated in the subtask's RUNNING state. Downside of this approach is: * if we decide to finally persist watermarks across recoveries, we would have to re-introduce watermarks back to the in-flight data. However I'm not sure if we will ever be able to implement persisting watermarks, especially across rescalings. was (Author: pnowojski): I can see two broad solutions. 1. Use something similar to the splittable timers mechanism during recovery to prevent operators from firing timers. For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to return without firing any timers when subtask is not RUNNING. The problem with that approach is that currently if we interrupt {{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. So if we just {{return false}} without any further changes, the mailbox executor would just again try to fire the timers, this time via mailbox, before processing remaining in-flight records leading to a live lock. There might be some easy solution to this problem. 2. Alternative approach might be to just filter out watermarks from unaligned checkpoints in-flight data. Currently this seems to be fine, as the mechanism of persisting watermarks in the in-flight data is very dubious. Already processed watermarks by the operators are not persisted in any way, so there is just a small random chance that some watermark(s) will be persisted as in-flight data - and only those will be persisted across recovery. So either way, we must relay on the watermark generators to re-emit new watermarks after recovery. Filtering out watermarks from the in-flight records would be more consistent. Watermarks that are freshly generated, that were not persisted the in-flight data are not a problem, as this predominantly happens only in source tasks, and so due to lack of the input in-flight data for the source tasks, those watermarks are already generated in the subtask's RUNNING state. Downside of this approach is: * if we decide to finally persist watermarks across recoveries, we would have to re-introduce watermarks back to the in-flight data > Firing timers can block recovery process > > > Key: FLINK-37256 > URL: https://issues.apache.org/jira/browse/FLINK-37256 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Task >Affects Versions: 2.0.0, 1.20.0 >Reporter: Piotr Nowojski >Priority: Major > > Splitable/interruptible timers for checkpointnig were introduced in > FLINK-20217 as part of the > https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing > . > However the exact same problem can happen during recovery. Usually (only?) > due to a watermark that was caught along the in-flight data, that is being > process
[jira] [Resolved] (FLINK-36929) SQL connector for keyed savepoint data
[ https://issues.apache.org/jira/browse/FLINK-36929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi resolved FLINK-36929. --- Fix Version/s: 2.1.0 Resolution: Fixed [{{a4eb676}}|https://github.com/apache/flink/commit/a4eb67642a826b941b02c7221846840326c3ff55] on master > SQL connector for keyed savepoint data > -- > > Key: FLINK-36929 > URL: https://issues.apache.org/jira/browse/FLINK-36929 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 2.0-preview >Reporter: Gabor Somogyi >Assignee: Gabor Somogyi >Priority: Major > Labels: pull-request-available > Fix For: 2.1.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36929) SQL connector for keyed savepoint data
[ https://issues.apache.org/jira/browse/FLINK-36929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gabor Somogyi closed FLINK-36929. - > SQL connector for keyed savepoint data > -- > > Key: FLINK-36929 > URL: https://issues.apache.org/jira/browse/FLINK-36929 > Project: Flink > Issue Type: Improvement > Components: Table SQL / API >Affects Versions: 2.0-preview >Reporter: Gabor Somogyi >Assignee: Gabor Somogyi >Priority: Major > Labels: pull-request-available > Fix For: 2.1.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-32738][formats] PROTOBUF format supports projection push down [flink]
zhougit86 commented on PR #23323: URL: https://github.com/apache/flink/pull/23323#issuecomment-2636294560 > @zhougit86 @ljw-hit Hi, is there a plan to revive effort on it? Ok let me look into conflict first -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-37254] Add OceanBaseDefaultValueConverter to fix table schema validation [flink-cdc]
whhe opened a new pull request, #3907: URL: https://github.com/apache/flink-cdc/pull/3907 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37254) flink cdc的oceanbase cdc 存在表字段的默认值为default '0'抛出异常
[ https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37254: --- Labels: pull-request-available (was: ) > flink cdc的oceanbase cdc 存在表字段的默认值为default '0'抛出异常 > - > > Key: FLINK-37254 > URL: https://issues.apache.org/jira/browse/FLINK-37254 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 > Environment: flink 1.18.1 > flink cdc 3.1.0 > oceanbase 5.7.25-OceanBase_CE-v4.1.0.2 > >Reporter: caixiaowei >Priority: Major > Labels: pull-request-available > > 当表结构的字段存在: > {color:#00}{color:#006464}`tenant_id`{color} > {color:#80}bigint{color}({color:#ff}20{color}) > {color:#80}unsigned{color} {color:#80}NOT{color} > {color:#80}NULL{color} {color:#80}DEFAULT{color} > {color:#008000}'0'{color} {color:#00}COMMENT{color} > {color:#008000}'租户ID'{color},{color} > > 会抛出以下异常: > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180) > at > org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261) > at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) > at > org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) > at > org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > at > org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) > at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) > at > org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) > at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) > at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) > at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) > at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) > at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) > at > java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java) > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) > Caused by: io.debezium.DebeziumException: Failed to set field default value > for 'base_system.sys_department.tenant_id' of
Re: [PR] [FLINK-37021][state/forst] Fix incorrect paths when reusing files for checkpoints. [flink]
Zakelly merged PR #26040: URL: https://github.com/apache/flink/pull/26040 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-37021) Implement fast checkpoint/rescaling for ForStKeyedStateBackend
[ https://issues.apache.org/jira/browse/FLINK-37021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923962#comment-17923962 ] Zakelly Lan commented on FLINK-37021: - Follow up PR: * master: f975783ce9a688a99e2f74fe6f2f5ad204589fdc > Implement fast checkpoint/rescaling for ForStKeyedStateBackend > -- > > Key: FLINK-37021 > URL: https://issues.apache.org/jira/browse/FLINK-37021 > Project: Flink > Issue Type: Sub-task >Reporter: Yanfei Lei >Assignee: Han Yin >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’
[ https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] caixiaowei updated FLINK-37254: --- Description: When the table schema contains the following field: {color:#00}`tenant_id`{color} {color:#80}bigint{color}({color:#ff}20{color}) {color:#80}unsigned{color} {color:#80}NOT{color} {color:#80}NULL{color} {color:#80}DEFAULT{color} {color:#008000}'0'{color} {color:#00}COMMENT{color} {color:#008000}'Tenant ID'{color}, The following exception is thrown: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180) at org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268) at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261) at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) at org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: io.debezium.DebeziumException: Failed to set field default value for 'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default value is 0 of type class java.lang.String at io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421) at io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util
Re: [PR] [FLINK-37236] Flink 2.0 preview support [flink-kubernetes-operator]
afedulov commented on code in PR #938: URL: https://github.com/apache/flink-kubernetes-operator/pull/938#discussion_r1942507092 ## flink-kubernetes-operator-api/pom.xml: ## @@ -55,6 +55,18 @@ under the License. + +org.apache.flink +flink-core-api +${flink.version} + Review Comment: Maybe a comment why such exclusion is needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37255] Fix unable to configure `SCAN_PARTITION_RECORD_SIZE` option [flink-connector-mongodb]
yuxiqian commented on PR #51: URL: https://github.com/apache/flink-connector-mongodb/pull/51#issuecomment-2636181379 Thanks for @Jiabao-Sun's kind review! Addressed comments in latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36439][docs] Documents for Disaggregate State and new State APIs [flink]
Zakelly opened a new pull request, #26107: URL: https://github.com/apache/flink/pull/26107 ## What is the purpose of the change English documents for Disaggregate State and new State APIs ## Brief change log Still writting... will add commit if finished. - Documents for State API V2 ## Verifying this change This change is a docs work without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36439) Documentation for Disaggregated State Storage and Management
[ https://issues.apache.org/jira/browse/FLINK-36439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36439: --- Labels: pull-request-available (was: ) > Documentation for Disaggregated State Storage and Management > - > > Key: FLINK-36439 > URL: https://issues.apache.org/jira/browse/FLINK-36439 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Runtime / Checkpointing, Runtime / State > Backends, Runtime / Task >Reporter: Zakelly Lan >Assignee: Zakelly Lan >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [hotfix][table-planner] Use equals instead of '==' to compare digests in FlinkCalcMergeRule [flink]
lincoln-lil opened a new pull request, #26108: URL: https://github.com/apache/flink/pull/26108 ## What is the purpose of the change This is a trivial fix for string comparison in `FlinkCalcMergeRule`. However it doesn't actually affect the optimization results, so no new cases were added to the fix. But we can verify this detail by debugging calcrule's optimization process with the following case(add to `FlinkCalcMergeRuleTest`): ```scala @Test def testCalcMergeWithTrivialCalc(): Unit = { val testTable = "MyTable" val relBuilder = util.getPlanner.plannerContext.createRelBuilder() val flinkLogicalTraits = relBuilder.getCluster.traitSetOf(FlinkConventions.LOGICAL) val table = relBuilder.getRelOptSchema .asInstanceOf[CalciteCatalogReader] .getTable(ImmutableList.of(testTable)) .asInstanceOf[FlinkPreparingTableBase] val flinkLogicalTableScan = new FlinkLogicalDataStreamTableScan( relBuilder.getCluster, flinkLogicalTraits, Collections.emptyList[RelHint](), table) relBuilder.scan(testTable) val logicScan = relBuilder.peek(0) val rowType = logicScan.getRowType val projects = (0 until (rowType.getFieldCount)).map(f => relBuilder.field(f)).toList val program = RexProgram.create( rowType, projects, null, rowType, relBuilder.getRexBuilder ) val bottomCalc = FlinkLogicalCalc.create(flinkLogicalTableScan, program) // create a trivial calc val topCalc = FlinkLogicalCalc.create(bottomCalc, program) val optimizedRels = util.getPlanner.optimize(Array(topCalc)) print(bottomCalc.getRelDigest, optimizedRels.get(0).getRelDigest) assertThat(bottomCalc.getRelDigest).isEqualTo(optimizedRels.get(0).getRelDigest) //util.assertPlanEquals(Array(topCalc), Array.empty[ExplainDetail], withRowType = false, Array(PlanKind.OPT_REL), () => Unit) } ``` ## Brief change log fix string comparison ## Verifying this change existing tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with @Public(Evolving): (no) - The serializers: (no ) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] Update copyright NOTICE year to 2025 [flink]
yangjf2019 closed pull request #25916: [hotfix] Update copyright NOTICE year to 2025 URL: https://github.com/apache/flink/pull/25916 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37255] Fix unable to configure scan.partition.record-size option [flink-connector-mongodb]
Jiabao-Sun merged PR #51: URL: https://github.com/apache/flink-connector-mongodb/pull/51 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-37255) Unsupported options: scan.partition.record-size
[ https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923982#comment-17923982 ] Jiabao Sun edited comment on FLINK-37255 at 2/5/25 11:11 AM: - Fixed via mongodb repo main: 123334b8a70e5b3cccfeab6b8e083fbcd7e617d2 v2.0: e9d484b1e040de9d9c4f667ac941a2081348d435 was (Author: JIRAUSER304154): Fixed via mongodb repo main: 123334b8a70e5b3cccfeab6b8e083fbcd7e617d2 > Unsupported options: scan.partition.record-size > --- > > Key: FLINK-37255 > URL: https://issues.apache.org/jira/browse/FLINK-37255 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Jiabao Sun >Assignee: yux >Priority: Blocker > Labels: pull-request-available > Fix For: mongodb-2.0.0 > > > The new configuration option scan.partition.record-size was introduced by > FLINK-36075, but it was not added to the optional options, making this > configuration item unusable. > {code:sql} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Unsupported options found for > 'mongodb'. > Unsupported options: > scan.partition.record-size > Supported options: > collection > connector > database > filter.handling.policy > ... > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [BP-2.0][FLINK-37255] Fix unable to configure scan.partition.record-size option [flink-connector-mongodb]
Jiabao-Sun merged PR #52: URL: https://github.com/apache/flink-connector-mongodb/pull/52 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-37255) Unsupported options: scan.partition.record-size
[ https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiabao Sun resolved FLINK-37255. Resolution: Fixed > Unsupported options: scan.partition.record-size > --- > > Key: FLINK-37255 > URL: https://issues.apache.org/jira/browse/FLINK-37255 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Jiabao Sun >Assignee: yux >Priority: Blocker > Labels: pull-request-available > Fix For: mongodb-2.0.0 > > > The new configuration option scan.partition.record-size was introduced by > FLINK-36075, but it was not added to the optional options, making this > configuration item unusable. > {code:sql} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Unsupported options found for > 'mongodb'. > Unsupported options: > scan.partition.record-size > Supported options: > collection > connector > database > filter.handling.policy > ... > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36161][docs]Update Integration Test Example with Sink API. [flink]
RanJinh commented on PR #25590: URL: https://github.com/apache/flink/pull/25590#issuecomment-2636439765 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37236] Flink 2.0 preview support [flink-kubernetes-operator]
gyfora commented on code in PR #938: URL: https://github.com/apache/flink-kubernetes-operator/pull/938#discussion_r1942746157 ## flink-kubernetes-operator-api/pom.xml: ## @@ -55,6 +55,18 @@ under the License. + +org.apache.flink +flink-core-api +${flink.version} + Review Comment: This hasn't changed compared to what was before here, we already had the exclusions in the api package as we do not require the transitive dependencies for that. The operator core still pulls the transitive deps as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-35687) JSON_QUERY should return a well formatted nested objects/arrays for ARRAY
[ https://issues.apache.org/jira/browse/FLINK-35687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated FLINK-35687: - Fix Version/s: 2.0-preview (was: 1.20.0) > JSON_QUERY should return a well formatted nested objects/arrays for > ARRAY > - > > Key: FLINK-35687 > URL: https://issues.apache.org/jira/browse/FLINK-35687 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 2.0-preview > > > {code} > SELECT JSON_QUERY('{"items": [{"itemId":1234, "count":10}]}', '$.items' > RETURNING ARRAY) > {code} > returns > {code} > ['{itemId=1234, count=10}'] > {code} > but it should: > {code} > ['{"itemId":1234, "count":10}'] > {code} > We should call jsonize for Collection types here: > https://github.com/apache/flink/blob/f6f88135b3a5fa5616fe905346e5ab6dce084555/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java#L268 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37242] Add labels for flink-operator-webhook-service [flink-kubernetes-operator]
gyfora merged PR #939: URL: https://github.com/apache/flink-kubernetes-operator/pull/939 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-37242) Enable setting custom labels for flink-operator-webhook-service
[ https://issues.apache.org/jira/browse/FLINK-37242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-37242. -- Assignee: thanh tung dao Resolution: Fixed merged to main 67aa78b1cc52acba4e73dc348d98196617464cb1 > Enable setting custom labels for flink-operator-webhook-service > --- > > Key: FLINK-37242 > URL: https://issues.apache.org/jira/browse/FLINK-37242 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.10.0 >Reporter: thanh tung dao >Assignee: thanh tung dao >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.11.0 > > > In the mutation webhook yaml of the helm chart, it is not possible to create > custom labels for the flink-operator-webhook-service. > Sometimes, there are requirements from the centralised platform team that the > Service resource must be tagged with certain labels in order for it to pass > the Admission Controller ( OPA Gatekeeper for example) > > My plan is to open a PR to enable this feature in values.yaml file of the > helm chart -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37258][runtime] Return Ordered Job list on Disptacher#requestMultipleJobDetails [flink]
flinkbot commented on PR #26111: URL: https://github.com/apache/flink/pull/26111#issuecomment-2637257809 ## CI report: * 94244da081bfe60e66ddf0bf49c176d7881f6983 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37253] Add state size in application status and deployment metrics [flink-kubernetes-operator]
mxm commented on code in PR #941: URL: https://github.com/apache/flink-kubernetes-operator/pull/941#discussion_r1943210819 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java: ## @@ -403,6 +403,28 @@ public static Long calculateClusterMemoryUsage(Configuration conf, int taskManag return tmTotalMemory + jmTotalMemory; } +public static Long calculateClusterStateSize(Configuration conf, int taskManagerReplicas) { Review Comment: I didn't see your comment in https://github.com/apache/flink-kubernetes-operator/pull/941#discussion_r1943008300, it was somehow hidden when I replied. Sorry, this code was unused code. I have removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37258) Order Flink Job list in Flink UI with start time
[ https://issues.apache.org/jira/browse/FLINK-37258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl updated FLINK-37258: -- Fix Version/s: (was: 2.1.0) > Order Flink Job list in Flink UI with start time > > > Key: FLINK-37258 > URL: https://issues.apache.org/jira/browse/FLINK-37258 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.19.1, 2.0-preview >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Minor > Labels: pull-request-available > Attachments: Screen Recording 2025-02-05 at 12.33.53.mov > > > h2. Description > When having multiple jobs, Flink UI job lists have non-deterministic > ordering. in fact the ordering changes continuously which is annoying, The > issue is demoed in the attached video. > h2. Acceptance Criteria > Jobs in Flink Dashboard UI are ordered by start time consistently -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37259) StreamCheckpointingITCase times out
Matthias Pohl created FLINK-37259: - Summary: StreamCheckpointingITCase times out Key: FLINK-37259 URL: https://issues.apache.org/jira/browse/FLINK-37259 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 2.0.0, 2.1.0 Reporter: Matthias Pohl https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=65800&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba&l=10103 {code} "main" #1 prio=5 os_prio=0 cpu=3360.33ms elapsed=12292.07s tid=0x7fc4cd89c000 nid=0x7a6d waiting on condition [0x7fc4cdeb3000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@17.0.7/Native Method) - parking to wait for <0xa160fa30> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.park(java.base@17.0.7/LockSupport.java:211) at java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.7/CompletableFuture.java:1864) at java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.7/ForkJoinPool.java:3463) at java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.7/ForkJoinPool.java:3434) at java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.7/CompletableFuture.java:1898) at java.util.concurrent.CompletableFuture.get(java.base@17.0.7/CompletableFuture.java:2072) at org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult(TestUtils.java:102) at org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase.runCheckpointedProgram(StreamFaultToleranceTestBase.java:136) [...] {code} Parsing the Maven output reveals that {{StreamCheckpointingITCase}} is the test run that times out: {code} cat 165.txt | grep "Running \|Tests run: " | grep -o '[^ ]*$' | sort | uniq -c | grep -v " 2" [...] 1 org.apache.flink.test.checkpointing.StreamCheckpointingITCase {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37253] Add state size in application status and deployment metrics [flink-kubernetes-operator]
mxm commented on code in PR #941: URL: https://github.com/apache/flink-kubernetes-operator/pull/941#discussion_r1943120952 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -757,16 +778,35 @@ public Map getClusterInfo(Configuration conf) throws Exception { DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision()); } +} -var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size(); -clusterInfo.put( -FIELD_NAME_TOTAL_CPU, -String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas))); -clusterInfo.put( -FIELD_NAME_TOTAL_MEMORY, -String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas))); +private void populateStateSize( +Configuration conf, @Nullable String jobId, Map clusterInfo) +throws Exception { +if (jobId != null) { +try (RestClusterClient clusterClient = getClusterClient(conf)) { +var checkpointingStatisticsHeaders = CheckpointingStatisticsHeaders.getInstance(); +var parameters = checkpointingStatisticsHeaders.getUnresolvedMessageParameters(); + parameters.jobPathParameter.resolve(JobID.fromHexString(jobId)); -return clusterInfo; +CheckpointingStatistics checkpointingStatistics = +clusterClient +.sendRequest( +checkpointingStatisticsHeaders, +parameters, +EmptyRequestBody.getInstance()) +.get(); +CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics = +checkpointingStatistics +.getLatestCheckpoints() +.getCompletedCheckpointStatistics(); +if (completedCheckpointStatistics != null) { +clusterInfo.put( +FIELD_NAME_STATE_SIZE, + String.valueOf(completedCheckpointStatistics.getCheckpointedSize())); Review Comment: Good question. It is a bit tricky to find this out, even after studying the code. Good news is that there should be a metric that is somewhat approximate to the actual full state / checkpoint size. It was introduced via https://issues.apache.org/jira/browse/FLINK-25557. Turns out, I was using the wrong metric because of a misleading JavaDoc in CheckpointStatistics. I've updated the PR. In the Rest API, there is `stateSize` and `checkpointedSize`. The former is the full state size, while the latter is the checkpointed size which various depending on whether checkpointing is incremental or not. StateSize is based on https://github.com/apache/flink/blob/d0d44d6196c02552179ec23c799d9769e128a8ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java#L66. I'm not 100% sure whether it always yields the exact state size but it should be close. CheckpointedSize is based on https://github.com/apache/flink/blob/d0d44d6196c02552179ec23c799d9769e128a8ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java#L62. I wonder whether we should expose both? ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java: ## @@ -403,6 +403,28 @@ public static Long calculateClusterMemoryUsage(Configuration conf, int taskManag return tmTotalMemory + jmTotalMemory; } +public static Long calculateClusterStateSize(Configuration conf, int taskManagerReplicas) { Review Comment: State size or checkpoint size isn't directly related to the cluster memory size. For the heap memory backend, we would expect the state size to be lower than the overall memory. For RocksDB, it could even exceed the cluster memory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37253] Add state size in application status and deployment metrics [flink-kubernetes-operator]
mxm commented on code in PR #941: URL: https://github.com/apache/flink-kubernetes-operator/pull/941#discussion_r1943201542 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java: ## @@ -757,16 +778,35 @@ public Map getClusterInfo(Configuration conf) throws Exception { DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision()); } +} -var taskManagerReplicas = getTaskManagersInfo(conf).getTaskManagerInfos().size(); -clusterInfo.put( -FIELD_NAME_TOTAL_CPU, -String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, taskManagerReplicas))); -clusterInfo.put( -FIELD_NAME_TOTAL_MEMORY, -String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, taskManagerReplicas))); +private void populateStateSize( +Configuration conf, @Nullable String jobId, Map clusterInfo) +throws Exception { +if (jobId != null) { +try (RestClusterClient clusterClient = getClusterClient(conf)) { +var checkpointingStatisticsHeaders = CheckpointingStatisticsHeaders.getInstance(); +var parameters = checkpointingStatisticsHeaders.getUnresolvedMessageParameters(); + parameters.jobPathParameter.resolve(JobID.fromHexString(jobId)); -return clusterInfo; +CheckpointingStatistics checkpointingStatistics = +clusterClient +.sendRequest( +checkpointingStatisticsHeaders, +parameters, +EmptyRequestBody.getInstance()) +.get(); +CheckpointStatistics.CompletedCheckpointStatistics completedCheckpointStatistics = +checkpointingStatistics +.getLatestCheckpoints() +.getCompletedCheckpointStatistics(); +if (completedCheckpointStatistics != null) { +clusterInfo.put( +FIELD_NAME_STATE_SIZE, + String.valueOf(completedCheckpointStatistics.getCheckpointedSize())); Review Comment: Perfect, then the PR is good as-is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-37258) Order Flink Job list in Flink UI with start time
[ https://issues.apache.org/jira/browse/FLINK-37258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias Pohl reassigned FLINK-37258: - Assignee: Ahmed Hamdy > Order Flink Job list in Flink UI with start time > > > Key: FLINK-37258 > URL: https://issues.apache.org/jira/browse/FLINK-37258 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.19.1, 2.0-preview >Reporter: Ahmed Hamdy >Assignee: Ahmed Hamdy >Priority: Minor > Labels: pull-request-available > Fix For: 2.1.0 > > Attachments: Screen Recording 2025-02-05 at 12.33.53.mov > > > h2. Description > When having multiple jobs, Flink UI job lists have non-deterministic > ordering. in fact the ordering changes continuously which is annoying, The > issue is demoed in the attached video. > h2. Acceptance Criteria > Jobs in Flink Dashboard UI are ordered by start time consistently -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37253] Add state size in application status and deployment metrics [flink-kubernetes-operator]
mxm commented on PR #941: URL: https://github.com/apache/flink-kubernetes-operator/pull/941#issuecomment-2637573408 Tried it out on a local k8s cluster with various Flink versions: https://github.com/user-attachments/assets/f6909e9d-5710-4965-9645-55ce8b06fe82"; /> https://github.com/user-attachments/assets/f2285783-4a02-4433-8dde-746ce9eb8368"; /> https://github.com/user-attachments/assets/4ff10809-e5c1-4aeb-a596-17310ba55e1c"; /> https://github.com/user-attachments/assets/fe95cb6d-4ee2-4b46-b850-507536f8cfed"; /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [tests][ci] Miscellaneous improvements on CI robustness [flink-cdc]
yuxiqian closed pull request #3884: [tests][ci] Miscellaneous improvements on CI robustness URL: https://github.com/apache/flink-cdc/pull/3884 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [tests][ci] Miscellaneous improvements on CI robustness [flink-cdc]
yuxiqian commented on PR #3884: URL: https://github.com/apache/flink-cdc/pull/3884#issuecomment-2638951176 Closed in favor of #3911. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][ci] Use raise to fix uncaught throw [flink-cdc]
whhe closed pull request #3909: [hotfix][ci] Use raise to fix uncaught throw URL: https://github.com/apache/flink-cdc/pull/3909 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (FLINK-37234) Add timeout for test case to avoid Infinite waiting
[ https://issues.apache.org/jira/browse/FLINK-37234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu resolved FLINK-37234. Resolution: Fixed Fixed via master(3.3-SNAPSHOT): 81e6a4a329498ddd9ff63d9db8ddbcf22882a7fc > Add timeout for test case to avoid Infinite waiting > > > Key: FLINK-37234 > URL: https://issues.apache.org/jira/browse/FLINK-37234 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Yanquan Lv >Assignee: Yanquan Lv >Priority: Not a Priority > Labels: pull-request-available > Fix For: jdbc-3.3.0 > > > Add timeout for test case to avoid Infinite waiting. > JdbcSourceStreamRelatedITCase#waitExpectation use a while(true) loop to check > exception, however, this may lead to infinite waiting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37265) document of python elasticsearch connector exeample has a mistake
Zhang Hechuan created FLINK-37265: - Summary: document of python elasticsearch connector exeample has a mistake Key: FLINK-37265 URL: https://issues.apache.org/jira/browse/FLINK-37265 Project: Flink Issue Type: Bug Components: API / DataStream, API / Python, Connectors / ElasticSearch Affects Versions: 1.20.0, 1.19.0, 1.18.0, 1.17.0 Reporter: Zhang Hechuan Elasticsearch 7 static index: {code:java} //代码占位符 from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter env = StreamExecutionEnvironment.get_execution_environment() env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) input = ... # The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered es7_sink = Elasticsearch7SinkBuilder() \ .set_bulk_flush_max_actions(1) \ .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \ .set_hosts(['localhost:9200']) \ .build() input.sink_to(es7_sink).name('es7 sink') "ElasticsearchEmitter" has not method named "static", it should be "static_index"{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-37255) Unsupported options: scan.partition.record-size
[ https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923982#comment-17923982 ] Jiabao Sun commented on FLINK-37255: Fixed via mongodb repo main: 123334b8a70e5b3cccfeab6b8e083fbcd7e617d2 > Unsupported options: scan.partition.record-size > --- > > Key: FLINK-37255 > URL: https://issues.apache.org/jira/browse/FLINK-37255 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Jiabao Sun >Assignee: yux >Priority: Blocker > Labels: pull-request-available > Fix For: mongodb-2.0.0 > > > The new configuration option scan.partition.record-size was introduced by > FLINK-36075, but it was not added to the optional options, making this > configuration item unusable. > {code:sql} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Unsupported options found for > 'mongodb'. > Unsupported options: > scan.partition.record-size > Supported options: > collection > connector > database > filter.handling.policy > ... > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [hotfix][Connector/JDBC] Remove unnecessary update fields of PostgreSQL upsert statement [flink-connector-jdbc]
boring-cyborg[bot] commented on PR #155: URL: https://github.com/apache/flink-connector-jdbc/pull/155#issuecomment-2636257302 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [hotfix][Connector/JDBC] Remove unnecessary update fields of PostgreSQL upsert statement [flink-connector-jdbc]
roseduan opened a new pull request, #155: URL: https://github.com/apache/flink-connector-jdbc/pull/155 In PostgreSQL, there is no need to specify the unique key fields in the update clause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36439][docs] Documents for Disaggregate State and new State APIs [flink]
flinkbot commented on PR #26107: URL: https://github.com/apache/flink/pull/26107#issuecomment-2636265648 ## CI report: * c5c732493e7d6b3dbb877ebffd620695cef52c3d UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix][table-planner] Use equals instead of '==' to compare digests in FlinkCalcMergeRule [flink]
flinkbot commented on PR #26108: URL: https://github.com/apache/flink/pull/26108#issuecomment-2636265903 ## CI report: * dcaa98389b7308ab184467a039163b466b560d53 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37255] Fix unable to configure scan.partition.record-size option [flink-connector-mongodb]
yuxiqian commented on PR #51: URL: https://github.com/apache/flink-connector-mongodb/pull/51#issuecomment-2636249551 > Thanks @yuxiqian for the quick update. LGTM. > > Could you help make a backport to v2.0 branch? Sure, cherry-picked in #52 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-37255) Unsupported options: scan.partition.record-size
[ https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923955#comment-17923955 ] Jiabao Sun commented on FLINK-37255: Thanks [~xiqian_yu], assigned to you. > Unsupported options: scan.partition.record-size > --- > > Key: FLINK-37255 > URL: https://issues.apache.org/jira/browse/FLINK-37255 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Jiabao Sun >Assignee: yux >Priority: Blocker > Fix For: mongodb-2.0.0 > > > The new configuration option scan.partition.record-size was introduced by > FLINK-36075, but it was not added to the optional options, making this > configuration item unusable. > {code:sql} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Unsupported options found for > 'mongodb'. > Unsupported options: > scan.partition.record-size > Supported options: > collection > connector > database > filter.handling.policy > ... > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37255) Unsupported options: scan.partition.record-size
[ https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37255: --- Labels: pull-request-available (was: ) > Unsupported options: scan.partition.record-size > --- > > Key: FLINK-37255 > URL: https://issues.apache.org/jira/browse/FLINK-37255 > Project: Flink > Issue Type: Bug > Components: Connectors / MongoDB >Reporter: Jiabao Sun >Assignee: yux >Priority: Blocker > Labels: pull-request-available > Fix For: mongodb-2.0.0 > > > The new configuration option scan.partition.record-size was introduced by > FLINK-36075, but it was not added to the optional options, making this > configuration item unusable. > {code:sql} > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.ValidationException: Unsupported options found for > 'mongodb'. > Unsupported options: > scan.partition.record-size > Supported options: > collection > connector > database > filter.handling.policy > ... > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-37256) Firing timers can block recovery process
Piotr Nowojski created FLINK-37256: -- Summary: Firing timers can block recovery process Key: FLINK-37256 URL: https://issues.apache.org/jira/browse/FLINK-37256 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / Task Affects Versions: 1.20.0, 2.0.0 Reporter: Piotr Nowojski Splitable/interruptible timers for checkpointnig were introduced in FLINK-20217 as part of the https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing . However the exact same problem can happen during recovery. Usually (only?) due to a watermark that was caught along the in-flight data, that is being processed during a subtask's "INITIALIZATION" phase. The problem is now that while we are in the initialization phase, job can not perform any checkpoints. This issue is compounded if there is some data multiplication operator in the pipeline, downstream from the operator that has a lot of timers to fire. What can happen then is: * some upstream operator A is firing a lot of timers, that produce a lot of data (for example 100 000 records) while it's still INITIALIZING * those records are multiplied downstream (operators B, C, ...) by for example factor of 100x * in the end, sinks have to accept ~100 000 * 100 records before that upstream operator A can finish processing in-flight data and switch to RUNNING This can take hours. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36929][table] Add SQL connector for keyed savepoint data [flink]
gaborgsomogyi merged PR #26035: URL: https://github.com/apache/flink/pull/26035 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36660] Bump to 2.0-preview1. [flink-connector-elasticsearch]
boring-cyborg[bot] commented on PR #115: URL: https://github.com/apache/flink-connector-elasticsearch/pull/115#issuecomment-2636487184 Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36660) Release flink-connector-elasticsearch vx.x.x for Flink 2.0
[ https://issues.apache.org/jira/browse/FLINK-36660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36660: --- Labels: pull-request-available (was: ) > Release flink-connector-elasticsearch vx.x.x for Flink 2.0 > -- > > Key: FLINK-36660 > URL: https://issues.apache.org/jira/browse/FLINK-36660 > Project: Flink > Issue Type: Sub-task > Components: Connectors / ElasticSearch >Affects Versions: 2.0.0 >Reporter: Yanquan Lv >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > Release a version of flink-connector-elasticsearch connector that bumped > Flink 2.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36660] Bump to 2.0-preview1. [flink-connector-elasticsearch]
lvyanquan opened a new pull request, #115: URL: https://github.com/apache/flink-connector-elasticsearch/pull/115 Bump Flink version to 2.0-preview1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [BP-2.0][FLINK-37021][state/forst] Fix incorrect paths when reusing and creating files. [flink]
Zakelly merged PR #26106: URL: https://github.com/apache/flink/pull/26106 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37222] Do not reuse views across TableEnvironments in SQL client [flink]
Zakelly commented on PR #26093: URL: https://github.com/apache/flink/pull/26093#issuecomment-2636492172 Thanks. Merging... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-37021) Implement fast checkpoint/rescaling for ForStKeyedStateBackend
[ https://issues.apache.org/jira/browse/FLINK-37021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923962#comment-17923962 ] Zakelly Lan edited comment on FLINK-37021 at 2/5/25 11:38 AM: -- Follow up PR: * master: f975783ce9a688a99e2f74fe6f2f5ad204589fdc * 2.0: 5512299630e3d4d6e92c4e4ac25059e64c76ebca was (Author: zakelly): Follow up PR: * master: f975783ce9a688a99e2f74fe6f2f5ad204589fdc > Implement fast checkpoint/rescaling for ForStKeyedStateBackend > -- > > Key: FLINK-37021 > URL: https://issues.apache.org/jira/browse/FLINK-37021 > Project: Flink > Issue Type: Sub-task >Reporter: Yanfei Lei >Assignee: Han Yin >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37222] Do not reuse views across TableEnvironments in SQL client [flink]
Zakelly merged PR #26093: URL: https://github.com/apache/flink/pull/26093 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-37222) Table planner exception when sql client submit job
[ https://issues.apache.org/jira/browse/FLINK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924034#comment-17924034 ] Zakelly Lan commented on FLINK-37222: - Master: * ac4f5952d5dd47e09644681b8f0fbf2b46a76509 * b88a6cac675a6acf6e2e8a62d225f5db2057f709 2.0: * 12493ead11f59697cda1baf0507b0ca8190f851a * 0714e670e638e4e3f71086944550b6f4fc51b04e > Table planner exception when sql client submit job > -- > > Key: FLINK-37222 > URL: https://issues.apache.org/jira/browse/FLINK-37222 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 2.0.0 >Reporter: Zakelly Lan >Assignee: Dawid Wysakowicz >Priority: Blocker > Labels: pull-request-available > Fix For: 2.0.0 > > Attachments: flink-root-sql-client-master-1-1.c-76646bdbb8bdab89.log > > > When testing [Nexmark|https://github.com/nexmark/nexmark] on release-2.0 > branch, a table planner related exception thrown by the sql client: > {code:java} > // Omit some stacktraces before > Caused by: > org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to > execute the operation d3516bc9-44c1-428e-ac10-2fb0ddd0c825. > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:415) > at > org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:268) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ... 1 more > Caused by: java.lang.AssertionError: Sql optimization: Assertion error: > Relational expression rel#714:LogicalProject.NONE.any.None: > 0.[NONE].[NONE](input=LogicalAggregate#712,exprs=[$2, $1]) belongs to a > different planner than is currently being used. > at > org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) > at > scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) > at scala.collection.Iterator.foreach(Iterator.scala:943) > at scala.collection.Iterator.foreach$(Iterator.scala:943) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > at scala.collection.IterableLike.foreach(IterableLike.scala:74) > at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) > at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) > at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) > at > org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:83) > at > org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118) > at > org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87) > at > org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1352) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:930) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.callModifyOperations(OperationExecutor.java:662) > at > org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(Op
Re: [PR] [BP-2.0][FLINK-37222] Do not reuse views across TableEnvironments in SQL client [flink]
Zakelly merged PR #26104: URL: https://github.com/apache/flink/pull/26104 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37258][runtime] Return Ordered Job list on Disptacher#requestMultipleJobDetails [flink]
XComp commented on code in PR #26111: URL: https://github.com/apache/flink/pull/26111#discussion_r1943202076 ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ## @@ -878,8 +879,12 @@ public CompletableFuture requestMultipleJobDetails(Duration completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); +Collection orderedDeduplicatedJobs = +deduplicatedJobs.values().stream() + .sorted(Comparator.comparingLong(JobDetails::getStartTime)) Review Comment: I know it's quite unlikely but should we use the job ID as a fallback if the start time is the same? ## flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java: ## @@ -878,8 +879,12 @@ public CompletableFuture requestMultipleJobDetails(Duration completedJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); runningJobDetails.forEach(job -> deduplicatedJobs.put(job.getJobId(), job)); +Collection orderedDeduplicatedJobs = +deduplicatedJobs.values().stream() + .sorted(Comparator.comparingLong(JobDetails::getStartTime)) +.collect(Collectors.toList()); -return new MultipleJobsDetails(new HashSet<>(deduplicatedJobs.values())); +return new MultipleJobsDetails(new ArrayList<>(orderedDeduplicatedJobs)); Review Comment: ```suggestion return new MultipleJobsDetails(orderedDeduplicatedJobs); ``` I guess, we don't need the explicit `ArrayList`, do we? ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java: ## @@ -1225,7 +1257,16 @@ public void testOverridingJobVertexParallelisms() throws Exception { private JobManagerRunner runningJobManagerRunnerWithJobStatus( final JobStatus currentJobStatus) { +return runningJobManagerRunnerWithJobStatus(currentJobStatus, jobId, 0L); +} + +private JobManagerRunner runningJobManagerRunnerWithJobStatus( +final JobStatus currentJobStatus, final JobID jobId, long startTime) { Preconditions.checkArgument(!currentJobStatus.isTerminalState()); +long[] stateTimeStampsForRunningJob = new long[JobStatus.values().length]; +stateTimeStampsForRunningJob[JobStatus.INITIALIZING.ordinal()] = startTime; Review Comment: nit, you could add custom timestamps for `CREATED` and `RUNNING` that have inverse values for the two jobs to verify that the `INITIALIZING` timestamp is used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37258][runtime] Return Ordered Job list on Disptacher#requestMultipleJobDetails [flink]
davidradl commented on code in PR #26111: URL: https://github.com/apache/flink/pull/26111#discussion_r1943263854 ## flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java: ## @@ -1225,7 +1257,16 @@ public void testOverridingJobVertexParallelisms() throws Exception { private JobManagerRunner runningJobManagerRunnerWithJobStatus( final JobStatus currentJobStatus) { +return runningJobManagerRunnerWithJobStatus(currentJobStatus, jobId, 0L); +} + +private JobManagerRunner runningJobManagerRunnerWithJobStatus( +final JobStatus currentJobStatus, final JobID jobId, long startTime) { Preconditions.checkArgument(!currentJobStatus.isTerminalState()); +long[] stateTimeStampsForRunningJob = new long[JobStatus.values().length]; Review Comment: I assume that this method ends up being called and we want the order of the jobs to be sensible. Would the most sensible for the UI be the latest first? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-37234) Add timeout for test case to avoid Infinite waiting
[ https://issues.apache.org/jira/browse/FLINK-37234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-37234: -- Assignee: Yanquan Lv > Add timeout for test case to avoid Infinite waiting > > > Key: FLINK-37234 > URL: https://issues.apache.org/jira/browse/FLINK-37234 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Yanquan Lv >Assignee: Yanquan Lv >Priority: Not a Priority > Fix For: jdbc-3.3.0 > > > Add timeout for test case to avoid Infinite waiting. > JdbcSourceStreamRelatedITCase#waitExpectation use a while(true) loop to check > exception, however, this may lead to infinite waiting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37234] Add test timeout to avoid infinite waiting. [flink-connector-jdbc]
leonardBang merged PR #154: URL: https://github.com/apache/flink-connector-jdbc/pull/154 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37234) Add timeout for test case to avoid Infinite waiting
[ https://issues.apache.org/jira/browse/FLINK-37234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37234: --- Labels: pull-request-available (was: ) > Add timeout for test case to avoid Infinite waiting > > > Key: FLINK-37234 > URL: https://issues.apache.org/jira/browse/FLINK-37234 > Project: Flink > Issue Type: Technical Debt > Components: Connectors / JDBC >Affects Versions: jdbc-3.2.0 >Reporter: Yanquan Lv >Assignee: Yanquan Lv >Priority: Not a Priority > Labels: pull-request-available > Fix For: jdbc-3.3.0 > > > Add timeout for test case to avoid Infinite waiting. > JdbcSourceStreamRelatedITCase#waitExpectation use a while(true) loop to check > exception, however, this may lead to infinite waiting. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37234] Add test timeout to avoid infinite waiting. [flink-connector-jdbc]
boring-cyborg[bot] commented on PR #154: URL: https://github.com/apache/flink-connector-jdbc/pull/154#issuecomment-2638952636 Awesome work, congrats on your first merged pull request! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-37263) AggregateITCase.testConstantGroupKeyWithUpsertSink failed
Weijie Guo created FLINK-37263: -- Summary: AggregateITCase.testConstantGroupKeyWithUpsertSink failed Key: FLINK-37263 URL: https://issues.apache.org/jira/browse/FLINK-37263 Project: Flink Issue Type: Bug Components: Build System / CI Affects Versions: 2.1.0 Reporter: Weijie Guo Feb 05 10:31:37 10:31:37.516 [ERROR] Run 3: AggregateITCase.testConstantGroupKeyWithUpsertSink:1730 Feb 05 10:31:37 expected: List(+I[A, 1], +I[B, 2], +I[C, 3]) Feb 05 10:31:37 but was: ArrayBuffer(+I[A, 1], +I[C, 3], +U[B, 2]) https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=65810&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=12050 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [tests][ci] Miscellaneous improvements on CI robustness [flink-cdc]
yuxiqian commented on PR #3911: URL: https://github.com/apache/flink-cdc/pull/3911#issuecomment-2638961294 Would @leonardBang and @whhe like to take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37087][doc] Add docs for alter materialized table as query [flink]
lsyldliu commented on code in PR #26064: URL: https://github.com/apache/flink/pull/26064#discussion_r1944182747 ## docs/content.zh/docs/dev/table/materialized-table/statements.md: ## @@ -326,6 +328,67 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28 注意 - REFRESH 操作会启动批作业来刷新表的数据。 +## AS +```sql +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS +``` + +`AS ` 子句用于修改刷新物化表的查询定义。它会先使用新查询推导的 `schema` 更新表的 `schema`,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。 + +具体修改流程取决于物化表的刷新模式: + +**全量模式:** + +1. 更新物化表的 `schema` 和查询定义。 +2. 在刷新作业下次触发执行时,将使用新的查询定义刷新数据: + - 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。 + - 否则,将刷新整个表的数据。 + +**持续模式:** + +1. 暂停当前的流式刷新作业。 +2. 更新物化表的 `schema` 和查询定义。 +3. 启动新的流式任务以刷新物化表: + - 新的流式任务会从头开始,而不会从之前的流式任务状态恢复。 + - 数据源的起始位点会由到连接器的默认实现或查询中设置的 [dynamic hint]({{< ref "docs/dev/table/sql/queries/hints" >}}#dynamic-table-options) 决定。 + +**示例:** + +```sql +-- 原始物化表定义 +CREATE MATERIALIZED TABLE my_materialized_table +FRESHNESS = INTERVAL '10' SECOND +AS +SELECT +user_id, +COUNT(*) AS event_count, +SUM(amount) AS total_amount +FROM +kafka_catalog.db1.events +WHERE +event_type = 'purchase' +GROUP BY +user_id; + +-- 修改现有物化表的查询 +ALTER MATERIALIZED TABLE my_materialized_table +AS SELECT +user_id, +COUNT(*) AS event_count, +SUM(amount) AS total_amount, +AVG(amount) AS avg_amount -- 在末尾追加新的可为空列 +FROM +kafka_catalog.db1.events +WHERE +event_type = 'purchase' +GROUP BY +user_id; +``` + +注意 +- Schema 演进当前仅支持在原表 schema 尾部追加`可空列`。 +- 在持续模式下,新的流式任务不会从原来的流式任务的状态恢复。这可能会导致短暂的数据重复或丢失。 Review Comment: 流式任务 -> 流式作业 ## docs/content.zh/docs/dev/table/materialized-table/statements.md: ## @@ -326,6 +328,67 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28 注意 - REFRESH 操作会启动批作业来刷新表的数据。 +## AS +```sql +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS +``` + +`AS ` 子句用于修改刷新物化表的查询定义。它会先使用新查询推导的 `schema` 更新表的 `schema`,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。 + +具体修改流程取决于物化表的刷新模式: + +**全量模式:** + +1. 更新物化表的 `schema` 和查询定义。 +2. 在刷新作业下次触发执行时,将使用新的查询定义刷新数据: + - 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。 + - 否则,将刷新整个表的数据。 + +**持续模式:** + +1. 暂停当前的流式刷新作业。 +2. 更新物化表的 `schema` 和查询定义。 +3. 启动新的流式任务以刷新物化表: Review Comment: 流式任务 -> 流式作业 ## docs/content.zh/docs/dev/table/materialized-table/statements.md: ## @@ -326,6 +328,67 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH PARTITION (ds='2024-06-28 注意 - REFRESH 操作会启动批作业来刷新表的数据。 +## AS +```sql +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS +``` + +`AS ` 子句用于修改刷新物化表的查询定义。它会先使用新查询推导的 `schema` 更新表的 `schema`,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。 + +具体修改流程取决于物化表的刷新模式: + +**全量模式:** + +1. 更新物化表的 `schema` 和查询定义。 +2. 在刷新作业下次触发执行时,将使用新的查询定义刷新数据: + - 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。 + - 否则,将刷新整个表的数据。 + +**持续模式:** + +1. 暂停当前的流式刷新作业。 +2. 更新物化表的 `schema` 和查询定义。 +3. 启动新的流式任务以刷新物化表: + - 新的流式任务会从头开始,而不会从之前的流式任务状态恢复。 Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37265) document of python elasticsearch connector exeample has a mistake
[ https://issues.apache.org/jira/browse/FLINK-37265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhang Hechuan updated FLINK-37265: -- Description: "ElasticsearchEmitter" has not method named "static", it should be "static_index". Elasticsearch 7 static index: {code:java} from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter env = StreamExecutionEnvironment.get_execution_environment() env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) input = ... # The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered es7_sink = Elasticsearch7SinkBuilder() \ .set_bulk_flush_max_actions(1) \ .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \ .set_hosts(['localhost:9200']) \ .build() input.sink_to(es7_sink).name('es7 sink'){code} was: Elasticsearch 7 static index: {code:java} //代码占位符 from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter env = StreamExecutionEnvironment.get_execution_environment() env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) input = ... # The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered es7_sink = Elasticsearch7SinkBuilder() \ .set_bulk_flush_max_actions(1) \ .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \ .set_hosts(['localhost:9200']) \ .build() input.sink_to(es7_sink).name('es7 sink') "ElasticsearchEmitter" has not method named "static", it should be "static_index"{code} > document of python elasticsearch connector exeample has a mistake > - > > Key: FLINK-37265 > URL: https://issues.apache.org/jira/browse/FLINK-37265 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Python, Connectors / > ElasticSearch >Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0 >Reporter: Zhang Hechuan >Priority: Minor > > "ElasticsearchEmitter" has not method named "static", it should be > "static_index". > Elasticsearch 7 static index: > {code:java} > from pyflink.datastream.connectors.elasticsearch import > Elasticsearch7SinkBuilder, ElasticsearchEmitter > env = StreamExecutionEnvironment.get_execution_environment() > env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) > input = ... > # The set_bulk_flush_max_actions instructs the sink to emit after every > element, otherwise they would be buffered > es7_sink = Elasticsearch7SinkBuilder() \ > .set_bulk_flush_max_actions(1) \ > .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \ > .set_hosts(['localhost:9200']) \ > .build() > input.sink_to(es7_sink).name('es7 sink'){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37265) document of python elasticsearch connector exeample has a mistake
[ https://issues.apache.org/jira/browse/FLINK-37265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhang Hechuan updated FLINK-37265: -- Description: "ElasticsearchEmitter" has no method named "static", it should be "static_index". Elasticsearch 7 static index: {code:java} from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter env = StreamExecutionEnvironment.get_execution_environment() env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) input = ... # The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered es7_sink = Elasticsearch7SinkBuilder() \ .set_bulk_flush_max_actions(1) \ .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \ .set_hosts(['localhost:9200']) \ .build() input.sink_to(es7_sink).name('es7 sink'){code} was: "ElasticsearchEmitter" has not method named "static", it should be "static_index". Elasticsearch 7 static index: {code:java} from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter env = StreamExecutionEnvironment.get_execution_environment() env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) input = ... # The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered es7_sink = Elasticsearch7SinkBuilder() \ .set_bulk_flush_max_actions(1) \ .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \ .set_hosts(['localhost:9200']) \ .build() input.sink_to(es7_sink).name('es7 sink'){code} > document of python elasticsearch connector exeample has a mistake > - > > Key: FLINK-37265 > URL: https://issues.apache.org/jira/browse/FLINK-37265 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Python, Connectors / > ElasticSearch >Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0 >Reporter: Zhang Hechuan >Priority: Minor > > "ElasticsearchEmitter" has no method named "static", it should be > "static_index". > Elasticsearch 7 static index: > {code:java} > from pyflink.datastream.connectors.elasticsearch import > Elasticsearch7SinkBuilder, ElasticsearchEmitter > env = StreamExecutionEnvironment.get_execution_environment() > env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) > input = ... > # The set_bulk_flush_max_actions instructs the sink to emit after every > element, otherwise they would be buffered > es7_sink = Elasticsearch7SinkBuilder() \ > .set_bulk_flush_max_actions(1) \ > .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \ > .set_hosts(['localhost:9200']) \ > .build() > input.sink_to(es7_sink).name('es7 sink'){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33555) LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:
[ https://issues.apache.org/jira/browse/FLINK-33555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924371#comment-17924371 ] Weijie Guo commented on FLINK-33555: https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=65823&view=logs&j=8fd9202e-fd17-5b26-353c-ac1ff76c8f28&t=ea7cf968-e585-52cb-e0fc-f48de023a7ca&l=21525 > LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory: > --- > > Key: FLINK-33555 > URL: https://issues.apache.org/jira/browse/FLINK-33555 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 2.0.0, 1.19.0, 1.20.0 >Reporter: Matthias Pohl >Priority: Critical > Labels: test-stability > Attachments: FLINK-33555.log > > > https://github.com/XComp/flink/actions/runs/6868936761/job/18680977238#step:12:13492 > {code} > Error: 21:44:15 21:44:15.144 [ERROR] > LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:119 > [The task was deployed to AllocationID(fcf411eadbae8beed895a78ea1653046) but > it should have been deployed to > AllocationID(dec337d82b9d960004ffd73be8a2c5d5) for local recovery., The task > was deployed to AllocationID(a61fd8a6bc5ef9d467f32f918bdfb385) but it should > have been deployed to AllocationID(fcf411eadbae8beed895a78ea1653046) for > local recovery., The task was deployed to > AllocationID(dec337d82b9d960004ffd73be8a2c5d5) but it should have been > deployed to AllocationID(a61fd8a6bc5ef9d467f32f918bdfb385) for local > recovery.] ==> expected: but was: > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [tests][ci] Miscellaneous improvements on CI robustness [flink-cdc]
yuxiqian opened a new pull request, #3911: URL: https://github.com/apache/flink-cdc/pull/3911 This PR mainly focuses on improving test case effectiveness by: * Single-stage CI failure should not cause the whole matrix got cancelled * Removed vulnerable & unmaintained FastJson from test dependencies * Optimize PolarDBX test case by using dynamic port bindings * Fixed unstable DataStream migration tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-37264) two bugs of DSv2
xuhuang created FLINK-37264: --- Summary: two bugs of DSv2 Key: FLINK-37264 URL: https://issues.apache.org/jira/browse/FLINK-37264 Project: Flink Issue Type: New Feature Components: API / DataStream Affects Versions: 2.0.0 Reporter: xuhuang Two bugs of DataStream V2: # When joining two tuple DataStreams using JoinExtension, an exception is thrown "Tuple needs to be parameterized by using generics". # DSv2 does not support adding a FileSink, even when the FileSink does not include committing topology. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37264] Fix two bugs of DataStream V2 [flink]
flinkbot commented on PR #26112: URL: https://github.com/apache/flink/pull/26112#issuecomment-2639023195 ## CI report: * e0f529506e5225fce4b103fcb81eabc903ebea4f UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37205][python] Correct the state cache behavior during bump beam version [flink]
dianfu merged PR #26058: URL: https://github.com/apache/flink/pull/26058 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-37205][python] Correct the state cache behavior during bump be… [flink]
dianfu merged PR #26059: URL: https://github.com/apache/flink/pull/26059 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37265) document of python elasticsearch connector exeample has a mistake
[ https://issues.apache.org/jira/browse/FLINK-37265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhang Hechuan updated FLINK-37265: -- Description: "ElasticsearchEmitter" has no method named "ElasticsearchEmitter.static", it should be "ElasticsearchEmitter.static_index". Here is the code in the document: Elasticsearch 7 static index: {code:java} from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter env = StreamExecutionEnvironment.get_execution_environment() env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) input = ... # The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered es7_sink = Elasticsearch7SinkBuilder() \ .set_bulk_flush_max_actions(1) \ .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \ .set_hosts(['localhost:9200']) \ .build() input.sink_to(es7_sink).name('es7 sink'){code} was: "ElasticsearchEmitter" has no method named "static", it should be "static_index". Elasticsearch 7 static index: {code:java} from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter env = StreamExecutionEnvironment.get_execution_environment() env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) input = ... # The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered es7_sink = Elasticsearch7SinkBuilder() \ .set_bulk_flush_max_actions(1) \ .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \ .set_hosts(['localhost:9200']) \ .build() input.sink_to(es7_sink).name('es7 sink'){code} > document of python elasticsearch connector exeample has a mistake > - > > Key: FLINK-37265 > URL: https://issues.apache.org/jira/browse/FLINK-37265 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Python, Connectors / > ElasticSearch >Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0 >Reporter: Zhang Hechuan >Priority: Minor > > "ElasticsearchEmitter" has no method named "ElasticsearchEmitter.static", it > should be "ElasticsearchEmitter.static_index". > Here is the code in the document: > Elasticsearch 7 static index: > {code:java} > from pyflink.datastream.connectors.elasticsearch import > Elasticsearch7SinkBuilder, ElasticsearchEmitter > env = StreamExecutionEnvironment.get_execution_environment() > env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) > input = ... > # The set_bulk_flush_max_actions instructs the sink to emit after every > element, otherwise they would be buffered > es7_sink = Elasticsearch7SinkBuilder() \ > .set_bulk_flush_max_actions(1) \ > .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \ > .set_hosts(['localhost:9200']) \ > .build() > input.sink_to(es7_sink).name('es7 sink'){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37265) document of python elasticsearch connector exeample has a mistake
[ https://issues.apache.org/jira/browse/FLINK-37265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhang Hechuan updated FLINK-37265: -- Description: There is no method named "ElasticsearchEmitter.static", it should be "ElasticsearchEmitter.static_index". Here is the code in the document: Elasticsearch 7 static index: {code:java} from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter env = StreamExecutionEnvironment.get_execution_environment() env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) input = ... # The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered es7_sink = Elasticsearch7SinkBuilder() \ .set_bulk_flush_max_actions(1) \ .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \ .set_hosts(['localhost:9200']) \ .build() input.sink_to(es7_sink).name('es7 sink'){code} was: "ElasticsearchEmitter" has no method named "ElasticsearchEmitter.static", it should be "ElasticsearchEmitter.static_index". Here is the code in the document: Elasticsearch 7 static index: {code:java} from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter env = StreamExecutionEnvironment.get_execution_environment() env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) input = ... # The set_bulk_flush_max_actions instructs the sink to emit after every element, otherwise they would be buffered es7_sink = Elasticsearch7SinkBuilder() \ .set_bulk_flush_max_actions(1) \ .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \ .set_hosts(['localhost:9200']) \ .build() input.sink_to(es7_sink).name('es7 sink'){code} > document of python elasticsearch connector exeample has a mistake > - > > Key: FLINK-37265 > URL: https://issues.apache.org/jira/browse/FLINK-37265 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Python, Connectors / > ElasticSearch >Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0 >Reporter: Zhang Hechuan >Priority: Minor > > There is no method named "ElasticsearchEmitter.static", it should be > "ElasticsearchEmitter.static_index". > Here is the code in the document: > Elasticsearch 7 static index: > {code:java} > from pyflink.datastream.connectors.elasticsearch import > Elasticsearch7SinkBuilder, ElasticsearchEmitter > env = StreamExecutionEnvironment.get_execution_environment() > env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) > input = ... > # The set_bulk_flush_max_actions instructs the sink to emit after every > element, otherwise they would be buffered > es7_sink = Elasticsearch7SinkBuilder() \ > .set_bulk_flush_max_actions(1) \ > .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \ > .set_hosts(['localhost:9200']) \ > .build() > input.sink_to(es7_sink).name('es7 sink'){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-37265) document of python elasticsearch connector exeample has a mistake
[ https://issues.apache.org/jira/browse/FLINK-37265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhang Hechuan updated FLINK-37265: -- Affects Version/s: (was: 1.17.0) (was: 1.18.0) (was: 1.19.0) (was: 1.20.0) > document of python elasticsearch connector exeample has a mistake > - > > Key: FLINK-37265 > URL: https://issues.apache.org/jira/browse/FLINK-37265 > Project: Flink > Issue Type: Bug > Components: API / DataStream, API / Python, Connectors / > ElasticSearch >Affects Versions: 2.0-preview >Reporter: Zhang Hechuan >Priority: Minor > > There is no method named "ElasticsearchEmitter.static", it should be > "ElasticsearchEmitter.static_index". > Here is the code in the document: > Elasticsearch 7 static index: > {code:java} > from pyflink.datastream.connectors.elasticsearch import > Elasticsearch7SinkBuilder, ElasticsearchEmitter > env = StreamExecutionEnvironment.get_execution_environment() > env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH) > input = ... > # The set_bulk_flush_max_actions instructs the sink to emit after every > element, otherwise they would be buffered > es7_sink = Elasticsearch7SinkBuilder() \ > .set_bulk_flush_max_actions(1) \ > .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \ > .set_hosts(['localhost:9200']) \ > .build() > input.sink_to(es7_sink).name('es7 sink'){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-37205][python] Correct the state cache behavior during bump be… [flink]
dianfu merged PR #26060: URL: https://github.com/apache/flink/pull/26060 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Closed] (FLINK-37205) Correct the state cache behavior during bump beam version
[ https://issues.apache.org/jira/browse/FLINK-37205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dian Fu closed FLINK-37205. --- Fix Version/s: 2.0.0 1.19.2 1.20.2 Resolution: Fixed Fixed in: - master via a498cf2c228680a44f71d81e7c08b39e1a968a1c - release-2.0 via bfea5b52be951ca74e41bc066cf655f06f828376 - release-1.20 via 14e853ab832bd9aabc61219a9ce187c201e9603c - release-1.19 via af041a2a883f31baf24537964e15e6679ba66dcd > Correct the state cache behavior during bump beam version > - > > Key: FLINK-37205 > URL: https://issues.apache.org/jira/browse/FLINK-37205 > Project: Flink > Issue Type: Bug > Components: API / Python >Affects Versions: 1.17.0, 1.18.0, 1.17.1, 1.17.2, 1.19.0, 1.18.1, 1.20.0, > 1.19.1 >Reporter: Dian Fu >Assignee: Dian Fu >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0, 1.19.2, 1.20.2 > > > It has changed the state cache strategy from size based to bytes based since > Beam 2.42.0 (See https://github.com/apache/beam/pull/22924 for more details). > We should also support memory based state cache in the long term. Before > that, we should correct the behavior which seems broken after bumping the > Beam version. It may cause the state cache continuously increase and finally > cause the job OOM. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-37264] Fix two bugs of DataStream V2 [flink]
codenohup opened a new pull request, #26112: URL: https://github.com/apache/flink/pull/26112 ## What is the purpose of the change Fix two bugs of DataStream V2: 1. When joining two tuple DataStreams using JoinExtension, an exception is thrown "Tuple needs to be parameterized by using generics". 2. DSv2 does not support adding a FileSink, even when the FileSink does not include committing topology. ## Brief change log 1. Correct usage of TypeExtractor in JoinExtension 2. Support FileSink in DSv2 if the FileSink does not add committing topology 3. Add a IT test case ## Verifying this change Added a test case: org.apache.flink.test.streaming.api.datastream.extension.join.JoinITCase#testJoinWithTuple ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-37264) two bugs of DSv2
[ https://issues.apache.org/jira/browse/FLINK-37264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-37264: --- Labels: pull-request-available (was: ) > two bugs of DSv2 > > > Key: FLINK-37264 > URL: https://issues.apache.org/jira/browse/FLINK-37264 > Project: Flink > Issue Type: New Feature > Components: API / DataStream >Affects Versions: 2.0.0 >Reporter: xuhuang >Priority: Minor > Labels: pull-request-available > > Two bugs of DataStream V2: > # When joining two tuple DataStreams using JoinExtension, an exception is > thrown "Tuple needs to be parameterized by using generics". > # DSv2 does not support adding a FileSink, even when the FileSink does not > include committing topology. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] for test [flink-docker]
reswqa opened a new pull request, #210: URL: https://github.com/apache/flink-docker/pull/210 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org