Re: [PR] [FLINK-36685][Kubernetes Operator] allow CREATE/UPDATE operation on flinkdeployments resource on webhook mutation endpoint [flink-kubernetes-operator]

2024-11-21 Thread via GitHub


gyfora merged PR #916:
URL: https://github.com/apache/flink-kubernetes-operator/pull/916


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-35859) [flink-cdc] Fix: The assigner is not ready to offer finished split information, this should not be called

2024-11-21 Thread Xin Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900028#comment-17900028
 ] 

Xin Gong commented on FLINK-35859:
--

[~loserwang1024] Users cannot immediately perceive task issues. Maybe we can 
fix it to more perfect. I add a flag to trigger restart when status is 
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. newly table will be synchronized.
{code:java}
// code placeholder


/** Assigner for snapshot split. */
public class SnapshotSplitAssigner implements 
SplitAssigner {
private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotSplitAssigner.class);

private boolean flagExceptionAssignerStatusWhenCheckpoint;

  
private void captureNewlyAddedTables() {
if (sourceConfig.isScanNewlyAddedTableEnabled() && 
AssignerStatus.isAssigningFinished(assignerStatus)) {
..
} else if 
(AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) {
flagExceptionAssignerStatusWhenCheckpoint = true;
LOG.info("exceptionAssignerStatusCheckpointFlag to true");
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) {
  
if (AssignerStatus.isNewlyAddedAssigningFinished(assignerStatus)
&& flagExceptionAssignerStatusWhenCheckpoint) {
throw new FlinkRuntimeException("Previous assigner status is 
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED and "
+ "newly add table will cause task always be exception from 
checkpoint, so we "
+ "trigger restart for newly table after assigner to normal 
status");
}
}

}
 {code}

> [flink-cdc] Fix: The assigner is not ready to offer finished split 
> information, this should not be called
> -
>
> Key: FLINK-35859
> URL: https://issues.apache.org/jira/browse/FLINK-35859
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Minor
> Fix For: cdc-3.2.0
>
>
> When use CDC with newly added table,  an error occurs: 
> {code:java}
> The assigner is not ready to offer finished split information, this should 
> not be called. {code}
> It's because:
> 1. when stop then restart the job , the status is 
> NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED.
>  
> 2. Then Enumerator will send each reader with 
> BinlogSplitUpdateRequestEvent to update binlog. (see 
> org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator#syncWithReaders).
> 3. The Reader will suspend binlog reader then send 
> BinlogSplitMetaRequestEvent to Enumerator.
> 4. The Enumerator found that some tables are not sent, an error will occur
> {code:java}
> private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent 
> requestEvent) {
> // initialize once
> if (binlogSplitMeta == null) {
> final List finishedSnapshotSplitInfos =
> splitAssigner.getFinishedSplitInfos();
> if (finishedSnapshotSplitInfos.isEmpty()) {
> LOG.error(
> "The assigner offers empty finished split information, 
> this should not happen");
> throw new FlinkRuntimeException(
> "The assigner offers empty finished split information, 
> this should not happen");
> }
> binlogSplitMeta =
> Lists.partition(
> finishedSnapshotSplitInfos, 
> sourceConfig.getSplitMetaGroupSize());
>} 
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36535) Optimize the scale down logic based on historical parallelism

2024-11-21 Thread Gyula Fora (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900029#comment-17900029
 ] 

Gyula Fora commented on FLINK-36535:


I think this would be a nice improvement (y)

> Optimize the scale down logic based on historical parallelism
> -
>
> Key: FLINK-36535
> URL: https://issues.apache.org/jira/browse/FLINK-36535
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale 
> down to avoid frequent rescaling.
> h1. Proposed Change
> Treat scale-down.interval as a window:
>  * Recording the scale down trigger time when the recommended parallelism < 
> current parallelism
>  ** When the recommended parallelism >= current parallelism, cancel the 
> triggered scale down
>  * The scale down will be executed when currentTime - triggerTime > 
> scale-down.interval
>  ** {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * {color:#de350b}Change2{color}: Never scale down when currentTime - 
> triggerTime < scale-down.interval
>  * 
>  ** In the FLINK-36018, the scale down may be executed when currentTime - 
> triggerTime < scale-down.interval.
>  ** For example: the taskA may scale down when taskB needs to scale up.
> h1. Background
> Some critical Flink jobs need to scale up in time, but only scale down on a 
> daily basis. In other words, Flink users do not want Flink jobs to be scaled 
> down multiple times within 24 hours, and jobs run at the same parallelism as 
> during the peak hours of each day. 
> Note: Users hope to scale down only happens when the parallelism during peak 
> hours still wastes resources. This is a trade-off between downtime and 
> resource waste for a critical job.
> h1. Current solution
> In general, this requirement could be met after setting{color:#de350b} 
> job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 
> parallelism, and recommended parallelism is 100 during the peak hours of each 
> day. We hope taskA doesn't rescale forever, because the triggered scale down 
> will be canceled once the recommended parallelism >= current parallelism 
> within 24 hours (It‘s exactly what FLINK-36018 does).
> h1. Unexpected Scenario & how to solve?
> But I found the critical production job is still rescaled about 10 times 
> every day (when scale-down.interval is set to 24 hours).
> Root cause: There may be many sources in a job, and the traffic peaks of 
> these sources may occur at different times. When taskA triggers scale down, 
> the scale down of taskA will not be actively executed within 24 hours, but it 
> may be executed when other tasks are scaled up.
> For example:
>  * The scale down of sourceB and sourceC may be executed when SourceA scales 
> up.
>  * After a while, the scale down of sourceA and sourceC may be executed when 
> SourceB scales up.
>  * After a while, the scale down of sourceA and sourceB may be executed when 
> SourceC scales up.
>  * When there are many tasks, the above 3 steps will be executed repeatedly.
> That's why the job is rescaled about 10 times every day, the 
> {color:#de350b}change2{color} of proposed change could solve this issue: 
> Never scale down when currentTime - triggerTime < scale-down.interval.
>  
> {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * It can ensure that the parallelism after scaling down is the parallelism 
> at yesterday's peak.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36535) Optimize the scale down logic based on historical parallelism

2024-11-21 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-36535:

Description: 
This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale down 
to avoid frequent rescaling.
h1. Proposed Change

Treat scale-down.interval as a window:
 * Recording the scale down trigger time when the recommended parallelism < 
current parallelism
 ** When the recommended parallelism >= current parallelism, cancel the 
triggered scale down
 * The scale down will be executed when currentTime - triggerTime > 
scale-down.interval
 ** {color:#de350b}Change1{color}: Using the maximum parallelism within the 
window instead of the latest parallelism when scaling down.
 * {color:#de350b}Change2{color}: Never scale down when currentTime - 
triggerTime < scale-down.interval

 ** In the FLINK-36018, the scale down may be executed when currentTime - 
triggerTime < scale-down.interval.
 ** For example: the taskA may scale down when taskB needs to scale up.

h1. Background

Some critical Flink jobs need to scale up in time, but only scale down on a 
daily basis. In other words, Flink users do not want Flink jobs to be scaled 
down multiple times within 24 hours, and the jobs run at the same parallelism 
as during the peak hours of each day. 

Note: Users hope to scale down only happens when the parallelism during peak 
hours is still a waste of resources. This is a trade-off between downtime and 
resource waste for a critical job.
h1. Current solution

In general, this requirement could be met after setting{color:#de350b} 
job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 
parallelism, and recommended parallelism is 100 during the peak hours of each 
day. We hope taskA doesn't rescale forever, because the triggered scale down 
will be canceled once the recommended parallelism >= current parallelism within 
24 hours (It‘s exactly what FLINK-36018 does).
h1. Unexpected Scenario & how to solve?

But I found the critical production job is still rescaled about 10 times every 
day (when scale-down.interval is set to 24 hours).

Root cause: There may be many sources in a job, and the traffic peaks of these 
sources may occur at different times. When taskA triggers scale down, the scale 
down of taskA will not be actively executed within 24 hours, but it may be 
executed when other tasks are scaled up.

For example:
 * The scale down of sourceB and sourceC may be executed when SourceA scales up.
 * After a while, the scale down of sourceA and sourceC may be executed when 
SourceB scales up.
 * After a while, the scale down of sourceA and sourceB may be executed when 
SourceC scales up.
 * When there are many tasks, the above 3 steps will be executed repeatedly.

That's why the job is rescaled about 10 times every day, the 
{color:#de350b}change2{color} of proposed change could solve this issue: Never 
scale down when currentTime - triggerTime < scale-down.interval.

 

{color:#de350b}Change1{color}: Using the maximum parallelism within the window 
instead of the latest parallelism when scaling down.
 * It can ensure that the parallelism after scaling down is the parallelism at 
yesterday's peak.

  was:
This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale down 
to avoid frequent rescaling. 
h1. Background

Some critical Flink jobs need to scale up in time, but only scale down on a 
daily basis. In other words, Flink users do not want Flink jobs to be scaled 
down multiple times within 24 hours, and the jobs run at the same parallelism 
as during the peak hours of each day. 

Note: Users hope to scale down only happens when the parallelism during peak 
hours is still a waste of resources. This is a trade-off between downtime and 
resource waste for a critical job.
h1. Current solution

In general, this requirement could be met after setting{color:#de350b} 
job.autoscaler.scale-down.interval= 24 hour{color}. For example, the vertex1 
runs with parallelism=100, and the following is the parallelism that the 
autoscaler recommends for vertex1:
 * 100 (2024-10-13 20:00:00, peak hour)
 * 90   (2024-10-13 21:00:00, trigger delayed scale down)
 * 80   (2024-10-13 22:00:00)
 * 70   (2024-10-14 00:00:00)
 * 60   (2024-10-14 01:00:00)
 * 50   (2024-10-14 02:00:00)
 * 40   (2024-10-14 04:00:00)
 * 50   (2024-10-14 06:00:00)
 * 60   (2024-10-14 08:00:00)
 * ...
 * 90   (2024-10-14 19:00:00)
 * 100 (2024-10-14 20:00:00, peak hour, the delayed scale down is canceled)

All recommended parallelism are delayed, and the recommended parallelism is 
backed to 100 within 24 hours. So the scale down request is canceled.

It means if the recommended parallelism for vertex1 during peak hours is 100 
every day, this vertex1 never be scaled down and scaled up. It is very friendly 
to critical jobs, and reducing the scale frequency can greatly reduce the 
downtime.
h1. Some scenarios do no

[jira] [Updated] (FLINK-36535) Optimize the scale down logic based on historical parallelism

2024-11-21 Thread Rui Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan updated FLINK-36535:

Description: 
This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale down 
to avoid frequent rescaling.
h1. Proposed Change

Treat scale-down.interval as a window:
 * Recording the scale down trigger time when the recommended parallelism < 
current parallelism
 ** When the recommended parallelism >= current parallelism, cancel the 
triggered scale down
 * The scale down will be executed when currentTime - triggerTime > 
scale-down.interval
 ** {color:#de350b}Change1{color}: Using the maximum parallelism within the 
window instead of the latest parallelism when scaling down.
 * {color:#de350b}Change2{color}: Never scale down when currentTime - 
triggerTime < scale-down.interval

 * 
 ** In the FLINK-36018, the scale down may be executed when currentTime - 
triggerTime < scale-down.interval.
 ** For example: the taskA may scale down when taskB needs to scale up.

h1. Background

Some critical Flink jobs need to scale up in time, but only scale down on a 
daily basis. In other words, Flink users do not want Flink jobs to be scaled 
down multiple times within 24 hours, and jobs run at the same parallelism as 
during the peak hours of each day. 

Note: Users hope to scale down only happens when the parallelism during peak 
hours still wastes resources. This is a trade-off between downtime and resource 
waste for a critical job.
h1. Current solution

In general, this requirement could be met after setting{color:#de350b} 
job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 
parallelism, and recommended parallelism is 100 during the peak hours of each 
day. We hope taskA doesn't rescale forever, because the triggered scale down 
will be canceled once the recommended parallelism >= current parallelism within 
24 hours (It‘s exactly what FLINK-36018 does).
h1. Unexpected Scenario & how to solve?

But I found the critical production job is still rescaled about 10 times every 
day (when scale-down.interval is set to 24 hours).

Root cause: There may be many sources in a job, and the traffic peaks of these 
sources may occur at different times. When taskA triggers scale down, the scale 
down of taskA will not be actively executed within 24 hours, but it may be 
executed when other tasks are scaled up.

For example:
 * The scale down of sourceB and sourceC may be executed when SourceA scales up.
 * After a while, the scale down of sourceA and sourceC may be executed when 
SourceB scales up.
 * After a while, the scale down of sourceA and sourceB may be executed when 
SourceC scales up.
 * When there are many tasks, the above 3 steps will be executed repeatedly.

That's why the job is rescaled about 10 times every day, the 
{color:#de350b}change2{color} of proposed change could solve this issue: Never 
scale down when currentTime - triggerTime < scale-down.interval.

 

{color:#de350b}Change1{color}: Using the maximum parallelism within the window 
instead of the latest parallelism when scaling down.
 * It can ensure that the parallelism after scaling down is the parallelism at 
yesterday's peak.

  was:
This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale down 
to avoid frequent rescaling.
h1. Proposed Change

Treat scale-down.interval as a window:
 * Recording the scale down trigger time when the recommended parallelism < 
current parallelism
 ** When the recommended parallelism >= current parallelism, cancel the 
triggered scale down
 * The scale down will be executed when currentTime - triggerTime > 
scale-down.interval
 ** {color:#de350b}Change1{color}: Using the maximum parallelism within the 
window instead of the latest parallelism when scaling down.
 * {color:#de350b}Change2{color}: Never scale down when currentTime - 
triggerTime < scale-down.interval

 ** In the FLINK-36018, the scale down may be executed when currentTime - 
triggerTime < scale-down.interval.
 ** For example: the taskA may scale down when taskB needs to scale up.

h1. Background

Some critical Flink jobs need to scale up in time, but only scale down on a 
daily basis. In other words, Flink users do not want Flink jobs to be scaled 
down multiple times within 24 hours, and the jobs run at the same parallelism 
as during the peak hours of each day. 

Note: Users hope to scale down only happens when the parallelism during peak 
hours is still a waste of resources. This is a trade-off between downtime and 
resource waste for a critical job.
h1. Current solution

In general, this requirement could be met after setting{color:#de350b} 
job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 
parallelism, and recommended parallelism is 100 during the peak hours of each 
day. We hope taskA doesn't rescale forever, because the triggered scale down 
will be canceled once the recomm

[jira] [Commented] (FLINK-36535) Optimize the scale down logic based on historical parallelism

2024-11-21 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900024#comment-17900024
 ] 

Rui Fan commented on FLINK-36535:
-

cc [~gyfora] [~mxm] 

> Optimize the scale down logic based on historical parallelism
> -
>
> Key: FLINK-36535
> URL: https://issues.apache.org/jira/browse/FLINK-36535
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale 
> down to avoid frequent rescaling.
> h1. Proposed Change
> Treat scale-down.interval as a window:
>  * Recording the scale down trigger time when the recommended parallelism < 
> current parallelism
>  ** When the recommended parallelism >= current parallelism, cancel the 
> triggered scale down
>  * The scale down will be executed when currentTime - triggerTime > 
> scale-down.interval
>  ** {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * {color:#de350b}Change2{color}: Never scale down when currentTime - 
> triggerTime < scale-down.interval
>  * 
>  ** In the FLINK-36018, the scale down may be executed when currentTime - 
> triggerTime < scale-down.interval.
>  ** For example: the taskA may scale down when taskB needs to scale up.
> h1. Background
> Some critical Flink jobs need to scale up in time, but only scale down on a 
> daily basis. In other words, Flink users do not want Flink jobs to be scaled 
> down multiple times within 24 hours, and jobs run at the same parallelism as 
> during the peak hours of each day. 
> Note: Users hope to scale down only happens when the parallelism during peak 
> hours still wastes resources. This is a trade-off between downtime and 
> resource waste for a critical job.
> h1. Current solution
> In general, this requirement could be met after setting{color:#de350b} 
> job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 
> parallelism, and recommended parallelism is 100 during the peak hours of each 
> day. We hope taskA doesn't rescale forever, because the triggered scale down 
> will be canceled once the recommended parallelism >= current parallelism 
> within 24 hours (It‘s exactly what FLINK-36018 does).
> h1. Unexpected Scenario & how to solve?
> But I found the critical production job is still rescaled about 10 times 
> every day (when scale-down.interval is set to 24 hours).
> Root cause: There may be many sources in a job, and the traffic peaks of 
> these sources may occur at different times. When taskA triggers scale down, 
> the scale down of taskA will not be actively executed within 24 hours, but it 
> may be executed when other tasks are scaled up.
> For example:
>  * The scale down of sourceB and sourceC may be executed when SourceA scales 
> up.
>  * After a while, the scale down of sourceA and sourceC may be executed when 
> SourceB scales up.
>  * After a while, the scale down of sourceA and sourceB may be executed when 
> SourceC scales up.
>  * When there are many tasks, the above 3 steps will be executed repeatedly.
> That's why the job is rescaled about 10 times every day, the 
> {color:#de350b}change2{color} of proposed change could solve this issue: 
> Never scale down when currentTime - triggerTime < scale-down.interval.
>  
> {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * It can ensure that the parallelism after scaling down is the parallelism 
> at yesterday's peak.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36743) Rescale from unaligend checkpoint failed

2024-11-21 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-36743:

Attachment: image-2024-11-21-20-20-20-536.png

> Rescale from unaligend checkpoint failed
> 
>
> Key: FLINK-36743
> URL: https://issues.apache.org/jira/browse/FLINK-36743
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
> Attachments: 
> Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch,
>  image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, 
> image-2024-11-19-17-30-14-816.png, image-2024-11-21-20-20-20-536.png, 
> image-2024-11-21-20-20-41-644.png
>
>
> We encountered the following exception when scaling down a job from 5600 to 
> 4200:
> {code:java}
> 2024-11-12 19:20:54,308 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> xx (1358/1400) 
> (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) 
> switched from RUNNING to FAILED on 
> container_e33_1725519807238_6894116_01_000825 @ yg-
> java.lang.IllegalStateException: Cannot select 
> SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; 
> known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=4200}]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code}
>  * Flink version : 1.16.1
>  * unaligned checkpoint : enabled
>  * log-based checkpoint : enabled
> The exception encountered when restore from chk-2718336, and it can 
> successfully restore from chk-2718333. And I checked the metadata file of 
> chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like 
> there is something wrong with the unaligned checkpoint when reassign 
> in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36743) Rescale from unaligend checkpoint failed

2024-11-21 Thread Feifan Wang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900025#comment-17900025
 ] 

Feifan Wang commented on FLINK-36743:
-

Thanks [~arvid]  for helping investigate the issue. I cherry-pick the fix of 
FLINK-31963 and test it , but it doesn't work. The exception is same.

!image-2024-11-21-20-20-20-536.png|width=1566,height=453!

!image-2024-11-21-20-20-41-644.png|width=981,height=226!

So I reopen the issue.

> Rescale from unaligend checkpoint failed
> 
>
> Key: FLINK-36743
> URL: https://issues.apache.org/jira/browse/FLINK-36743
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
> Attachments: 
> Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch,
>  image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, 
> image-2024-11-19-17-30-14-816.png, image-2024-11-21-20-20-20-536.png, 
> image-2024-11-21-20-20-41-644.png
>
>
> We encountered the following exception when scaling down a job from 5600 to 
> 4200:
> {code:java}
> 2024-11-12 19:20:54,308 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> xx (1358/1400) 
> (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) 
> switched from RUNNING to FAILED on 
> container_e33_1725519807238_6894116_01_000825 @ yg-
> java.lang.IllegalStateException: Cannot select 
> SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; 
> known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=4200}]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code}
>  * Flink version : 1.16.1
>  * unaligned checkpoint : enabled
>  * log-based checkpoint : enabled
> The exception encountered when restore from chk-2718336, and it can 
> successfully restore from chk-2718333. And I checked the metadata file of 
> chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like 
> there is something wrong with the unaligned checkpoint when reassign 
> in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (FLINK-36743) Rescale from unaligend checkpoint failed

2024-11-21 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang reopened FLINK-36743:
-

> Rescale from unaligend checkpoint failed
> 
>
> Key: FLINK-36743
> URL: https://issues.apache.org/jira/browse/FLINK-36743
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
> Attachments: 
> Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch,
>  image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, 
> image-2024-11-19-17-30-14-816.png, image-2024-11-21-20-20-20-536.png, 
> image-2024-11-21-20-20-41-644.png
>
>
> We encountered the following exception when scaling down a job from 5600 to 
> 4200:
> {code:java}
> 2024-11-12 19:20:54,308 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> xx (1358/1400) 
> (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) 
> switched from RUNNING to FAILED on 
> container_e33_1725519807238_6894116_01_000825 @ yg-
> java.lang.IllegalStateException: Cannot select 
> SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; 
> known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=4200}]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code}
>  * Flink version : 1.16.1
>  * unaligned checkpoint : enabled
>  * log-based checkpoint : enabled
> The exception encountered when restore from chk-2718336, and it can 
> successfully restore from chk-2718333. And I checked the metadata file of 
> chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like 
> there is something wrong with the unaligned checkpoint when reassign 
> in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25670:
URL: https://github.com/apache/flink/pull/25670#issuecomment-2490393427

   Reviewed by Chi on 21/11/24. Asked submitter questions
   
   @mehdid93 Looks good - but why are the CI tests failing?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36745][docs] Make FixedSizeSplitFetcherManager and FixedFetcherSizeSourceReader examples match Flink 2.0-preview API [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25667:
URL: https://github.com/apache/flink/pull/25667#issuecomment-2490395429

   Reviewed by Chi on 21/11/24 Need a committer to review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36745][docs] Make FixedSizeSplitFetcherManager and FixedFetcherSizeSourceReader examples match Flink 2.0-preview API [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25667:
URL: https://github.com/apache/flink/pull/25667#issuecomment-2490395974

   Reviewed by Chi on 21/11/24 Need a committer to review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] [docs] Update table environment variable name in common.md [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25669:
URL: https://github.com/apache/flink/pull/25669#issuecomment-2490393823

   Reviewed by Chi on 21/11/24. Asked submitter questions


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Event latency does not equal window triggering [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25664:
URL: https://github.com/apache/flink/pull/25664#issuecomment-2490402911

   Reviewed by Chi on 21/11/24 Approve - looking for committer to merge. Notice 
the tests are failing - but this is a docs change !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36704] Update TypeInference with StaticArgument and StateTypeStrategy [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25665:
URL: https://github.com/apache/flink/pull/25665#issuecomment-2490399670

   Reviewed by Chi on 21/11/24 Need a committer / subject area expert to 
review. Notice that the Tests are failing 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36066][runtime] Introducing the AdaptiveGraphManager component [flink]

2024-11-21 Thread via GitHub


noorall commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1851601660


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##
@@ -1969,8 +2032,12 @@ private static void 
setManagedMemoryFractionForSlotSharingGroup(
 final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
 final Map> vertexChainedConfigs =
 jobVertexBuildContext.getChainedConfigs();
-final Set groupOperatorIds =
+final Set jobVertexIds =
 slotSharingGroup.getJobVertexIds().stream()
+.filter(vertexOperators::containsKey)

Review Comment:
   > In which case the `slotSharingGroup` will contain a job vertex which is 
not included in the `vertexOperators`? Could you add some comments to explain 
it as it may not be obvious to other developers.
   
   In the progressive job graph generation algorithm, if the user specified the 
`SlotSharingGroupResource` or the `AllVerticesInSameSlotSharingGroupByDefault` 
is set to true, job vertices generated in different phase may be assigned to 
the same `slotSharingGroup`. Therefore, we need to filter out the job vertices 
that belong to the current phase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] fix:Event latency does not equal window triggering [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25663:
URL: https://github.com/apache/flink/pull/25663#issuecomment-2490406949

   Reviewed by Chi on 21/11/24 Close if duplicate


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36455] Sinks retry synchronously [1.20] [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25661:
URL: https://github.com/apache/flink/pull/25661#issuecomment-2490409340

   Reviewed by Chi on 21/11/24. Looks in hand, code conflicts and test failures 
currently


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36379] Improve (Global)Committer with UC disabled [1.20] [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25660:
URL: https://github.com/apache/flink/pull/25660#issuecomment-2490412290

   Reviewed by Chi on 21/11/24. Looks in hand, test failures currently


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] Add release announcement for Flink CDC 3.2.1 [flink-web]

2024-11-21 Thread via GitHub


ruanhang1993 opened a new pull request, #764:
URL: https://github.com/apache/flink-web/pull/764

   This PR adds release announcement for Flink CDC 3.2.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36764) Add checkpoint type to checkpoint trace

2024-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36764:
---
Labels: pull-request-available  (was: )

> Add checkpoint type to checkpoint trace
> ---
>
> Key: FLINK-36764
> URL: https://issues.apache.org/jira/browse/FLINK-36764
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.0.0
>
>
> Currently it's impossible to distinguish checkpoints from savepoints. Also it 
> would be handy to distinguish aligned and unaligned checkpoints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36704] Update TypeInference with StaticArgument and StateTypeStrategy [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25665:
URL: https://github.com/apache/flink/pull/25665#issuecomment-2490397302

   Reviewed by Chi on 21/11/24 Need a committer/ subject area expert to review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35825][hive] HiveTableSource supports report statistics for text file [flink]

2024-11-21 Thread via GitHub


reswqa commented on PR #25078:
URL: https://github.com/apache/flink/pull/25078#issuecomment-2490379694

   Thanks @xuyangzhong for the review, updated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25656:
URL: https://github.com/apache/flink/pull/25656#discussion_r1851611252


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -1029,6 +1029,8 @@ private TableResultInternal executeInternal(
 defaultJobName,
 jobStatusHookList);
 try {
+ClassLoader userClassLoader = 
Thread.currentThread().getContextClassLoader();

Review Comment:
   Please add coments and refer to the v2 implementation in the comments and 
that the v2 refactor is not going to be backported to 1.20. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25656:
URL: https://github.com/apache/flink/pull/25656#discussion_r1851611671


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -1069,8 +1072,11 @@ private TableResultInternal executeQueryOperation(
 
 Pipeline pipeline = generatePipelineFromQueryOperation(operation, 
transformations);
 try {
+ClassLoader userClassLoader = 
Thread.currentThread().getContextClassLoader();

Review Comment:
   please add unit tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-16077][docs] Translate "Custom State Serialization" page into Chinese [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25648:
URL: https://github.com/apache/flink/pull/25648#issuecomment-2490427387

   Reviewed by Chi on 21/11/24. Unable to review the translation.
   
   Notice the tests are failing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36764) Add checkpoint type to checkpoint trace

2024-11-21 Thread Piotr Nowojski (Jira)
Piotr Nowojski created FLINK-36764:
--

 Summary: Add checkpoint type to checkpoint trace
 Key: FLINK-36764
 URL: https://issues.apache.org/jira/browse/FLINK-36764
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing, Runtime / Metrics
Reporter: Piotr Nowojski
 Fix For: 2.0.0


Currently it's impossible to distinguish checkpoints from savepoints. Also it 
would be handy to distinguish aligned and unaligned checkpoints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25656:
URL: https://github.com/apache/flink/pull/25656#discussion_r1851610106


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java:
##
@@ -1029,6 +1029,8 @@ private TableResultInternal executeInternal(
 defaultJobName,
 jobStatusHookList);
 try {
+ClassLoader userClassLoader = 
Thread.currentThread().getContextClassLoader();

Review Comment:
   please could you change the variable name to be something like 
originalContextClassLoader 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25656:
URL: https://github.com/apache/flink/pull/25656#issuecomment-2490421787

   Reviewed by Chi on 21/11/24. Asked submitter questions. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Upgrade com.squareup.okio:okio [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25649:
URL: https://github.com/apache/flink/pull/25649#issuecomment-2490425525

   Reviewed by Chi on 21/11/24. Asked submitter questions
   
   Please could you raise a Jira detailing the reason you want to upgrade this 
component (e.g. is there a particular bug that this would fix)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25647:
URL: https://github.com/apache/flink/pull/25647#discussion_r1851618895


##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -707,6 +707,12 @@ public class TaskManagerOptions {
 "The %s mode tries to 
spread out the slots evenly across all available %s.",
 
code(TaskManagerLoadBalanceMode.SLOTS.name()),
 code("TaskManagers")),
+text(

Review Comment:
   NOT: its' -> it's



##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -707,6 +707,12 @@ public class TaskManagerOptions {
 "The %s mode tries to 
spread out the slots evenly across all available %s.",
 
code(TaskManagerLoadBalanceMode.SLOTS.name()),
 code("TaskManagers")),
+text(

Review Comment:
   NIT: its' -> it's



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36764) Add checkpoint type to checkpoint trace

2024-11-21 Thread Piotr Nowojski (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-36764:
--

Assignee: Piotr Nowojski

> Add checkpoint type to checkpoint trace
> ---
>
> Key: FLINK-36764
> URL: https://issues.apache.org/jira/browse/FLINK-36764
> Project: Flink
>  Issue Type: Improvement
>  Components: Runtime / Checkpointing, Runtime / Metrics
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
> Fix For: 2.0.0
>
>
> Currently it's impossible to distinguish checkpoints from savepoints. Also it 
> would be handy to distinguish aligned and unaligned checkpoints.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25647:
URL: https://github.com/apache/flink/pull/25647#discussion_r1851620823


##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -707,6 +707,12 @@ public class TaskManagerOptions {
 "The %s mode tries to 
spread out the slots evenly across all available %s.",
 
code(TaskManagerLoadBalanceMode.SLOTS.name()),
 code("TaskManagers")),
+text(
+"The %s mode tries to 
schedule evenly all tasks based on its' number across all available %s. "

Review Comment:
   I don't understand this sentence. I am not sure what "schedule evenly all 
tasks based on its' number" means. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36066][runtime] Introducing the AdaptiveGraphManager component [flink]

2024-11-21 Thread via GitHub


zhuzhurk commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1851265617


##
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java:
##
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
+import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
+import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.addVertexIndexPrefixInVertexName;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.connect;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createAndInitializeJobGraph;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createSourceChainInfo;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isSourceChainable;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.markSupportingConcurrentExecutionAttempts;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.serializeOperatorCoordinatorsAndStreamConfig;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setAllOperatorNonChainedOutputsConfigs;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setManagedMemoryFraction;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setPhysicalEdges;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharingAndCoLocation;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexDescription;
+import static 
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.validateHybridShuffleExecuteInBatchMode;
+
+/** Default implementation for {@link AdaptiveGraphGenerator}. */
+@Internal
+public class AdaptiveGraphManager implements AdaptiveGraphGenerator {
+
+private final StreamGraph streamGraph;
+
+private final JobGraph jobGraph;
+
+private final StreamGraphHasher defaultStreamGraphHasher;
+
+private final List legacyStreamGraphHasher;
+
+private final Executor serializationExecutor;
+
+private final AtomicInteger vertexIndexId;
+
+private final StreamGraphContext streamGraphContext;
+
+private final Map hashes;
+
+private final List> legacyHashes;
+
+// Records the id of stream node which job vertex is created.
+private final Map frozenNodeToStartNodeMap;
+
+// When the downstream vertex is not cr

Re: [PR] [FLINK-34545][cdc-pipeline-connector]Add OceanBase pipeline connector to Flink CDC [flink-cdc]

2024-11-21 Thread via GitHub


yuanoOo commented on PR #3360:
URL: https://github.com/apache/flink-cdc/pull/3360#issuecomment-2490158237

   @lvyanquan I fixed the above comments, please take a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36763) Support schema inference and evolution with single-table-mutliple-partition sources

2024-11-21 Thread Leonard Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Leonard Xu reassigned FLINK-36763:
--

Assignee: yux

> Support schema inference and evolution with single-table-mutliple-partition 
> sources
> ---
>
> Key: FLINK-36763
> URL: https://issues.apache.org/jira/browse/FLINK-36763
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Major
>
> Current schema evolution implementation implicitly assumes that there can't 
> be two partitions with the same TableID being consumed on different subTasks. 
> This is not always true for some weak-structured sources like Kafka and 
> MongoDB, where table schemas are not centralized and might evolve 
> independently in various parallelized tasks.
> It's very unlikely to be a trivial change, and might involves modifying most 
> part of pipeline implementation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36762) Add E2ECase to run sql client in application mode

2024-11-21 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-36762:
-

 Summary: Add E2ECase to run sql client in application mode
 Key: FLINK-36762
 URL: https://issues.apache.org/jira/browse/FLINK-36762
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Gateway
Reporter: Shengkai Fang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35282][python] Upgrade Apache Beam > 2.54 [flink]

2024-11-21 Thread via GitHub


snuyanzin commented on PR #25541:
URL: https://github.com/apache/flink/pull/25541#issuecomment-2490142517

   @dianfu , @HuangXingBo could you please have a look here, I think you are 
more experienced with python in Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36760) Support to deploy script via sql client

2024-11-21 Thread Shengkai Fang (Jira)
Shengkai Fang created FLINK-36760:
-

 Summary: Support to deploy script via sql client
 Key: FLINK-36760
 URL: https://issues.apache.org/jira/browse/FLINK-36760
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / Client
Reporter: Shengkai Fang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25647:
URL: https://github.com/apache/flink/pull/25647#discussion_r1851640826


##
docs/layouts/shortcodes/generated/all_taskmanager_section.html:
##
@@ -90,7 +90,7 @@
 taskmanager.load-balance.mode
 NONE
 Enum
-Mode for the load-balance allocation strategy across all 
available TaskManagers.The SLOTS mode tries to spread out the slots 
evenly across all available TaskManagers.The NONE mode is the default mode without any 
specified strategy.Possible 
values:"NONE""SLOTS"
+Mode for the load-balance allocation strategy across all 
available TaskManagers.The SLOTS mode tries to spread out the slots 
evenly across all available TaskManagers.The TASKS mode tries to schedule evenly all tasks 
based on its' number across all available TaskManagers. Note: Currently, enabling this 
parameter can only achieve the balancing effect of the slot level dimension of 
DefaultScheduler.The NONE mode is the default mode without any 
specified strategy.Possible 
values:"NONE""SLOTS""TASKS"

Review Comment:
   I am not sure what this sentence means - it would be really useful to have a 
diagram, and maybe an example  showing the SLOT and TASK modes and how they 
differ.
   
   It would also be useful to detail why you would choose TASK mode of SLOT 
mode.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25647:
URL: https://github.com/apache/flink/pull/25647#discussion_r1851640826


##
docs/layouts/shortcodes/generated/all_taskmanager_section.html:
##
@@ -90,7 +90,7 @@
 taskmanager.load-balance.mode
 NONE
 Enum
-Mode for the load-balance allocation strategy across all 
available TaskManagers.The SLOTS mode tries to spread out the slots 
evenly across all available TaskManagers.The NONE mode is the default mode without any 
specified strategy.Possible 
values:"NONE""SLOTS"
+Mode for the load-balance allocation strategy across all 
available TaskManagers.The SLOTS mode tries to spread out the slots 
evenly across all available TaskManagers.The TASKS mode tries to schedule evenly all tasks 
based on its' number across all available TaskManagers. Note: Currently, enabling this 
parameter can only achieve the balancing effect of the slot level dimension of 
DefaultScheduler.The NONE mode is the default mode without any 
specified strategy.Possible 
values:"NONE""SLOTS""TASKS"

Review Comment:
   I am not sure what this sentence means - it would be really useful to have a 
diagram, and maybe an example  showing the SLOT and TASK modes and how they 
differ.
   
   It would also be useful to detail why you would choose TASK mode of SLOT 
mode.  
   
   There is no information in the Jira explaining this 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25647:
URL: https://github.com/apache/flink/pull/25647#discussion_r1851640555


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java:
##
@@ -40,13 +41,16 @@ public SlotSharingExecutionSlotAllocatorFactory(
 PhysicalSlotProvider slotProvider,
 boolean slotWillBeOccupiedIndefinitely,
 PhysicalSlotRequestBulkChecker bulkChecker,
-Duration allocationTimeout) {
+Duration allocationTimeout,
+TaskManagerOptions.TaskManagerLoadBalanceMode 
taskManagerLoadBalanceMode) {
 this(
 slotProvider,
 slotWillBeOccupiedIndefinitely,
 bulkChecker,
 allocationTimeout,
-new LocalInputPreferredSlotSharingStrategy.Factory());
+taskManagerLoadBalanceMode == 
TaskManagerOptions.TaskManagerLoadBalanceMode.TASKS
+? new 
TaskBalancedPreferredSlotSharingStrategy.Factory()

Review Comment:
   I see that specifying TASKS means this factory will be used to load balance. 
I do not see the works task number in the factory implementation. It seems to 
be trying to colocate tasks in a slot. I am not sure when this would be 
preferable or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36688) table.optimizer.reuse-source-enabled may cause disordered metadata columns when reading from Kafka.

2024-11-21 Thread xuyang (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899969#comment-17899969
 ] 

xuyang commented on FLINK-36688:


You're right. There is a bug in ScanReuser. I'll fix it.

> table.optimizer.reuse-source-enabled may cause disordered metadata columns 
> when reading from Kafka.  
> -
>
> Key: FLINK-36688
> URL: https://issues.apache.org/jira/browse/FLINK-36688
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.20.0, 1.19.1, 2.0-preview
>Reporter: Yanquan Lv
>Priority: Major
>
> Metadata columns in Kafka need to maintain a fixed order: The metadata for 
> format needs to be at the beginning, while the metadata for Kafka 
> itself(partition/offset and so on) needs to be at the end. Kafka connector 
> will add fields of format first, and then add fields of Kafka later.
> However, reused Source did not maintain this order, witch may cause 
> ClassCastException.
> How to product:
> {code:java}
> create temporary table `message_channel_task_record`
> (
> origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL
> ,`partition` INT METADATA VIRTUAL
> ,`offset` BIGINT METADATA VIRTUAL
> ,id BIGINT comment '自增ID'
> ,PRIMARY KEY (`id`) NOT ENFORCED
> )
> with (
> 'connector'='kafka'
> xxx
> )
> ;
> create temporary table `sink`
> (
> origin_ts TIMESTAMP(3)
> ,`partition` INT
> ,`offset` BIGINT
> ,id BIGINT
> )
> WITH (
> 'connector'='print'
> )
> ;
> create temporary table `sr_sink`
> (
> id BIGINT comment '自增ID'
> )
> WITH (
> 'connector'='print'
> )
> ;
> -- EXPLAIN STATEMENT SET BEGIN
> BEGIN STATEMENT SET;
> INSERT INTO sink
> SELECT
> origin_ts
> ,`partition`
> ,`offset`
> ,id
> FROM message_channel_task_record
> ;
> INSERT INTO `sr_sink`
> SELECT
> id
> FROM `message_channel_task_record`
> ;
> END
> ;
>  {code}
> Explained plan:
> {code:java}
>  [558]:TableSourceScan(table=[[vvp, default, message_channel_task_record, 
> project=[id], metadata=[partition, value.ingestion-timestamp, offset]]], 
> fields=[id, partition, origin_ts, offset])
> :- [559]:Calc(select=[CAST(origin_ts AS TIMESTAMP(3)) AS origin_ts, 
> CAST(partition AS INTEGER) AS partition, CAST(offset AS BIGINT) AS offset, 
> id])
> :  +- [560]:Sink(table=[vvp.default.sink], fields=[origin_ts, partition, 
> offset, id])
>+- [562]:Sink(table=[vvp.default.sr_sink], fields=[id]) {code}
> Expected metadata column order is: value.ingestion-timestamp(format), 
> partition, offset;
> The actual metadata column order is: partition, 
> value.ingestion-timestamp(format), offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25647:
URL: https://github.com/apache/flink/pull/25647#discussion_r1851640826


##
docs/layouts/shortcodes/generated/all_taskmanager_section.html:
##
@@ -90,7 +90,7 @@
 taskmanager.load-balance.mode
 NONE
 Enum
-Mode for the load-balance allocation strategy across all 
available TaskManagers.The SLOTS mode tries to spread out the slots 
evenly across all available TaskManagers.The NONE mode is the default mode without any 
specified strategy.Possible 
values:"NONE""SLOTS"
+Mode for the load-balance allocation strategy across all 
available TaskManagers.The SLOTS mode tries to spread out the slots 
evenly across all available TaskManagers.The TASKS mode tries to schedule evenly all tasks 
based on its' number across all available TaskManagers. Note: Currently, enabling this 
parameter can only achieve the balancing effect of the slot level dimension of 
DefaultScheduler.The NONE mode is the default mode without any 
specified strategy.Possible 
values:"NONE""SLOTS""TASKS"

Review Comment:
   I am not sure what this sentence means - it would be really useful to have a 
diagram, and maybe an example  showing the SLOT and TASK modes and how they 
differ.
   
   It would also be useful to detail why you would choose TASK mode of SLOT 
mode.  
   
   I was looking for more information in the Jira - it should point to the Flip 
.
   
   I see some of the conversations in the PR as to why this might be needed. 
Could we summarize these considerations in the docs   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]

2024-11-21 Thread via GitHub


pnowojski opened a new pull request, #25671:
URL: https://github.com/apache/flink/pull/25671

   
   ## What is the purpose of the change
   
   Add checkpoint type and unaligned flag to the checkpoint trace
   
   ## Verifying this change
   
   Expanded unit test.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (not applicable / **docs** / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]

2024-11-21 Thread via GitHub


flinkbot commented on PR #25671:
URL: https://github.com/apache/flink/pull/25671#issuecomment-2490481251

   
   ## CI report:
   
   * fe598c5420a55cc01a1e8fc92f6cbb4907ac9d49 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]

2024-11-21 Thread via GitHub


davidradl commented on PR #25647:
URL: https://github.com/apache/flink/pull/25647#issuecomment-2490489348

   Reviewed by Chi on 21/11/24. Asked submitter questions. Mostly to make it 
clear when to configure this new option, by bringing in appropriate Flip 
content and reasoning into the docs. Ideally including diagrams to show the 
need for the new option.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36685) Enable update/create operation on flinkdeployment resource in mutation webhook

2024-11-21 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-36685:
---
Priority: Minor  (was: Blocker)

> Enable update/create operation on flinkdeployment resource in mutation webhook
> --
>
> Key: FLINK-36685
> URL: https://issues.apache.org/jira/browse/FLINK-36685
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.9.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Minor
>  Labels: pull-request-available
>
> In mutation webhook yaml of the helm chart, UPDATE/CREATE operation is not 
> allowed on 
> flinkdeployments. We use mutation webhook to inject platform secrets to the 
> flink pipeline CRD. Planned to add a PR to enable UPDATE/CREATE operation on 
> flinkdeployments resource. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]

2024-11-21 Thread via GitHub


lvyanquan opened a new pull request, #3753:
URL: https://github.com/apache/flink-cdc/pull/3753

   The goal is to extend 
[flink-cdc](https://issues.apache.org/jira/browse/FLINK-cdc) with the 
capability to invoke AI models during the data stream processing workflow, with 
a particular focus on supporting array data structures. This feature will allow 
users to easily integrate embedding models to handle array-based data, such as 
lists of text, numeric data, or other multi-dimensional arrays, within their 
Flink jobs.
   
   Co-auther with @proletarians.
   And based on https://github.com/apache/flink-cdc/pull/3642 and 
https://github.com/apache/flink-cdc/pull/3434


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36646] Test different versions of the JDK in the Flink image [flink-kubernetes-operator]

2024-11-21 Thread via GitHub


gyfora commented on PR #910:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/910#issuecomment-2490732713

   Closing this as it was merged in another PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36646] Test different versions of the JDK in the Flink image [flink-kubernetes-operator]

2024-11-21 Thread via GitHub


gyfora closed pull request #910: [FLINK-36646] Test different versions of the 
JDK in the Flink image
URL: https://github.com/apache/flink-kubernetes-operator/pull/910


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-36765) How to Handle Multi-Type Maps in Avro Schema with Flink Table API?

2024-11-21 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900049#comment-17900049
 ] 

david radley edited comment on FLINK-36765 at 11/21/24 2:07 PM:


I was trying to find where the Avro specification says that multiple values are 
allowed for the map values, do you know where this is documented as part of the 
specification. 

Could you use Record here instead of a map? 


was (Author: JIRAUSER300523):
I was trying to find where the Avro specification says that multiple values are 
allowed for the map values, do you know where this is documented as part of the 
specification. 

> How to Handle Multi-Type Maps in Avro Schema with Flink Table API?
> --
>
> Key: FLINK-36765
> URL: https://issues.apache.org/jira/browse/FLINK-36765
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Formats (JSON, Avro, 
> Parquet, ORC, SequenceFile)
>Reporter: Maneendra
>Priority: Major
>
>  
> I have a Map with multiple data types in my Avro schema, which I am trying to 
> use in the Flink Table API to read data from Kafka. However, I’m encountering 
> the following exception because the Flink AvroSchemaConverter does not 
> support Maps with mixed data types. Could someone assist me in parsing this 
> schema using the Table API?
> FLink Code: String avroSchema="";
> DataType s = AvroSchemaConverter.convertToDataType(avroSchema);
>     Schema schema1 = Schema.newBuilder().fromRowDataType(s).build();
>     
>     TableDescriptor descriptor = TableDescriptor.forConnector("kafka")
>             .schema(schema)
>             .comment("simple comment")
>             .option("topic", "")
>             .option("properties.application.id", "")
>             .option("properties.security.protocol", "")
>             .option("properties.bootstrap.servers", "")
>             .option("properties.group.id", "")
>             .option("properties.auto.offset.reset", "earliest")
>             .option("format", "avro")
>             .build();
> Avro Schema:
> {
>    "name":"standByProperties",
>    "type":[
>       "null",
>       {
>          "type":"map",
>          "values":[
>             "null",
>             "boolean",
>             "int"
>          ]
>       }
>    ]
> },
> Output: standByProperties MAP NULL> Exception: Exception in thread "main" 
> java.lang.UnsupportedOperationException: Unsupported to derive Schema for 
> type: RAW('java.lang.Object', ?) NOT NULL at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:580)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:568)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:549)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416)
> What I Tried: I defined an Avro schema that includes a Map field with values 
> of mixed data types. Used the Flink Table API to read data from Kafka and 
> attempted to use AvroSchemaConverter to map the schema to a Flink table. 
> During execution, I encountered an exception because the AvroSchemaConverter 
> does not support Maps with multiple value types. What I Was Expecting: I was 
> expecting Flink to handle the Map field and correctly parse the data into a 
> table format, with proper support for the mixed data types within the Map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36769] support fury serializer for pyflink [flink]

2024-11-21 Thread via GitHub


flinkbot commented on PR #25672:
URL: https://github.com/apache/flink/pull/25672#issuecomment-2491305074

   
   ## CI report:
   
   * c8a1d5309bca19c7d3880fd8b86b1b6b65bd4a07 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]

2024-11-21 Thread via GitHub


RocMarshal commented on code in PR #25647:
URL: https://github.com/apache/flink/pull/25647#discussion_r1852202449


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java:
##
@@ -40,13 +41,16 @@ public SlotSharingExecutionSlotAllocatorFactory(
 PhysicalSlotProvider slotProvider,
 boolean slotWillBeOccupiedIndefinitely,
 PhysicalSlotRequestBulkChecker bulkChecker,
-Duration allocationTimeout) {
+Duration allocationTimeout,
+TaskManagerOptions.TaskManagerLoadBalanceMode 
taskManagerLoadBalanceMode) {
 this(
 slotProvider,
 slotWillBeOccupiedIndefinitely,
 bulkChecker,
 allocationTimeout,
-new LocalInputPreferredSlotSharingStrategy.Factory());
+taskManagerLoadBalanceMode == 
TaskManagerOptions.TaskManagerLoadBalanceMode.TASKS
+? new 
TaskBalancedPreferredSlotSharingStrategy.Factory()

Review Comment:
   In fact, the phenomenon of uneven task scheduling results is mainly caused 
by two stages:
   1. tasks->slot
   2. slots->taskmanager
   Because prior to the initial feature freeze, we anticipated that the entire 
FLIP would be at risk. Our original intention in initiating this PR was to 
release this change as early as possible to address the imbalance issue in the 
task ->slot dimension. But now there is no time risk with this feature, so we 
plan to release both dimensions of functionality together.
   BTW, the optimizations and changes mentioned above are highly likely not to 
be fixed in the current PR as it may be abandoned.
   But we will place the corresponding optimization in the appropriate PR 
position in the future. If you are willing to help with the review, I would 
greatly appreciate it! :)



##
docs/layouts/shortcodes/generated/all_taskmanager_section.html:
##
@@ -90,7 +90,7 @@
 taskmanager.load-balance.mode
 NONE
 Enum
-Mode for the load-balance allocation strategy across all 
available TaskManagers.The SLOTS mode tries to spread out the slots 
evenly across all available TaskManagers.The NONE mode is the default mode without any 
specified strategy.Possible 
values:"NONE""SLOTS"
+Mode for the load-balance allocation strategy across all 
available TaskManagers.The SLOTS mode tries to spread out the slots 
evenly across all available TaskManagers.The TASKS mode tries to schedule evenly all tasks 
based on its' number across all available TaskManagers. Note: Currently, enabling this 
parameter can only achieve the balancing effect of the slot level dimension of 
DefaultScheduler.The NONE mode is the default mode without any 
specified strategy.Possible 
values:"NONE""SLOTS""TASKS"

Review Comment:
   Good idea~, we do need a more detailed and precise description, as well as 
traceable design documents.
   How about making the following changes based on your suggestion?
   1. Optimize the configuration description section in the code, including 
adding FLIP addresses
   2. The explanation of how to use and the differences between it and other 
configurations, as well as the creation of charts, can be placed in 
https://issues.apache.org/jira/browse/FLINK-33392 Completed in the middle



##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -707,6 +707,12 @@ public class TaskManagerOptions {
 "The %s mode tries to 
spread out the slots evenly across all available %s.",
 
code(TaskManagerLoadBalanceMode.SLOTS.name()),
 code("TaskManagers")),
+text(
+"The %s mode tries to 
schedule evenly all tasks based on its' number across all available %s. "

Review Comment:
   The meaning expressed here is that the scheduler will try its best to ensure 
that the scheduling results are concentrated and the number of tasks on each 
Taskmanager is similar.
   I will add a more detailed description
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36769) Suport fury Serializer for pyflink

2024-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36769:
---
Labels: pull-request-available  (was: )

> Suport fury Serializer for pyflink
> --
>
> Key: FLINK-36769
> URL: https://issues.apache.org/jira/browse/FLINK-36769
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.16.1
> Environment: flink 1.16.1
>Reporter: xingyuan cheng
>Priority: Major
>  Labels: pull-request-available
>
> Hi, community. Currently, in the batch verification scenario of our algorithm 
> data, we use pyflink and encounter low transmission efficiency caused by low 
> performance of pickle4-based encoding. After research, we decided to adopt 
> Apache fury, a serialization framework based on pickle5 encoding. The 
> implementation of fury in python will define the transmission buffer size in 
> the protocol for transmission to improve the performance of large data 
> transmission. To this end, I Prepared a draft pull request. What do friends 
> in the community think about this?
>  
> Pickle protocol 5 with out-of-band data:  https://peps.python.org/pep-0574/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36765) How to Handle Multi-Type Maps in Avro Schema with Flink Table API?

2024-11-21 Thread Maneendra (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900079#comment-17900079
 ] 

Maneendra commented on FLINK-36765:
---

[~davidradl] 
The existing schema, which contains map values with multiple data types, works 
with the Flink DataStream API. However, while migrating the codebase to the 
Flink Table API, we encountered an issue with this schema. We need to resolve 
this issue because multiple teams consume the data, making it impractical to 
modify the source schema.
 
 

> How to Handle Multi-Type Maps in Avro Schema with Flink Table API?
> --
>
> Key: FLINK-36765
> URL: https://issues.apache.org/jira/browse/FLINK-36765
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Formats (JSON, Avro, 
> Parquet, ORC, SequenceFile)
>Reporter: Maneendra
>Priority: Major
>
>  
> I have a Map with multiple data types in my Avro schema, which I am trying to 
> use in the Flink Table API to read data from Kafka. However, I’m encountering 
> the following exception because the Flink AvroSchemaConverter does not 
> support Maps with mixed data types. Could someone assist me in parsing this 
> schema using the Table API?
> FLink Code: String avroSchema="";
> DataType s = AvroSchemaConverter.convertToDataType(avroSchema);
>     Schema schema1 = Schema.newBuilder().fromRowDataType(s).build();
>     
>     TableDescriptor descriptor = TableDescriptor.forConnector("kafka")
>             .schema(schema)
>             .comment("simple comment")
>             .option("topic", "")
>             .option("properties.application.id", "")
>             .option("properties.security.protocol", "")
>             .option("properties.bootstrap.servers", "")
>             .option("properties.group.id", "")
>             .option("properties.auto.offset.reset", "earliest")
>             .option("format", "avro")
>             .build();
> Avro Schema:
> {
>    "name":"standByProperties",
>    "type":[
>       "null",
>       {
>          "type":"map",
>          "values":[
>             "null",
>             "boolean",
>             "int"
>          ]
>       }
>    ]
> },
> Output: standByProperties MAP NULL> Exception: Exception in thread "main" 
> java.lang.UnsupportedOperationException: Unsupported to derive Schema for 
> type: RAW('java.lang.Object', ?) NOT NULL at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:580)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:568)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:549)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416)
> What I Tried: I defined an Avro schema that includes a Map field with values 
> of mixed data types. Used the Flink Table API to read data from Kafka and 
> attempted to use AvroSchemaConverter to map the schema to a Flink table. 
> During execution, I encountered an exception because the AvroSchemaConverter 
> does not support Maps with multiple value types. What I Was Expecting: I was 
> expecting Flink to handle the Map field and correctly parse the data into a 
> table format, with proper support for the mixed data types within the Map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36766) Use pyflink to create remote env

2024-11-21 Thread William Que (Jira)
William Que created FLINK-36766:
---

 Summary: Use pyflink to create remote env
 Key: FLINK-36766
 URL: https://issues.apache.org/jira/browse/FLINK-36766
 Project: Flink
  Issue Type: Bug
  Components: API / Python
Affects Versions: 1.19.1, 1.20.0
 Environment: Ubuntu 24 LTSC

Flink : 1.19.1  or 1.20.0

 
Reporter: William Que


I use the following codes to connect remote flink cluster and  then create a 
remote flink env. After adding jar files to the streamExecutionEnvironment, 
evary time when executing flink sql, error will be reported, something like 
error of parsing yaml file.
{code:java}
import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.java_gateway import get_gateway
from pyflink.table import StreamTableEnvironment

gateway = get_gateway()
string_class = gateway.jvm.String
string_array = gateway.new_array(string_class, 0)
stream_env = 
gateway.jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
j_stream_exection_environment = 
stream_env.createRemoteEnvironment("master",8081,string_array)
env = StreamExecutionEnvironment(j_stream_exection_environment) 

jars_path = "F:/jars/flink-1.19.1/"
jar_files = ["file:///" + jars_path + f for f in os.listdir(jars_path) if 
f.endswith('.jar')]
jar_files_str = ';'.join(jar_files)
env.add_jars(*jar_files)   ## Cause Error

t_env = StreamTableEnvironment.create(env)
{code}
 

Then I trace the error, and find it caused by a static method in 
configuration.py of pyflink package. after env.add_jars(*jar_files) , the value 
parameter will be like this, which caused the above error.

value = 
'{color:#FF}[];{color}file:///F:/software/jars-flink3/flink-clients-1.20.0.jar;file:///F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar;..'
{code:java}
    @staticmethod
    def parse_jars_value(value: str, jvm):
        is_standard_yaml = 
jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml()
        if is_standard_yaml:
            from ruamel.yaml import YAML
            yaml = YAML(typ='safe')
            jar_urls_list = yaml.load(value)  # ERROR 
            if isinstance(jar_urls_list, list):
                return jar_urls_list
        return value.split(";") {code}
 

I once tried to fix it by the way of removing "[];" part from the value of 
value parameter, one problem solved but another then came out, it seems jar 
files have been added twice in classpath at some place. 
{code:java}
Caused by: java.lang.IllegalStateException: The library registration references 
a different set of library BLOBs than previous registrations for this job:
old:[file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, 
file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, 
file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar, 
file:/F:/software/jars-flink3/flink-json-1.20.0.jar, 
file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, 
file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar]
new:[file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, 
file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, 
file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, 
file:/F:/software/jars-flink3/flink-json-1.20.0.jar, 
file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar, 
file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar, 
file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, 
file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, 
file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, 
file:/F:/software/jars-flink3/flink-json-1.20.0.jar, 
file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar, 
file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar]{code}
 

Please check it carefullly.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36767) Bump cyclonedx-maven-plugin from 2.7.9 to 2.9.0

2024-11-21 Thread Siddharth R (Jira)
Siddharth R created FLINK-36767:
---

 Summary: Bump cyclonedx-maven-plugin from 2.7.9 to 2.9.0
 Key: FLINK-36767
 URL: https://issues.apache.org/jira/browse/FLINK-36767
 Project: Flink
  Issue Type: Improvement
  Components: Kubernetes Operator
Affects Versions: kubernetes-operator-1.11.0
Reporter: Siddharth R


Bump cyclonedx-maven-plugin from 2.7.9 to 2.9.0 to remediate the findings in 
the dependant packages.

Vulnerabilities from dependencies:
[CVE-2024-38374|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-38374]

 

Package details:

[https://mvnrepository.com/artifact/org.cyclonedx/cyclonedx-maven-plugin/2.9.0]

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36686) allow customizing env variables for flink-webhook container

2024-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36686:
---
Labels: pull-request-available  (was: )

> allow customizing env variables for flink-webhook container
> ---
>
> Key: FLINK-36686
> URL: https://issues.apache.org/jira/browse/FLINK-36686
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.9.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Blocker
>  Labels: pull-request-available
>
> In current helm chart of the operator, there is no way to pass in custom env 
> variables into the flink-webhook container. I can see a few options to fix 
> the problem:
>  * make Values.operatorPod.env pod-level envs in flink-operator.yaml
>  * create a new webhoodPod.env in values.yaml and apply it only to the 
> flink-webhook container env setup
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25671:
URL: https://github.com/apache/flink/pull/25671#discussion_r1851895450


##
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java:
##
@@ -385,11 +384,10 @@ public void addSpan(SpanBuilder spanBuilder) {
 
tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null));
 
 assertThat(reportedSpans.size()).isEqualTo(1);
-assertThat(
-reportedSpans.stream()
-.map(span -> 
span.getAttributes().get("checkpointId"))
-.collect(Collectors.toList()))
-.containsExactly(42L);
+Span reportedSpan = Iterables.getOnlyElement(reportedSpans);
+
assertThat(reportedSpan.getAttributes().get("checkpointId")).isEqualTo(42L);
+
assertThat(reportedSpan.getAttributes().get("checkpointType")).isEqualTo("Checkpoint");
+
assertThat(reportedSpan.getAttributes().get("isUnaligned")).isEqualTo("false");

Review Comment:
   can we add a test for` isUnaligned ` true. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36743) Rescale from unaligend checkpoint failed

2024-11-21 Thread Feifan Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Feifan Wang updated FLINK-36743:

Attachment: image-2024-11-21-20-20-41-644.png

> Rescale from unaligend checkpoint failed
> 
>
> Key: FLINK-36743
> URL: https://issues.apache.org/jira/browse/FLINK-36743
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Reporter: Feifan Wang
>Priority: Major
> Attachments: 
> Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch,
>  image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, 
> image-2024-11-19-17-30-14-816.png, image-2024-11-21-20-20-20-536.png, 
> image-2024-11-21-20-20-41-644.png
>
>
> We encountered the following exception when scaling down a job from 5600 to 
> 4200:
> {code:java}
> 2024-11-12 19:20:54,308 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> xx (1358/1400) 
> (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) 
> switched from RUNNING to FAILED on 
> container_e33_1725519807238_6894116_01_000825 @ yg-
> java.lang.IllegalStateException: Cannot select 
> SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; 
> known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, 
> outputSubtaskIndex=4200}]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937)
>  ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) 
> ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT]
>     at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code}
>  * Flink version : 1.16.1
>  * unaligned checkpoint : enabled
>  * log-based checkpoint : enabled
> The exception encountered when restore from chk-2718336, and it can 
> successfully restore from chk-2718333. And I checked the metadata file of 
> chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like 
> there is something wrong with the unaligned checkpoint when reassign 
> in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36066][runtime] Introducing the AdaptiveGraphManager component [flink]

2024-11-21 Thread via GitHub


zhuzhurk commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1851918943


##
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManagerTest.java:
##
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
+
+import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link AdaptiveGraphManager}. */
+public class AdaptiveGraphManagerTest extends JobGraphGeneratorTestBase {
+@Override
+JobGraph createJobGraph(StreamGraph streamGraph) {
+return generateJobGraphInLazilyMode(streamGraph);
+}
+
+@Test
+@Override
+void testManagedMemoryFractionForUnknownResourceSpec() throws Exception {
+final ResourceSpec resource = ResourceSpec.UNKNOWN;
+final List resourceSpecs =
+Arrays.asList(resource, resource, resource, resource);
+
+final Configuration taskManagerConfig =
+new Configuration() {
+{
+set(
+
TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS,
+new HashMap() {
+{
+put(
+TaskManagerOptions
+
.MANAGED_MEMORY_CONSUMER_NAME_OPERATOR,
+"6");
+put(
+TaskManagerOptions
+
.MANAGED_MEMORY_CONSUMER_NAME_PYTHON,
+"4");
+}
+});
+}
+};
+
+final List> 
operatorScopeManagedMemoryUseCaseWeights =
+new ArrayList<>();
+final List> slotScopeManagedMemoryUseCases = 
new ArrayList<>();
+
+// source: batch
+operatorScopeManagedMemoryUseCaseWeights.add(
+Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
+slotScopeManagedMemoryUseCases.add(Collections.emptySet());
+
+// map1: batch, python
+operatorScopeManagedMemoryUseCaseWeights.add(
+Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1));
+
slotScopeManagedMemoryUseCases.add(Collections.singleton(ManagedMemoryUseCase.PYTHON));
+
+// map3: python
+operatorScopeManagedMemoryUseCaseWeights.add(Collections.emptyMap());
+
slotScopeManagedMemoryUseCases.add(Collections.singleton(ManagedMemoryUseCase.PYTHON));
+
+   

Re: [PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]

2024-11-21 Thread via GitHub


pnowojski commented on code in PR #25671:
URL: https://github.com/apache/flink/pull/25671#discussion_r1851984375


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java:
##
@@ -256,7 +256,13 @@ private void 
logCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
 .setAttribute("checkpointId", 
checkpointStats.getCheckpointId())
 .setAttribute("fullSize", 
checkpointStats.getStateSize())
 .setAttribute("checkpointedSize", 
checkpointStats.getCheckpointedSize())
-.setAttribute("checkpointStatus", 
checkpointStats.getStatus().name()));
+.setAttribute("checkpointStatus", 
checkpointStats.getStatus().name())
+.setAttribute(
+"isUnaligned",
+
Boolean.toString(checkpointStats.isUnalignedCheckpoint()))
+.setAttribute(
+"checkpointType",
+
checkpointStats.getProperties().getCheckpointType().getName()));

Review Comment:
   The potential values here are:
   ```
   public static final CheckpointType CHECKPOINT =
   new CheckpointType("Checkpoint", 
SharingFilesStrategy.FORWARD_BACKWARD);
   
   public static final CheckpointType FULL_CHECKPOINT =
   new CheckpointType("Full Checkpoint", 
SharingFilesStrategy.FORWARD);
   
   public static SavepointType savepoint(SavepointFormatType formatType) {
   return new SavepointType("Savepoint", PostCheckpointAction.NONE, 
formatType);
   }
   
   public static SavepointType terminate(SavepointFormatType formatType) {
   return new SavepointType("Terminate Savepoint", 
PostCheckpointAction.TERMINATE, formatType);
   }
   
   public static SavepointType suspend(SavepointFormatType formatType) {
   return new SavepointType("Suspend Savepoint", 
PostCheckpointAction.SUSPEND, formatType);
   }
   ```
   
   Savepoints do end up here, and unfortunately the `Checkpoint` name here is a 
bit overloaded. However this is also consistent with other places in the code, 
especially metrics/logs. There savepoints are also reported/logged as 
checkpoints. So I'm not sure that I would be fine the to change the trace's 
`Checkpoint` name  here to `Snapshot`.
   
   I think `SharingFilesStrategy` is superfluous. Each snapshot type is named 
uniquely, so you can deduce sharing strategy based on the name/type.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-36768) Empty Checkpoint Directory created when older checkpoint fails due to timeout and the checkpoint interval is same

2024-11-21 Thread Eaugene Thomas (Jira)
Eaugene Thomas created FLINK-36768:
--

 Summary: Empty Checkpoint Directory created when older checkpoint 
fails due to timeout and the checkpoint interval is same 
 Key: FLINK-36768
 URL: https://issues.apache.org/jira/browse/FLINK-36768
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.12.1
Reporter: Eaugene Thomas


When the checkpoint interval equals the checkpoint timeout, and the failure 
threshold is reached, the asynchronous checkpoint trigger starts. However, due 
to an internal restart, the tombstone directory ({{{}chk-{}}}) is not 
cleared.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-31836][table] Upgrade Calcite version to 1.34.0 [flink]

2024-11-21 Thread via GitHub


twalthr commented on code in PR #24256:
URL: https://github.com/apache/flink/pull/24256#discussion_r1852228055


##
docs/content.zh/docs/dev/table/sql/queries/deduplication.md:
##
@@ -32,13 +32,10 @@ Flink 使用 `ROW_NUMBER()` 去除重复数据,就像 Top-N 查询一样。其
 下面的例子展示了去重语句的语法:
 
 ```sql
-SELECT [column_list]
-FROM (
-   SELECT [column_list],
- ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
-   ORDER BY time_attr [asc|desc]) AS rownum
-   FROM table_name)
-WHERE rownum = 1
+SELECT [column_list],

Review Comment:
   Can we use this instead for deduplication? The Top-N makes sense.
   ```
   SELECT [column_list]
   FROM table_name
   QUALIFY ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr 
[asc|desc]) = 1
   ```



##
docs/content/docs/dev/table/concepts/versioned_tables.md:
##
@@ -163,13 +163,10 @@ table usable in subsequent queries.
 In general, the results of a query with the following format produces a 
versioned table:
 
 ```sql
-SELECT [column_list]
-FROM (
-   SELECT [column_list],
- ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
-   ORDER BY time_attr DESC) AS rownum
-   FROM table_name)
-WHERE rownum = 1
+SELECT [column_list],
+ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr 
[asc|desc]) AS rownum
+FROM table_name)
+QUALIFY rownum = 1

Review Comment:
   use same as for dedup



##
docs/content.zh/docs/dev/table/sql/queries/topn.md:
##
@@ -32,13 +32,11 @@ Flink 使用 `OVER` 窗口子句和过滤条件的组合来表达一个 Top-N 
 下面展示了 Top-N 的语法:
 
 ```sql
-SELECT [column_list]
-FROM (
-   SELECT [column_list],
- ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
-   ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
-   FROM table_name)
-WHERE rownum <= N [AND conditions]
+SELECT [column_list],
+ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr 
[asc|desc]) AS rownum
+FROM table_name)

Review Comment:
   ```
   FROM table_name
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]

2024-11-21 Thread via GitHub


davidradl commented on code in PR #25671:
URL: https://github.com/apache/flink/pull/25671#discussion_r1851894006


##
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java:
##
@@ -256,7 +256,13 @@ private void 
logCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
 .setAttribute("checkpointId", 
checkpointStats.getCheckpointId())
 .setAttribute("fullSize", 
checkpointStats.getStateSize())
 .setAttribute("checkpointedSize", 
checkpointStats.getCheckpointedSize())
-.setAttribute("checkpointStatus", 
checkpointStats.getStatus().name()));
+.setAttribute("checkpointStatus", 
checkpointStats.getStatus().name())
+.setAttribute(
+"isUnaligned",
+
Boolean.toString(checkpointStats.isUnalignedCheckpoint()))
+.setAttribute(
+"checkpointType",
+
checkpointStats.getProperties().getCheckpointType().getName()));

Review Comment:
   in this case I assume that the name will always be "Checkpoint". I assume 
savepoints do not come through here. If save points can come through here then 
the span.builder first line should probable be "Snapshot" rather than 
"Checkpoint".
   I am thinking that the SnapshotType's SharingFilesStrategy - would be good 
to include here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36535) Optimize the scale down logic based on historical parallelism

2024-11-21 Thread Rui Fan (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900022#comment-17900022
 ] 

Rui Fan commented on FLINK-36535:
-

Thanks [~heigebupahei]  for the comment!
{quote} I think we only need to change the strategy to max of window and 
abandon the previous last value of window.
{quote}
I think about it again, your suggestion is reasonable, I have updated the Jira 
description. Also I have finished the POC in our production environment. It 
works as expected:
 * The rescale frequency is extremely low: Previously, the job was rescaled 10 
times a day, but now it is rescaled less than once a day on average.
 * Jobs run at the same parallelism as during the peak hours of each day. 
 * Scale down only happens when the parallelism during peak hours still wastes 
resources.

> Optimize the scale down logic based on historical parallelism
> -
>
> Key: FLINK-36535
> URL: https://issues.apache.org/jira/browse/FLINK-36535
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale 
> down to avoid frequent rescaling.
> h1. Proposed Change
> Treat scale-down.interval as a window:
>  * Recording the scale down trigger time when the recommended parallelism < 
> current parallelism
>  ** When the recommended parallelism >= current parallelism, cancel the 
> triggered scale down
>  * The scale down will be executed when currentTime - triggerTime > 
> scale-down.interval
>  ** {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * {color:#de350b}Change2{color}: Never scale down when currentTime - 
> triggerTime < scale-down.interval
>  ** In the FLINK-36018, the scale down may be executed when currentTime - 
> triggerTime < scale-down.interval.
>  ** For example: the taskA may scale down when taskB needs to scale up.
> h1. Background
> Some critical Flink jobs need to scale up in time, but only scale down on a 
> daily basis. In other words, Flink users do not want Flink jobs to be scaled 
> down multiple times within 24 hours, and the jobs run at the same parallelism 
> as during the peak hours of each day. 
> Note: Users hope to scale down only happens when the parallelism during peak 
> hours is still a waste of resources. This is a trade-off between downtime and 
> resource waste for a critical job.
> h1. Current solution
> In general, this requirement could be met after setting{color:#de350b} 
> job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 
> parallelism, and recommended parallelism is 100 during the peak hours of each 
> day. We hope taskA doesn't rescale forever, because the triggered scale down 
> will be canceled once the recommended parallelism >= current parallelism 
> within 24 hours (It‘s exactly what FLINK-36018 does).
> h1. Unexpected Scenario & how to solve?
> But I found the critical production job is still rescaled about 10 times 
> every day (when scale-down.interval is set to 24 hours).
> Root cause: There may be many sources in a job, and the traffic peaks of 
> these sources may occur at different times. When taskA triggers scale down, 
> the scale down of taskA will not be actively executed within 24 hours, but it 
> may be executed when other tasks are scaled up.
> For example:
>  * The scale down of sourceB and sourceC may be executed when SourceA scales 
> up.
>  * After a while, the scale down of sourceA and sourceC may be executed when 
> SourceB scales up.
>  * After a while, the scale down of sourceA and sourceB may be executed when 
> SourceC scales up.
>  * When there are many tasks, the above 3 steps will be executed repeatedly.
> That's why the job is rescaled about 10 times every day, the 
> {color:#de350b}change2{color} of proposed change could solve this issue: 
> Never scale down when currentTime - triggerTime < scale-down.interval.
>  
> {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * It can ensure that the parallelism after scaling down is the parallelism 
> at yesterday's peak.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36739) Update NodeJS to v22 (LTS)

2024-11-21 Thread Mehdi (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mehdi updated FLINK-36739:
--
Summary: Update NodeJS to v22 (LTS)  (was: Update NodeJS to v18.20.5 (LTS))

> Update NodeJS to v22 (LTS)
> --
>
> Key: FLINK-36739
> URL: https://issues.apache.org/jira/browse/FLINK-36739
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / Web Frontend
>Reporter: Mehdi
>Assignee: Mehdi
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36765) How to Handle Multi-Type Maps in Avro Schema with Flink Table API?

2024-11-21 Thread Maneendra (Jira)
Maneendra created FLINK-36765:
-

 Summary: How to Handle Multi-Type Maps in Avro Schema with Flink 
Table API?
 Key: FLINK-36765
 URL: https://issues.apache.org/jira/browse/FLINK-36765
 Project: Flink
  Issue Type: Bug
  Components: API / Type Serialization System, Formats (JSON, Avro, 
Parquet, ORC, SequenceFile)
Reporter: Maneendra


 


I have a Map with multiple data types in my Avro schema, which I am trying to 
use in the Flink Table API to read data from Kafka. However, I’m encountering 
the following exception because the Flink AvroSchemaConverter does not support 
Maps with mixed data types. Could someone assist me in parsing this schema 
using the Table API?

FLink Code: String avroSchema="";

DataType s = AvroSchemaConverter.convertToDataType(avroSchema);
    Schema schema1 = Schema.newBuilder().fromRowDataType(s).build();
    
    TableDescriptor descriptor = TableDescriptor.forConnector("kafka")
            .schema(schema)
            .comment("simple comment")
            .option("topic", "")
            .option("properties.application.id", "")
            .option("properties.security.protocol", "")
            .option("properties.bootstrap.servers", "")
            .option("properties.group.id", "")
            .option("properties.auto.offset.reset", "earliest")
            .option("format", "avro")
            .build();
Avro Schema:
{
   "name":"standByProperties",
   "type":[
      "null",
      {
         "type":"map",
         "values":[
            "null",
            "boolean",
            "int"
         ]
      }
   ]
},

Output: standByProperties MAP Exception: Exception in thread "main" 
java.lang.UnsupportedOperationException: Unsupported to derive Schema for type: 
RAW('java.lang.Object', ?) NOT NULL at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:580)
 at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416)
 at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:568)
 at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:549)
 at 
org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416)

What I Tried: I defined an Avro schema that includes a Map field with values of 
mixed data types. Used the Flink Table API to read data from Kafka and 
attempted to use AvroSchemaConverter to map the schema to a Flink table. During 
execution, I encountered an exception because the AvroSchemaConverter does not 
support Maps with multiple value types. What I Was Expecting: I was expecting 
Flink to handle the Map field and correctly parse the data into a table format, 
with proper support for the mixed data types within the Map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (FLINK-36535) Optimize the scale down logic based on historical parallelism

2024-11-21 Thread Rui Fan (Jira)


[ https://issues.apache.org/jira/browse/FLINK-36535 ]


Rui Fan deleted comment on FLINK-36535:
-

was (Author: fanrui):
Thanks [~heigebupahei] for the comment!
{quote}that we need to treat job.autoscaler.scale-down.interval as a window. 
The default is last value of window, but in some cases it is max in 
window(>24hour)? 
{quote}
For solution2, yes.

For solution1, the default is last value of window. And users could set an 
option to choose the max as the result.
{quote}But it seems to me like we just need to change the default behavior to 
max in window? 

Just imagine, if you follow the default 1hour configuration, the result of 
using max in window is not much different from before, so I think we only need 
to change the strategy to max of window and abandon the previous last value of 
window.
{quote}
It's a good point.

As I understand, you mean:
 * We choose the max as the result by default
 * Don't need any additional option, and autoscaler doesn't support choose the 
latest parallelism as the result for scale down

Is my understanding correct?

If yes, I think it makes sense for most of scenarios. However I'm afraid it 
could not meet all requirements. For example:
 * when the job owners are very familiar with the daily traffic changes of the 
job, they could set a appropriate job.autoscaler.scale-down.interval, and job 
will only scale down once a day when autoscaler choose the latest parallelism.
 * IIUC, autoscaler needs at least 2 scales down a day if autoscaler choose the 
max parallelism in window.
 ** Because the max is not the latest parallelism, and after scaling down, max 
is most likely not the latest appropriate parallelism for vertex. So job needs 
scale down again later.

Please correct me if I misunderstand anything, thanks~

> Optimize the scale down logic based on historical parallelism
> -
>
> Key: FLINK-36535
> URL: https://issues.apache.org/jira/browse/FLINK-36535
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale 
> down to avoid frequent rescaling.
> h1. Proposed Change
> Treat scale-down.interval as a window:
>  * Recording the scale down trigger time when the recommended parallelism < 
> current parallelism
>  ** When the recommended parallelism >= current parallelism, cancel the 
> triggered scale down
>  * The scale down will be executed when currentTime - triggerTime > 
> scale-down.interval
>  ** {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * {color:#de350b}Change2{color}: Never scale down when currentTime - 
> triggerTime < scale-down.interval
>  ** In the FLINK-36018, the scale down may be executed when currentTime - 
> triggerTime < scale-down.interval.
>  ** For example: the taskA may scale down when taskB needs to scale up.
> h1. Background
> Some critical Flink jobs need to scale up in time, but only scale down on a 
> daily basis. In other words, Flink users do not want Flink jobs to be scaled 
> down multiple times within 24 hours, and the jobs run at the same parallelism 
> as during the peak hours of each day. 
> Note: Users hope to scale down only happens when the parallelism during peak 
> hours is still a waste of resources. This is a trade-off between downtime and 
> resource waste for a critical job.
> h1. Current solution
> In general, this requirement could be met after setting{color:#de350b} 
> job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 
> parallelism, and recommended parallelism is 100 during the peak hours of each 
> day. We hope taskA doesn't rescale forever, because the triggered scale down 
> will be canceled once the recommended parallelism >= current parallelism 
> within 24 hours (It‘s exactly what FLINK-36018 does).
> h1. Unexpected Scenario & how to solve?
> But I found the critical production job is still rescaled about 10 times 
> every day (when scale-down.interval is set to 24 hours).
> Root cause: There may be many sources in a job, and the traffic peaks of 
> these sources may occur at different times. When taskA triggers scale down, 
> the scale down of taskA will not be actively executed within 24 hours, but it 
> may be executed when other tasks are scaled up.
> For example:
>  * The scale down of sourceB and sourceC may be executed when SourceA scales 
> up.
>  * After a while, the scale down of sourceA and sourceC may be executed when 
> SourceB scales up.
>  * After a while, the scale down of sourceA and sourceB may be executed when 
> SourceC scales up.
>  * When there are many tasks, the above 3 steps will be executed repeatedly.
> That's why the job is re

[jira] [Updated] (FLINK-36769) Suport fury Serializer for pyflink

2024-11-21 Thread xingyuan cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xingyuan cheng updated FLINK-36769:
---
Description: 
Hi, community. Currently, in the batch verification scenario of our algorithm 
data, we use pyflink and encounter low transmission efficiency caused by low 
performance of pickle4-based encoding. After research, we decided to adopt 
Apache fury, a serialization framework based on pickle5 encoding. The 
implementation of fury in python will define the transmission buffer size in 
the protocol for transmission to improve the performance of large data 
transmission. To this end, I Prepared a draft pull request. What do friends in 
the community think about this?

 

Pickle protocol 5 with out-of-band data:  https://peps.python.org/pep-0574/

  was:Hi, community. Currently, in the batch verification scenario of our 
algorithm data, we use pyflink and encounter low transmission efficiency caused 
by low performance of pickle4-based encoding. After research, we decided to 
adopt Apache fury, a serialization framework based on pickle5 encoding. The 
implementation of fury in python will define the transmission buffer size in 
the protocol for transmission to improve the performance of large data 
transmission. To this end, I Prepared a draft pull request. What do friends in 
the community think about this?


> Suport fury Serializer for pyflink
> --
>
> Key: FLINK-36769
> URL: https://issues.apache.org/jira/browse/FLINK-36769
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.16.1
> Environment: flink 1.16.1
>Reporter: xingyuan cheng
>Priority: Major
>
> Hi, community. Currently, in the batch verification scenario of our algorithm 
> data, we use pyflink and encounter low transmission efficiency caused by low 
> performance of pickle4-based encoding. After research, we decided to adopt 
> Apache fury, a serialization framework based on pickle5 encoding. The 
> implementation of fury in python will define the transmission buffer size in 
> the protocol for transmission to improve the performance of large data 
> transmission. To this end, I Prepared a draft pull request. What do friends 
> in the community think about this?
>  
> Pickle protocol 5 with out-of-band data:  https://peps.python.org/pep-0574/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36690][runtime] Fix schema operator hanging under extreme parallelized pressure [flink-cdc]

2024-11-21 Thread via GitHub


yuxiqian commented on PR #3680:
URL: https://github.com/apache/flink-cdc/pull/3680#issuecomment-2490188267

   @leonardBang Will this PR be reviewed soon? I'm planning to implement 
FLINK-36763 based on this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36765) How to Handle Multi-Type Maps in Avro Schema with Flink Table API?

2024-11-21 Thread david radley (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900049#comment-17900049
 ] 

david radley commented on FLINK-36765:
--

I was trying to find where the Avro specification says that multiple values are 
allowed for the map values, do you know where this is documented as part of the 
specification. 

> How to Handle Multi-Type Maps in Avro Schema with Flink Table API?
> --
>
> Key: FLINK-36765
> URL: https://issues.apache.org/jira/browse/FLINK-36765
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System, Formats (JSON, Avro, 
> Parquet, ORC, SequenceFile)
>Reporter: Maneendra
>Priority: Major
>
>  
> I have a Map with multiple data types in my Avro schema, which I am trying to 
> use in the Flink Table API to read data from Kafka. However, I’m encountering 
> the following exception because the Flink AvroSchemaConverter does not 
> support Maps with mixed data types. Could someone assist me in parsing this 
> schema using the Table API?
> FLink Code: String avroSchema="";
> DataType s = AvroSchemaConverter.convertToDataType(avroSchema);
>     Schema schema1 = Schema.newBuilder().fromRowDataType(s).build();
>     
>     TableDescriptor descriptor = TableDescriptor.forConnector("kafka")
>             .schema(schema)
>             .comment("simple comment")
>             .option("topic", "")
>             .option("properties.application.id", "")
>             .option("properties.security.protocol", "")
>             .option("properties.bootstrap.servers", "")
>             .option("properties.group.id", "")
>             .option("properties.auto.offset.reset", "earliest")
>             .option("format", "avro")
>             .build();
> Avro Schema:
> {
>    "name":"standByProperties",
>    "type":[
>       "null",
>       {
>          "type":"map",
>          "values":[
>             "null",
>             "boolean",
>             "int"
>          ]
>       }
>    ]
> },
> Output: standByProperties MAP NULL> Exception: Exception in thread "main" 
> java.lang.UnsupportedOperationException: Unsupported to derive Schema for 
> type: RAW('java.lang.Object', ?) NOT NULL at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:580)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:568)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:549)
>  at 
> org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416)
> What I Tried: I defined an Avro schema that includes a Map field with values 
> of mixed data types. Used the Flink Table API to read data from Kafka and 
> attempted to use AvroSchemaConverter to map the schema to a Flink table. 
> During execution, I encountered an exception because the AvroSchemaConverter 
> does not support Maps with multiple value types. What I Was Expecting: I was 
> expecting Flink to handle the Map field and correctly parse the data into a 
> table format, with proper support for the mixed data types within the Map.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36769) Suport fury Serializer for pyflink

2024-11-21 Thread xingyuan cheng (Jira)
xingyuan cheng created FLINK-36769:
--

 Summary: Suport fury Serializer for pyflink
 Key: FLINK-36769
 URL: https://issues.apache.org/jira/browse/FLINK-36769
 Project: Flink
  Issue Type: New Feature
  Components: API / Python
Affects Versions: 1.16.1
Reporter: xingyuan cheng


Hi, community. Currently, in the batch verification scenario of our algorithm 
data, we use pyflink and encounter low transmission efficiency caused by low 
performance of pickle4-based encoding. After research, we decided to adopt 
Apache fury, a serialization framework based on pickle5 encoding. The 
implementation of fury in python will define the transmission buffer size in 
the protocol for transmission to improve the performance of large data 
transmission. To this end, I Prepared a draft pull request. What do friends in 
the community think about this?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36769) Suport fury Serializer for pyflink

2024-11-21 Thread xingyuan cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xingyuan cheng updated FLINK-36769:
---
External issue URL: https://github.com/apache/flink/pull/25672

> Suport fury Serializer for pyflink
> --
>
> Key: FLINK-36769
> URL: https://issues.apache.org/jira/browse/FLINK-36769
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.16.1
>Reporter: xingyuan cheng
>Priority: Major
>
> Hi, community. Currently, in the batch verification scenario of our algorithm 
> data, we use pyflink and encounter low transmission efficiency caused by low 
> performance of pickle4-based encoding. After research, we decided to adopt 
> Apache fury, a serialization framework based on pickle5 encoding. The 
> implementation of fury in python will define the transmission buffer size in 
> the protocol for transmission to improve the performance of large data 
> transmission. To this end, I Prepared a draft pull request. What do friends 
> in the community think about this?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [ISSUE#36769] support fury serializer for pyflink [flink]

2024-11-21 Thread via GitHub


kaori-seasons opened a new pull request, #25672:
URL: https://github.com/apache/flink/pull/25672

   ## What is the purpose of the change
   
   Hi, community. Currently, in the batch verification scenario of our 
algorithm data, we use pyflink and encounter low transmission efficiency caused 
by low performance of pickle4-based encoding. After research, we decided to 
adopt Apache fury, a serialization framework based on pickle5 encoding. The 
implementation of fury in python will define the transmission buffer size in 
the protocol for transmission to improve the performance of large data 
transmission.
   
   Related communications with fury community members can be found 
[here](https://github.com/apache/fury/issues/1919)
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follow [the 
conventions for tests defined in our code quality 
guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing).
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36769) Suport fury Serializer for pyflink

2024-11-21 Thread xingyuan cheng (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

xingyuan cheng updated FLINK-36769:
---
 Docs Text: https://github.com/apache/flink/pull/25672
External issue URL:   (was: https://github.com/apache/flink/pull/25672)
   Environment: flink 1.16.1

> Suport fury Serializer for pyflink
> --
>
> Key: FLINK-36769
> URL: https://issues.apache.org/jira/browse/FLINK-36769
> Project: Flink
>  Issue Type: New Feature
>  Components: API / Python
>Affects Versions: 1.16.1
> Environment: flink 1.16.1
>Reporter: xingyuan cheng
>Priority: Major
>
> Hi, community. Currently, in the batch verification scenario of our algorithm 
> data, we use pyflink and encounter low transmission efficiency caused by low 
> performance of pickle4-based encoding. After research, we decided to adopt 
> Apache fury, a serialization framework based on pickle5 encoding. The 
> implementation of fury in python will define the transmission buffer size in 
> the protocol for transmission to improve the performance of large data 
> transmission. To this end, I Prepared a draft pull request. What do friends 
> in the community think about this?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36686) allow customizing env variables for flink-webhook container

2024-11-21 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-36686.
--
Fix Version/s: kubernetes-operator-1.11.0
   Resolution: Fixed

merged to main 14ded7e8b9f8aa12f54adeec9d777f8e2253d814

> allow customizing env variables for flink-webhook container
> ---
>
> Key: FLINK-36686
> URL: https://issues.apache.org/jira/browse/FLINK-36686
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.9.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.11.0
>
>
> In current helm chart of the operator, there is no way to pass in custom env 
> variables into the flink-webhook container. I can see a few options to fix 
> the problem:
>  * make Values.operatorPod.env pod-level envs in flink-operator.yaml
>  * create a new webhoodPod.env in values.yaml and apply it only to the 
> flink-webhook container env setup
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36686) allow customizing env variables for flink-webhook container

2024-11-21 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora updated FLINK-36686:
---
Priority: Minor  (was: Blocker)

> allow customizing env variables for flink-webhook container
> ---
>
> Key: FLINK-36686
> URL: https://issues.apache.org/jira/browse/FLINK-36686
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.9.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Minor
>  Labels: pull-request-available
>
> In current helm chart of the operator, there is no way to pass in custom env 
> variables into the flink-webhook container. I can see a few options to fix 
> the problem:
>  * make Values.operatorPod.env pod-level envs in flink-operator.yaml
>  * create a new webhoodPod.env in values.yaml and apply it only to the 
> flink-webhook container env setup
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-36685) Enable update/create operation on flinkdeployment resource in mutation webhook

2024-11-21 Thread Gyula Fora (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-36685.
--
Fix Version/s: kubernetes-operator-1.11.0
   Resolution: Fixed

merged to main 5d29554a179632028ccd182d48da5f825b52b976

> Enable update/create operation on flinkdeployment resource in mutation webhook
> --
>
> Key: FLINK-36685
> URL: https://issues.apache.org/jira/browse/FLINK-36685
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Affects Versions: kubernetes-operator-1.9.0
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.11.0
>
>
> In mutation webhook yaml of the helm chart, UPDATE/CREATE operation is not 
> allowed on 
> flinkdeployments. We use mutation webhook to inject platform secrets to the 
> flink pipeline CRD. Planned to add a PR to enable UPDATE/CREATE operation on 
> flinkdeployments resource. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36469] Bump commons-io from 2.11.0 to 2.17.0 [flink-kubernetes-operator]

2024-11-21 Thread via GitHub


r-sidd opened a new pull request, #917:
URL: https://github.com/apache/flink-kubernetes-operator/pull/917

   ## What is the purpose of the change
   
   Bump commons-io from 2.11.0 to 2.17.0
   
   ## Brief change log
   
   Bump cyclonedx-maven-plugin from 2.7.9 to 2.9.0 to remediate the findings in 
the dependant packages.
   
   **Vulnerabilities from dependencies:**
   
[CVE-2024-38374](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-38374)
   
   **Package details:**
   https://mvnrepository.com/artifact/org.cyclonedx/cyclonedx-maven-plugin/2.9.0
   
   ## Verifying this change
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): yes
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? no
 - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36770) Support Request Timeout for AWS sinks

2024-11-21 Thread Ahmed Hamdy (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900114#comment-17900114
 ] 

Ahmed Hamdy commented on FLINK-36770:
-

[~hlteoh37] I would love your feedback here, also could you assign this to me?

> Support Request Timeout for AWS sinks
> -
>
> Key: FLINK-36770
> URL: https://issues.apache.org/jira/browse/FLINK-36770
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / AWS
>Reporter: Ahmed Hamdy
>Priority: Major
> Fix For: aws-connector-5.1.0
>
>
> ## Description
> in 
> [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]
>  we introduced request timeout for Async Sink which was released in 1.20, 
> Ideally we want to support that in AWS connectors. 
> ## Acceptance Criteria
> - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated 
> ## Notes
> This is a breaking change which means we expect the next version (5.1) to not 
> be compatible with 1.19 anymore. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36770) Support Request Timeout for AWS sinks

2024-11-21 Thread Ahmed Hamdy (Jira)
Ahmed Hamdy created FLINK-36770:
---

 Summary: Support Request Timeout for AWS sinks
 Key: FLINK-36770
 URL: https://issues.apache.org/jira/browse/FLINK-36770
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / AWS
Reporter: Ahmed Hamdy
 Fix For: aws-connector-5.1.0


## Description

in 
[FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]
 we introduced request timeout for Async Sink which was released in 1.20, 
Ideally we want to support that in AWS connectors. 

## Acceptance Criteria
- Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated 

## Notes
This is a breaking change which means we expect the next version (5.1) to not 
be compatible with 1.19 anymore. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36770) Support Request Timeout for AWS sinks

2024-11-21 Thread Ahmed Hamdy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy updated FLINK-36770:

Description: 
h2. Description

in 
[FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]
 we introduced request timeout for Async Sink which was released in 1.20, 
Ideally we want to support that in AWS connectors.
h2. Acceptance Criteria
 - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated

h2. Notes

This is a breaking change regarding flink version compatibility which means we 
expect the next version (5.1) to not be compatible with 1.19 anymore. 

  was:
h2. Description

in 
[FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]
 we introduced request timeout for Async Sink which was released in 1.20, 
Ideally we want to support that in AWS connectors.
h2. Acceptance Criteria
 - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated

h2. Notes

This is a breaking change which means we expect the next version (5.1) to not 
be compatible with 1.19 anymore.


> Support Request Timeout for AWS sinks
> -
>
> Key: FLINK-36770
> URL: https://issues.apache.org/jira/browse/FLINK-36770
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / AWS
>Reporter: Ahmed Hamdy
>Priority: Major
> Fix For: aws-connector-5.1.0
>
>
> h2. Description
> in 
> [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]
>  we introduced request timeout for Async Sink which was released in 1.20, 
> Ideally we want to support that in AWS connectors.
> h2. Acceptance Criteria
>  - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated
> h2. Notes
> This is a breaking change regarding flink version compatibility which means 
> we expect the next version (5.1) to not be compatible with 1.19 anymore. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36770) Support Request Timeout for AWS sinks

2024-11-21 Thread Ahmed Hamdy (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ahmed Hamdy updated FLINK-36770:

Description: 
h2. Description

in 
[FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]
 we introduced request timeout for Async Sink which was released in 1.20, 
Ideally we want to support that in AWS connectors.
h2. Acceptance Criteria
 - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated

h2. Notes

This is a breaking change which means we expect the next version (5.1) to not 
be compatible with 1.19 anymore.

  was:
## Description

in 
[FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]
 we introduced request timeout for Async Sink which was released in 1.20, 
Ideally we want to support that in AWS connectors. 

## Acceptance Criteria
- Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated 

## Notes
This is a breaking change which means we expect the next version (5.1) to not 
be compatible with 1.19 anymore. 


> Support Request Timeout for AWS sinks
> -
>
> Key: FLINK-36770
> URL: https://issues.apache.org/jira/browse/FLINK-36770
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / AWS
>Reporter: Ahmed Hamdy
>Priority: Major
> Fix For: aws-connector-5.1.0
>
>
> h2. Description
> in 
> [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API]
>  we introduced request timeout for Async Sink which was released in 1.20, 
> Ideally we want to support that in AWS connectors.
> h2. Acceptance Criteria
>  - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated
> h2. Notes
> This is a breaking change which means we expect the next version (5.1) to not 
> be compatible with 1.19 anymore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [FLINK-36529] Allow flink version configs to be set to greater than given version [flink-kubernetes-operator]

2024-11-21 Thread via GitHub


tomncooper opened a new pull request, #918:
URL: https://github.com/apache/flink-kubernetes-operator/pull/918

   ## What is the purpose of the change
   
   The operator currently allows the following syntax for defining flink 
version specific defaults:
   
   `kubernetes.operator.default-configuration.flink-version.v1_18.key: value`
   
   The problem with this is that, in many cases, these defaults should be 
applied to newer Flink versions as well, forcing config duplications.
   
   This PR introduces a new "greater than" syntax for config defaults, 
indicating that they should be applied to a given version and above:
   `kubernetes.operator.default-configuration.flink-version.v1_18+.key: value`
   
   In this case key:value would be applied to all Flink version greater or 
equal to 1.18, unless overridden for specific versions.
   
   ## Brief change log
   
   - Adds a new method `getRelevantVersionPrefixes`, to the 
`org.apache.flink.kubernetes.operator.config.FlinkConfigManager`, which 
identifies all Flink version default config prefix which are relevant to the 
currently specified Flink version.
   - Refactored the `FlinkVersion` enum to specify major and minor semver 
integers to facilitate quick look up of relevant Flink versions when parsing 
version strings.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
   - Added additional tests in the `FlinkConfigManagerTest` class to cover the 
new Regex and `getRelevantVersionPrefixes` methods.
   - Updated the `testVersionNamespaceDefaultConfs` test in 
`FlinkConfigManagerTest` to test the greater than version behaviour.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): no
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
no
 - Core observer or reconciler logic that is regularly executed: yes
   
   ## Documentation
   
 - Does this pull request introduce a new feature? yes
 - If yes, how is the feature documented? Update the configuration docs 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36529] Allow flink version configs to be set to greater than given version [flink-kubernetes-operator]

2024-11-21 Thread via GitHub


tomncooper commented on PR #918:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/918#issuecomment-2491802004

   I probably need to look at adding an end to end test for this but want to 
make sure I am on the right track before doing that.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-36529] Allow flink version configs to be set to greater than given version [flink-kubernetes-operator]

2024-11-21 Thread via GitHub


tomncooper commented on PR #918:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/918#issuecomment-2491803380

   CC @gyfora 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-36529) Support greater or equals logic for operator flink version default configs

2024-11-21 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-36529:
---
Labels: pull-request-available  (was: )

> Support greater or equals logic for operator flink version default configs
> --
>
> Key: FLINK-36529
> URL: https://issues.apache.org/jira/browse/FLINK-36529
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Gyula Fora
>Priority: Major
>  Labels: pull-request-available
>
> The operator currently allows the following syntax for defining flink version 
> specific defaults:
> kubernetes.operator.default-configuration.flink-version.v1_18.key: value
> The problem with this is that usually these defaults should be applied to 
> newer Flink versions as well in many cases, forcing config duplications.
> We should introduce a + syntax for configs applied to a version and above:
> kubernetes.operator.default-configuration.flink-version.v1_18+.key: value
> in this case key:value would be applied to all Flink version greater or equal 
> to 1.18



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [hotfix] [docs]Fix miss semicolon on SELECT & WHERE clause example sql [flink]

2024-11-21 Thread via GitHub


camilesing opened a new pull request, #25673:
URL: https://github.com/apache/flink/pull/25673

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   Fix miss semicolon on SELECT & WHERE clause example sql.
   
   ## Brief change log
   Add semicolon for miss semicolon example sql.
   
   
   ## Verifying this change
   
   Since the changes affect the doc examples and there are no tests. I copied 
the snippets and verified (compiled) them locally.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [hotfix] [docs]Fix miss semicolon on SELECT & WHERE clause example sql [flink]

2024-11-21 Thread via GitHub


flinkbot commented on PR #25673:
URL: https://github.com/apache/flink/pull/25673#issuecomment-2492789712

   
   ## CI report:
   
   * 4740ab5775e6045fbc62736e2a60f75ea4463812 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (FLINK-35859) [flink-cdc] Fix: The assigner is not ready to offer finished split information, this should not be called

2024-11-21 Thread Xin Gong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900028#comment-17900028
 ] 

Xin Gong edited comment on FLINK-35859 at 11/22/24 3:19 AM:


[~loserwang1024] This fix just ignore some newly added table. It can cause some 
UT timeout fail. For example, 
mongo NewlyAddedTableITCase#testRemoveAndAddCollectionsOneByOne, 
pg NewlyAddedTableITCase#testRemoveAndAddTablesOneByOne.
Users cannot immediately perceive task issues in production applications. Maybe 
we can fix it to more perfect. I add a flag to trigger restart when status is 
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. newly table will be synchronized.
{code:java}
// code placeholder


/** Assigner for snapshot split. */
public class SnapshotSplitAssigner implements 
SplitAssigner {
private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotSplitAssigner.class);

private boolean flagExceptionAssignerStatusWhenCheckpoint;

  
private void captureNewlyAddedTables() {
if (sourceConfig.isScanNewlyAddedTableEnabled() && 
!AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) {
..
} else if 
(AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) {
flagExceptionAssignerStatusWhenCheckpoint = true;
LOG.info("exceptionAssignerStatusCheckpointFlag to true");
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) {
  
if (AssignerStatus.isNewlyAddedAssigningFinished(assignerStatus)
&& flagExceptionAssignerStatusWhenCheckpoint) {
throw new FlinkRuntimeException("Previous assigner status is 
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED and "
+ "newly add table will cause task always be exception from 
checkpoint, so we "
+ "trigger restart for newly table after assigner to normal 
status");
}
}

}{code}


was (Author: JIRAUSER292212):
[~loserwang1024] Users cannot immediately perceive task issues. Maybe we can 
fix it to more perfect. I add a flag to trigger restart when status is 
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. newly table will be synchronized.
{code:java}
// code placeholder


/** Assigner for snapshot split. */
public class SnapshotSplitAssigner implements 
SplitAssigner {
private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotSplitAssigner.class);

private boolean flagExceptionAssignerStatusWhenCheckpoint;

  
private void captureNewlyAddedTables() {
if (sourceConfig.isScanNewlyAddedTableEnabled() && 
!AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) {
..
} else if 
(AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) {
flagExceptionAssignerStatusWhenCheckpoint = true;
LOG.info("exceptionAssignerStatusCheckpointFlag to true");
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) {
  
if (AssignerStatus.isNewlyAddedAssigningFinished(assignerStatus)
&& flagExceptionAssignerStatusWhenCheckpoint) {
throw new FlinkRuntimeException("Previous assigner status is 
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED and "
+ "newly add table will cause task always be exception from 
checkpoint, so we "
+ "trigger restart for newly table after assigner to normal 
status");
}
}

}
 {code}

> [flink-cdc] Fix: The assigner is not ready to offer finished split 
> information, this should not be called
> -
>
> Key: FLINK-35859
> URL: https://issues.apache.org/jira/browse/FLINK-35859
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Hongshun Wang
>Assignee: Hongshun Wang
>Priority: Minor
> Fix For: cdc-3.2.0
>
>
> When use CDC with newly added table,  an error occurs: 
> {code:java}
> The assigner is not ready to offer finished split information, this should 
> not be called. {code}
> It's because:
> 1. when stop then restart the job , the status is 
> NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED.
>  
> 2. Then Enumerator will send each reader with 
> BinlogSplitUpdateRequestEvent to update binlog. (see 
> org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator#syncWithReaders).
> 3. The Reader will suspend binlog reader then send 
> BinlogSplitMetaRequestEvent to Enumerator.
> 4. The Enumerator found that some tables are not sent, an error will occur
> {code:java}
> private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent 
> requestEvent) {
> // initialize once
>

[jira] [Commented] (FLINK-36768) Empty Checkpoint Directory created when older checkpoint fails due to timeout and the checkpoint interval is same

2024-11-21 Thread Rui Xia (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900228#comment-17900228
 ] 

Rui Xia commented on FLINK-36768:
-

Hi Thomas, thank you for reporting this. From your description, I am not sure 
which chk- is not properly deleted. Could you provide a more concrete 
example for your condition. For example, chk-1 trigger -> chk-1 timeout -> at 
the same time chk-2 trigger, chk-xxx cp directory is not cleaned.

BTW, 1.12.1 is a old flink version (2 years ago). You may use the latest 
version and check whether the same problem is still existed or not.

> Empty Checkpoint Directory created when older checkpoint fails due to timeout 
> and the checkpoint interval is same 
> --
>
> Key: FLINK-36768
> URL: https://issues.apache.org/jira/browse/FLINK-36768
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Checkpointing
>Affects Versions: 1.12.1
>Reporter: Eaugene Thomas
>Priority: Minor
>
> When the checkpoint interval equals the checkpoint timeout, and the failure 
> threshold is reached, the asynchronous checkpoint trigger starts. However, 
> due to an internal restart, the tombstone directory ({{{}chk-{}}}) is not 
> cleared.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-36773) Introduce new Group Agg Operator with Async State API

2024-11-21 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-36773:
---

Assignee: Yang Xu

> Introduce new Group Agg Operator with Async State API
> -
>
> Key: FLINK-36773
> URL: https://issues.apache.org/jira/browse/FLINK-36773
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: xuyang
>Assignee: Yang Xu
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36772][mysql][cdc-base] Fix error placeholder for errorMessageTemplate of Preconditions [flink-cdc]

2024-11-21 Thread via GitHub


gong commented on PR #3754:
URL: https://github.com/apache/flink-cdc/pull/3754#issuecomment-2492953554

   > LGTM, could you also check all Preconditions.checkState method call?
   
   @leonardBang I checked all Preconditions.checkState method call.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (FLINK-36773) Introduce new Group Agg Operator with Async State API

2024-11-21 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee reassigned FLINK-36773:
---

Assignee: xuyang  (was: Yang Xu)

> Introduce new Group Agg Operator with Async State API
> -
>
> Key: FLINK-36773
> URL: https://issues.apache.org/jira/browse/FLINK-36773
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / Runtime
>Reporter: xuyang
>Assignee: xuyang
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36774) SQL gateway should support executing multiple SQL statements once

2024-11-21 Thread tim yu (Jira)
tim yu created FLINK-36774:
--

 Summary: SQL gateway should support executing multiple SQL 
statements once
 Key: FLINK-36774
 URL: https://issues.apache.org/jira/browse/FLINK-36774
 Project: Flink
  Issue Type: New Feature
  Components: Table SQL / Gateway
Reporter: tim yu


I have a flink SQL file example.sql that contain the following SQL statements:
{code:java}
set 'state.checkpoints.dir' = 'hdfs:///tmp/flink-checkpoints/datagen_blackhole';
set 'execution.checkpointing.interval' = '3min';
set 'execution.checkpointing.timeout' = '20min';
set 'parallelism.default' = '1';
CREATE TABLE t1 (id BIGINT, info STRING) with ('connector' = 'datagen', 
'rows-per-second' = '1', 'fields.id.min' = '1', 'fields.id.max' = '10');
CREATE TABLE black_hole_1 WITH ('connector' = 'blackhole') LIKE t1 (EXCLUDING 
ALL);
insert into black_hole_1 select * from t1;{code}
I hope that submit this file to SQL gateway by using the following command:
{code:java}
curl -d '@example.sql' -H 'Content-Type: application/json' 
http://gateway:8083/v1/sessions/{session_id}/statements {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] Add blogpost for new KinesisStreamsSource and DynamoDbStreamsSource [flink-web]

2024-11-21 Thread via GitHub


hlteoh37 opened a new pull request, #765:
URL: https://github.com/apache/flink-web/pull/765

   Add a blogpost to illustrated in detail the KDS and DDB streams source
   
   Image of blogpost included: 
   
![KDS_DB_blog](https://github.com/user-attachments/assets/8f9778cd-8ae8-4119-9623-d80a95b1c132)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-36535) Optimize the scale down logic based on historical parallelism

2024-11-21 Thread yuanfenghu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900226#comment-17900226
 ] 

yuanfenghu commented on FLINK-36535:


nice improvement! 

> Optimize the scale down logic based on historical parallelism
> -
>
> Key: FLINK-36535
> URL: https://issues.apache.org/jira/browse/FLINK-36535
> Project: Flink
>  Issue Type: Improvement
>  Components: Autoscaler
>Reporter: Rui Fan
>Assignee: Rui Fan
>Priority: Major
>
> This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale 
> down to avoid frequent rescaling.
> h1. Proposed Change
> Treat scale-down.interval as a window:
>  * Recording the scale down trigger time when the recommended parallelism < 
> current parallelism
>  ** When the recommended parallelism >= current parallelism, cancel the 
> triggered scale down
>  * The scale down will be executed when currentTime - triggerTime > 
> scale-down.interval
>  ** {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * {color:#de350b}Change2{color}: Never scale down when currentTime - 
> triggerTime < scale-down.interval
>  * 
>  ** In the FLINK-36018, the scale down may be executed when currentTime - 
> triggerTime < scale-down.interval.
>  ** For example: the taskA may scale down when taskB needs to scale up.
> h1. Background
> Some critical Flink jobs need to scale up in time, but only scale down on a 
> daily basis. In other words, Flink users do not want Flink jobs to be scaled 
> down multiple times within 24 hours, and jobs run at the same parallelism as 
> during the peak hours of each day. 
> Note: Users hope to scale down only happens when the parallelism during peak 
> hours still wastes resources. This is a trade-off between downtime and 
> resource waste for a critical job.
> h1. Current solution
> In general, this requirement could be met after setting{color:#de350b} 
> job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 
> parallelism, and recommended parallelism is 100 during the peak hours of each 
> day. We hope taskA doesn't rescale forever, because the triggered scale down 
> will be canceled once the recommended parallelism >= current parallelism 
> within 24 hours (It‘s exactly what FLINK-36018 does).
> h1. Unexpected Scenario & how to solve?
> But I found the critical production job is still rescaled about 10 times 
> every day (when scale-down.interval is set to 24 hours).
> Root cause: There may be many sources in a job, and the traffic peaks of 
> these sources may occur at different times. When taskA triggers scale down, 
> the scale down of taskA will not be actively executed within 24 hours, but it 
> may be executed when other tasks are scaled up.
> For example:
>  * The scale down of sourceB and sourceC may be executed when SourceA scales 
> up.
>  * After a while, the scale down of sourceA and sourceC may be executed when 
> SourceB scales up.
>  * After a while, the scale down of sourceA and sourceB may be executed when 
> SourceC scales up.
>  * When there are many tasks, the above 3 steps will be executed repeatedly.
> That's why the job is rescaled about 10 times every day, the 
> {color:#de350b}change2{color} of proposed change could solve this issue: 
> Never scale down when currentTime - triggerTime < scale-down.interval.
>  
> {color:#de350b}Change1{color}: Using the maximum parallelism within the 
> window instead of the latest parallelism when scaling down.
>  * It can ensure that the parallelism after scaling down is the parallelism 
> at yesterday's peak.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [FLINK-36315][cdc-base]The flink-cdc-base module supports source metric statistics [flink-cdc]

2024-11-21 Thread via GitHub


liuxiao2shf commented on code in PR #3619:
URL: https://github.com/apache/flink-cdc/pull/3619#discussion_r1853230011


##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##
@@ -397,6 +491,27 @@ && allSnapshotSplitsFinished()) {
 }
 LOG.info("Snapshot split assigner is turn into finished status.");
 }
+
+if (splitFinishedCheckpointIds != null && 
!splitFinishedCheckpointIds.isEmpty()) {
+Iterator> iterator =
+splitFinishedCheckpointIds.entrySet().iterator();
+while (iterator.hasNext()) {
+Map.Entry splitFinishedCheckpointId = 
iterator.next();
+String splitId = splitFinishedCheckpointId.getKey();
+Long splitCheckpointId = splitFinishedCheckpointId.getValue();
+if (splitCheckpointId != UNDEFINED_CHECKPOINT_ID
+&& checkpointId >= splitCheckpointId) {
+// record table-level splits metrics
+TableId tableId = SnapshotSplit.parseTableId(splitId);
+
enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId);
+iterator.remove();
+}
+}
+LOG.info(

Review Comment:
   Adjustment completed



##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java:
##
@@ -359,11 +432,31 @@ public void addSplits(Collection splits) 
{
 // because they are failed
 assignedSplits.remove(split.splitId());
 splitFinishedOffsets.remove(split.splitId());
+
+enumeratorMetrics
+.getTableMetrics(split.asSnapshotSplit().getTableId())
+.reprocessSplit(split.splitId());
+TableId tableId = split.asSnapshotSplit().getTableId();
+
+
enumeratorMetrics.getTableMetrics(tableId).removeFinishedSplit(split.splitId());
 }
 }
 
 @Override
 public SnapshotPendingSplitsState snapshotState(long checkpointId) {
+if (splitFinishedCheckpointIds != null && 
!splitFinishedCheckpointIds.isEmpty()) {
+for (Map.Entry splitFinishedCheckpointId :
+splitFinishedCheckpointIds.entrySet()) {
+if (splitFinishedCheckpointId.getValue() == 
UNDEFINED_CHECKPOINT_ID) {
+splitFinishedCheckpointId.setValue(checkpointId);
+}
+}
+}
+LOG.info(
+"SnapshotSplitAssigner snapshotState on checkpoint {} with 
splitFinishedCheckpointIds size {}.",
+checkpointId,
+splitFinishedCheckpointIds == null ? 0 : 
splitFinishedCheckpointIds.size());

Review Comment:
   Adjustment completed



##
flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java:
##
@@ -55,14 +55,14 @@ public void 
testPendingSplitsStateSerializerAndDeserialize() throws IOException
 new 
PendingSplitsStateSerializer(constructSourceSplitSerializer());
 PendingSplitsState streamSplitsStateAfter =
 pendingSplitsStateSerializer.deserializePendingSplitsState(
-6, 
pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore));
+7, 
pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore));
 Assert.assertEquals(streamPendingSplitsStateBefore, 
streamSplitsStateAfter);
 
 SnapshotPendingSplitsState snapshotPendingSplitsStateBefore =
 
constructSnapshotPendingSplitsState(AssignerStatus.NEWLY_ADDED_ASSIGNING);
 PendingSplitsState snapshotPendingSplitsStateAfter =
 pendingSplitsStateSerializer.deserializePendingSplitsState(
-6,

Review Comment:
   Adjustment completed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >