[PR] [FLINK-37156] [cdc-composer/cli] pipeline supports collecting data once and writing it to multiple sinks. [flink-cdc]

2025-02-05 Thread via GitHub


qyfftf opened a new pull request, #3906:
URL: https://github.com/apache/flink-cdc/pull/3906

   I've implemented a solution where data is collected from MySQL once and then 
sent to multiple target data sources, such as Paimon, Kafka, and StarRocks, 
which reduces redundant data collection.
   pipeline yaml :
   ```
   source:
 type: mysql
 name: MySQL Source
 hostname: 127.0.0.1
 port: 3306
 username:root
 password: 123456
 tables: test.order
 server-id: 5401-5404
 jdbc.properties.useSSL: false
   
   sink:
  - type: paimon
name: Paimon Sink
catalog.properties.metastore: filesystem
catalog.properties.warehouse: /tmp/path/warehouse
  - type: kafka
name: Kafka Sink
properties.bootstrap.servers: PLAINTEXT://localhost:9092
  - type: starrocks
jdbc-url: jdbc:mysql://127.0.0.1:9030
load-url: 127.0.0.1:8030
username: root
password: ""
table.create.properties.replication_num: 1
   
   route:
 - source-table:  test.order
   sink-table:  test.order
   
   pipeline:
 name: MySQL to Paimon Pipeline
 parallelism: 2
   ```
   
   
   https://github.com/user-attachments/assets/00ff0811-d682-4cfe-b21e-18ca56bbf53f";
 />
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37255) Unsupported options: scan.partition.record-size

2025-02-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun updated FLINK-37255:
---
Description: 
The new configuration option scan.partition.record-size was introduced by 
FLINK-36075, but it was not added to the optional options, making this 
configuration item unusable.


{code:sql}
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Unsupported options found for 
'mongodb'.

Unsupported options:

scan.partition.record-size

Supported options:

collection
connector
database
filter.handling.policy
...
{code}


  was:
The new configuration option scan.partition.record-size was introduced by 
FLINK-36075, but it was not added to the optional options, making this 
configuration item unusable.




> Unsupported options: scan.partition.record-size
> ---
>
> Key: FLINK-37255
> URL: https://issues.apache.org/jira/browse/FLINK-37255
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Jiabao Sun
>Assignee: Jiabao Sun
>Priority: Blocker
> Fix For: mongodb-2.0.0
>
>
> The new configuration option scan.partition.record-size was introduced by 
> FLINK-36075, but it was not added to the optional options, making this 
> configuration item unusable.
> {code:sql}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Unsupported options found for 
> 'mongodb'.
> Unsupported options:
> scan.partition.record-size
> Supported options:
> collection
> connector
> database
> filter.handling.policy
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37156) A Flink pipeline supports collecting data once and writing it to multiple sinks.

2025-02-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37156:
---
Labels: pull-request-available  (was: )

> A Flink pipeline supports collecting data once and writing it to multiple 
> sinks.
> 
>
> Key: FLINK-37156
> URL: https://issues.apache.org/jira/browse/FLINK-37156
> Project: Flink
>  Issue Type: New Feature
>Affects Versions: cdc-3.3.0
>Reporter: cheng qian
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2025-01-24-09-45-15-720.png, 
> image-2025-01-24-09-46-05-142.png, image-2025-01-24-09-46-52-848.png
>
>
> Is there any plan in the community to support the scenario of multiple sinks? 
> We want to synchronize data from a MySQL database to StarRocks, Paimon, and 
> Kafka. Currently, we need to create three Pipelines, which triples the I/O 
> load on the MySQL database. We envision defining three sinks within a single 
> Pipeline, enabling data to be collected once and written to three different 
> downstream systems.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37255) Unsupported options: scan.partition.record-size

2025-02-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun updated FLINK-37255:
---
Priority: Blocker  (was: Major)

> Unsupported options: scan.partition.record-size
> ---
>
> Key: FLINK-37255
> URL: https://issues.apache.org/jira/browse/FLINK-37255
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Jiabao Sun
>Assignee: Jiabao Sun
>Priority: Blocker
> Fix For: mongodb-2.0.0
>
>
> The new configuration option scan.partition.record-size was introduced by 
> FLINK-36075, but it was not added to the optional options, making this 
> configuration item unusable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-37255) Unsupported options: scan.partition.record-size

2025-02-05 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923952#comment-17923952
 ] 

yux commented on FLINK-37255:
-

I'll prepare a patch for this.

> Unsupported options: scan.partition.record-size
> ---
>
> Key: FLINK-37255
> URL: https://issues.apache.org/jira/browse/FLINK-37255
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Jiabao Sun
>Assignee: Jiabao Sun
>Priority: Blocker
> Fix For: mongodb-2.0.0
>
>
> The new configuration option scan.partition.record-size was introduced by 
> FLINK-36075, but it was not added to the optional options, making this 
> configuration item unusable.
> {code:sql}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Unsupported options found for 
> 'mongodb'.
> Unsupported options:
> scan.partition.record-size
> Supported options:
> collection
> connector
> database
> filter.handling.policy
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-37255) Unsupported options: scan.partition.record-size

2025-02-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun reassigned FLINK-37255:
--

Assignee: yux  (was: Jiabao Sun)

> Unsupported options: scan.partition.record-size
> ---
>
> Key: FLINK-37255
> URL: https://issues.apache.org/jira/browse/FLINK-37255
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Jiabao Sun
>Assignee: yux
>Priority: Blocker
> Fix For: mongodb-2.0.0
>
>
> The new configuration option scan.partition.record-size was introduced by 
> FLINK-36075, but it was not added to the optional options, making this 
> configuration item unusable.
> {code:sql}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Unsupported options found for 
> 'mongodb'.
> Unsupported options:
> scan.partition.record-size
> Supported options:
> collection
> connector
> database
> filter.handling.policy
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-37256) Firing timers can block recovery process

2025-02-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923966#comment-17923966
 ] 

Piotr Nowojski edited comment on FLINK-37256 at 2/5/25 9:02 AM:


I can see two broad solutions.

1. Use something similar to the splittable timers mechanism during recovery to 
prevent operators from firing timers.

For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a 
similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to 
return without firing any timers when subtask is not RUNNING.

The problem with that approach is that currently if we interrupt 
{{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. 
So if we just {{return false}} without any further changes, the mailbox 
executor would just again try to fire the timers, this time via mailbox, before 
processing remaining in-flight records leading to a live lock. There might be 
some easy solution to this problem.

2. Alternative approach might be to just filter out watermarks from unaligned 
checkpoints in-flight data.

Currently this seems to be fine, as the mechanism of persisting watermarks in 
the in-flight data is very dubious. Already processed watermarks by the 
operators are not persisted in any way, so there is just a small random chance 
that some watermark(s) will be persisted as in-flight data - and only those 
will be persisted across recovery. So either way, we must relay on the 
watermark generators to re-emit new watermarks after recovery. Filtering out 
watermarks from the in-flight records would be more consistent.

Watermarks that are freshly generated, that were not persisted the in-flight 
data are not a problem, as this predominantly happens only in source tasks, and 
so due to lack of the input in-flight data for the source tasks, those 
watermarks are already generated in the subtask's RUNNING state.

Downside of this approach is: if we decide to finally persist watermarks across 
recoveries, we would have to re-introduce watermarks back to the in-flight 
data. However I'm not sure if we will ever be able to implement persisting 
watermarks, especially across rescalings.

Regardless of that, I would lean towards the option 2.


was (Author: pnowojski):
I can see two broad solutions.

1. Use something similar to the splittable timers mechanism during recovery to 
prevent operators from firing timers.

For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a 
similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to 
return without firing any timers when subtask is not RUNNING.

The problem with that approach is that currently if we interrupt 
{{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. 
So if we just {{return false}} without any further changes, the mailbox 
executor would just again try to fire the timers, this time via mailbox, before 
processing remaining in-flight records leading to a live lock. There might be 
some easy solution to this problem.

2. Alternative approach might be to just filter out watermarks from unaligned 
checkpoints in-flight data.

Currently this seems to be fine, as the mechanism of persisting watermarks in 
the in-flight data is very dubious. Already processed watermarks by the 
operators are not persisted in any way, so there is just a small random chance 
that some watermark(s) will be persisted as in-flight data - and only those 
will be persisted across recovery. So either way, we must relay on the 
watermark generators to re-emit new watermarks after recovery. Filtering out 
watermarks from the in-flight records would be more consistent.

Watermarks that are freshly generated, that were not persisted the in-flight 
data are not a problem, as this predominantly happens only in source tasks, and 
so due to lack of the input in-flight data for the source tasks, those 
watermarks are already generated in the subtask's RUNNING state.

Downside of this approach is:
* if we decide to finally persist watermarks across recoveries, we would have 
to re-introduce watermarks back to the in-flight data. However I'm not sure if 
we will ever be able to implement persisting watermarks, especially across 
rescalings.

> Firing timers can block recovery process
> 
>
> Key: FLINK-37256
> URL: https://issues.apache.org/jira/browse/FLINK-37256
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Task
>Affects Versions: 2.0.0, 1.20.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Splitable/interruptible timers for checkpointnig were introduced in 
> FLINK-20217 as part of the 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing
> 

[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’

2025-02-05 Thread caixiaowei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

caixiaowei updated FLINK-37254:
---
Description: 
When the table schema contains the following field:

{color:#00}`tenant_id`{color} 
{color:#80}bigint{color}({color:#ff}20{color}) 
{color:#80}unsigned{color} {color:#80}NOT{color} 
{color:#80}NULL{color} {color:#80}DEFAULT{color} 
{color:#008000}'0'{color} {color:#00}COMMENT{color} {color:#008000}'Tenant 
ID'{color},

 

The following exception is thrown:

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
    at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
    at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
    at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.debezium.DebeziumException: Failed to set field default value for 
'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default 
value is 0 of type class java.lang.String
    at 
io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421)
    at 
io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
java.u

[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’

2025-02-05 Thread caixiaowei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

caixiaowei updated FLINK-37254:
---
Attachment: 1.png

> Flink CDC OceanBase Connector Throws Exception for Column Default Value 
> ‘DEFAULT ‘0’
> 
>
> Key: FLINK-37254
> URL: https://issues.apache.org/jira/browse/FLINK-37254
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
> Environment: flink 1.18.1
> flink cdc 3.1.0
> oceanbase 5.7.25-OceanBase_CE-v4.1.0.2
>  
>Reporter: caixiaowei
>Priority: Major
>  Labels: pull-request-available
> Attachments: 1.png
>
>
> When the table schema contains the following field:
> {color:#00}`tenant_id`{color} 
> {color:#80}bigint{color}({color:#ff}20{color}) 
> {color:#80}unsigned{color} {color:#80}NOT{color} 
> {color:#80}NULL{color} {color:#80}DEFAULT{color} 
> {color:#008000}'0'{color} {color:#00}COMMENT{color} 
> {color:#008000}'Tenant ID'{color},
>  
> The following exception is thrown:
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>     at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
>     at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
>     at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
>     at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>     at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>     at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>     at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>     at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>     at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>     at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
>     at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
>     at 
> java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
>     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
> 

Re: [PR] [FLINK-36648] Bump Flink version to Flink 2.0-preview1 [flink-connector-kafka]

2025-02-05 Thread via GitHub


lvyanquan commented on PR #140:
URL: 
https://github.com/apache/flink-connector-kafka/pull/140#issuecomment-2636096310

   Hi, @PatrickRen can you help to review again and help to complete this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37144] Update version to 4.0-SNAPSHOT. [flink-connector-kafka]

2025-02-05 Thread via GitHub


lvyanquan commented on PR #149:
URL: 
https://github.com/apache/flink-connector-kafka/pull/149#issuecomment-2636103200

   Hi, @PatrickRen can you help to review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’

2025-02-05 Thread caixiaowei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

caixiaowei updated FLINK-37254:
---
Description: 
When the table schema contains the following field:

:

{color:#00}`tenant_id`{color} 
{color:#80}bigint{color}({color:#ff}20{color}) 
{color:#80}unsigned{color} {color:#80}NOT{color} 
{color:#80}NULL{color} {color:#80}DEFAULT{color} 
{color:#008000}'0'{color} {color:#00}COMMENT{color} 
{color:#008000}'租户ID'{color},

 

The following exception is thrown:

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
    at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
    at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
    at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.debezium.DebeziumException: Failed to set field default value for 
'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default 
value is 0 of type class java.lang.String
    at 
io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421)
    at 
io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
java.uti

[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’

2025-02-05 Thread caixiaowei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

caixiaowei updated FLINK-37254:
---
Summary: Flink CDC OceanBase Connector Throws Exception for Column Default 
Value ‘DEFAULT ‘0’  (was: flink cdc的oceanbase cdc 存在表字段的默认值为default '0'抛出异常)

> Flink CDC OceanBase Connector Throws Exception for Column Default Value 
> ‘DEFAULT ‘0’
> 
>
> Key: FLINK-37254
> URL: https://issues.apache.org/jira/browse/FLINK-37254
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
> Environment: flink 1.18.1
> flink cdc 3.1.0
> oceanbase 5.7.25-OceanBase_CE-v4.1.0.2
>  
>Reporter: caixiaowei
>Priority: Major
>  Labels: pull-request-available
>
> 当表结构的字段存在:
> {color:#00}{color:#006464}`tenant_id`{color} 
> {color:#80}bigint{color}({color:#ff}20{color}) 
> {color:#80}unsigned{color} {color:#80}NOT{color} 
> {color:#80}NULL{color} {color:#80}DEFAULT{color} 
> {color:#008000}'0'{color} {color:#00}COMMENT{color} 
> {color:#008000}'租户ID'{color},{color}
>  
> 会抛出以下异常:
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>     at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
>     at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
>     at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
>     at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>     at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>     at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>     at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>     at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>     at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>     at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
>     at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
>     at 
> java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
>     at 
> java.util.concurrent.Fork

Re: [PR] [BP-2.0][FLINK-37021][state/forst] Fix incorrect paths when reusing and creating files. [flink]

2025-02-05 Thread via GitHub


flinkbot commented on PR #26106:
URL: https://github.com/apache/flink/pull/26106#issuecomment-2636105860

   
   ## CI report:
   
   * 0f17c6288fb5f885c235bb7aa3e7dd78de2ade92 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-37255] Fix unable to configure `SCAN_PARTITION_RECORD_SIZE` option [flink-connector-mongodb]

2025-02-05 Thread via GitHub


yuxiqian opened a new pull request, #51:
URL: https://github.com/apache/flink-connector-mongodb/pull/51

   This closes FLINK-37255.
   
   `scan.partition.record-size` option was introduced in FLINK-36075, but 
corresponding config manifest wasn't updated, which makes it impossible for 
Flink SQL users to configure it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-37021][state/forst] Fix incorrect paths when reusing and creating files. [flink]

2025-02-05 Thread via GitHub


AlexYinHan opened a new pull request, #26106:
URL: https://github.com/apache/flink/pull/26106

   
   
   ## What is the purpose of the change
   
   This pull request addresses several bugs in ForSt that may prevent proper 
file reuse, leading to slower or failed snapshot and restoration operations.
   
   
   ## Brief change log
   
 - Make ```DataTransferStrategyBuilder``` function properly.
   - Use ```URI``` to tell whether two ```FileSystem``` are the same.
   - Use 'Reuse' strategy for Restoration only in CLAIM mode
   - Use 'Reuse' strategy for Snapshot only when ```SharingFilesStrategy``` 
is 'FORWARD_BACKWARD'
 - Ensure we correctly distinguish between ```dbFilePath``` and 
```realSourcePath```.
 - During Snapshots: do not copy file if it is already owned by JM.
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’

2025-02-05 Thread caixiaowei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

caixiaowei updated FLINK-37254:
---
Description: 
When the table schema contains the following field:

:

{color:#00}`tenant_id`{color} 
{color:#80}bigint{color}({color:#ff}20{color}) 
{color:#80}unsigned{color} {color:#80}NOT{color} 
{color:#80}NULL{color} {color:#80}DEFAULT{color} 
{color:#008000}'0'{color} {color:#00}COMMENT{color} {color:#008000}'Tenant 
ID'{color},

 

The following exception is thrown:

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
    at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
    at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
    at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.debezium.DebeziumException: Failed to set field default value for 
'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default 
value is 0 of type class java.lang.String
    at 
io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421)
    at 
io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
jav

[jira] [Commented] (FLINK-37256) Firing timers can block recovery process

2025-02-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923966#comment-17923966
 ] 

Piotr Nowojski commented on FLINK-37256:


I can see two broad solutions.

1. Use something similar to the splittable timers mechanism during recovery to 
prevent operators from firing timers.

For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a 
similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to 
return without firing any timers when subtask is not RUNNING.

The problem with that approach is that currently if we interrupt 
{{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. 
So if we just {{return false}} without any further changes, the mailbox 
executor would just again try to fire the timers, this time via mailbox, before 
processing remaining in-flight records leading to a live lock. There might be 
some easy solution to this problem.

2. Alternative approach might be to just filter out watermarks from unaligned 
checkpoints in-flight data.

Currently this seems to be fine, as the mechanism of persisting watermarks in 
the in-flight data is very dubious. Already processed watermarks by the 
operators are not persisted in any way, so there is just a small random chance 
that some watermark(s) will be persisted as in-flight data - and only those 
will be persisted across recovery. So either way, we must relay on the 
watermark generators to re-emit new watermarks after recovery. Filtering out 
watermarks from the in-flight records would be more consistent.

Watermarks that are freshly generated, that were not persisted the in-flight 
data are not a problem, as this predominantly happens only in source tasks, and 
so due to lack of the input in-flight data for the source tasks, those 
watermarks are already generated in the subtask's RUNNING state.

Downside of this approach is:
* if we decide to finally persist watermarks across recoveries, we would have 
to re-introduce watermarks back to the in-flight data

> Firing timers can block recovery process
> 
>
> Key: FLINK-37256
> URL: https://issues.apache.org/jira/browse/FLINK-37256
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Task
>Affects Versions: 2.0.0, 1.20.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Splitable/interruptible timers for checkpointnig were introduced in 
> FLINK-20217 as part of the 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing
>  .
> However the exact same problem can happen during recovery. Usually (only?) 
> due to a watermark that was caught along the in-flight data, that is being 
> processed during a subtask's "INITIALIZATION" phase. The problem is now that 
> while we are in the initialization phase, job can not perform any 
> checkpoints. This issue is compounded if there is some data multiplication 
> operator in the pipeline, downstream from the operator that has a lot of 
> timers to fire. What can happen then is:
> * some upstream operator A is firing a lot of timers, that produce a lot of 
> data (for example 100 000 records) while it's still INITIALIZING
> * those records are multiplied downstream (operators B, C, ...) by for 
> example factor of 100x
> * in the end, sinks have to accept ~100 000 * 100 records before that 
> upstream operator A can finish processing in-flight data and switch to RUNNING
> This can take hours.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’

2025-02-05 Thread caixiaowei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

caixiaowei updated FLINK-37254:
---
Description: 
When the table schema contains the following field:

{color:#00}`tenant_id`{color} 
{color:#80}bigint{color}({color:#ff}20{color}) 
{color:#80}unsigned{color} {color:#80}NOT{color} 
{color:#80}NULL{color} {color:#80}DEFAULT{color} 
{color:#008000}'0'{color} {color:#00}COMMENT{color} {color:#008000}'Tenant 
ID'{color},

 

The following exception is thrown:

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
    at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
    at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
    at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.debezium.DebeziumException: Failed to set field default value for 
'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default 
value is 0 of type class java.lang.String
    at 
io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421)
    at 
io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
java.u

[jira] [Comment Edited] (FLINK-37256) Firing timers can block recovery process

2025-02-05 Thread Piotr Nowojski (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923966#comment-17923966
 ] 

Piotr Nowojski edited comment on FLINK-37256 at 2/5/25 8:56 AM:


I can see two broad solutions.

1. Use something similar to the splittable timers mechanism during recovery to 
prevent operators from firing timers.

For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a 
similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to 
return without firing any timers when subtask is not RUNNING.

The problem with that approach is that currently if we interrupt 
{{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. 
So if we just {{return false}} without any further changes, the mailbox 
executor would just again try to fire the timers, this time via mailbox, before 
processing remaining in-flight records leading to a live lock. There might be 
some easy solution to this problem.

2. Alternative approach might be to just filter out watermarks from unaligned 
checkpoints in-flight data.

Currently this seems to be fine, as the mechanism of persisting watermarks in 
the in-flight data is very dubious. Already processed watermarks by the 
operators are not persisted in any way, so there is just a small random chance 
that some watermark(s) will be persisted as in-flight data - and only those 
will be persisted across recovery. So either way, we must relay on the 
watermark generators to re-emit new watermarks after recovery. Filtering out 
watermarks from the in-flight records would be more consistent.

Watermarks that are freshly generated, that were not persisted the in-flight 
data are not a problem, as this predominantly happens only in source tasks, and 
so due to lack of the input in-flight data for the source tasks, those 
watermarks are already generated in the subtask's RUNNING state.

Downside of this approach is:
* if we decide to finally persist watermarks across recoveries, we would have 
to re-introduce watermarks back to the in-flight data. However I'm not sure if 
we will ever be able to implement persisting watermarks, especially across 
rescalings.


was (Author: pnowojski):
I can see two broad solutions.

1. Use something similar to the splittable timers mechanism during recovery to 
prevent operators from firing timers.

For example in {{InternalTimerServiceImpl#tryAdvanceWatermark}} we could add a 
similar hook to the {{shouldStopAdvancingFn}}, that would tell the code to 
return without firing any timers when subtask is not RUNNING.

The problem with that approach is that currently if we interrupt 
{{tryAdvanceWatermark}}, it's firing is immediately re-enqueued to the mailbox. 
So if we just {{return false}} without any further changes, the mailbox 
executor would just again try to fire the timers, this time via mailbox, before 
processing remaining in-flight records leading to a live lock. There might be 
some easy solution to this problem.

2. Alternative approach might be to just filter out watermarks from unaligned 
checkpoints in-flight data.

Currently this seems to be fine, as the mechanism of persisting watermarks in 
the in-flight data is very dubious. Already processed watermarks by the 
operators are not persisted in any way, so there is just a small random chance 
that some watermark(s) will be persisted as in-flight data - and only those 
will be persisted across recovery. So either way, we must relay on the 
watermark generators to re-emit new watermarks after recovery. Filtering out 
watermarks from the in-flight records would be more consistent.

Watermarks that are freshly generated, that were not persisted the in-flight 
data are not a problem, as this predominantly happens only in source tasks, and 
so due to lack of the input in-flight data for the source tasks, those 
watermarks are already generated in the subtask's RUNNING state.

Downside of this approach is:
* if we decide to finally persist watermarks across recoveries, we would have 
to re-introduce watermarks back to the in-flight data

> Firing timers can block recovery process
> 
>
> Key: FLINK-37256
> URL: https://issues.apache.org/jira/browse/FLINK-37256
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Task
>Affects Versions: 2.0.0, 1.20.0
>Reporter: Piotr Nowojski
>Priority: Major
>
> Splitable/interruptible timers for checkpointnig were introduced in 
> FLINK-20217 as part of the 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing
>  .
> However the exact same problem can happen during recovery. Usually (only?) 
> due to a watermark that was caught along the in-flight data, that is being 
> process

[jira] [Resolved] (FLINK-36929) SQL connector for keyed savepoint data

2025-02-05 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi resolved FLINK-36929.
---
Fix Version/s: 2.1.0
   Resolution: Fixed

[{{a4eb676}}|https://github.com/apache/flink/commit/a4eb67642a826b941b02c7221846840326c3ff55]
 on master

> SQL connector for keyed savepoint data
> --
>
> Key: FLINK-36929
> URL: https://issues.apache.org/jira/browse/FLINK-36929
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 2.0-preview
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36929) SQL connector for keyed savepoint data

2025-02-05 Thread Gabor Somogyi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi closed FLINK-36929.
-

> SQL connector for keyed savepoint data
> --
>
> Key: FLINK-36929
> URL: https://issues.apache.org/jira/browse/FLINK-36929
> Project: Flink
>  Issue Type: Improvement
>  Components: Table SQL / API
>Affects Versions: 2.0-preview
>Reporter: Gabor Somogyi
>Assignee: Gabor Somogyi
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.1.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-32738][formats] PROTOBUF format supports projection push down [flink]

2025-02-05 Thread via GitHub


zhougit86 commented on PR #23323:
URL: https://github.com/apache/flink/pull/23323#issuecomment-2636294560

   > @zhougit86 @ljw-hit Hi, is there a plan to revive effort on it?
   
   Ok let me look into conflict first


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-37254] Add OceanBaseDefaultValueConverter to fix table schema validation [flink-cdc]

2025-02-05 Thread via GitHub


whhe opened a new pull request, #3907:
URL: https://github.com/apache/flink-cdc/pull/3907

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37254) flink cdc的oceanbase cdc 存在表字段的默认值为default '0'抛出异常

2025-02-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37254:
---
Labels: pull-request-available  (was: )

> flink cdc的oceanbase cdc 存在表字段的默认值为default '0'抛出异常
> -
>
> Key: FLINK-37254
> URL: https://issues.apache.org/jira/browse/FLINK-37254
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
> Environment: flink 1.18.1
> flink cdc 3.1.0
> oceanbase 5.7.25-OceanBase_CE-v4.1.0.2
>  
>Reporter: caixiaowei
>Priority: Major
>  Labels: pull-request-available
>
> 当表结构的字段存在:
> {color:#00}{color:#006464}`tenant_id`{color} 
> {color:#80}bigint{color}({color:#ff}20{color}) 
> {color:#80}unsigned{color} {color:#80}NOT{color} 
> {color:#80}NULL{color} {color:#80}DEFAULT{color} 
> {color:#008000}'0'{color} {color:#00}COMMENT{color} 
> {color:#008000}'租户ID'{color},{color}
>  
> 会抛出以下异常:
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
>     at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
>     at 
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
>     at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
>     at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>     at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>     at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>     at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>     at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>     at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
>     at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
>     at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
>     at 
> java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
>     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
>     at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
>     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
>     at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
> Caused by: io.debezium.DebeziumException: Failed to set field default value 
> for 'base_system.sys_department.tenant_id' of

Re: [PR] [FLINK-37021][state/forst] Fix incorrect paths when reusing files for checkpoints. [flink]

2025-02-05 Thread via GitHub


Zakelly merged PR #26040:
URL: https://github.com/apache/flink/pull/26040


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-37021) Implement fast checkpoint/rescaling for ForStKeyedStateBackend

2025-02-05 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923962#comment-17923962
 ] 

Zakelly Lan commented on FLINK-37021:
-

Follow up PR:
* master: f975783ce9a688a99e2f74fe6f2f5ad204589fdc

> Implement fast checkpoint/rescaling for ForStKeyedStateBackend
> --
>
> Key: FLINK-37021
> URL: https://issues.apache.org/jira/browse/FLINK-37021
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Assignee: Han Yin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37254) Flink CDC OceanBase Connector Throws Exception for Column Default Value ‘DEFAULT ‘0’

2025-02-05 Thread caixiaowei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

caixiaowei updated FLINK-37254:
---
Description: 
When the table schema contains the following field:

{color:#00}`tenant_id`{color} 
{color:#80}bigint{color}({color:#ff}20{color}) 
{color:#80}unsigned{color} {color:#80}NOT{color} 
{color:#80}NULL{color} {color:#80}DEFAULT{color} 
{color:#008000}'0'{color} {color:#00}COMMENT{color} {color:#008000}'Tenant 
ID'{color},

The following exception is thrown:

org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:180)
    at 
org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:277)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:268)
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:261)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:787)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:764)
    at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
    at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
    at java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
    at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: io.debezium.DebeziumException: Failed to set field default value for 
'base_system.sys_department.tenant_id' of type BIGINT UNSIGNED, the default 
value is 0 of type class java.lang.String
    at 
io.debezium.relational.TableSchemaBuilder.addField(TableSchemaBuilder.java:421)
    at 
io.debezium.relational.TableSchemaBuilder.lambda$create$2(TableSchemaBuilder.java:149)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at 
java.util

Re: [PR] [FLINK-37236] Flink 2.0 preview support [flink-kubernetes-operator]

2025-02-05 Thread via GitHub


afedulov commented on code in PR #938:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/938#discussion_r1942507092


##
flink-kubernetes-operator-api/pom.xml:
##
@@ -55,6 +55,18 @@ under the License.
 
 
 
+
+org.apache.flink
+flink-core-api
+${flink.version}
+

Review Comment:
   Maybe a comment why such exclusion is needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37255] Fix unable to configure `SCAN_PARTITION_RECORD_SIZE` option [flink-connector-mongodb]

2025-02-05 Thread via GitHub


yuxiqian commented on PR #51:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/51#issuecomment-2636181379

   Thanks for @Jiabao-Sun's kind review! Addressed comments in latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36439][docs] Documents for Disaggregate State and new State APIs [flink]

2025-02-05 Thread via GitHub


Zakelly opened a new pull request, #26107:
URL: https://github.com/apache/flink/pull/26107

   ## What is the purpose of the change
   
   English documents for Disaggregate State and new State APIs
   
   
   ## Brief change log
   
   Still writting... will add commit if finished.
   
- Documents for State API V2
   
   
   ## Verifying this change
   
   This change is a docs work without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency):  no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36439) Documentation for Disaggregated State Storage and Management

2025-02-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36439:
---
Labels: pull-request-available  (was: )

> Documentation for Disaggregated State Storage and Management 
> -
>
> Key: FLINK-36439
> URL: https://issues.apache.org/jira/browse/FLINK-36439
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Runtime / Checkpointing, Runtime / State 
> Backends, Runtime / Task
>Reporter: Zakelly Lan
>Assignee: Zakelly Lan
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [hotfix][table-planner] Use equals instead of '==' to compare digests in FlinkCalcMergeRule [flink]

2025-02-05 Thread via GitHub


lincoln-lil opened a new pull request, #26108:
URL: https://github.com/apache/flink/pull/26108

   ## What is the purpose of the change
   This is a trivial fix for string comparison in `FlinkCalcMergeRule`.  
   However it doesn't actually affect the optimization results, so no new cases 
were added to the fix.
   But we can verify this detail by debugging calcrule's optimization process 
with the following case(add to `FlinkCalcMergeRuleTest`):
   ```scala
   @Test
   def testCalcMergeWithTrivialCalc(): Unit = {
 val testTable = "MyTable"
 val relBuilder = util.getPlanner.plannerContext.createRelBuilder()
 val flinkLogicalTraits = 
relBuilder.getCluster.traitSetOf(FlinkConventions.LOGICAL)
 val table = relBuilder.getRelOptSchema
   .asInstanceOf[CalciteCatalogReader]
   .getTable(ImmutableList.of(testTable))
   .asInstanceOf[FlinkPreparingTableBase]
 val flinkLogicalTableScan = new FlinkLogicalDataStreamTableScan(
   relBuilder.getCluster,
   flinkLogicalTraits,
   Collections.emptyList[RelHint](),
   table)
   
 relBuilder.scan(testTable)
 val logicScan = relBuilder.peek(0)
 val rowType = logicScan.getRowType
 val projects = (0 until (rowType.getFieldCount)).map(f => 
relBuilder.field(f)).toList
 val program = RexProgram.create(
   rowType,
   projects,
   null,
   rowType,
   relBuilder.getRexBuilder
 )
   
 val bottomCalc = FlinkLogicalCalc.create(flinkLogicalTableScan, program)
 // create a trivial calc
 val topCalc = FlinkLogicalCalc.create(bottomCalc, program)
   
 val optimizedRels = util.getPlanner.optimize(Array(topCalc))
 print(bottomCalc.getRelDigest, optimizedRels.get(0).getRelDigest)
 
assertThat(bottomCalc.getRelDigest).isEqualTo(optimizedRels.get(0).getRelDigest)
 //util.assertPlanEquals(Array(topCalc), Array.empty[ExplainDetail], 
withRowType = false, Array(PlanKind.OPT_REL), () => Unit)
   }
   ```
   
   ## Brief change log
   fix string comparison
   
   ## Verifying this change
   existing tests
   
   ## Does this pull request potentially affect one of the following parts:
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
@Public(Evolving): (no)
 - The serializers: (no )
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
 - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] Update copyright NOTICE year to 2025 [flink]

2025-02-05 Thread via GitHub


yangjf2019 closed pull request #25916: [hotfix] Update copyright NOTICE year to 
2025
URL: https://github.com/apache/flink/pull/25916


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37255] Fix unable to configure scan.partition.record-size option [flink-connector-mongodb]

2025-02-05 Thread via GitHub


Jiabao-Sun merged PR #51:
URL: https://github.com/apache/flink-connector-mongodb/pull/51


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-37255) Unsupported options: scan.partition.record-size

2025-02-05 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923982#comment-17923982
 ] 

Jiabao Sun edited comment on FLINK-37255 at 2/5/25 11:11 AM:
-

Fixed via mongodb repo 
main: 123334b8a70e5b3cccfeab6b8e083fbcd7e617d2
v2.0: e9d484b1e040de9d9c4f667ac941a2081348d435


was (Author: JIRAUSER304154):
Fixed via mongodb repo 
main: 123334b8a70e5b3cccfeab6b8e083fbcd7e617d2

> Unsupported options: scan.partition.record-size
> ---
>
> Key: FLINK-37255
> URL: https://issues.apache.org/jira/browse/FLINK-37255
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Jiabao Sun
>Assignee: yux
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: mongodb-2.0.0
>
>
> The new configuration option scan.partition.record-size was introduced by 
> FLINK-36075, but it was not added to the optional options, making this 
> configuration item unusable.
> {code:sql}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Unsupported options found for 
> 'mongodb'.
> Unsupported options:
> scan.partition.record-size
> Supported options:
> collection
> connector
> database
> filter.handling.policy
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [BP-2.0][FLINK-37255] Fix unable to configure scan.partition.record-size option [flink-connector-mongodb]

2025-02-05 Thread via GitHub


Jiabao-Sun merged PR #52:
URL: https://github.com/apache/flink-connector-mongodb/pull/52


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-37255) Unsupported options: scan.partition.record-size

2025-02-05 Thread Jiabao Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiabao Sun resolved FLINK-37255.

Resolution: Fixed

> Unsupported options: scan.partition.record-size
> ---
>
> Key: FLINK-37255
> URL: https://issues.apache.org/jira/browse/FLINK-37255
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Jiabao Sun
>Assignee: yux
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: mongodb-2.0.0
>
>
> The new configuration option scan.partition.record-size was introduced by 
> FLINK-36075, but it was not added to the optional options, making this 
> configuration item unusable.
> {code:sql}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Unsupported options found for 
> 'mongodb'.
> Unsupported options:
> scan.partition.record-size
> Supported options:
> collection
> connector
> database
> filter.handling.policy
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36161][docs]Update Integration Test Example with Sink API. [flink]

2025-02-05 Thread via GitHub


RanJinh commented on PR #25590:
URL: https://github.com/apache/flink/pull/25590#issuecomment-2636439765

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37236] Flink 2.0 preview support [flink-kubernetes-operator]

2025-02-05 Thread via GitHub


gyfora commented on code in PR #938:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/938#discussion_r1942746157


##
flink-kubernetes-operator-api/pom.xml:
##
@@ -55,6 +55,18 @@ under the License.
 
 
 
+
+org.apache.flink
+flink-core-api
+${flink.version}
+

Review Comment:
   This hasn't changed compared to what was before here, we already had the 
exclusions in the api package as we do not require the transitive dependencies 
for that. The operator core still pulls the transitive deps as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-35687) JSON_QUERY should return a well formatted nested objects/arrays for ARRAY

2025-02-05 Thread Dawid Wysakowicz (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-35687:
-
Fix Version/s: 2.0-preview
   (was: 1.20.0)

> JSON_QUERY should return a well formatted nested objects/arrays for 
> ARRAY
> -
>
> Key: FLINK-35687
> URL: https://issues.apache.org/jira/browse/FLINK-35687
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0-preview
>
>
> {code}
> SELECT JSON_QUERY('{"items": [{"itemId":1234, "count":10}]}', '$.items' 
> RETURNING ARRAY)
> {code}
> returns
> {code}
> ['{itemId=1234, count=10}']
> {code}
> but it should:
> {code}
> ['{"itemId":1234, "count":10}']
> {code}
> We should call jsonize for Collection types here: 
> https://github.com/apache/flink/blob/f6f88135b3a5fa5616fe905346e5ab6dce084555/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java#L268



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37242] Add labels for flink-operator-webhook-service [flink-kubernetes-operator]

2025-02-05 Thread via GitHub


gyfora merged PR #939:
URL: https://github.com/apache/flink-kubernetes-operator/pull/939


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-37242) Enable setting custom labels for flink-operator-webhook-service

2025-02-05 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-37242.
--
  Assignee: thanh tung dao
Resolution: Fixed

merged to main 67aa78b1cc52acba4e73dc348d98196617464cb1

> Enable setting custom labels for flink-operator-webhook-service
> ---
>
> Key: FLINK-37242
> URL: https://issues.apache.org/jira/browse/FLINK-37242
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.10.0
>Reporter: thanh tung dao
>Assignee: thanh tung dao
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.11.0
>
>
> In the mutation webhook yaml of the helm chart, it is not possible to create 
> custom labels for the flink-operator-webhook-service.
> Sometimes, there are requirements from the centralised platform team that the 
> Service resource must be tagged with certain labels in order for it to pass 
> the Admission Controller ( OPA Gatekeeper for example)
>  
> My plan is to open a PR to enable this feature in values.yaml file of the 
> helm chart



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37258][runtime] Return Ordered Job list on Disptacher#requestMultipleJobDetails [flink]

2025-02-05 Thread via GitHub


flinkbot commented on PR #26111:
URL: https://github.com/apache/flink/pull/26111#issuecomment-2637257809

   
   ## CI report:
   
   * 94244da081bfe60e66ddf0bf49c176d7881f6983 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37253] Add state size in application status and deployment metrics [flink-kubernetes-operator]

2025-02-05 Thread via GitHub


mxm commented on code in PR #941:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/941#discussion_r1943210819


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##
@@ -403,6 +403,28 @@ public static Long 
calculateClusterMemoryUsage(Configuration conf, int taskManag
 return tmTotalMemory + jmTotalMemory;
 }
 
+public static Long calculateClusterStateSize(Configuration conf, int 
taskManagerReplicas) {

Review Comment:
   I didn't see your comment in 
https://github.com/apache/flink-kubernetes-operator/pull/941#discussion_r1943008300,
 it was somehow hidden when I replied.
   
   Sorry, this code was unused code. I have removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37258) Order Flink Job list in Flink UI with start time

2025-02-05 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-37258:
--
Fix Version/s: (was: 2.1.0)

> Order Flink Job list in Flink UI with start time
> 
>
> Key: FLINK-37258
> URL: https://issues.apache.org/jira/browse/FLINK-37258
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.19.1, 2.0-preview
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Minor
>  Labels: pull-request-available
> Attachments: Screen Recording 2025-02-05 at 12.33.53.mov
>
>
> h2. Description
> When having multiple jobs, Flink UI job lists have non-deterministic 
> ordering. in fact the ordering changes continuously which is annoying, The 
> issue is demoed in the attached video.
> h2. Acceptance Criteria
> Jobs in Flink Dashboard UI are ordered by start time consistently



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37259) StreamCheckpointingITCase times out

2025-02-05 Thread Matthias Pohl (Jira)
Matthias Pohl created FLINK-37259:
-

 Summary: StreamCheckpointingITCase times out
 Key: FLINK-37259
 URL: https://issues.apache.org/jira/browse/FLINK-37259
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 2.0.0, 2.1.0
Reporter: Matthias Pohl


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=65800&view=logs&j=5c8e7682-d68f-54d1-16a2-a09310218a49&t=86f654fa-ab48-5c1a-25f4-7e7f6afb9bba&l=10103

{code}
"main" #1 prio=5 os_prio=0 cpu=3360.33ms elapsed=12292.07s 
tid=0x7fc4cd89c000 nid=0x7a6d waiting on condition  [0x7fc4cdeb3000]
   java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17.0.7/Native Method)
- parking to wait for  <0xa160fa30> (a 
java.util.concurrent.CompletableFuture$Signaller)
at 
java.util.concurrent.locks.LockSupport.park(java.base@17.0.7/LockSupport.java:211)
at 
java.util.concurrent.CompletableFuture$Signaller.block(java.base@17.0.7/CompletableFuture.java:1864)
at 
java.util.concurrent.ForkJoinPool.unmanagedBlock(java.base@17.0.7/ForkJoinPool.java:3463)
at 
java.util.concurrent.ForkJoinPool.managedBlock(java.base@17.0.7/ForkJoinPool.java:3434)
at 
java.util.concurrent.CompletableFuture.waitingGet(java.base@17.0.7/CompletableFuture.java:1898)
at 
java.util.concurrent.CompletableFuture.get(java.base@17.0.7/CompletableFuture.java:2072)
at 
org.apache.flink.test.util.TestUtils.submitJobAndWaitForResult(TestUtils.java:102)
at 
org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase.runCheckpointedProgram(StreamFaultToleranceTestBase.java:136)
[...]
{code}

Parsing the Maven output reveals that {{StreamCheckpointingITCase}} is the test 
run that times out:
{code}
cat 165.txt | grep "Running \|Tests run: " | grep -o '[^ ]*$' | sort | uniq -c 
| grep -v "  2"
  [...]
  1 org.apache.flink.test.checkpointing.StreamCheckpointingITCase
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37253] Add state size in application status and deployment metrics [flink-kubernetes-operator]

2025-02-05 Thread via GitHub


mxm commented on code in PR #941:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/941#discussion_r1943120952


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -757,16 +778,35 @@ public Map getClusterInfo(Configuration 
conf) throws Exception {
 DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
 dashboardConfiguration.getFlinkRevision());
 }
+}
 
-var taskManagerReplicas = 
getTaskManagersInfo(conf).getTaskManagerInfos().size();
-clusterInfo.put(
-FIELD_NAME_TOTAL_CPU,
-String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, 
taskManagerReplicas)));
-clusterInfo.put(
-FIELD_NAME_TOTAL_MEMORY,
-String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, 
taskManagerReplicas)));
+private void populateStateSize(
+Configuration conf, @Nullable String jobId, Map 
clusterInfo)
+throws Exception {
+if (jobId != null) {
+try (RestClusterClient clusterClient = 
getClusterClient(conf)) {
+var checkpointingStatisticsHeaders = 
CheckpointingStatisticsHeaders.getInstance();
+var parameters = 
checkpointingStatisticsHeaders.getUnresolvedMessageParameters();
+
parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
 
-return clusterInfo;
+CheckpointingStatistics checkpointingStatistics =
+clusterClient
+.sendRequest(
+checkpointingStatisticsHeaders,
+parameters,
+EmptyRequestBody.getInstance())
+.get();
+CheckpointStatistics.CompletedCheckpointStatistics 
completedCheckpointStatistics =
+checkpointingStatistics
+.getLatestCheckpoints()
+.getCompletedCheckpointStatistics();
+if (completedCheckpointStatistics != null) {
+clusterInfo.put(
+FIELD_NAME_STATE_SIZE,
+
String.valueOf(completedCheckpointStatistics.getCheckpointedSize()));

Review Comment:
   Good question. It is a bit tricky to find this out, even after studying the 
code. Good news is that there should be a metric that is somewhat approximate 
to the actual full state / checkpoint size. It was introduced via 
https://issues.apache.org/jira/browse/FLINK-25557.
   
   Turns out, I was using the wrong metric because of a misleading JavaDoc in 
CheckpointStatistics. I've updated the PR. In the Rest API, there is 
`stateSize` and `checkpointedSize`. The former is the full state size, while 
the latter is the checkpointed size which various depending on whether 
checkpointing is incremental or not. 
   
   StateSize is based on 
https://github.com/apache/flink/blob/d0d44d6196c02552179ec23c799d9769e128a8ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateObject.java#L66.
 I'm not 100% sure whether it always yields the exact state size but it should 
be close.
   
   CheckpointedSize is based on 
https://github.com/apache/flink/blob/d0d44d6196c02552179ec23c799d9769e128a8ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java#L62.
   
   I wonder whether we should expose both?
   
   



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java:
##
@@ -403,6 +403,28 @@ public static Long 
calculateClusterMemoryUsage(Configuration conf, int taskManag
 return tmTotalMemory + jmTotalMemory;
 }
 
+public static Long calculateClusterStateSize(Configuration conf, int 
taskManagerReplicas) {

Review Comment:
   State size or checkpoint size isn't directly related to the cluster memory 
size. For the heap memory backend, we would expect the state size to be lower 
than the overall memory. For RocksDB, it could even exceed the cluster memory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37253] Add state size in application status and deployment metrics [flink-kubernetes-operator]

2025-02-05 Thread via GitHub


mxm commented on code in PR #941:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/941#discussion_r1943201542


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java:
##
@@ -757,16 +778,35 @@ public Map getClusterInfo(Configuration 
conf) throws Exception {
 DashboardConfiguration.FIELD_NAME_FLINK_REVISION,
 dashboardConfiguration.getFlinkRevision());
 }
+}
 
-var taskManagerReplicas = 
getTaskManagersInfo(conf).getTaskManagerInfos().size();
-clusterInfo.put(
-FIELD_NAME_TOTAL_CPU,
-String.valueOf(FlinkUtils.calculateClusterCpuUsage(conf, 
taskManagerReplicas)));
-clusterInfo.put(
-FIELD_NAME_TOTAL_MEMORY,
-String.valueOf(FlinkUtils.calculateClusterMemoryUsage(conf, 
taskManagerReplicas)));
+private void populateStateSize(
+Configuration conf, @Nullable String jobId, Map 
clusterInfo)
+throws Exception {
+if (jobId != null) {
+try (RestClusterClient clusterClient = 
getClusterClient(conf)) {
+var checkpointingStatisticsHeaders = 
CheckpointingStatisticsHeaders.getInstance();
+var parameters = 
checkpointingStatisticsHeaders.getUnresolvedMessageParameters();
+
parameters.jobPathParameter.resolve(JobID.fromHexString(jobId));
 
-return clusterInfo;
+CheckpointingStatistics checkpointingStatistics =
+clusterClient
+.sendRequest(
+checkpointingStatisticsHeaders,
+parameters,
+EmptyRequestBody.getInstance())
+.get();
+CheckpointStatistics.CompletedCheckpointStatistics 
completedCheckpointStatistics =
+checkpointingStatistics
+.getLatestCheckpoints()
+.getCompletedCheckpointStatistics();
+if (completedCheckpointStatistics != null) {
+clusterInfo.put(
+FIELD_NAME_STATE_SIZE,
+
String.valueOf(completedCheckpointStatistics.getCheckpointedSize()));

Review Comment:
   Perfect, then the PR is good as-is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-37258) Order Flink Job list in Flink UI with start time

2025-02-05 Thread Matthias Pohl (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl reassigned FLINK-37258:
-

Assignee: Ahmed Hamdy

> Order Flink Job list in Flink UI with start time
> 
>
> Key: FLINK-37258
> URL: https://issues.apache.org/jira/browse/FLINK-37258
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.19.1, 2.0-preview
>Reporter: Ahmed Hamdy
>Assignee: Ahmed Hamdy
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 2.1.0
>
> Attachments: Screen Recording 2025-02-05 at 12.33.53.mov
>
>
> h2. Description
> When having multiple jobs, Flink UI job lists have non-deterministic 
> ordering. in fact the ordering changes continuously which is annoying, The 
> issue is demoed in the attached video.
> h2. Acceptance Criteria
> Jobs in Flink Dashboard UI are ordered by start time consistently



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37253] Add state size in application status and deployment metrics [flink-kubernetes-operator]

2025-02-05 Thread via GitHub


mxm commented on PR #941:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/941#issuecomment-2637573408

   Tried it out on a local k8s cluster with various Flink versions:
   
   https://github.com/user-attachments/assets/f6909e9d-5710-4965-9645-55ce8b06fe82";
 />
   https://github.com/user-attachments/assets/f2285783-4a02-4433-8dde-746ce9eb8368";
 />
   https://github.com/user-attachments/assets/4ff10809-e5c1-4aeb-a596-17310ba55e1c";
 />
   https://github.com/user-attachments/assets/fe95cb6d-4ee2-4b46-b850-507536f8cfed";
 />
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [tests][ci] Miscellaneous improvements on CI robustness [flink-cdc]

2025-02-05 Thread via GitHub


yuxiqian closed pull request #3884: [tests][ci] Miscellaneous improvements on 
CI robustness
URL: https://github.com/apache/flink-cdc/pull/3884


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [tests][ci] Miscellaneous improvements on CI robustness [flink-cdc]

2025-02-05 Thread via GitHub


yuxiqian commented on PR #3884:
URL: https://github.com/apache/flink-cdc/pull/3884#issuecomment-2638951176

   Closed in favor of #3911.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][ci] Use raise to fix uncaught throw [flink-cdc]

2025-02-05 Thread via GitHub


whhe closed pull request #3909: [hotfix][ci] Use raise to fix uncaught throw
URL: https://github.com/apache/flink-cdc/pull/3909


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (FLINK-37234) Add timeout for test case to avoid Infinite waiting

2025-02-05 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu resolved FLINK-37234.

Resolution: Fixed

Fixed via master(3.3-SNAPSHOT): 81e6a4a329498ddd9ff63d9db8ddbcf22882a7fc

> Add timeout for test case to avoid Infinite waiting 
> 
>
> Key: FLINK-37234
> URL: https://issues.apache.org/jira/browse/FLINK-37234
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Yanquan Lv
>Assignee: Yanquan Lv
>Priority: Not a Priority
>  Labels: pull-request-available
> Fix For: jdbc-3.3.0
>
>
> Add timeout for test case to avoid Infinite waiting.
> JdbcSourceStreamRelatedITCase#waitExpectation use a while(true) loop to check 
> exception, however, this may lead to infinite waiting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37265) document of python elasticsearch connector exeample has a mistake

2025-02-05 Thread Zhang Hechuan (Jira)
Zhang Hechuan created FLINK-37265:
-

 Summary: document of python elasticsearch connector exeample has a 
mistake
 Key: FLINK-37265
 URL: https://issues.apache.org/jira/browse/FLINK-37265
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream, API / Python, Connectors / ElasticSearch
Affects Versions: 1.20.0, 1.19.0, 1.18.0, 1.17.0
Reporter: Zhang Hechuan


Elasticsearch 7 static index:
{code:java}
//代码占位符 
from pyflink.datastream.connectors.elasticsearch import 
Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
.set_hosts(['localhost:9200']) \
.build()

input.sink_to(es7_sink).name('es7 sink')


"ElasticsearchEmitter" has not method named "static", it should be 
"static_index"{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-37255) Unsupported options: scan.partition.record-size

2025-02-05 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923982#comment-17923982
 ] 

Jiabao Sun commented on FLINK-37255:


Fixed via mongodb repo 
main: 123334b8a70e5b3cccfeab6b8e083fbcd7e617d2

> Unsupported options: scan.partition.record-size
> ---
>
> Key: FLINK-37255
> URL: https://issues.apache.org/jira/browse/FLINK-37255
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Jiabao Sun
>Assignee: yux
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: mongodb-2.0.0
>
>
> The new configuration option scan.partition.record-size was introduced by 
> FLINK-36075, but it was not added to the optional options, making this 
> configuration item unusable.
> {code:sql}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Unsupported options found for 
> 'mongodb'.
> Unsupported options:
> scan.partition.record-size
> Supported options:
> collection
> connector
> database
> filter.handling.policy
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [hotfix][Connector/JDBC] Remove unnecessary update fields of PostgreSQL upsert statement [flink-connector-jdbc]

2025-02-05 Thread via GitHub


boring-cyborg[bot] commented on PR #155:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/155#issuecomment-2636257302

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [hotfix][Connector/JDBC] Remove unnecessary update fields of PostgreSQL upsert statement [flink-connector-jdbc]

2025-02-05 Thread via GitHub


roseduan opened a new pull request, #155:
URL: https://github.com/apache/flink-connector-jdbc/pull/155

   In PostgreSQL, there is no need to specify the unique key fields in the 
update clause.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36439][docs] Documents for Disaggregate State and new State APIs [flink]

2025-02-05 Thread via GitHub


flinkbot commented on PR #26107:
URL: https://github.com/apache/flink/pull/26107#issuecomment-2636265648

   
   ## CI report:
   
   * c5c732493e7d6b3dbb877ebffd620695cef52c3d UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix][table-planner] Use equals instead of '==' to compare digests in FlinkCalcMergeRule [flink]

2025-02-05 Thread via GitHub


flinkbot commented on PR #26108:
URL: https://github.com/apache/flink/pull/26108#issuecomment-2636265903

   
   ## CI report:
   
   * dcaa98389b7308ab184467a039163b466b560d53 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37255] Fix unable to configure scan.partition.record-size option [flink-connector-mongodb]

2025-02-05 Thread via GitHub


yuxiqian commented on PR #51:
URL: 
https://github.com/apache/flink-connector-mongodb/pull/51#issuecomment-2636249551

   > Thanks @yuxiqian for the quick update. LGTM.
   > 
   > Could you help make a backport to v2.0 branch?
   
   Sure, cherry-picked in #52


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-37255) Unsupported options: scan.partition.record-size

2025-02-05 Thread Jiabao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923955#comment-17923955
 ] 

Jiabao Sun commented on FLINK-37255:


Thanks [~xiqian_yu], assigned to you.

> Unsupported options: scan.partition.record-size
> ---
>
> Key: FLINK-37255
> URL: https://issues.apache.org/jira/browse/FLINK-37255
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Jiabao Sun
>Assignee: yux
>Priority: Blocker
> Fix For: mongodb-2.0.0
>
>
> The new configuration option scan.partition.record-size was introduced by 
> FLINK-36075, but it was not added to the optional options, making this 
> configuration item unusable.
> {code:sql}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Unsupported options found for 
> 'mongodb'.
> Unsupported options:
> scan.partition.record-size
> Supported options:
> collection
> connector
> database
> filter.handling.policy
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37255) Unsupported options: scan.partition.record-size

2025-02-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37255:
---
Labels: pull-request-available  (was: )

> Unsupported options: scan.partition.record-size
> ---
>
> Key: FLINK-37255
> URL: https://issues.apache.org/jira/browse/FLINK-37255
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / MongoDB
>Reporter: Jiabao Sun
>Assignee: yux
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: mongodb-2.0.0
>
>
> The new configuration option scan.partition.record-size was introduced by 
> FLINK-36075, but it was not added to the optional options, making this 
> configuration item unusable.
> {code:sql}
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.table.api.ValidationException: Unsupported options found for 
> 'mongodb'.
> Unsupported options:
> scan.partition.record-size
> Supported options:
> collection
> connector
> database
> filter.handling.policy
> ...
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-37256) Firing timers can block recovery process

2025-02-05 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-37256:
--

 Summary: Firing timers can block recovery process
 Key: FLINK-37256
 URL: https://issues.apache.org/jira/browse/FLINK-37256
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / Task
Affects Versions: 1.20.0, 2.0.0
Reporter: Piotr Nowojski


Splitable/interruptible timers for checkpointnig were introduced in FLINK-20217 
as part of the 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-443%3A+Interruptible+timers+firing
 .

However the exact same problem can happen during recovery. Usually (only?) due 
to a watermark that was caught along the in-flight data, that is being 
processed during a subtask's "INITIALIZATION" phase. The problem is now that 
while we are in the initialization phase, job can not perform any checkpoints. 
This issue is compounded if there is some data multiplication operator in the 
pipeline, downstream from the operator that has a lot of timers to fire. What 
can happen then is:
* some upstream operator A is firing a lot of timers, that produce a lot of 
data (for example 100 000 records) while it's still INITIALIZING
* those records are multiplied downstream (operators B, C, ...) by for example 
factor of 100x
* in the end, sinks have to accept ~100 000 * 100 records before that upstream 
operator A can finish processing in-flight data and switch to RUNNING

This can take hours.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36929][table] Add SQL connector for keyed savepoint data [flink]

2025-02-05 Thread via GitHub


gaborgsomogyi merged PR #26035:
URL: https://github.com/apache/flink/pull/26035


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36660] Bump to 2.0-preview1. [flink-connector-elasticsearch]

2025-02-05 Thread via GitHub


boring-cyborg[bot] commented on PR #115:
URL: 
https://github.com/apache/flink-connector-elasticsearch/pull/115#issuecomment-2636487184

   Thanks for opening this pull request! Please check out our contributing 
guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36660) Release flink-connector-elasticsearch vx.x.x for Flink 2.0

2025-02-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36660:
---
Labels: pull-request-available  (was: )

> Release flink-connector-elasticsearch vx.x.x for Flink 2.0
> --
>
> Key: FLINK-36660
> URL: https://issues.apache.org/jira/browse/FLINK-36660
> Project: Flink
>  Issue Type: Sub-task
>  Components: Connectors / ElasticSearch
>Affects Versions: 2.0.0
>Reporter: Yanquan Lv
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Release a version of flink-connector-elasticsearch connector that bumped 
> Flink 2.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36660] Bump to 2.0-preview1. [flink-connector-elasticsearch]

2025-02-05 Thread via GitHub


lvyanquan opened a new pull request, #115:
URL: https://github.com/apache/flink-connector-elasticsearch/pull/115

   Bump Flink version to 2.0-preview1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [BP-2.0][FLINK-37021][state/forst] Fix incorrect paths when reusing and creating files. [flink]

2025-02-05 Thread via GitHub


Zakelly merged PR #26106:
URL: https://github.com/apache/flink/pull/26106


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37222] Do not reuse views across TableEnvironments in SQL client [flink]

2025-02-05 Thread via GitHub


Zakelly commented on PR #26093:
URL: https://github.com/apache/flink/pull/26093#issuecomment-2636492172

   Thanks. Merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-37021) Implement fast checkpoint/rescaling for ForStKeyedStateBackend

2025-02-05 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17923962#comment-17923962
 ] 

Zakelly Lan edited comment on FLINK-37021 at 2/5/25 11:38 AM:
--

Follow up PR:
* master: f975783ce9a688a99e2f74fe6f2f5ad204589fdc
* 2.0: 5512299630e3d4d6e92c4e4ac25059e64c76ebca


was (Author: zakelly):
Follow up PR:
* master: f975783ce9a688a99e2f74fe6f2f5ad204589fdc

> Implement fast checkpoint/rescaling for ForStKeyedStateBackend
> --
>
> Key: FLINK-37021
> URL: https://issues.apache.org/jira/browse/FLINK-37021
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Yanfei Lei
>Assignee: Han Yin
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37222] Do not reuse views across TableEnvironments in SQL client [flink]

2025-02-05 Thread via GitHub


Zakelly merged PR #26093:
URL: https://github.com/apache/flink/pull/26093


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-37222) Table planner exception when sql client submit job

2025-02-05 Thread Zakelly Lan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-37222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924034#comment-17924034
 ] 

Zakelly Lan commented on FLINK-37222:
-

Master:
* ac4f5952d5dd47e09644681b8f0fbf2b46a76509
* b88a6cac675a6acf6e2e8a62d225f5db2057f709
2.0:
* 12493ead11f59697cda1baf0507b0ca8190f851a
* 0714e670e638e4e3f71086944550b6f4fc51b04e


> Table planner exception when sql client submit job
> --
>
> Key: FLINK-37222
> URL: https://issues.apache.org/jira/browse/FLINK-37222
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 2.0.0
>Reporter: Zakelly Lan
>Assignee: Dawid Wysakowicz
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 2.0.0
>
> Attachments: flink-root-sql-client-master-1-1.c-76646bdbb8bdab89.log
>
>
> When testing [Nexmark|https://github.com/nexmark/nexmark] on release-2.0 
> branch, a table planner related exception thrown by the sql client:
> {code:java}
> // Omit some stacktraces before 
> Caused by: 
> org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to 
> execute the operation d3516bc9-44c1-428e-ac10-2fb0ddd0c825.
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:415)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:268)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>   at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>   ... 1 more
> Caused by: java.lang.AssertionError: Sql optimization: Assertion error: 
> Relational expression rel#714:LogicalProject.NONE.any.None: 
> 0.[NONE].[NONE](input=LogicalAggregate#712,exprs=[$2, $1]) belongs to a 
> different planner than is currently being used.
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:79)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
>   at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196)
>   at 
> scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194)
>   at scala.collection.Iterator.foreach(Iterator.scala:943)
>   at scala.collection.Iterator.foreach$(Iterator.scala:943)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>   at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
>   at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
>   at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108)
>   at 
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:196)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeSinkBlocks(StreamCommonSubGraphBasedOptimizer.scala:83)
>   at 
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:118)
>   at 
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:390)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:183)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1352)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:930)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.callModifyOperations(OperationExecutor.java:662)
>   at 
> org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(Op

Re: [PR] [BP-2.0][FLINK-37222] Do not reuse views across TableEnvironments in SQL client [flink]

2025-02-05 Thread via GitHub


Zakelly merged PR #26104:
URL: https://github.com/apache/flink/pull/26104


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37258][runtime] Return Ordered Job list on Disptacher#requestMultipleJobDetails [flink]

2025-02-05 Thread via GitHub


XComp commented on code in PR #26111:
URL: https://github.com/apache/flink/pull/26111#discussion_r1943202076


##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##
@@ -878,8 +879,12 @@ public CompletableFuture 
requestMultipleJobDetails(Duration
 
 completedJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));
 runningJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));
+Collection orderedDeduplicatedJobs =
+deduplicatedJobs.values().stream()
+
.sorted(Comparator.comparingLong(JobDetails::getStartTime))

Review Comment:
   I know it's quite unlikely but should we use the job ID as a fallback if the 
start time is the same?



##
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java:
##
@@ -878,8 +879,12 @@ public CompletableFuture 
requestMultipleJobDetails(Duration
 
 completedJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));
 runningJobDetails.forEach(job -> 
deduplicatedJobs.put(job.getJobId(), job));
+Collection orderedDeduplicatedJobs =
+deduplicatedJobs.values().stream()
+
.sorted(Comparator.comparingLong(JobDetails::getStartTime))
+.collect(Collectors.toList());
 
-return new MultipleJobsDetails(new 
HashSet<>(deduplicatedJobs.values()));
+return new MultipleJobsDetails(new 
ArrayList<>(orderedDeduplicatedJobs));

Review Comment:
   ```suggestion
   return new MultipleJobsDetails(orderedDeduplicatedJobs);
   ```
   I guess, we don't need the explicit `ArrayList`, do we?



##
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##
@@ -1225,7 +1257,16 @@ public void testOverridingJobVertexParallelisms() throws 
Exception {
 
 private JobManagerRunner runningJobManagerRunnerWithJobStatus(
 final JobStatus currentJobStatus) {
+return runningJobManagerRunnerWithJobStatus(currentJobStatus, jobId, 
0L);
+}
+
+private JobManagerRunner runningJobManagerRunnerWithJobStatus(
+final JobStatus currentJobStatus, final JobID jobId, long 
startTime) {
 Preconditions.checkArgument(!currentJobStatus.isTerminalState());
+long[] stateTimeStampsForRunningJob = new 
long[JobStatus.values().length];
+stateTimeStampsForRunningJob[JobStatus.INITIALIZING.ordinal()] = 
startTime;

Review Comment:
   nit, you could add custom timestamps for `CREATED` and `RUNNING` that have 
inverse values for the two jobs to verify that the `INITIALIZING` timestamp is 
used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37258][runtime] Return Ordered Job list on Disptacher#requestMultipleJobDetails [flink]

2025-02-05 Thread via GitHub


davidradl commented on code in PR #26111:
URL: https://github.com/apache/flink/pull/26111#discussion_r1943263854


##
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java:
##
@@ -1225,7 +1257,16 @@ public void testOverridingJobVertexParallelisms() throws 
Exception {
 
 private JobManagerRunner runningJobManagerRunnerWithJobStatus(
 final JobStatus currentJobStatus) {
+return runningJobManagerRunnerWithJobStatus(currentJobStatus, jobId, 
0L);
+}
+
+private JobManagerRunner runningJobManagerRunnerWithJobStatus(
+final JobStatus currentJobStatus, final JobID jobId, long 
startTime) {
 Preconditions.checkArgument(!currentJobStatus.isTerminalState());
+long[] stateTimeStampsForRunningJob = new 
long[JobStatus.values().length];

Review Comment:
   I assume that this method ends up being called and we want the order of the 
jobs to be sensible. Would the most sensible for the UI be the latest first?
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-37234) Add timeout for test case to avoid Infinite waiting

2025-02-05 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-37234:
--

Assignee: Yanquan Lv

> Add timeout for test case to avoid Infinite waiting 
> 
>
> Key: FLINK-37234
> URL: https://issues.apache.org/jira/browse/FLINK-37234
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Yanquan Lv
>Assignee: Yanquan Lv
>Priority: Not a Priority
> Fix For: jdbc-3.3.0
>
>
> Add timeout for test case to avoid Infinite waiting.
> JdbcSourceStreamRelatedITCase#waitExpectation use a while(true) loop to check 
> exception, however, this may lead to infinite waiting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37234] Add test timeout to avoid infinite waiting. [flink-connector-jdbc]

2025-02-05 Thread via GitHub


leonardBang merged PR #154:
URL: https://github.com/apache/flink-connector-jdbc/pull/154


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37234) Add timeout for test case to avoid Infinite waiting

2025-02-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37234:
---
Labels: pull-request-available  (was: )

> Add timeout for test case to avoid Infinite waiting 
> 
>
> Key: FLINK-37234
> URL: https://issues.apache.org/jira/browse/FLINK-37234
> Project: Flink
>  Issue Type: Technical Debt
>  Components: Connectors / JDBC
>Affects Versions: jdbc-3.2.0
>Reporter: Yanquan Lv
>Assignee: Yanquan Lv
>Priority: Not a Priority
>  Labels: pull-request-available
> Fix For: jdbc-3.3.0
>
>
> Add timeout for test case to avoid Infinite waiting.
> JdbcSourceStreamRelatedITCase#waitExpectation use a while(true) loop to check 
> exception, however, this may lead to infinite waiting.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37234] Add test timeout to avoid infinite waiting. [flink-connector-jdbc]

2025-02-05 Thread via GitHub


boring-cyborg[bot] commented on PR #154:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/154#issuecomment-2638952636

   Awesome work, congrats on your first merged pull request!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-37263) AggregateITCase.testConstantGroupKeyWithUpsertSink failed

2025-02-05 Thread Weijie Guo (Jira)
Weijie Guo created FLINK-37263:
--

 Summary: AggregateITCase.testConstantGroupKeyWithUpsertSink failed
 Key: FLINK-37263
 URL: https://issues.apache.org/jira/browse/FLINK-37263
 Project: Flink
  Issue Type: Bug
  Components: Build System / CI
Affects Versions: 2.1.0
Reporter: Weijie Guo


Feb 05 10:31:37 10:31:37.516 [ERROR]   Run 3: 
AggregateITCase.testConstantGroupKeyWithUpsertSink:1730 
Feb 05 10:31:37 expected: List(+I[A, 1], +I[B, 2], +I[C, 3])
Feb 05 10:31:37  but was: ArrayBuffer(+I[A, 1], +I[C, 3], +U[B, 2])

https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=65810&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4&l=12050



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [tests][ci] Miscellaneous improvements on CI robustness [flink-cdc]

2025-02-05 Thread via GitHub


yuxiqian commented on PR #3911:
URL: https://github.com/apache/flink-cdc/pull/3911#issuecomment-2638961294

   Would @leonardBang and @whhe like to take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37087][doc] Add docs for alter materialized table as query [flink]

2025-02-05 Thread via GitHub


lsyldliu commented on code in PR #26064:
URL: https://github.com/apache/flink/pull/26064#discussion_r1944182747


##
docs/content.zh/docs/dev/table/materialized-table/statements.md:
##
@@ -326,6 +328,67 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH 
PARTITION (ds='2024-06-28
 注意
 - REFRESH 操作会启动批作业来刷新表的数据。
 
+## AS 
+```sql
+ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS 

+```
+
+`AS ` 子句用于修改刷新物化表的查询定义。它会先使用新查询推导的 `schema` 更新表的 
`schema`,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。
+
+具体修改流程取决于物化表的刷新模式:
+
+**全量模式:**
+
+1. 更新物化表的 `schema` 和查询定义。
+2. 在刷新作业下次触发执行时,将使用新的查询定义刷新数据:
+   - 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref 
"docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。
+   - 否则,将刷新整个表的数据。
+
+**持续模式:**
+
+1. 暂停当前的流式刷新作业。
+2. 更新物化表的 `schema` 和查询定义。
+3. 启动新的流式任务以刷新物化表:
+   - 新的流式任务会从头开始,而不会从之前的流式任务状态恢复。
+   - 数据源的起始位点会由到连接器的默认实现或查询中设置的 [dynamic hint]({{< ref 
"docs/dev/table/sql/queries/hints" >}}#dynamic-table-options) 决定。
+
+**示例:**
+
+```sql
+-- 原始物化表定义
+CREATE MATERIALIZED TABLE my_materialized_table
+FRESHNESS = INTERVAL '10' SECOND
+AS 
+SELECT 
+user_id,
+COUNT(*) AS event_count,
+SUM(amount) AS total_amount
+FROM 
+kafka_catalog.db1.events
+WHERE 
+event_type = 'purchase'
+GROUP BY 
+user_id;
+
+-- 修改现有物化表的查询
+ALTER MATERIALIZED TABLE my_materialized_table
+AS SELECT 
+user_id,
+COUNT(*) AS event_count,
+SUM(amount) AS total_amount,
+AVG(amount) AS avg_amount  -- 在末尾追加新的可为空列
+FROM
+kafka_catalog.db1.events
+WHERE
+event_type = 'purchase'
+GROUP BY
+user_id;
+```
+
+注意
+- Schema 演进当前仅支持在原表 schema 尾部追加`可空列`。
+- 在持续模式下,新的流式任务不会从原来的流式任务的状态恢复。这可能会导致短暂的数据重复或丢失。

Review Comment:
   流式任务 -> 流式作业



##
docs/content.zh/docs/dev/table/materialized-table/statements.md:
##
@@ -326,6 +328,67 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH 
PARTITION (ds='2024-06-28
 注意
 - REFRESH 操作会启动批作业来刷新表的数据。
 
+## AS 
+```sql
+ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS 

+```
+
+`AS ` 子句用于修改刷新物化表的查询定义。它会先使用新查询推导的 `schema` 更新表的 
`schema`,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。
+
+具体修改流程取决于物化表的刷新模式:
+
+**全量模式:**
+
+1. 更新物化表的 `schema` 和查询定义。
+2. 在刷新作业下次触发执行时,将使用新的查询定义刷新数据:
+   - 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref 
"docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。
+   - 否则,将刷新整个表的数据。
+
+**持续模式:**
+
+1. 暂停当前的流式刷新作业。
+2. 更新物化表的 `schema` 和查询定义。
+3. 启动新的流式任务以刷新物化表:

Review Comment:
   流式任务 -> 流式作业



##
docs/content.zh/docs/dev/table/materialized-table/statements.md:
##
@@ -326,6 +328,67 @@ ALTER MATERIALIZED TABLE my_materialized_table REFRESH 
PARTITION (ds='2024-06-28
 注意
 - REFRESH 操作会启动批作业来刷新表的数据。
 
+## AS 
+```sql
+ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name AS 

+```
+
+`AS ` 子句用于修改刷新物化表的查询定义。它会先使用新查询推导的 `schema` 更新表的 
`schema`,然后使用新查询刷新表数据。需要特别强调的是,默认情况下,这不会影响历史数据。
+
+具体修改流程取决于物化表的刷新模式:
+
+**全量模式:**
+
+1. 更新物化表的 `schema` 和查询定义。
+2. 在刷新作业下次触发执行时,将使用新的查询定义刷新数据:
+   - 如果修改的物化表是分区表,且[partition.fields.#.date-formatter]({{< ref 
"docs/dev/table/config" >}}#partition-fields-date-formatter) 配置正确,则仅刷新最新分区。
+   - 否则,将刷新整个表的数据。
+
+**持续模式:**
+
+1. 暂停当前的流式刷新作业。
+2. 更新物化表的 `schema` 和查询定义。
+3. 启动新的流式任务以刷新物化表:
+   - 新的流式任务会从头开始,而不会从之前的流式任务状态恢复。

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37265) document of python elasticsearch connector exeample has a mistake

2025-02-05 Thread Zhang Hechuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhang Hechuan updated FLINK-37265:
--
Description: 
"ElasticsearchEmitter" has not method named "static", it should be 
"static_index".

Elasticsearch 7 static index:
{code:java}
from pyflink.datastream.connectors.elasticsearch import 
Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
.set_hosts(['localhost:9200']) \
.build()

input.sink_to(es7_sink).name('es7 sink'){code}

  was:
Elasticsearch 7 static index:
{code:java}
//代码占位符 
from pyflink.datastream.connectors.elasticsearch import 
Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
.set_hosts(['localhost:9200']) \
.build()

input.sink_to(es7_sink).name('es7 sink')


"ElasticsearchEmitter" has not method named "static", it should be 
"static_index"{code}


> document of python elasticsearch connector exeample has a mistake
> -
>
> Key: FLINK-37265
> URL: https://issues.apache.org/jira/browse/FLINK-37265
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, API / Python, Connectors / 
> ElasticSearch
>Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0
>Reporter: Zhang Hechuan
>Priority: Minor
>
> "ElasticsearchEmitter" has not method named "static", it should be 
> "static_index".
> Elasticsearch 7 static index:
> {code:java}
> from pyflink.datastream.connectors.elasticsearch import 
> Elasticsearch7SinkBuilder, ElasticsearchEmitter
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
> input = ...
> # The set_bulk_flush_max_actions instructs the sink to emit after every 
> element, otherwise they would be buffered
> es7_sink = Elasticsearch7SinkBuilder() \
> .set_bulk_flush_max_actions(1) \
> .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
> .set_hosts(['localhost:9200']) \
> .build()
> input.sink_to(es7_sink).name('es7 sink'){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37265) document of python elasticsearch connector exeample has a mistake

2025-02-05 Thread Zhang Hechuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhang Hechuan updated FLINK-37265:
--
Description: 
"ElasticsearchEmitter" has no method named "static", it should be 
"static_index".

Elasticsearch 7 static index:
{code:java}
from pyflink.datastream.connectors.elasticsearch import 
Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
.set_hosts(['localhost:9200']) \
.build()

input.sink_to(es7_sink).name('es7 sink'){code}

  was:
"ElasticsearchEmitter" has not method named "static", it should be 
"static_index".

Elasticsearch 7 static index:
{code:java}
from pyflink.datastream.connectors.elasticsearch import 
Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
.set_hosts(['localhost:9200']) \
.build()

input.sink_to(es7_sink).name('es7 sink'){code}


> document of python elasticsearch connector exeample has a mistake
> -
>
> Key: FLINK-37265
> URL: https://issues.apache.org/jira/browse/FLINK-37265
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, API / Python, Connectors / 
> ElasticSearch
>Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0
>Reporter: Zhang Hechuan
>Priority: Minor
>
> "ElasticsearchEmitter" has no method named "static", it should be 
> "static_index".
> Elasticsearch 7 static index:
> {code:java}
> from pyflink.datastream.connectors.elasticsearch import 
> Elasticsearch7SinkBuilder, ElasticsearchEmitter
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
> input = ...
> # The set_bulk_flush_max_actions instructs the sink to emit after every 
> element, otherwise they would be buffered
> es7_sink = Elasticsearch7SinkBuilder() \
> .set_bulk_flush_max_actions(1) \
> .set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
> .set_hosts(['localhost:9200']) \
> .build()
> input.sink_to(es7_sink).name('es7 sink'){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-33555) LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:

2025-02-05 Thread Weijie Guo (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-33555?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17924371#comment-17924371
 ] 

Weijie Guo commented on FLINK-33555:


https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=65823&view=logs&j=8fd9202e-fd17-5b26-353c-ac1ff76c8f28&t=ea7cf968-e585-52cb-e0fc-f48de023a7ca&l=21525

> LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:
> ---
>
> Key: FLINK-33555
> URL: https://issues.apache.org/jira/browse/FLINK-33555
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 2.0.0, 1.19.0, 1.20.0
>Reporter: Matthias Pohl
>Priority: Critical
>  Labels: test-stability
> Attachments: FLINK-33555.log
>
>
> https://github.com/XComp/flink/actions/runs/6868936761/job/18680977238#step:12:13492
> {code}
> Error: 21:44:15 21:44:15.144 [ERROR]   
> LocalRecoveryITCase.testRecoverLocallyFromProcessCrashWithWorkingDirectory:119
>  [The task was deployed to AllocationID(fcf411eadbae8beed895a78ea1653046) but 
> it should have been deployed to 
> AllocationID(dec337d82b9d960004ffd73be8a2c5d5) for local recovery., The task 
> was deployed to AllocationID(a61fd8a6bc5ef9d467f32f918bdfb385) but it should 
> have been deployed to AllocationID(fcf411eadbae8beed895a78ea1653046) for 
> local recovery., The task was deployed to 
> AllocationID(dec337d82b9d960004ffd73be8a2c5d5) but it should have been 
> deployed to AllocationID(a61fd8a6bc5ef9d467f32f918bdfb385) for local 
> recovery.] ==> expected:  but was: 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [tests][ci] Miscellaneous improvements on CI robustness [flink-cdc]

2025-02-05 Thread via GitHub


yuxiqian opened a new pull request, #3911:
URL: https://github.com/apache/flink-cdc/pull/3911

   This PR mainly focuses on improving test case effectiveness by:
   
   * Single-stage CI failure should not cause the whole matrix got cancelled
   * Removed vulnerable & unmaintained FastJson from test dependencies
   * Optimize PolarDBX test case by using dynamic port bindings
   * Fixed unstable DataStream migration tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-37264) two bugs of DSv2

2025-02-05 Thread xuhuang (Jira)
xuhuang created FLINK-37264:
---

 Summary: two bugs of DSv2
 Key: FLINK-37264
 URL: https://issues.apache.org/jira/browse/FLINK-37264
 Project: Flink
  Issue Type: New Feature
  Components: API / DataStream
Affects Versions: 2.0.0
Reporter: xuhuang


Two bugs of DataStream V2:
# When joining two tuple DataStreams using JoinExtension, an exception is 
thrown "Tuple needs to be parameterized by using generics".
# DSv2 does not support adding a FileSink, even when the FileSink does not 
include committing topology.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37264] Fix two bugs of DataStream V2 [flink]

2025-02-05 Thread via GitHub


flinkbot commented on PR #26112:
URL: https://github.com/apache/flink/pull/26112#issuecomment-2639023195

   
   ## CI report:
   
   * e0f529506e5225fce4b103fcb81eabc903ebea4f UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37205][python] Correct the state cache behavior during bump beam version [flink]

2025-02-05 Thread via GitHub


dianfu merged PR #26058:
URL: https://github.com/apache/flink/pull/26058


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-37205][python] Correct the state cache behavior during bump be… [flink]

2025-02-05 Thread via GitHub


dianfu merged PR #26059:
URL: https://github.com/apache/flink/pull/26059


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37265) document of python elasticsearch connector exeample has a mistake

2025-02-05 Thread Zhang Hechuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhang Hechuan updated FLINK-37265:
--
Description: 
"ElasticsearchEmitter" has no method named "ElasticsearchEmitter.static", it 
should be "ElasticsearchEmitter.static_index".

Here is the code in the document:

Elasticsearch 7 static index:
{code:java}
from pyflink.datastream.connectors.elasticsearch import 
Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
.set_hosts(['localhost:9200']) \
.build()

input.sink_to(es7_sink).name('es7 sink'){code}

  was:
"ElasticsearchEmitter" has no method named "static", it should be 
"static_index".

Elasticsearch 7 static index:
{code:java}
from pyflink.datastream.connectors.elasticsearch import 
Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static_index('foo', 'id')) \
.set_hosts(['localhost:9200']) \
.build()

input.sink_to(es7_sink).name('es7 sink'){code}


> document of python elasticsearch connector exeample has a mistake
> -
>
> Key: FLINK-37265
> URL: https://issues.apache.org/jira/browse/FLINK-37265
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, API / Python, Connectors / 
> ElasticSearch
>Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0
>Reporter: Zhang Hechuan
>Priority: Minor
>
> "ElasticsearchEmitter" has no method named "ElasticsearchEmitter.static", it 
> should be "ElasticsearchEmitter.static_index".
> Here is the code in the document:
> Elasticsearch 7 static index:
> {code:java}
> from pyflink.datastream.connectors.elasticsearch import 
> Elasticsearch7SinkBuilder, ElasticsearchEmitter
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
> input = ...
> # The set_bulk_flush_max_actions instructs the sink to emit after every 
> element, otherwise they would be buffered
> es7_sink = Elasticsearch7SinkBuilder() \
> .set_bulk_flush_max_actions(1) \
> .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
> .set_hosts(['localhost:9200']) \
> .build()
> input.sink_to(es7_sink).name('es7 sink'){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37265) document of python elasticsearch connector exeample has a mistake

2025-02-05 Thread Zhang Hechuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhang Hechuan updated FLINK-37265:
--
Description: 
There is no method named "ElasticsearchEmitter.static", it should be 
"ElasticsearchEmitter.static_index".

Here is the code in the document:

Elasticsearch 7 static index:
{code:java}
from pyflink.datastream.connectors.elasticsearch import 
Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
.set_hosts(['localhost:9200']) \
.build()

input.sink_to(es7_sink).name('es7 sink'){code}

  was:
"ElasticsearchEmitter" has no method named "ElasticsearchEmitter.static", it 
should be "ElasticsearchEmitter.static_index".

Here is the code in the document:

Elasticsearch 7 static index:
{code:java}
from pyflink.datastream.connectors.elasticsearch import 
Elasticsearch7SinkBuilder, ElasticsearchEmitter

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)

input = ...

# The set_bulk_flush_max_actions instructs the sink to emit after every 
element, otherwise they would be buffered
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
.set_hosts(['localhost:9200']) \
.build()

input.sink_to(es7_sink).name('es7 sink'){code}


> document of python elasticsearch connector exeample has a mistake
> -
>
> Key: FLINK-37265
> URL: https://issues.apache.org/jira/browse/FLINK-37265
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, API / Python, Connectors / 
> ElasticSearch
>Affects Versions: 1.17.0, 1.18.0, 1.19.0, 1.20.0
>Reporter: Zhang Hechuan
>Priority: Minor
>
> There is no method named "ElasticsearchEmitter.static", it should be 
> "ElasticsearchEmitter.static_index".
> Here is the code in the document:
> Elasticsearch 7 static index:
> {code:java}
> from pyflink.datastream.connectors.elasticsearch import 
> Elasticsearch7SinkBuilder, ElasticsearchEmitter
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
> input = ...
> # The set_bulk_flush_max_actions instructs the sink to emit after every 
> element, otherwise they would be buffered
> es7_sink = Elasticsearch7SinkBuilder() \
> .set_bulk_flush_max_actions(1) \
> .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
> .set_hosts(['localhost:9200']) \
> .build()
> input.sink_to(es7_sink).name('es7 sink'){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-37265) document of python elasticsearch connector exeample has a mistake

2025-02-05 Thread Zhang Hechuan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhang Hechuan updated FLINK-37265:
--
Affects Version/s: (was: 1.17.0)
   (was: 1.18.0)
   (was: 1.19.0)
   (was: 1.20.0)

> document of python elasticsearch connector exeample has a mistake
> -
>
> Key: FLINK-37265
> URL: https://issues.apache.org/jira/browse/FLINK-37265
> Project: Flink
>  Issue Type: Bug
>  Components: API / DataStream, API / Python, Connectors / 
> ElasticSearch
>Affects Versions: 2.0-preview
>Reporter: Zhang Hechuan
>Priority: Minor
>
> There is no method named "ElasticsearchEmitter.static", it should be 
> "ElasticsearchEmitter.static_index".
> Here is the code in the document:
> Elasticsearch 7 static index:
> {code:java}
> from pyflink.datastream.connectors.elasticsearch import 
> Elasticsearch7SinkBuilder, ElasticsearchEmitter
> env = StreamExecutionEnvironment.get_execution_environment()
> env.add_jars(ELASTICSEARCH_SQL_CONNECTOR_PATH)
> input = ...
> # The set_bulk_flush_max_actions instructs the sink to emit after every 
> element, otherwise they would be buffered
> es7_sink = Elasticsearch7SinkBuilder() \
> .set_bulk_flush_max_actions(1) \
> .set_emitter(ElasticsearchEmitter.static('foo', 'id')) \
> .set_hosts(['localhost:9200']) \
> .build()
> input.sink_to(es7_sink).name('es7 sink'){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-37205][python] Correct the state cache behavior during bump be… [flink]

2025-02-05 Thread via GitHub


dianfu merged PR #26060:
URL: https://github.com/apache/flink/pull/26060


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-37205) Correct the state cache behavior during bump beam version

2025-02-05 Thread Dian Fu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu closed FLINK-37205.
---
Fix Version/s: 2.0.0
   1.19.2
   1.20.2
   Resolution: Fixed

Fixed in:
- master via a498cf2c228680a44f71d81e7c08b39e1a968a1c
- release-2.0 via bfea5b52be951ca74e41bc066cf655f06f828376
- release-1.20 via 14e853ab832bd9aabc61219a9ce187c201e9603c
- release-1.19 via af041a2a883f31baf24537964e15e6679ba66dcd

> Correct the state cache behavior during bump beam version
> -
>
> Key: FLINK-37205
> URL: https://issues.apache.org/jira/browse/FLINK-37205
> Project: Flink
>  Issue Type: Bug
>  Components: API / Python
>Affects Versions: 1.17.0, 1.18.0, 1.17.1, 1.17.2, 1.19.0, 1.18.1, 1.20.0, 
> 1.19.1
>Reporter: Dian Fu
>Assignee: Dian Fu
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.2
>
>
> It has changed the state cache strategy from size based to bytes based since 
> Beam 2.42.0 (See https://github.com/apache/beam/pull/22924 for more details). 
> We should also support memory based state cache in the long term. Before 
> that, we should correct the behavior which seems broken after bumping the 
> Beam version. It may cause the state cache continuously increase and finally 
> cause the job OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-37264] Fix two bugs of DataStream V2 [flink]

2025-02-05 Thread via GitHub


codenohup opened a new pull request, #26112:
URL: https://github.com/apache/flink/pull/26112

   ## What is the purpose of the change
   
   Fix two bugs of DataStream V2:
   1. When joining two tuple DataStreams using JoinExtension, an exception is 
thrown "Tuple needs to be parameterized by using generics".
   2. DSv2 does not support adding a FileSink, even when the FileSink does not 
include committing topology.
   
   ## Brief change log
   1. Correct usage of TypeExtractor in JoinExtension 
   2. Support FileSink in DSv2 if the FileSink does not add committing topology
   3. Add a IT test case
   
   ## Verifying this change
   
   Added a test case: 
org.apache.flink.test.streaming.api.datastream.extension.join.JoinITCase#testJoinWithTuple
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
 - The serializers: no
 - The runtime per-record code paths (performance sensitive): no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
 - The S3 file system connector: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-37264) two bugs of DSv2

2025-02-05 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-37264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-37264:
---
Labels: pull-request-available  (was: )

> two bugs of DSv2
> 
>
> Key: FLINK-37264
> URL: https://issues.apache.org/jira/browse/FLINK-37264
> Project: Flink
>  Issue Type: New Feature
>  Components: API / DataStream
>Affects Versions: 2.0.0
>Reporter: xuhuang
>Priority: Minor
>  Labels: pull-request-available
>
> Two bugs of DataStream V2:
> # When joining two tuple DataStreams using JoinExtension, an exception is 
> thrown "Tuple needs to be parameterized by using generics".
> # DSv2 does not support adding a FileSink, even when the FileSink does not 
> include committing topology.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] for test [flink-docker]

2025-02-05 Thread via GitHub


reswqa opened a new pull request, #210:
URL: https://github.com/apache/flink-docker/pull/210

   (no comment)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >