Re: [PR] [FLINK-36685][Kubernetes Operator] allow CREATE/UPDATE operation on flinkdeployments resource on webhook mutation endpoint [flink-kubernetes-operator]
gyfora merged PR #916: URL: https://github.com/apache/flink-kubernetes-operator/pull/916 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-35859) [flink-cdc] Fix: The assigner is not ready to offer finished split information, this should not be called
[ https://issues.apache.org/jira/browse/FLINK-35859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900028#comment-17900028 ] Xin Gong commented on FLINK-35859: -- [~loserwang1024] Users cannot immediately perceive task issues. Maybe we can fix it to more perfect. I add a flag to trigger restart when status is NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. newly table will be synchronized. {code:java} // code placeholder /** Assigner for snapshot split. */ public class SnapshotSplitAssigner implements SplitAssigner { private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitAssigner.class); private boolean flagExceptionAssignerStatusWhenCheckpoint; private void captureNewlyAddedTables() { if (sourceConfig.isScanNewlyAddedTableEnabled() && AssignerStatus.isAssigningFinished(assignerStatus)) { .. } else if (AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) { flagExceptionAssignerStatusWhenCheckpoint = true; LOG.info("exceptionAssignerStatusCheckpointFlag to true"); } } @Override public void notifyCheckpointComplete(long checkpointId) { if (AssignerStatus.isNewlyAddedAssigningFinished(assignerStatus) && flagExceptionAssignerStatusWhenCheckpoint) { throw new FlinkRuntimeException("Previous assigner status is NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED and " + "newly add table will cause task always be exception from checkpoint, so we " + "trigger restart for newly table after assigner to normal status"); } } } {code} > [flink-cdc] Fix: The assigner is not ready to offer finished split > information, this should not be called > - > > Key: FLINK-35859 > URL: https://issues.apache.org/jira/browse/FLINK-35859 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Minor > Fix For: cdc-3.2.0 > > > When use CDC with newly added table, an error occurs: > {code:java} > The assigner is not ready to offer finished split information, this should > not be called. {code} > It's because: > 1. when stop then restart the job , the status is > NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. > > 2. Then Enumerator will send each reader with > BinlogSplitUpdateRequestEvent to update binlog. (see > org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator#syncWithReaders). > 3. The Reader will suspend binlog reader then send > BinlogSplitMetaRequestEvent to Enumerator. > 4. The Enumerator found that some tables are not sent, an error will occur > {code:java} > private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent > requestEvent) { > // initialize once > if (binlogSplitMeta == null) { > final List finishedSnapshotSplitInfos = > splitAssigner.getFinishedSplitInfos(); > if (finishedSnapshotSplitInfos.isEmpty()) { > LOG.error( > "The assigner offers empty finished split information, > this should not happen"); > throw new FlinkRuntimeException( > "The assigner offers empty finished split information, > this should not happen"); > } > binlogSplitMeta = > Lists.partition( > finishedSnapshotSplitInfos, > sourceConfig.getSplitMetaGroupSize()); >} > }{code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36535) Optimize the scale down logic based on historical parallelism
[ https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900029#comment-17900029 ] Gyula Fora commented on FLINK-36535: I think this would be a nice improvement (y) > Optimize the scale down logic based on historical parallelism > - > > Key: FLINK-36535 > URL: https://issues.apache.org/jira/browse/FLINK-36535 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale > down to avoid frequent rescaling. > h1. Proposed Change > Treat scale-down.interval as a window: > * Recording the scale down trigger time when the recommended parallelism < > current parallelism > ** When the recommended parallelism >= current parallelism, cancel the > triggered scale down > * The scale down will be executed when currentTime - triggerTime > > scale-down.interval > ** {color:#de350b}Change1{color}: Using the maximum parallelism within the > window instead of the latest parallelism when scaling down. > * {color:#de350b}Change2{color}: Never scale down when currentTime - > triggerTime < scale-down.interval > * > ** In the FLINK-36018, the scale down may be executed when currentTime - > triggerTime < scale-down.interval. > ** For example: the taskA may scale down when taskB needs to scale up. > h1. Background > Some critical Flink jobs need to scale up in time, but only scale down on a > daily basis. In other words, Flink users do not want Flink jobs to be scaled > down multiple times within 24 hours, and jobs run at the same parallelism as > during the peak hours of each day. > Note: Users hope to scale down only happens when the parallelism during peak > hours still wastes resources. This is a trade-off between downtime and > resource waste for a critical job. > h1. Current solution > In general, this requirement could be met after setting{color:#de350b} > job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 > parallelism, and recommended parallelism is 100 during the peak hours of each > day. We hope taskA doesn't rescale forever, because the triggered scale down > will be canceled once the recommended parallelism >= current parallelism > within 24 hours (It‘s exactly what FLINK-36018 does). > h1. Unexpected Scenario & how to solve? > But I found the critical production job is still rescaled about 10 times > every day (when scale-down.interval is set to 24 hours). > Root cause: There may be many sources in a job, and the traffic peaks of > these sources may occur at different times. When taskA triggers scale down, > the scale down of taskA will not be actively executed within 24 hours, but it > may be executed when other tasks are scaled up. > For example: > * The scale down of sourceB and sourceC may be executed when SourceA scales > up. > * After a while, the scale down of sourceA and sourceC may be executed when > SourceB scales up. > * After a while, the scale down of sourceA and sourceB may be executed when > SourceC scales up. > * When there are many tasks, the above 3 steps will be executed repeatedly. > That's why the job is rescaled about 10 times every day, the > {color:#de350b}change2{color} of proposed change could solve this issue: > Never scale down when currentTime - triggerTime < scale-down.interval. > > {color:#de350b}Change1{color}: Using the maximum parallelism within the > window instead of the latest parallelism when scaling down. > * It can ensure that the parallelism after scaling down is the parallelism > at yesterday's peak. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36535) Optimize the scale down logic based on historical parallelism
[ https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-36535: Description: This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale down to avoid frequent rescaling. h1. Proposed Change Treat scale-down.interval as a window: * Recording the scale down trigger time when the recommended parallelism < current parallelism ** When the recommended parallelism >= current parallelism, cancel the triggered scale down * The scale down will be executed when currentTime - triggerTime > scale-down.interval ** {color:#de350b}Change1{color}: Using the maximum parallelism within the window instead of the latest parallelism when scaling down. * {color:#de350b}Change2{color}: Never scale down when currentTime - triggerTime < scale-down.interval ** In the FLINK-36018, the scale down may be executed when currentTime - triggerTime < scale-down.interval. ** For example: the taskA may scale down when taskB needs to scale up. h1. Background Some critical Flink jobs need to scale up in time, but only scale down on a daily basis. In other words, Flink users do not want Flink jobs to be scaled down multiple times within 24 hours, and the jobs run at the same parallelism as during the peak hours of each day. Note: Users hope to scale down only happens when the parallelism during peak hours is still a waste of resources. This is a trade-off between downtime and resource waste for a critical job. h1. Current solution In general, this requirement could be met after setting{color:#de350b} job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 parallelism, and recommended parallelism is 100 during the peak hours of each day. We hope taskA doesn't rescale forever, because the triggered scale down will be canceled once the recommended parallelism >= current parallelism within 24 hours (It‘s exactly what FLINK-36018 does). h1. Unexpected Scenario & how to solve? But I found the critical production job is still rescaled about 10 times every day (when scale-down.interval is set to 24 hours). Root cause: There may be many sources in a job, and the traffic peaks of these sources may occur at different times. When taskA triggers scale down, the scale down of taskA will not be actively executed within 24 hours, but it may be executed when other tasks are scaled up. For example: * The scale down of sourceB and sourceC may be executed when SourceA scales up. * After a while, the scale down of sourceA and sourceC may be executed when SourceB scales up. * After a while, the scale down of sourceA and sourceB may be executed when SourceC scales up. * When there are many tasks, the above 3 steps will be executed repeatedly. That's why the job is rescaled about 10 times every day, the {color:#de350b}change2{color} of proposed change could solve this issue: Never scale down when currentTime - triggerTime < scale-down.interval. {color:#de350b}Change1{color}: Using the maximum parallelism within the window instead of the latest parallelism when scaling down. * It can ensure that the parallelism after scaling down is the parallelism at yesterday's peak. was: This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale down to avoid frequent rescaling. h1. Background Some critical Flink jobs need to scale up in time, but only scale down on a daily basis. In other words, Flink users do not want Flink jobs to be scaled down multiple times within 24 hours, and the jobs run at the same parallelism as during the peak hours of each day. Note: Users hope to scale down only happens when the parallelism during peak hours is still a waste of resources. This is a trade-off between downtime and resource waste for a critical job. h1. Current solution In general, this requirement could be met after setting{color:#de350b} job.autoscaler.scale-down.interval= 24 hour{color}. For example, the vertex1 runs with parallelism=100, and the following is the parallelism that the autoscaler recommends for vertex1: * 100 (2024-10-13 20:00:00, peak hour) * 90 (2024-10-13 21:00:00, trigger delayed scale down) * 80 (2024-10-13 22:00:00) * 70 (2024-10-14 00:00:00) * 60 (2024-10-14 01:00:00) * 50 (2024-10-14 02:00:00) * 40 (2024-10-14 04:00:00) * 50 (2024-10-14 06:00:00) * 60 (2024-10-14 08:00:00) * ... * 90 (2024-10-14 19:00:00) * 100 (2024-10-14 20:00:00, peak hour, the delayed scale down is canceled) All recommended parallelism are delayed, and the recommended parallelism is backed to 100 within 24 hours. So the scale down request is canceled. It means if the recommended parallelism for vertex1 during peak hours is 100 every day, this vertex1 never be scaled down and scaled up. It is very friendly to critical jobs, and reducing the scale frequency can greatly reduce the downtime. h1. Some scenarios do no
[jira] [Updated] (FLINK-36535) Optimize the scale down logic based on historical parallelism
[ https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rui Fan updated FLINK-36535: Description: This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale down to avoid frequent rescaling. h1. Proposed Change Treat scale-down.interval as a window: * Recording the scale down trigger time when the recommended parallelism < current parallelism ** When the recommended parallelism >= current parallelism, cancel the triggered scale down * The scale down will be executed when currentTime - triggerTime > scale-down.interval ** {color:#de350b}Change1{color}: Using the maximum parallelism within the window instead of the latest parallelism when scaling down. * {color:#de350b}Change2{color}: Never scale down when currentTime - triggerTime < scale-down.interval * ** In the FLINK-36018, the scale down may be executed when currentTime - triggerTime < scale-down.interval. ** For example: the taskA may scale down when taskB needs to scale up. h1. Background Some critical Flink jobs need to scale up in time, but only scale down on a daily basis. In other words, Flink users do not want Flink jobs to be scaled down multiple times within 24 hours, and jobs run at the same parallelism as during the peak hours of each day. Note: Users hope to scale down only happens when the parallelism during peak hours still wastes resources. This is a trade-off between downtime and resource waste for a critical job. h1. Current solution In general, this requirement could be met after setting{color:#de350b} job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 parallelism, and recommended parallelism is 100 during the peak hours of each day. We hope taskA doesn't rescale forever, because the triggered scale down will be canceled once the recommended parallelism >= current parallelism within 24 hours (It‘s exactly what FLINK-36018 does). h1. Unexpected Scenario & how to solve? But I found the critical production job is still rescaled about 10 times every day (when scale-down.interval is set to 24 hours). Root cause: There may be many sources in a job, and the traffic peaks of these sources may occur at different times. When taskA triggers scale down, the scale down of taskA will not be actively executed within 24 hours, but it may be executed when other tasks are scaled up. For example: * The scale down of sourceB and sourceC may be executed when SourceA scales up. * After a while, the scale down of sourceA and sourceC may be executed when SourceB scales up. * After a while, the scale down of sourceA and sourceB may be executed when SourceC scales up. * When there are many tasks, the above 3 steps will be executed repeatedly. That's why the job is rescaled about 10 times every day, the {color:#de350b}change2{color} of proposed change could solve this issue: Never scale down when currentTime - triggerTime < scale-down.interval. {color:#de350b}Change1{color}: Using the maximum parallelism within the window instead of the latest parallelism when scaling down. * It can ensure that the parallelism after scaling down is the parallelism at yesterday's peak. was: This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale down to avoid frequent rescaling. h1. Proposed Change Treat scale-down.interval as a window: * Recording the scale down trigger time when the recommended parallelism < current parallelism ** When the recommended parallelism >= current parallelism, cancel the triggered scale down * The scale down will be executed when currentTime - triggerTime > scale-down.interval ** {color:#de350b}Change1{color}: Using the maximum parallelism within the window instead of the latest parallelism when scaling down. * {color:#de350b}Change2{color}: Never scale down when currentTime - triggerTime < scale-down.interval ** In the FLINK-36018, the scale down may be executed when currentTime - triggerTime < scale-down.interval. ** For example: the taskA may scale down when taskB needs to scale up. h1. Background Some critical Flink jobs need to scale up in time, but only scale down on a daily basis. In other words, Flink users do not want Flink jobs to be scaled down multiple times within 24 hours, and the jobs run at the same parallelism as during the peak hours of each day. Note: Users hope to scale down only happens when the parallelism during peak hours is still a waste of resources. This is a trade-off between downtime and resource waste for a critical job. h1. Current solution In general, this requirement could be met after setting{color:#de350b} job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 parallelism, and recommended parallelism is 100 during the peak hours of each day. We hope taskA doesn't rescale forever, because the triggered scale down will be canceled once the recomm
[jira] [Commented] (FLINK-36535) Optimize the scale down logic based on historical parallelism
[ https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900024#comment-17900024 ] Rui Fan commented on FLINK-36535: - cc [~gyfora] [~mxm] > Optimize the scale down logic based on historical parallelism > - > > Key: FLINK-36535 > URL: https://issues.apache.org/jira/browse/FLINK-36535 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale > down to avoid frequent rescaling. > h1. Proposed Change > Treat scale-down.interval as a window: > * Recording the scale down trigger time when the recommended parallelism < > current parallelism > ** When the recommended parallelism >= current parallelism, cancel the > triggered scale down > * The scale down will be executed when currentTime - triggerTime > > scale-down.interval > ** {color:#de350b}Change1{color}: Using the maximum parallelism within the > window instead of the latest parallelism when scaling down. > * {color:#de350b}Change2{color}: Never scale down when currentTime - > triggerTime < scale-down.interval > * > ** In the FLINK-36018, the scale down may be executed when currentTime - > triggerTime < scale-down.interval. > ** For example: the taskA may scale down when taskB needs to scale up. > h1. Background > Some critical Flink jobs need to scale up in time, but only scale down on a > daily basis. In other words, Flink users do not want Flink jobs to be scaled > down multiple times within 24 hours, and jobs run at the same parallelism as > during the peak hours of each day. > Note: Users hope to scale down only happens when the parallelism during peak > hours still wastes resources. This is a trade-off between downtime and > resource waste for a critical job. > h1. Current solution > In general, this requirement could be met after setting{color:#de350b} > job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 > parallelism, and recommended parallelism is 100 during the peak hours of each > day. We hope taskA doesn't rescale forever, because the triggered scale down > will be canceled once the recommended parallelism >= current parallelism > within 24 hours (It‘s exactly what FLINK-36018 does). > h1. Unexpected Scenario & how to solve? > But I found the critical production job is still rescaled about 10 times > every day (when scale-down.interval is set to 24 hours). > Root cause: There may be many sources in a job, and the traffic peaks of > these sources may occur at different times. When taskA triggers scale down, > the scale down of taskA will not be actively executed within 24 hours, but it > may be executed when other tasks are scaled up. > For example: > * The scale down of sourceB and sourceC may be executed when SourceA scales > up. > * After a while, the scale down of sourceA and sourceC may be executed when > SourceB scales up. > * After a while, the scale down of sourceA and sourceB may be executed when > SourceC scales up. > * When there are many tasks, the above 3 steps will be executed repeatedly. > That's why the job is rescaled about 10 times every day, the > {color:#de350b}change2{color} of proposed change could solve this issue: > Never scale down when currentTime - triggerTime < scale-down.interval. > > {color:#de350b}Change1{color}: Using the maximum parallelism within the > window instead of the latest parallelism when scaling down. > * It can ensure that the parallelism after scaling down is the parallelism > at yesterday's peak. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36743) Rescale from unaligend checkpoint failed
[ https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feifan Wang updated FLINK-36743: Attachment: image-2024-11-21-20-20-20-536.png > Rescale from unaligend checkpoint failed > > > Key: FLINK-36743 > URL: https://issues.apache.org/jira/browse/FLINK-36743 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Priority: Major > Attachments: > Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch, > image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, > image-2024-11-19-17-30-14-816.png, image-2024-11-21-20-20-20-536.png, > image-2024-11-21-20-20-41-644.png > > > We encountered the following exception when scaling down a job from 5600 to > 4200: > {code:java} > 2024-11-12 19:20:54,308 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > xx (1358/1400) > (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) > switched from RUNNING to FAILED on > container_e33_1725519807238_6894116_01_000825 @ yg- > java.lang.IllegalStateException: Cannot select > SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; > known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=4200}] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code} > * Flink version : 1.16.1 > * unaligned checkpoint : enabled > * log-based checkpoint : enabled > The exception encountered when restore from chk-2718336, and it can > successfully restore from chk-2718333. And I checked the metadata file of > chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like > there is something wrong with the unaligned checkpoint when reassign > in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36743) Rescale from unaligend checkpoint failed
[ https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900025#comment-17900025 ] Feifan Wang commented on FLINK-36743: - Thanks [~arvid] for helping investigate the issue. I cherry-pick the fix of FLINK-31963 and test it , but it doesn't work. The exception is same. !image-2024-11-21-20-20-20-536.png|width=1566,height=453! !image-2024-11-21-20-20-41-644.png|width=981,height=226! So I reopen the issue. > Rescale from unaligend checkpoint failed > > > Key: FLINK-36743 > URL: https://issues.apache.org/jira/browse/FLINK-36743 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Priority: Major > Attachments: > Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch, > image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, > image-2024-11-19-17-30-14-816.png, image-2024-11-21-20-20-20-536.png, > image-2024-11-21-20-20-41-644.png > > > We encountered the following exception when scaling down a job from 5600 to > 4200: > {code:java} > 2024-11-12 19:20:54,308 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > xx (1358/1400) > (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) > switched from RUNNING to FAILED on > container_e33_1725519807238_6894116_01_000825 @ yg- > java.lang.IllegalStateException: Cannot select > SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; > known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=4200}] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code} > * Flink version : 1.16.1 > * unaligned checkpoint : enabled > * log-based checkpoint : enabled > The exception encountered when restore from chk-2718336, and it can > successfully restore from chk-2718333. And I checked the metadata file of > chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like > there is something wrong with the unaligned checkpoint when reassign > in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (FLINK-36743) Rescale from unaligend checkpoint failed
[ https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feifan Wang reopened FLINK-36743: - > Rescale from unaligend checkpoint failed > > > Key: FLINK-36743 > URL: https://issues.apache.org/jira/browse/FLINK-36743 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Priority: Major > Attachments: > Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch, > image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, > image-2024-11-19-17-30-14-816.png, image-2024-11-21-20-20-20-536.png, > image-2024-11-21-20-20-41-644.png > > > We encountered the following exception when scaling down a job from 5600 to > 4200: > {code:java} > 2024-11-12 19:20:54,308 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > xx (1358/1400) > (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) > switched from RUNNING to FAILED on > container_e33_1725519807238_6894116_01_000825 @ yg- > java.lang.IllegalStateException: Cannot select > SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; > known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=4200}] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code} > * Flink version : 1.16.1 > * unaligned checkpoint : enabled > * log-based checkpoint : enabled > The exception encountered when restore from chk-2718336, and it can > successfully restore from chk-2718333. And I checked the metadata file of > chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like > there is something wrong with the unaligned checkpoint when reassign > in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36739] [WebFrontend] Update the NodeJS to v22.11.0 (LTS) [flink]
davidradl commented on PR #25670: URL: https://github.com/apache/flink/pull/25670#issuecomment-2490393427 Reviewed by Chi on 21/11/24. Asked submitter questions @mehdid93 Looks good - but why are the CI tests failing? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36745][docs] Make FixedSizeSplitFetcherManager and FixedFetcherSizeSourceReader examples match Flink 2.0-preview API [flink]
davidradl commented on PR #25667: URL: https://github.com/apache/flink/pull/25667#issuecomment-2490395429 Reviewed by Chi on 21/11/24 Need a committer to review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36745][docs] Make FixedSizeSplitFetcherManager and FixedFetcherSizeSourceReader examples match Flink 2.0-preview API [flink]
davidradl commented on PR #25667: URL: https://github.com/apache/flink/pull/25667#issuecomment-2490395974 Reviewed by Chi on 21/11/24 Need a committer to review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] [docs] Update table environment variable name in common.md [flink]
davidradl commented on PR #25669: URL: https://github.com/apache/flink/pull/25669#issuecomment-2490393823 Reviewed by Chi on 21/11/24. Asked submitter questions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Event latency does not equal window triggering [flink]
davidradl commented on PR #25664: URL: https://github.com/apache/flink/pull/25664#issuecomment-2490402911 Reviewed by Chi on 21/11/24 Approve - looking for committer to merge. Notice the tests are failing - but this is a docs change ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36704] Update TypeInference with StaticArgument and StateTypeStrategy [flink]
davidradl commented on PR #25665: URL: https://github.com/apache/flink/pull/25665#issuecomment-2490399670 Reviewed by Chi on 21/11/24 Need a committer / subject area expert to review. Notice that the Tests are failing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36066][runtime] Introducing the AdaptiveGraphManager component [flink]
noorall commented on code in PR #25414: URL: https://github.com/apache/flink/pull/25414#discussion_r1851601660 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java: ## @@ -1969,8 +2032,12 @@ private static void setManagedMemoryFractionForSlotSharingGroup( final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph(); final Map> vertexChainedConfigs = jobVertexBuildContext.getChainedConfigs(); -final Set groupOperatorIds = +final Set jobVertexIds = slotSharingGroup.getJobVertexIds().stream() +.filter(vertexOperators::containsKey) Review Comment: > In which case the `slotSharingGroup` will contain a job vertex which is not included in the `vertexOperators`? Could you add some comments to explain it as it may not be obvious to other developers. In the progressive job graph generation algorithm, if the user specified the `SlotSharingGroupResource` or the `AllVerticesInSameSlotSharingGroupByDefault` is set to true, job vertices generated in different phase may be assigned to the same `slotSharingGroup`. Therefore, we need to filter out the job vertices that belong to the current phase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] fix:Event latency does not equal window triggering [flink]
davidradl commented on PR #25663: URL: https://github.com/apache/flink/pull/25663#issuecomment-2490406949 Reviewed by Chi on 21/11/24 Close if duplicate -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36455] Sinks retry synchronously [1.20] [flink]
davidradl commented on PR #25661: URL: https://github.com/apache/flink/pull/25661#issuecomment-2490409340 Reviewed by Chi on 21/11/24. Looks in hand, code conflicts and test failures currently -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36379] Improve (Global)Committer with UC disabled [1.20] [flink]
davidradl commented on PR #25660: URL: https://github.com/apache/flink/pull/25660#issuecomment-2490412290 Reviewed by Chi on 21/11/24. Looks in hand, test failures currently -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] Add release announcement for Flink CDC 3.2.1 [flink-web]
ruanhang1993 opened a new pull request, #764: URL: https://github.com/apache/flink-web/pull/764 This PR adds release announcement for Flink CDC 3.2.1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36764) Add checkpoint type to checkpoint trace
[ https://issues.apache.org/jira/browse/FLINK-36764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36764: --- Labels: pull-request-available (was: ) > Add checkpoint type to checkpoint trace > --- > > Key: FLINK-36764 > URL: https://issues.apache.org/jira/browse/FLINK-36764 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Metrics >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > Fix For: 2.0.0 > > > Currently it's impossible to distinguish checkpoints from savepoints. Also it > would be handy to distinguish aligned and unaligned checkpoints. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36704] Update TypeInference with StaticArgument and StateTypeStrategy [flink]
davidradl commented on PR #25665: URL: https://github.com/apache/flink/pull/25665#issuecomment-2490397302 Reviewed by Chi on 21/11/24 Need a committer/ subject area expert to review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35825][hive] HiveTableSource supports report statistics for text file [flink]
reswqa commented on PR #25078: URL: https://github.com/apache/flink/pull/25078#issuecomment-2490379694 Thanks @xuyangzhong for the review, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]
davidradl commented on code in PR #25656: URL: https://github.com/apache/flink/pull/25656#discussion_r1851611252 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -1029,6 +1029,8 @@ private TableResultInternal executeInternal( defaultJobName, jobStatusHookList); try { +ClassLoader userClassLoader = Thread.currentThread().getContextClassLoader(); Review Comment: Please add coments and refer to the v2 implementation in the comments and that the v2 refactor is not going to be backported to 1.20. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]
davidradl commented on code in PR #25656: URL: https://github.com/apache/flink/pull/25656#discussion_r1851611671 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -1069,8 +1072,11 @@ private TableResultInternal executeQueryOperation( Pipeline pipeline = generatePipelineFromQueryOperation(operation, transformations); try { +ClassLoader userClassLoader = Thread.currentThread().getContextClassLoader(); Review Comment: please add unit tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-16077][docs] Translate "Custom State Serialization" page into Chinese [flink]
davidradl commented on PR #25648: URL: https://github.com/apache/flink/pull/25648#issuecomment-2490427387 Reviewed by Chi on 21/11/24. Unable to review the translation. Notice the tests are failing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36764) Add checkpoint type to checkpoint trace
Piotr Nowojski created FLINK-36764: -- Summary: Add checkpoint type to checkpoint trace Key: FLINK-36764 URL: https://issues.apache.org/jira/browse/FLINK-36764 Project: Flink Issue Type: Improvement Components: Runtime / Checkpointing, Runtime / Metrics Reporter: Piotr Nowojski Fix For: 2.0.0 Currently it's impossible to distinguish checkpoints from savepoints. Also it would be handy to distinguish aligned and unaligned checkpoints. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]
davidradl commented on code in PR #25656: URL: https://github.com/apache/flink/pull/25656#discussion_r1851610106 ## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java: ## @@ -1029,6 +1029,8 @@ private TableResultInternal executeInternal( defaultJobName, jobStatusHookList); try { +ClassLoader userClassLoader = Thread.currentThread().getContextClassLoader(); Review Comment: please could you change the variable name to be something like originalContextClassLoader -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-28897] [TABLE-SQL] Fail to use udf in added jar when enabling checkpoint [flink]
davidradl commented on PR #25656: URL: https://github.com/apache/flink/pull/25656#issuecomment-2490421787 Reviewed by Chi on 21/11/24. Asked submitter questions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Upgrade com.squareup.okio:okio [flink]
davidradl commented on PR #25649: URL: https://github.com/apache/flink/pull/25649#issuecomment-2490425525 Reviewed by Chi on 21/11/24. Asked submitter questions Please could you raise a Jira detailing the reason you want to upgrade this component (e.g. is there a particular bug that this would fix) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]
davidradl commented on code in PR #25647: URL: https://github.com/apache/flink/pull/25647#discussion_r1851618895 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -707,6 +707,12 @@ public class TaskManagerOptions { "The %s mode tries to spread out the slots evenly across all available %s.", code(TaskManagerLoadBalanceMode.SLOTS.name()), code("TaskManagers")), +text( Review Comment: NOT: its' -> it's ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -707,6 +707,12 @@ public class TaskManagerOptions { "The %s mode tries to spread out the slots evenly across all available %s.", code(TaskManagerLoadBalanceMode.SLOTS.name()), code("TaskManagers")), +text( Review Comment: NIT: its' -> it's -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36764) Add checkpoint type to checkpoint trace
[ https://issues.apache.org/jira/browse/FLINK-36764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Piotr Nowojski reassigned FLINK-36764: -- Assignee: Piotr Nowojski > Add checkpoint type to checkpoint trace > --- > > Key: FLINK-36764 > URL: https://issues.apache.org/jira/browse/FLINK-36764 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Metrics >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Fix For: 2.0.0 > > > Currently it's impossible to distinguish checkpoints from savepoints. Also it > would be handy to distinguish aligned and unaligned checkpoints. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]
davidradl commented on code in PR #25647: URL: https://github.com/apache/flink/pull/25647#discussion_r1851620823 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -707,6 +707,12 @@ public class TaskManagerOptions { "The %s mode tries to spread out the slots evenly across all available %s.", code(TaskManagerLoadBalanceMode.SLOTS.name()), code("TaskManagers")), +text( +"The %s mode tries to schedule evenly all tasks based on its' number across all available %s. " Review Comment: I don't understand this sentence. I am not sure what "schedule evenly all tasks based on its' number" means. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36066][runtime] Introducing the AdaptiveGraphManager component [flink]
zhuzhurk commented on code in PR #25414: URL: https://github.com/apache/flink/pull/25414#discussion_r1851265617 ## flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java: ## @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.runtime.jobgraph.IntermediateDataSet; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil; +import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext; +import org.apache.flink.streaming.api.graph.util.OperatorChainInfo; +import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.addVertexIndexPrefixInVertexName; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.connect; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createAndInitializeJobGraph; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createSourceChainInfo; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isSourceChainable; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.markSupportingConcurrentExecutionAttempts; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.serializeOperatorCoordinatorsAndStreamConfig; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setAllOperatorNonChainedOutputsConfigs; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setManagedMemoryFraction; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setPhysicalEdges; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharingAndCoLocation; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexDescription; +import static org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.validateHybridShuffleExecuteInBatchMode; + +/** Default implementation for {@link AdaptiveGraphGenerator}. */ +@Internal +public class AdaptiveGraphManager implements AdaptiveGraphGenerator { + +private final StreamGraph streamGraph; + +private final JobGraph jobGraph; + +private final StreamGraphHasher defaultStreamGraphHasher; + +private final List legacyStreamGraphHasher; + +private final Executor serializationExecutor; + +private final AtomicInteger vertexIndexId; + +private final StreamGraphContext streamGraphContext; + +private final Map hashes; + +private final List> legacyHashes; + +// Records the id of stream node which job vertex is created. +private final Map frozenNodeToStartNodeMap; + +// When the downstream vertex is not cr
Re: [PR] [FLINK-34545][cdc-pipeline-connector]Add OceanBase pipeline connector to Flink CDC [flink-cdc]
yuanoOo commented on PR #3360: URL: https://github.com/apache/flink-cdc/pull/3360#issuecomment-2490158237 @lvyanquan I fixed the above comments, please take a look again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36763) Support schema inference and evolution with single-table-mutliple-partition sources
[ https://issues.apache.org/jira/browse/FLINK-36763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Leonard Xu reassigned FLINK-36763: -- Assignee: yux > Support schema inference and evolution with single-table-mutliple-partition > sources > --- > > Key: FLINK-36763 > URL: https://issues.apache.org/jira/browse/FLINK-36763 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Major > > Current schema evolution implementation implicitly assumes that there can't > be two partitions with the same TableID being consumed on different subTasks. > This is not always true for some weak-structured sources like Kafka and > MongoDB, where table schemas are not centralized and might evolve > independently in various parallelized tasks. > It's very unlikely to be a trivial change, and might involves modifying most > part of pipeline implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36762) Add E2ECase to run sql client in application mode
Shengkai Fang created FLINK-36762: - Summary: Add E2ECase to run sql client in application mode Key: FLINK-36762 URL: https://issues.apache.org/jira/browse/FLINK-36762 Project: Flink Issue Type: Sub-task Components: Table SQL / Gateway Reporter: Shengkai Fang -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35282][python] Upgrade Apache Beam > 2.54 [flink]
snuyanzin commented on PR #25541: URL: https://github.com/apache/flink/pull/25541#issuecomment-2490142517 @dianfu , @HuangXingBo could you please have a look here, I think you are more experienced with python in Flink -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36760) Support to deploy script via sql client
Shengkai Fang created FLINK-36760: - Summary: Support to deploy script via sql client Key: FLINK-36760 URL: https://issues.apache.org/jira/browse/FLINK-36760 Project: Flink Issue Type: Sub-task Components: Table SQL / Client Reporter: Shengkai Fang -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]
davidradl commented on code in PR #25647: URL: https://github.com/apache/flink/pull/25647#discussion_r1851640826 ## docs/layouts/shortcodes/generated/all_taskmanager_section.html: ## @@ -90,7 +90,7 @@ taskmanager.load-balance.mode NONE Enum -Mode for the load-balance allocation strategy across all available TaskManagers.The SLOTS mode tries to spread out the slots evenly across all available TaskManagers.The NONE mode is the default mode without any specified strategy.Possible values:"NONE""SLOTS" +Mode for the load-balance allocation strategy across all available TaskManagers.The SLOTS mode tries to spread out the slots evenly across all available TaskManagers.The TASKS mode tries to schedule evenly all tasks based on its' number across all available TaskManagers. Note: Currently, enabling this parameter can only achieve the balancing effect of the slot level dimension of DefaultScheduler.The NONE mode is the default mode without any specified strategy.Possible values:"NONE""SLOTS""TASKS" Review Comment: I am not sure what this sentence means - it would be really useful to have a diagram, and maybe an example showing the SLOT and TASK modes and how they differ. It would also be useful to detail why you would choose TASK mode of SLOT mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]
davidradl commented on code in PR #25647: URL: https://github.com/apache/flink/pull/25647#discussion_r1851640826 ## docs/layouts/shortcodes/generated/all_taskmanager_section.html: ## @@ -90,7 +90,7 @@ taskmanager.load-balance.mode NONE Enum -Mode for the load-balance allocation strategy across all available TaskManagers.The SLOTS mode tries to spread out the slots evenly across all available TaskManagers.The NONE mode is the default mode without any specified strategy.Possible values:"NONE""SLOTS" +Mode for the load-balance allocation strategy across all available TaskManagers.The SLOTS mode tries to spread out the slots evenly across all available TaskManagers.The TASKS mode tries to schedule evenly all tasks based on its' number across all available TaskManagers. Note: Currently, enabling this parameter can only achieve the balancing effect of the slot level dimension of DefaultScheduler.The NONE mode is the default mode without any specified strategy.Possible values:"NONE""SLOTS""TASKS" Review Comment: I am not sure what this sentence means - it would be really useful to have a diagram, and maybe an example showing the SLOT and TASK modes and how they differ. It would also be useful to detail why you would choose TASK mode of SLOT mode. There is no information in the Jira explaining this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]
davidradl commented on code in PR #25647: URL: https://github.com/apache/flink/pull/25647#discussion_r1851640555 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java: ## @@ -40,13 +41,16 @@ public SlotSharingExecutionSlotAllocatorFactory( PhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely, PhysicalSlotRequestBulkChecker bulkChecker, -Duration allocationTimeout) { +Duration allocationTimeout, +TaskManagerOptions.TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) { this( slotProvider, slotWillBeOccupiedIndefinitely, bulkChecker, allocationTimeout, -new LocalInputPreferredSlotSharingStrategy.Factory()); +taskManagerLoadBalanceMode == TaskManagerOptions.TaskManagerLoadBalanceMode.TASKS +? new TaskBalancedPreferredSlotSharingStrategy.Factory() Review Comment: I see that specifying TASKS means this factory will be used to load balance. I do not see the works task number in the factory implementation. It seems to be trying to colocate tasks in a slot. I am not sure when this would be preferable or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36688) table.optimizer.reuse-source-enabled may cause disordered metadata columns when reading from Kafka.
[ https://issues.apache.org/jira/browse/FLINK-36688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17899969#comment-17899969 ] xuyang commented on FLINK-36688: You're right. There is a bug in ScanReuser. I'll fix it. > table.optimizer.reuse-source-enabled may cause disordered metadata columns > when reading from Kafka. > - > > Key: FLINK-36688 > URL: https://issues.apache.org/jira/browse/FLINK-36688 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.20.0, 1.19.1, 2.0-preview >Reporter: Yanquan Lv >Priority: Major > > Metadata columns in Kafka need to maintain a fixed order: The metadata for > format needs to be at the beginning, while the metadata for Kafka > itself(partition/offset and so on) needs to be at the end. Kafka connector > will add fields of format first, and then add fields of Kafka later. > However, reused Source did not maintain this order, witch may cause > ClassCastException. > How to product: > {code:java} > create temporary table `message_channel_task_record` > ( > origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL > ,`partition` INT METADATA VIRTUAL > ,`offset` BIGINT METADATA VIRTUAL > ,id BIGINT comment '自增ID' > ,PRIMARY KEY (`id`) NOT ENFORCED > ) > with ( > 'connector'='kafka' > xxx > ) > ; > create temporary table `sink` > ( > origin_ts TIMESTAMP(3) > ,`partition` INT > ,`offset` BIGINT > ,id BIGINT > ) > WITH ( > 'connector'='print' > ) > ; > create temporary table `sr_sink` > ( > id BIGINT comment '自增ID' > ) > WITH ( > 'connector'='print' > ) > ; > -- EXPLAIN STATEMENT SET BEGIN > BEGIN STATEMENT SET; > INSERT INTO sink > SELECT > origin_ts > ,`partition` > ,`offset` > ,id > FROM message_channel_task_record > ; > INSERT INTO `sr_sink` > SELECT > id > FROM `message_channel_task_record` > ; > END > ; > {code} > Explained plan: > {code:java} > [558]:TableSourceScan(table=[[vvp, default, message_channel_task_record, > project=[id], metadata=[partition, value.ingestion-timestamp, offset]]], > fields=[id, partition, origin_ts, offset]) > :- [559]:Calc(select=[CAST(origin_ts AS TIMESTAMP(3)) AS origin_ts, > CAST(partition AS INTEGER) AS partition, CAST(offset AS BIGINT) AS offset, > id]) > : +- [560]:Sink(table=[vvp.default.sink], fields=[origin_ts, partition, > offset, id]) >+- [562]:Sink(table=[vvp.default.sr_sink], fields=[id]) {code} > Expected metadata column order is: value.ingestion-timestamp(format), > partition, offset; > The actual metadata column order is: partition, > value.ingestion-timestamp(format), offset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]
davidradl commented on code in PR #25647: URL: https://github.com/apache/flink/pull/25647#discussion_r1851640826 ## docs/layouts/shortcodes/generated/all_taskmanager_section.html: ## @@ -90,7 +90,7 @@ taskmanager.load-balance.mode NONE Enum -Mode for the load-balance allocation strategy across all available TaskManagers.The SLOTS mode tries to spread out the slots evenly across all available TaskManagers.The NONE mode is the default mode without any specified strategy.Possible values:"NONE""SLOTS" +Mode for the load-balance allocation strategy across all available TaskManagers.The SLOTS mode tries to spread out the slots evenly across all available TaskManagers.The TASKS mode tries to schedule evenly all tasks based on its' number across all available TaskManagers. Note: Currently, enabling this parameter can only achieve the balancing effect of the slot level dimension of DefaultScheduler.The NONE mode is the default mode without any specified strategy.Possible values:"NONE""SLOTS""TASKS" Review Comment: I am not sure what this sentence means - it would be really useful to have a diagram, and maybe an example showing the SLOT and TASK modes and how they differ. It would also be useful to detail why you would choose TASK mode of SLOT mode. I was looking for more information in the Jira - it should point to the Flip . I see some of the conversations in the PR as to why this might be needed. Could we summarize these considerations in the docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]
pnowojski opened a new pull request, #25671: URL: https://github.com/apache/flink/pull/25671 ## What is the purpose of the change Add checkpoint type and unaligned flag to the checkpoint trace ## Verifying this change Expanded unit test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (not applicable / **docs** / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]
flinkbot commented on PR #25671: URL: https://github.com/apache/flink/pull/25671#issuecomment-2490481251 ## CI report: * fe598c5420a55cc01a1e8fc92f6cbb4907ac9d49 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]
davidradl commented on PR #25647: URL: https://github.com/apache/flink/pull/25647#issuecomment-2490489348 Reviewed by Chi on 21/11/24. Asked submitter questions. Mostly to make it clear when to configure this new option, by bringing in appropriate Flip content and reasoning into the docs. Ideally including diagrams to show the need for the new option. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36685) Enable update/create operation on flinkdeployment resource in mutation webhook
[ https://issues.apache.org/jira/browse/FLINK-36685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-36685: --- Priority: Minor (was: Blocker) > Enable update/create operation on flinkdeployment resource in mutation webhook > -- > > Key: FLINK-36685 > URL: https://issues.apache.org/jira/browse/FLINK-36685 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.9.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Minor > Labels: pull-request-available > > In mutation webhook yaml of the helm chart, UPDATE/CREATE operation is not > allowed on > flinkdeployments. We use mutation webhook to inject platform secrets to the > flink pipeline CRD. Planned to add a PR to enable UPDATE/CREATE operation on > flinkdeployments resource. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36525] Support for AI Model Integration for Data Processing [flink-cdc]
lvyanquan opened a new pull request, #3753: URL: https://github.com/apache/flink-cdc/pull/3753 The goal is to extend [flink-cdc](https://issues.apache.org/jira/browse/FLINK-cdc) with the capability to invoke AI models during the data stream processing workflow, with a particular focus on supporting array data structures. This feature will allow users to easily integrate embedding models to handle array-based data, such as lists of text, numeric data, or other multi-dimensional arrays, within their Flink jobs. Co-auther with @proletarians. And based on https://github.com/apache/flink-cdc/pull/3642 and https://github.com/apache/flink-cdc/pull/3434 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36646] Test different versions of the JDK in the Flink image [flink-kubernetes-operator]
gyfora commented on PR #910: URL: https://github.com/apache/flink-kubernetes-operator/pull/910#issuecomment-2490732713 Closing this as it was merged in another PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36646] Test different versions of the JDK in the Flink image [flink-kubernetes-operator]
gyfora closed pull request #910: [FLINK-36646] Test different versions of the JDK in the Flink image URL: https://github.com/apache/flink-kubernetes-operator/pull/910 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-36765) How to Handle Multi-Type Maps in Avro Schema with Flink Table API?
[ https://issues.apache.org/jira/browse/FLINK-36765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900049#comment-17900049 ] david radley edited comment on FLINK-36765 at 11/21/24 2:07 PM: I was trying to find where the Avro specification says that multiple values are allowed for the map values, do you know where this is documented as part of the specification. Could you use Record here instead of a map? was (Author: JIRAUSER300523): I was trying to find where the Avro specification says that multiple values are allowed for the map values, do you know where this is documented as part of the specification. > How to Handle Multi-Type Maps in Avro Schema with Flink Table API? > -- > > Key: FLINK-36765 > URL: https://issues.apache.org/jira/browse/FLINK-36765 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Formats (JSON, Avro, > Parquet, ORC, SequenceFile) >Reporter: Maneendra >Priority: Major > > > I have a Map with multiple data types in my Avro schema, which I am trying to > use in the Flink Table API to read data from Kafka. However, I’m encountering > the following exception because the Flink AvroSchemaConverter does not > support Maps with mixed data types. Could someone assist me in parsing this > schema using the Table API? > FLink Code: String avroSchema=""; > DataType s = AvroSchemaConverter.convertToDataType(avroSchema); > Schema schema1 = Schema.newBuilder().fromRowDataType(s).build(); > > TableDescriptor descriptor = TableDescriptor.forConnector("kafka") > .schema(schema) > .comment("simple comment") > .option("topic", "") > .option("properties.application.id", "") > .option("properties.security.protocol", "") > .option("properties.bootstrap.servers", "") > .option("properties.group.id", "") > .option("properties.auto.offset.reset", "earliest") > .option("format", "avro") > .build(); > Avro Schema: > { > "name":"standByProperties", > "type":[ > "null", > { > "type":"map", > "values":[ > "null", > "boolean", > "int" > ] > } > ] > }, > Output: standByProperties MAP NULL> Exception: Exception in thread "main" > java.lang.UnsupportedOperationException: Unsupported to derive Schema for > type: RAW('java.lang.Object', ?) NOT NULL at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:580) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:568) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:549) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416) > What I Tried: I defined an Avro schema that includes a Map field with values > of mixed data types. Used the Flink Table API to read data from Kafka and > attempted to use AvroSchemaConverter to map the schema to a Flink table. > During execution, I encountered an exception because the AvroSchemaConverter > does not support Maps with multiple value types. What I Was Expecting: I was > expecting Flink to handle the Map field and correctly parse the data into a > table format, with proper support for the mixed data types within the Map. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36769] support fury serializer for pyflink [flink]
flinkbot commented on PR #25672: URL: https://github.com/apache/flink/pull/25672#issuecomment-2491305074 ## CI report: * c8a1d5309bca19c7d3880fd8b86b1b6b65bd4a07 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35966][runtime] Introduce the TASKS for TaskManagerLoadBalanceMode enum [flink]
RocMarshal commented on code in PR #25647: URL: https://github.com/apache/flink/pull/25647#discussion_r1852202449 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java: ## @@ -40,13 +41,16 @@ public SlotSharingExecutionSlotAllocatorFactory( PhysicalSlotProvider slotProvider, boolean slotWillBeOccupiedIndefinitely, PhysicalSlotRequestBulkChecker bulkChecker, -Duration allocationTimeout) { +Duration allocationTimeout, +TaskManagerOptions.TaskManagerLoadBalanceMode taskManagerLoadBalanceMode) { this( slotProvider, slotWillBeOccupiedIndefinitely, bulkChecker, allocationTimeout, -new LocalInputPreferredSlotSharingStrategy.Factory()); +taskManagerLoadBalanceMode == TaskManagerOptions.TaskManagerLoadBalanceMode.TASKS +? new TaskBalancedPreferredSlotSharingStrategy.Factory() Review Comment: In fact, the phenomenon of uneven task scheduling results is mainly caused by two stages: 1. tasks->slot 2. slots->taskmanager Because prior to the initial feature freeze, we anticipated that the entire FLIP would be at risk. Our original intention in initiating this PR was to release this change as early as possible to address the imbalance issue in the task ->slot dimension. But now there is no time risk with this feature, so we plan to release both dimensions of functionality together. BTW, the optimizations and changes mentioned above are highly likely not to be fixed in the current PR as it may be abandoned. But we will place the corresponding optimization in the appropriate PR position in the future. If you are willing to help with the review, I would greatly appreciate it! :) ## docs/layouts/shortcodes/generated/all_taskmanager_section.html: ## @@ -90,7 +90,7 @@ taskmanager.load-balance.mode NONE Enum -Mode for the load-balance allocation strategy across all available TaskManagers.The SLOTS mode tries to spread out the slots evenly across all available TaskManagers.The NONE mode is the default mode without any specified strategy.Possible values:"NONE""SLOTS" +Mode for the load-balance allocation strategy across all available TaskManagers.The SLOTS mode tries to spread out the slots evenly across all available TaskManagers.The TASKS mode tries to schedule evenly all tasks based on its' number across all available TaskManagers. Note: Currently, enabling this parameter can only achieve the balancing effect of the slot level dimension of DefaultScheduler.The NONE mode is the default mode without any specified strategy.Possible values:"NONE""SLOTS""TASKS" Review Comment: Good idea~, we do need a more detailed and precise description, as well as traceable design documents. How about making the following changes based on your suggestion? 1. Optimize the configuration description section in the code, including adding FLIP addresses 2. The explanation of how to use and the differences between it and other configurations, as well as the creation of charts, can be placed in https://issues.apache.org/jira/browse/FLINK-33392 Completed in the middle ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -707,6 +707,12 @@ public class TaskManagerOptions { "The %s mode tries to spread out the slots evenly across all available %s.", code(TaskManagerLoadBalanceMode.SLOTS.name()), code("TaskManagers")), +text( +"The %s mode tries to schedule evenly all tasks based on its' number across all available %s. " Review Comment: The meaning expressed here is that the scheduler will try its best to ensure that the scheduling results are concentrated and the number of tasks on each Taskmanager is similar. I will add a more detailed description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36769) Suport fury Serializer for pyflink
[ https://issues.apache.org/jira/browse/FLINK-36769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36769: --- Labels: pull-request-available (was: ) > Suport fury Serializer for pyflink > -- > > Key: FLINK-36769 > URL: https://issues.apache.org/jira/browse/FLINK-36769 > Project: Flink > Issue Type: New Feature > Components: API / Python >Affects Versions: 1.16.1 > Environment: flink 1.16.1 >Reporter: xingyuan cheng >Priority: Major > Labels: pull-request-available > > Hi, community. Currently, in the batch verification scenario of our algorithm > data, we use pyflink and encounter low transmission efficiency caused by low > performance of pickle4-based encoding. After research, we decided to adopt > Apache fury, a serialization framework based on pickle5 encoding. The > implementation of fury in python will define the transmission buffer size in > the protocol for transmission to improve the performance of large data > transmission. To this end, I Prepared a draft pull request. What do friends > in the community think about this? > > Pickle protocol 5 with out-of-band data: https://peps.python.org/pep-0574/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36765) How to Handle Multi-Type Maps in Avro Schema with Flink Table API?
[ https://issues.apache.org/jira/browse/FLINK-36765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900079#comment-17900079 ] Maneendra commented on FLINK-36765: --- [~davidradl] The existing schema, which contains map values with multiple data types, works with the Flink DataStream API. However, while migrating the codebase to the Flink Table API, we encountered an issue with this schema. We need to resolve this issue because multiple teams consume the data, making it impractical to modify the source schema. > How to Handle Multi-Type Maps in Avro Schema with Flink Table API? > -- > > Key: FLINK-36765 > URL: https://issues.apache.org/jira/browse/FLINK-36765 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Formats (JSON, Avro, > Parquet, ORC, SequenceFile) >Reporter: Maneendra >Priority: Major > > > I have a Map with multiple data types in my Avro schema, which I am trying to > use in the Flink Table API to read data from Kafka. However, I’m encountering > the following exception because the Flink AvroSchemaConverter does not > support Maps with mixed data types. Could someone assist me in parsing this > schema using the Table API? > FLink Code: String avroSchema=""; > DataType s = AvroSchemaConverter.convertToDataType(avroSchema); > Schema schema1 = Schema.newBuilder().fromRowDataType(s).build(); > > TableDescriptor descriptor = TableDescriptor.forConnector("kafka") > .schema(schema) > .comment("simple comment") > .option("topic", "") > .option("properties.application.id", "") > .option("properties.security.protocol", "") > .option("properties.bootstrap.servers", "") > .option("properties.group.id", "") > .option("properties.auto.offset.reset", "earliest") > .option("format", "avro") > .build(); > Avro Schema: > { > "name":"standByProperties", > "type":[ > "null", > { > "type":"map", > "values":[ > "null", > "boolean", > "int" > ] > } > ] > }, > Output: standByProperties MAP NULL> Exception: Exception in thread "main" > java.lang.UnsupportedOperationException: Unsupported to derive Schema for > type: RAW('java.lang.Object', ?) NOT NULL at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:580) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:568) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:549) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416) > What I Tried: I defined an Avro schema that includes a Map field with values > of mixed data types. Used the Flink Table API to read data from Kafka and > attempted to use AvroSchemaConverter to map the schema to a Flink table. > During execution, I encountered an exception because the AvroSchemaConverter > does not support Maps with multiple value types. What I Was Expecting: I was > expecting Flink to handle the Map field and correctly parse the data into a > table format, with proper support for the mixed data types within the Map. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36766) Use pyflink to create remote env
William Que created FLINK-36766: --- Summary: Use pyflink to create remote env Key: FLINK-36766 URL: https://issues.apache.org/jira/browse/FLINK-36766 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.19.1, 1.20.0 Environment: Ubuntu 24 LTSC Flink : 1.19.1 or 1.20.0 Reporter: William Que I use the following codes to connect remote flink cluster and then create a remote flink env. After adding jar files to the streamExecutionEnvironment, evary time when executing flink sql, error will be reported, something like error of parsing yaml file. {code:java} import os from pyflink.datastream import StreamExecutionEnvironment from pyflink.java_gateway import get_gateway from pyflink.table import StreamTableEnvironment gateway = get_gateway() string_class = gateway.jvm.String string_array = gateway.new_array(string_class, 0) stream_env = gateway.jvm.org.apache.flink.streaming.api.environment.StreamExecutionEnvironment j_stream_exection_environment = stream_env.createRemoteEnvironment("master",8081,string_array) env = StreamExecutionEnvironment(j_stream_exection_environment) jars_path = "F:/jars/flink-1.19.1/" jar_files = ["file:///" + jars_path + f for f in os.listdir(jars_path) if f.endswith('.jar')] jar_files_str = ';'.join(jar_files) env.add_jars(*jar_files) ## Cause Error t_env = StreamTableEnvironment.create(env) {code} Then I trace the error, and find it caused by a static method in configuration.py of pyflink package. after env.add_jars(*jar_files) , the value parameter will be like this, which caused the above error. value = '{color:#FF}[];{color}file:///F:/software/jars-flink3/flink-clients-1.20.0.jar;file:///F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar;..' {code:java} @staticmethod def parse_jars_value(value: str, jvm): is_standard_yaml = jvm.org.apache.flink.configuration.GlobalConfiguration.isStandardYaml() if is_standard_yaml: from ruamel.yaml import YAML yaml = YAML(typ='safe') jar_urls_list = yaml.load(value) # ERROR if isinstance(jar_urls_list, list): return jar_urls_list return value.split(";") {code} I once tried to fix it by the way of removing "[];" part from the value of value parameter, one problem solved but another then came out, it seems jar files have been added twice in classpath at some place. {code:java} Caused by: java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job: old:[file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar, file:/F:/software/jars-flink3/flink-json-1.20.0.jar, file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar] new:[file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, file:/F:/software/jars-flink3/flink-json-1.20.0.jar, file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar, file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar, file:/F:/software/jars-flink3/flink-clients-1.20.0.jar, file:/F:/software/jars-flink3/flink-connector-jdbc-3.2.0-1.19.jar, file:/F:/software/jars-flink3/flink-connector-kafka-3.3.0-1.20.jar, file:/F:/software/jars-flink3/flink-json-1.20.0.jar, file:/F:/software/jars-flink3/kafka-clients-3.6.2.jar, file:/F:/software/jars-flink3/mysql-connector-java-8.0.28.jar]{code} Please check it carefullly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36767) Bump cyclonedx-maven-plugin from 2.7.9 to 2.9.0
Siddharth R created FLINK-36767: --- Summary: Bump cyclonedx-maven-plugin from 2.7.9 to 2.9.0 Key: FLINK-36767 URL: https://issues.apache.org/jira/browse/FLINK-36767 Project: Flink Issue Type: Improvement Components: Kubernetes Operator Affects Versions: kubernetes-operator-1.11.0 Reporter: Siddharth R Bump cyclonedx-maven-plugin from 2.7.9 to 2.9.0 to remediate the findings in the dependant packages. Vulnerabilities from dependencies: [CVE-2024-38374|https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-38374] Package details: [https://mvnrepository.com/artifact/org.cyclonedx/cyclonedx-maven-plugin/2.9.0] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36686) allow customizing env variables for flink-webhook container
[ https://issues.apache.org/jira/browse/FLINK-36686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36686: --- Labels: pull-request-available (was: ) > allow customizing env variables for flink-webhook container > --- > > Key: FLINK-36686 > URL: https://issues.apache.org/jira/browse/FLINK-36686 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.9.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Blocker > Labels: pull-request-available > > In current helm chart of the operator, there is no way to pass in custom env > variables into the flink-webhook container. I can see a few options to fix > the problem: > * make Values.operatorPod.env pod-level envs in flink-operator.yaml > * create a new webhoodPod.env in values.yaml and apply it only to the > flink-webhook container env setup > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]
davidradl commented on code in PR #25671: URL: https://github.com/apache/flink/pull/25671#discussion_r1851895450 ## flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTrackerTest.java: ## @@ -385,11 +384,10 @@ public void addSpan(SpanBuilder spanBuilder) { tracker.reportCompletedCheckpoint(pending.toCompletedCheckpointStats(null)); assertThat(reportedSpans.size()).isEqualTo(1); -assertThat( -reportedSpans.stream() -.map(span -> span.getAttributes().get("checkpointId")) -.collect(Collectors.toList())) -.containsExactly(42L); +Span reportedSpan = Iterables.getOnlyElement(reportedSpans); + assertThat(reportedSpan.getAttributes().get("checkpointId")).isEqualTo(42L); + assertThat(reportedSpan.getAttributes().get("checkpointType")).isEqualTo("Checkpoint"); + assertThat(reportedSpan.getAttributes().get("isUnaligned")).isEqualTo("false"); Review Comment: can we add a test for` isUnaligned ` true. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36743) Rescale from unaligend checkpoint failed
[ https://issues.apache.org/jira/browse/FLINK-36743?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feifan Wang updated FLINK-36743: Attachment: image-2024-11-21-20-20-41-644.png > Rescale from unaligend checkpoint failed > > > Key: FLINK-36743 > URL: https://issues.apache.org/jira/browse/FLINK-36743 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Priority: Major > Attachments: > Allow-user-to-set-whether-restore-forward-rescale-broadcast-from-unaligned-checkpoint-with-parallelism-change.patch, > image-2024-11-19-14-58-22-975.png, image-2024-11-19-17-27-55-387.png, > image-2024-11-19-17-30-14-816.png, image-2024-11-21-20-20-20-536.png, > image-2024-11-21-20-20-41-644.png > > > We encountered the following exception when scaling down a job from 5600 to > 4200: > {code:java} > 2024-11-12 19:20:54,308 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > xx (1358/1400) > (80ea0855521cb3249d011e3166823e47_56a38c81905da002db3a9d8f9d395f2b_1357_0) > switched from RUNNING to FAILED on > container_e33_1725519807238_6894116_01_000825 @ yg- > java.lang.IllegalStateException: Cannot select > SubtaskConnectionDescriptor{inputSubtaskIndex=0, outputSubtaskIndex=4071}; > known channels are [SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=0}, SubtaskConnectionDescriptor{inputSubtaskIndex=1357, > outputSubtaskIndex=4200}] > at > org.apache.flink.streaming.runtime.io.recovery.DemultiplexingRecordDeserializer.select(DemultiplexingRecordDeserializer.java:121) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.recovery.RescalingStreamTaskNetworkInput.processEvent(RescalingStreamTaskNetworkInput.java:181) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:118) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:937) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:916) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:730) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) > ~[flink-dist-1.16.1-mt001-SNAPSHOT.jar:1.16.1-mt001-SNAPSHOT] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312] {code} > * Flink version : 1.16.1 > * unaligned checkpoint : enabled > * log-based checkpoint : enabled > The exception encountered when restore from chk-2718336, and it can > successfully restore from chk-2718333. And I checked the metadata file of > chk-2718336 and chk-2718333 , both of them have in-flight data. It looks like > there is something wrong with the unaligned checkpoint when reassign > in-flight data. Could you please help a look ? [~arvid] , [~pnowojski] -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36066][runtime] Introducing the AdaptiveGraphManager component [flink]
zhuzhurk commented on code in PR #25414: URL: https://github.com/apache/flink/pull/25414#discussion_r1851918943 ## flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManagerTest.java: ## @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.graph; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.transformations.MultipleInputTransformation; + +import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link AdaptiveGraphManager}. */ +public class AdaptiveGraphManagerTest extends JobGraphGeneratorTestBase { +@Override +JobGraph createJobGraph(StreamGraph streamGraph) { +return generateJobGraphInLazilyMode(streamGraph); +} + +@Test +@Override +void testManagedMemoryFractionForUnknownResourceSpec() throws Exception { +final ResourceSpec resource = ResourceSpec.UNKNOWN; +final List resourceSpecs = +Arrays.asList(resource, resource, resource, resource); + +final Configuration taskManagerConfig = +new Configuration() { +{ +set( + TaskManagerOptions.MANAGED_MEMORY_CONSUMER_WEIGHTS, +new HashMap() { +{ +put( +TaskManagerOptions + .MANAGED_MEMORY_CONSUMER_NAME_OPERATOR, +"6"); +put( +TaskManagerOptions + .MANAGED_MEMORY_CONSUMER_NAME_PYTHON, +"4"); +} +}); +} +}; + +final List> operatorScopeManagedMemoryUseCaseWeights = +new ArrayList<>(); +final List> slotScopeManagedMemoryUseCases = new ArrayList<>(); + +// source: batch +operatorScopeManagedMemoryUseCaseWeights.add( +Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1)); +slotScopeManagedMemoryUseCases.add(Collections.emptySet()); + +// map1: batch, python +operatorScopeManagedMemoryUseCaseWeights.add( +Collections.singletonMap(ManagedMemoryUseCase.OPERATOR, 1)); + slotScopeManagedMemoryUseCases.add(Collections.singleton(ManagedMemoryUseCase.PYTHON)); + +// map3: python +operatorScopeManagedMemoryUseCaseWeights.add(Collections.emptyMap()); + slotScopeManagedMemoryUseCases.add(Collections.singleton(ManagedMemoryUseCase.PYTHON)); + +
Re: [PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]
pnowojski commented on code in PR #25671: URL: https://github.com/apache/flink/pull/25671#discussion_r1851984375 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java: ## @@ -256,7 +256,13 @@ private void logCheckpointStatistics(AbstractCheckpointStats checkpointStats) { .setAttribute("checkpointId", checkpointStats.getCheckpointId()) .setAttribute("fullSize", checkpointStats.getStateSize()) .setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize()) -.setAttribute("checkpointStatus", checkpointStats.getStatus().name())); +.setAttribute("checkpointStatus", checkpointStats.getStatus().name()) +.setAttribute( +"isUnaligned", + Boolean.toString(checkpointStats.isUnalignedCheckpoint())) +.setAttribute( +"checkpointType", + checkpointStats.getProperties().getCheckpointType().getName())); Review Comment: The potential values here are: ``` public static final CheckpointType CHECKPOINT = new CheckpointType("Checkpoint", SharingFilesStrategy.FORWARD_BACKWARD); public static final CheckpointType FULL_CHECKPOINT = new CheckpointType("Full Checkpoint", SharingFilesStrategy.FORWARD); public static SavepointType savepoint(SavepointFormatType formatType) { return new SavepointType("Savepoint", PostCheckpointAction.NONE, formatType); } public static SavepointType terminate(SavepointFormatType formatType) { return new SavepointType("Terminate Savepoint", PostCheckpointAction.TERMINATE, formatType); } public static SavepointType suspend(SavepointFormatType formatType) { return new SavepointType("Suspend Savepoint", PostCheckpointAction.SUSPEND, formatType); } ``` Savepoints do end up here, and unfortunately the `Checkpoint` name here is a bit overloaded. However this is also consistent with other places in the code, especially metrics/logs. There savepoints are also reported/logged as checkpoints. So I'm not sure that I would be fine the to change the trace's `Checkpoint` name here to `Snapshot`. I think `SharingFilesStrategy` is superfluous. Each snapshot type is named uniquely, so you can deduce sharing strategy based on the name/type. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (FLINK-36768) Empty Checkpoint Directory created when older checkpoint fails due to timeout and the checkpoint interval is same
Eaugene Thomas created FLINK-36768: -- Summary: Empty Checkpoint Directory created when older checkpoint fails due to timeout and the checkpoint interval is same Key: FLINK-36768 URL: https://issues.apache.org/jira/browse/FLINK-36768 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.12.1 Reporter: Eaugene Thomas When the checkpoint interval equals the checkpoint timeout, and the failure threshold is reached, the asynchronous checkpoint trigger starts. However, due to an internal restart, the tombstone directory ({{{}chk-{}}}) is not cleared. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-31836][table] Upgrade Calcite version to 1.34.0 [flink]
twalthr commented on code in PR #24256: URL: https://github.com/apache/flink/pull/24256#discussion_r1852228055 ## docs/content.zh/docs/dev/table/sql/queries/deduplication.md: ## @@ -32,13 +32,10 @@ Flink 使用 `ROW_NUMBER()` 去除重复数据,就像 Top-N 查询一样。其 下面的例子展示了去重语句的语法: ```sql -SELECT [column_list] -FROM ( - SELECT [column_list], - ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] - ORDER BY time_attr [asc|desc]) AS rownum - FROM table_name) -WHERE rownum = 1 +SELECT [column_list], Review Comment: Can we use this instead for deduplication? The Top-N makes sense. ``` SELECT [column_list] FROM table_name QUALIFY ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) = 1 ``` ## docs/content/docs/dev/table/concepts/versioned_tables.md: ## @@ -163,13 +163,10 @@ table usable in subsequent queries. In general, the results of a query with the following format produces a versioned table: ```sql -SELECT [column_list] -FROM ( - SELECT [column_list], - ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] - ORDER BY time_attr DESC) AS rownum - FROM table_name) -WHERE rownum = 1 +SELECT [column_list], +ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) AS rownum +FROM table_name) +QUALIFY rownum = 1 Review Comment: use same as for dedup ## docs/content.zh/docs/dev/table/sql/queries/topn.md: ## @@ -32,13 +32,11 @@ Flink 使用 `OVER` 窗口子句和过滤条件的组合来表达一个 Top-N 下面展示了 Top-N 的语法: ```sql -SELECT [column_list] -FROM ( - SELECT [column_list], - ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] - ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum - FROM table_name) -WHERE rownum <= N [AND conditions] +SELECT [column_list], +ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] ORDER BY time_attr [asc|desc]) AS rownum +FROM table_name) Review Comment: ``` FROM table_name ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36764] Add checkpoint type and unaligned flag to the checkpoint trace [flink]
davidradl commented on code in PR #25671: URL: https://github.com/apache/flink/pull/25671#discussion_r1851894006 ## flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointStatsTracker.java: ## @@ -256,7 +256,13 @@ private void logCheckpointStatistics(AbstractCheckpointStats checkpointStats) { .setAttribute("checkpointId", checkpointStats.getCheckpointId()) .setAttribute("fullSize", checkpointStats.getStateSize()) .setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize()) -.setAttribute("checkpointStatus", checkpointStats.getStatus().name())); +.setAttribute("checkpointStatus", checkpointStats.getStatus().name()) +.setAttribute( +"isUnaligned", + Boolean.toString(checkpointStats.isUnalignedCheckpoint())) +.setAttribute( +"checkpointType", + checkpointStats.getProperties().getCheckpointType().getName())); Review Comment: in this case I assume that the name will always be "Checkpoint". I assume savepoints do not come through here. If save points can come through here then the span.builder first line should probable be "Snapshot" rather than "Checkpoint". I am thinking that the SnapshotType's SharingFilesStrategy - would be good to include here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36535) Optimize the scale down logic based on historical parallelism
[ https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900022#comment-17900022 ] Rui Fan commented on FLINK-36535: - Thanks [~heigebupahei] for the comment! {quote} I think we only need to change the strategy to max of window and abandon the previous last value of window. {quote} I think about it again, your suggestion is reasonable, I have updated the Jira description. Also I have finished the POC in our production environment. It works as expected: * The rescale frequency is extremely low: Previously, the job was rescaled 10 times a day, but now it is rescaled less than once a day on average. * Jobs run at the same parallelism as during the peak hours of each day. * Scale down only happens when the parallelism during peak hours still wastes resources. > Optimize the scale down logic based on historical parallelism > - > > Key: FLINK-36535 > URL: https://issues.apache.org/jira/browse/FLINK-36535 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale > down to avoid frequent rescaling. > h1. Proposed Change > Treat scale-down.interval as a window: > * Recording the scale down trigger time when the recommended parallelism < > current parallelism > ** When the recommended parallelism >= current parallelism, cancel the > triggered scale down > * The scale down will be executed when currentTime - triggerTime > > scale-down.interval > ** {color:#de350b}Change1{color}: Using the maximum parallelism within the > window instead of the latest parallelism when scaling down. > * {color:#de350b}Change2{color}: Never scale down when currentTime - > triggerTime < scale-down.interval > ** In the FLINK-36018, the scale down may be executed when currentTime - > triggerTime < scale-down.interval. > ** For example: the taskA may scale down when taskB needs to scale up. > h1. Background > Some critical Flink jobs need to scale up in time, but only scale down on a > daily basis. In other words, Flink users do not want Flink jobs to be scaled > down multiple times within 24 hours, and the jobs run at the same parallelism > as during the peak hours of each day. > Note: Users hope to scale down only happens when the parallelism during peak > hours is still a waste of resources. This is a trade-off between downtime and > resource waste for a critical job. > h1. Current solution > In general, this requirement could be met after setting{color:#de350b} > job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 > parallelism, and recommended parallelism is 100 during the peak hours of each > day. We hope taskA doesn't rescale forever, because the triggered scale down > will be canceled once the recommended parallelism >= current parallelism > within 24 hours (It‘s exactly what FLINK-36018 does). > h1. Unexpected Scenario & how to solve? > But I found the critical production job is still rescaled about 10 times > every day (when scale-down.interval is set to 24 hours). > Root cause: There may be many sources in a job, and the traffic peaks of > these sources may occur at different times. When taskA triggers scale down, > the scale down of taskA will not be actively executed within 24 hours, but it > may be executed when other tasks are scaled up. > For example: > * The scale down of sourceB and sourceC may be executed when SourceA scales > up. > * After a while, the scale down of sourceA and sourceC may be executed when > SourceB scales up. > * After a while, the scale down of sourceA and sourceB may be executed when > SourceC scales up. > * When there are many tasks, the above 3 steps will be executed repeatedly. > That's why the job is rescaled about 10 times every day, the > {color:#de350b}change2{color} of proposed change could solve this issue: > Never scale down when currentTime - triggerTime < scale-down.interval. > > {color:#de350b}Change1{color}: Using the maximum parallelism within the > window instead of the latest parallelism when scaling down. > * It can ensure that the parallelism after scaling down is the parallelism > at yesterday's peak. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36739) Update NodeJS to v22 (LTS)
[ https://issues.apache.org/jira/browse/FLINK-36739?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mehdi updated FLINK-36739: -- Summary: Update NodeJS to v22 (LTS) (was: Update NodeJS to v18.20.5 (LTS)) > Update NodeJS to v22 (LTS) > -- > > Key: FLINK-36739 > URL: https://issues.apache.org/jira/browse/FLINK-36739 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Web Frontend >Reporter: Mehdi >Assignee: Mehdi >Priority: Major > Labels: pull-request-available > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36765) How to Handle Multi-Type Maps in Avro Schema with Flink Table API?
Maneendra created FLINK-36765: - Summary: How to Handle Multi-Type Maps in Avro Schema with Flink Table API? Key: FLINK-36765 URL: https://issues.apache.org/jira/browse/FLINK-36765 Project: Flink Issue Type: Bug Components: API / Type Serialization System, Formats (JSON, Avro, Parquet, ORC, SequenceFile) Reporter: Maneendra I have a Map with multiple data types in my Avro schema, which I am trying to use in the Flink Table API to read data from Kafka. However, I’m encountering the following exception because the Flink AvroSchemaConverter does not support Maps with mixed data types. Could someone assist me in parsing this schema using the Table API? FLink Code: String avroSchema=""; DataType s = AvroSchemaConverter.convertToDataType(avroSchema); Schema schema1 = Schema.newBuilder().fromRowDataType(s).build(); TableDescriptor descriptor = TableDescriptor.forConnector("kafka") .schema(schema) .comment("simple comment") .option("topic", "") .option("properties.application.id", "") .option("properties.security.protocol", "") .option("properties.bootstrap.servers", "") .option("properties.group.id", "") .option("properties.auto.offset.reset", "earliest") .option("format", "avro") .build(); Avro Schema: { "name":"standByProperties", "type":[ "null", { "type":"map", "values":[ "null", "boolean", "int" ] } ] }, Output: standByProperties MAP Exception: Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported to derive Schema for type: RAW('java.lang.Object', ?) NOT NULL at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:580) at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416) at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:568) at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:549) at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416) What I Tried: I defined an Avro schema that includes a Map field with values of mixed data types. Used the Flink Table API to read data from Kafka and attempted to use AvroSchemaConverter to map the schema to a Flink table. During execution, I encountered an exception because the AvroSchemaConverter does not support Maps with multiple value types. What I Was Expecting: I was expecting Flink to handle the Map field and correctly parse the data into a table format, with proper support for the mixed data types within the Map. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (FLINK-36535) Optimize the scale down logic based on historical parallelism
[ https://issues.apache.org/jira/browse/FLINK-36535 ] Rui Fan deleted comment on FLINK-36535: - was (Author: fanrui): Thanks [~heigebupahei] for the comment! {quote}that we need to treat job.autoscaler.scale-down.interval as a window. The default is last value of window, but in some cases it is max in window(>24hour)? {quote} For solution2, yes. For solution1, the default is last value of window. And users could set an option to choose the max as the result. {quote}But it seems to me like we just need to change the default behavior to max in window? Just imagine, if you follow the default 1hour configuration, the result of using max in window is not much different from before, so I think we only need to change the strategy to max of window and abandon the previous last value of window. {quote} It's a good point. As I understand, you mean: * We choose the max as the result by default * Don't need any additional option, and autoscaler doesn't support choose the latest parallelism as the result for scale down Is my understanding correct? If yes, I think it makes sense for most of scenarios. However I'm afraid it could not meet all requirements. For example: * when the job owners are very familiar with the daily traffic changes of the job, they could set a appropriate job.autoscaler.scale-down.interval, and job will only scale down once a day when autoscaler choose the latest parallelism. * IIUC, autoscaler needs at least 2 scales down a day if autoscaler choose the max parallelism in window. ** Because the max is not the latest parallelism, and after scaling down, max is most likely not the latest appropriate parallelism for vertex. So job needs scale down again later. Please correct me if I misunderstand anything, thanks~ > Optimize the scale down logic based on historical parallelism > - > > Key: FLINK-36535 > URL: https://issues.apache.org/jira/browse/FLINK-36535 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale > down to avoid frequent rescaling. > h1. Proposed Change > Treat scale-down.interval as a window: > * Recording the scale down trigger time when the recommended parallelism < > current parallelism > ** When the recommended parallelism >= current parallelism, cancel the > triggered scale down > * The scale down will be executed when currentTime - triggerTime > > scale-down.interval > ** {color:#de350b}Change1{color}: Using the maximum parallelism within the > window instead of the latest parallelism when scaling down. > * {color:#de350b}Change2{color}: Never scale down when currentTime - > triggerTime < scale-down.interval > ** In the FLINK-36018, the scale down may be executed when currentTime - > triggerTime < scale-down.interval. > ** For example: the taskA may scale down when taskB needs to scale up. > h1. Background > Some critical Flink jobs need to scale up in time, but only scale down on a > daily basis. In other words, Flink users do not want Flink jobs to be scaled > down multiple times within 24 hours, and the jobs run at the same parallelism > as during the peak hours of each day. > Note: Users hope to scale down only happens when the parallelism during peak > hours is still a waste of resources. This is a trade-off between downtime and > resource waste for a critical job. > h1. Current solution > In general, this requirement could be met after setting{color:#de350b} > job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 > parallelism, and recommended parallelism is 100 during the peak hours of each > day. We hope taskA doesn't rescale forever, because the triggered scale down > will be canceled once the recommended parallelism >= current parallelism > within 24 hours (It‘s exactly what FLINK-36018 does). > h1. Unexpected Scenario & how to solve? > But I found the critical production job is still rescaled about 10 times > every day (when scale-down.interval is set to 24 hours). > Root cause: There may be many sources in a job, and the traffic peaks of > these sources may occur at different times. When taskA triggers scale down, > the scale down of taskA will not be actively executed within 24 hours, but it > may be executed when other tasks are scaled up. > For example: > * The scale down of sourceB and sourceC may be executed when SourceA scales > up. > * After a while, the scale down of sourceA and sourceC may be executed when > SourceB scales up. > * After a while, the scale down of sourceA and sourceB may be executed when > SourceC scales up. > * When there are many tasks, the above 3 steps will be executed repeatedly. > That's why the job is re
[jira] [Updated] (FLINK-36769) Suport fury Serializer for pyflink
[ https://issues.apache.org/jira/browse/FLINK-36769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xingyuan cheng updated FLINK-36769: --- Description: Hi, community. Currently, in the batch verification scenario of our algorithm data, we use pyflink and encounter low transmission efficiency caused by low performance of pickle4-based encoding. After research, we decided to adopt Apache fury, a serialization framework based on pickle5 encoding. The implementation of fury in python will define the transmission buffer size in the protocol for transmission to improve the performance of large data transmission. To this end, I Prepared a draft pull request. What do friends in the community think about this? Pickle protocol 5 with out-of-band data: https://peps.python.org/pep-0574/ was:Hi, community. Currently, in the batch verification scenario of our algorithm data, we use pyflink and encounter low transmission efficiency caused by low performance of pickle4-based encoding. After research, we decided to adopt Apache fury, a serialization framework based on pickle5 encoding. The implementation of fury in python will define the transmission buffer size in the protocol for transmission to improve the performance of large data transmission. To this end, I Prepared a draft pull request. What do friends in the community think about this? > Suport fury Serializer for pyflink > -- > > Key: FLINK-36769 > URL: https://issues.apache.org/jira/browse/FLINK-36769 > Project: Flink > Issue Type: New Feature > Components: API / Python >Affects Versions: 1.16.1 > Environment: flink 1.16.1 >Reporter: xingyuan cheng >Priority: Major > > Hi, community. Currently, in the batch verification scenario of our algorithm > data, we use pyflink and encounter low transmission efficiency caused by low > performance of pickle4-based encoding. After research, we decided to adopt > Apache fury, a serialization framework based on pickle5 encoding. The > implementation of fury in python will define the transmission buffer size in > the protocol for transmission to improve the performance of large data > transmission. To this end, I Prepared a draft pull request. What do friends > in the community think about this? > > Pickle protocol 5 with out-of-band data: https://peps.python.org/pep-0574/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36690][runtime] Fix schema operator hanging under extreme parallelized pressure [flink-cdc]
yuxiqian commented on PR #3680: URL: https://github.com/apache/flink-cdc/pull/3680#issuecomment-2490188267 @leonardBang Will this PR be reviewed soon? I'm planning to implement FLINK-36763 based on this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36765) How to Handle Multi-Type Maps in Avro Schema with Flink Table API?
[ https://issues.apache.org/jira/browse/FLINK-36765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900049#comment-17900049 ] david radley commented on FLINK-36765: -- I was trying to find where the Avro specification says that multiple values are allowed for the map values, do you know where this is documented as part of the specification. > How to Handle Multi-Type Maps in Avro Schema with Flink Table API? > -- > > Key: FLINK-36765 > URL: https://issues.apache.org/jira/browse/FLINK-36765 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System, Formats (JSON, Avro, > Parquet, ORC, SequenceFile) >Reporter: Maneendra >Priority: Major > > > I have a Map with multiple data types in my Avro schema, which I am trying to > use in the Flink Table API to read data from Kafka. However, I’m encountering > the following exception because the Flink AvroSchemaConverter does not > support Maps with mixed data types. Could someone assist me in parsing this > schema using the Table API? > FLink Code: String avroSchema=""; > DataType s = AvroSchemaConverter.convertToDataType(avroSchema); > Schema schema1 = Schema.newBuilder().fromRowDataType(s).build(); > > TableDescriptor descriptor = TableDescriptor.forConnector("kafka") > .schema(schema) > .comment("simple comment") > .option("topic", "") > .option("properties.application.id", "") > .option("properties.security.protocol", "") > .option("properties.bootstrap.servers", "") > .option("properties.group.id", "") > .option("properties.auto.offset.reset", "earliest") > .option("format", "avro") > .build(); > Avro Schema: > { > "name":"standByProperties", > "type":[ > "null", > { > "type":"map", > "values":[ > "null", > "boolean", > "int" > ] > } > ] > }, > Output: standByProperties MAP NULL> Exception: Exception in thread "main" > java.lang.UnsupportedOperationException: Unsupported to derive Schema for > type: RAW('java.lang.Object', ?) NOT NULL at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:580) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:568) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:549) > at > org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:416) > What I Tried: I defined an Avro schema that includes a Map field with values > of mixed data types. Used the Flink Table API to read data from Kafka and > attempted to use AvroSchemaConverter to map the schema to a Flink table. > During execution, I encountered an exception because the AvroSchemaConverter > does not support Maps with multiple value types. What I Was Expecting: I was > expecting Flink to handle the Map field and correctly parse the data into a > table format, with proper support for the mixed data types within the Map. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36769) Suport fury Serializer for pyflink
xingyuan cheng created FLINK-36769: -- Summary: Suport fury Serializer for pyflink Key: FLINK-36769 URL: https://issues.apache.org/jira/browse/FLINK-36769 Project: Flink Issue Type: New Feature Components: API / Python Affects Versions: 1.16.1 Reporter: xingyuan cheng Hi, community. Currently, in the batch verification scenario of our algorithm data, we use pyflink and encounter low transmission efficiency caused by low performance of pickle4-based encoding. After research, we decided to adopt Apache fury, a serialization framework based on pickle5 encoding. The implementation of fury in python will define the transmission buffer size in the protocol for transmission to improve the performance of large data transmission. To this end, I Prepared a draft pull request. What do friends in the community think about this? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36769) Suport fury Serializer for pyflink
[ https://issues.apache.org/jira/browse/FLINK-36769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xingyuan cheng updated FLINK-36769: --- External issue URL: https://github.com/apache/flink/pull/25672 > Suport fury Serializer for pyflink > -- > > Key: FLINK-36769 > URL: https://issues.apache.org/jira/browse/FLINK-36769 > Project: Flink > Issue Type: New Feature > Components: API / Python >Affects Versions: 1.16.1 >Reporter: xingyuan cheng >Priority: Major > > Hi, community. Currently, in the batch verification scenario of our algorithm > data, we use pyflink and encounter low transmission efficiency caused by low > performance of pickle4-based encoding. After research, we decided to adopt > Apache fury, a serialization framework based on pickle5 encoding. The > implementation of fury in python will define the transmission buffer size in > the protocol for transmission to improve the performance of large data > transmission. To this end, I Prepared a draft pull request. What do friends > in the community think about this? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [ISSUE#36769] support fury serializer for pyflink [flink]
kaori-seasons opened a new pull request, #25672: URL: https://github.com/apache/flink/pull/25672 ## What is the purpose of the change Hi, community. Currently, in the batch verification scenario of our algorithm data, we use pyflink and encounter low transmission efficiency caused by low performance of pickle4-based encoding. After research, we decided to adopt Apache fury, a serialization framework based on pickle5 encoding. The implementation of fury in python will define the transmission buffer size in the protocol for transmission to improve the performance of large data transmission. Related communications with fury community members can be found [here](https://github.com/apache/fury/issues/1919) ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change Please make sure both new and modified tests in this PR follow [the conventions for tests defined in our code quality guide](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#7-testing). *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36769) Suport fury Serializer for pyflink
[ https://issues.apache.org/jira/browse/FLINK-36769?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] xingyuan cheng updated FLINK-36769: --- Docs Text: https://github.com/apache/flink/pull/25672 External issue URL: (was: https://github.com/apache/flink/pull/25672) Environment: flink 1.16.1 > Suport fury Serializer for pyflink > -- > > Key: FLINK-36769 > URL: https://issues.apache.org/jira/browse/FLINK-36769 > Project: Flink > Issue Type: New Feature > Components: API / Python >Affects Versions: 1.16.1 > Environment: flink 1.16.1 >Reporter: xingyuan cheng >Priority: Major > > Hi, community. Currently, in the batch verification scenario of our algorithm > data, we use pyflink and encounter low transmission efficiency caused by low > performance of pickle4-based encoding. After research, we decided to adopt > Apache fury, a serialization framework based on pickle5 encoding. The > implementation of fury in python will define the transmission buffer size in > the protocol for transmission to improve the performance of large data > transmission. To this end, I Prepared a draft pull request. What do friends > in the community think about this? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36686) allow customizing env variables for flink-webhook container
[ https://issues.apache.org/jira/browse/FLINK-36686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-36686. -- Fix Version/s: kubernetes-operator-1.11.0 Resolution: Fixed merged to main 14ded7e8b9f8aa12f54adeec9d777f8e2253d814 > allow customizing env variables for flink-webhook container > --- > > Key: FLINK-36686 > URL: https://issues.apache.org/jira/browse/FLINK-36686 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.9.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.11.0 > > > In current helm chart of the operator, there is no way to pass in custom env > variables into the flink-webhook container. I can see a few options to fix > the problem: > * make Values.operatorPod.env pod-level envs in flink-operator.yaml > * create a new webhoodPod.env in values.yaml and apply it only to the > flink-webhook container env setup > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36686) allow customizing env variables for flink-webhook container
[ https://issues.apache.org/jira/browse/FLINK-36686?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora updated FLINK-36686: --- Priority: Minor (was: Blocker) > allow customizing env variables for flink-webhook container > --- > > Key: FLINK-36686 > URL: https://issues.apache.org/jira/browse/FLINK-36686 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.9.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Minor > Labels: pull-request-available > > In current helm chart of the operator, there is no way to pass in custom env > variables into the flink-webhook container. I can see a few options to fix > the problem: > * make Values.operatorPod.env pod-level envs in flink-operator.yaml > * create a new webhoodPod.env in values.yaml and apply it only to the > flink-webhook container env setup > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-36685) Enable update/create operation on flinkdeployment resource in mutation webhook
[ https://issues.apache.org/jira/browse/FLINK-36685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-36685. -- Fix Version/s: kubernetes-operator-1.11.0 Resolution: Fixed merged to main 5d29554a179632028ccd182d48da5f825b52b976 > Enable update/create operation on flinkdeployment resource in mutation webhook > -- > > Key: FLINK-36685 > URL: https://issues.apache.org/jira/browse/FLINK-36685 > Project: Flink > Issue Type: Improvement > Components: Kubernetes Operator >Affects Versions: kubernetes-operator-1.9.0 >Reporter: Shuyi Chen >Assignee: Shuyi Chen >Priority: Minor > Labels: pull-request-available > Fix For: kubernetes-operator-1.11.0 > > > In mutation webhook yaml of the helm chart, UPDATE/CREATE operation is not > allowed on > flinkdeployments. We use mutation webhook to inject platform secrets to the > flink pipeline CRD. Planned to add a PR to enable UPDATE/CREATE operation on > flinkdeployments resource. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36469] Bump commons-io from 2.11.0 to 2.17.0 [flink-kubernetes-operator]
r-sidd opened a new pull request, #917: URL: https://github.com/apache/flink-kubernetes-operator/pull/917 ## What is the purpose of the change Bump commons-io from 2.11.0 to 2.17.0 ## Brief change log Bump cyclonedx-maven-plugin from 2.7.9 to 2.9.0 to remediate the findings in the dependant packages. **Vulnerabilities from dependencies:** [CVE-2024-38374](https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2024-38374) **Package details:** https://mvnrepository.com/artifact/org.cyclonedx/cyclonedx-maven-plugin/2.9.0 ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): yes - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36770) Support Request Timeout for AWS sinks
[ https://issues.apache.org/jira/browse/FLINK-36770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900114#comment-17900114 ] Ahmed Hamdy commented on FLINK-36770: - [~hlteoh37] I would love your feedback here, also could you assign this to me? > Support Request Timeout for AWS sinks > - > > Key: FLINK-36770 > URL: https://issues.apache.org/jira/browse/FLINK-36770 > Project: Flink > Issue Type: Improvement > Components: Connectors / AWS >Reporter: Ahmed Hamdy >Priority: Major > Fix For: aws-connector-5.1.0 > > > ## Description > in > [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] > we introduced request timeout for Async Sink which was released in 1.20, > Ideally we want to support that in AWS connectors. > ## Acceptance Criteria > - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated > ## Notes > This is a breaking change which means we expect the next version (5.1) to not > be compatible with 1.19 anymore. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36770) Support Request Timeout for AWS sinks
Ahmed Hamdy created FLINK-36770: --- Summary: Support Request Timeout for AWS sinks Key: FLINK-36770 URL: https://issues.apache.org/jira/browse/FLINK-36770 Project: Flink Issue Type: Improvement Components: Connectors / AWS Reporter: Ahmed Hamdy Fix For: aws-connector-5.1.0 ## Description in [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] we introduced request timeout for Async Sink which was released in 1.20, Ideally we want to support that in AWS connectors. ## Acceptance Criteria - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated ## Notes This is a breaking change which means we expect the next version (5.1) to not be compatible with 1.19 anymore. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36770) Support Request Timeout for AWS sinks
[ https://issues.apache.org/jira/browse/FLINK-36770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Hamdy updated FLINK-36770: Description: h2. Description in [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] we introduced request timeout for Async Sink which was released in 1.20, Ideally we want to support that in AWS connectors. h2. Acceptance Criteria - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated h2. Notes This is a breaking change regarding flink version compatibility which means we expect the next version (5.1) to not be compatible with 1.19 anymore. was: h2. Description in [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] we introduced request timeout for Async Sink which was released in 1.20, Ideally we want to support that in AWS connectors. h2. Acceptance Criteria - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated h2. Notes This is a breaking change which means we expect the next version (5.1) to not be compatible with 1.19 anymore. > Support Request Timeout for AWS sinks > - > > Key: FLINK-36770 > URL: https://issues.apache.org/jira/browse/FLINK-36770 > Project: Flink > Issue Type: Improvement > Components: Connectors / AWS >Reporter: Ahmed Hamdy >Priority: Major > Fix For: aws-connector-5.1.0 > > > h2. Description > in > [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] > we introduced request timeout for Async Sink which was released in 1.20, > Ideally we want to support that in AWS connectors. > h2. Acceptance Criteria > - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated > h2. Notes > This is a breaking change regarding flink version compatibility which means > we expect the next version (5.1) to not be compatible with 1.19 anymore. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36770) Support Request Timeout for AWS sinks
[ https://issues.apache.org/jira/browse/FLINK-36770?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Hamdy updated FLINK-36770: Description: h2. Description in [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] we introduced request timeout for Async Sink which was released in 1.20, Ideally we want to support that in AWS connectors. h2. Acceptance Criteria - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated h2. Notes This is a breaking change which means we expect the next version (5.1) to not be compatible with 1.19 anymore. was: ## Description in [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] we introduced request timeout for Async Sink which was released in 1.20, Ideally we want to support that in AWS connectors. ## Acceptance Criteria - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated ## Notes This is a breaking change which means we expect the next version (5.1) to not be compatible with 1.19 anymore. > Support Request Timeout for AWS sinks > - > > Key: FLINK-36770 > URL: https://issues.apache.org/jira/browse/FLINK-36770 > Project: Flink > Issue Type: Improvement > Components: Connectors / AWS >Reporter: Ahmed Hamdy >Priority: Major > Fix For: aws-connector-5.1.0 > > > h2. Description > in > [FLIP-451|https://cwiki.apache.org/confluence/display/FLINK/FLIP-451%3A+Introduce+timeout+configuration+to+AsyncSink+API] > we introduced request timeout for Async Sink which was released in 1.20, > Ideally we want to support that in AWS connectors. > h2. Acceptance Criteria > - Kinesis Sink, Firehose Sink, DDB Sink, SQS Sink updated > h2. Notes > This is a breaking change which means we expect the next version (5.1) to not > be compatible with 1.19 anymore. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [FLINK-36529] Allow flink version configs to be set to greater than given version [flink-kubernetes-operator]
tomncooper opened a new pull request, #918: URL: https://github.com/apache/flink-kubernetes-operator/pull/918 ## What is the purpose of the change The operator currently allows the following syntax for defining flink version specific defaults: `kubernetes.operator.default-configuration.flink-version.v1_18.key: value` The problem with this is that, in many cases, these defaults should be applied to newer Flink versions as well, forcing config duplications. This PR introduces a new "greater than" syntax for config defaults, indicating that they should be applied to a given version and above: `kubernetes.operator.default-configuration.flink-version.v1_18+.key: value` In this case key:value would be applied to all Flink version greater or equal to 1.18, unless overridden for specific versions. ## Brief change log - Adds a new method `getRelevantVersionPrefixes`, to the `org.apache.flink.kubernetes.operator.config.FlinkConfigManager`, which identifies all Flink version default config prefix which are relevant to the currently specified Flink version. - Refactored the `FlinkVersion` enum to specify major and minor semver integers to facilitate quick look up of relevant Flink versions when parsing version strings. ## Verifying this change This change added tests and can be verified as follows: - Added additional tests in the `FlinkConfigManagerTest` class to cover the new Regex and `getRelevantVersionPrefixes` methods. - Updated the `testVersionNamespaceDefaultConfs` test in `FlinkConfigManagerTest` to test the greater than version behaviour. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changes to the `CustomResourceDescriptors`: no - Core observer or reconciler logic that is regularly executed: yes ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? Update the configuration docs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36529] Allow flink version configs to be set to greater than given version [flink-kubernetes-operator]
tomncooper commented on PR #918: URL: https://github.com/apache/flink-kubernetes-operator/pull/918#issuecomment-2491802004 I probably need to look at adding an end to end test for this but want to make sure I am on the right track before doing that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-36529] Allow flink version configs to be set to greater than given version [flink-kubernetes-operator]
tomncooper commented on PR #918: URL: https://github.com/apache/flink-kubernetes-operator/pull/918#issuecomment-2491803380 CC @gyfora -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-36529) Support greater or equals logic for operator flink version default configs
[ https://issues.apache.org/jira/browse/FLINK-36529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-36529: --- Labels: pull-request-available (was: ) > Support greater or equals logic for operator flink version default configs > -- > > Key: FLINK-36529 > URL: https://issues.apache.org/jira/browse/FLINK-36529 > Project: Flink > Issue Type: New Feature > Components: Kubernetes Operator >Reporter: Gyula Fora >Priority: Major > Labels: pull-request-available > > The operator currently allows the following syntax for defining flink version > specific defaults: > kubernetes.operator.default-configuration.flink-version.v1_18.key: value > The problem with this is that usually these defaults should be applied to > newer Flink versions as well in many cases, forcing config duplications. > We should introduce a + syntax for configs applied to a version and above: > kubernetes.operator.default-configuration.flink-version.v1_18+.key: value > in this case key:value would be applied to all Flink version greater or equal > to 1.18 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [hotfix] [docs]Fix miss semicolon on SELECT & WHERE clause example sql [flink]
camilesing opened a new pull request, #25673: URL: https://github.com/apache/flink/pull/25673 ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* Fix miss semicolon on SELECT & WHERE clause example sql. ## Brief change log Add semicolon for miss semicolon example sql. ## Verifying this change Since the changes affect the doc examples and there are no tests. I copied the snippets and verified (compiled) them locally. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [hotfix] [docs]Fix miss semicolon on SELECT & WHERE clause example sql [flink]
flinkbot commented on PR #25673: URL: https://github.com/apache/flink/pull/25673#issuecomment-2492789712 ## CI report: * 4740ab5775e6045fbc62736e2a60f75ea4463812 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (FLINK-35859) [flink-cdc] Fix: The assigner is not ready to offer finished split information, this should not be called
[ https://issues.apache.org/jira/browse/FLINK-35859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900028#comment-17900028 ] Xin Gong edited comment on FLINK-35859 at 11/22/24 3:19 AM: [~loserwang1024] This fix just ignore some newly added table. It can cause some UT timeout fail. For example, mongo NewlyAddedTableITCase#testRemoveAndAddCollectionsOneByOne, pg NewlyAddedTableITCase#testRemoveAndAddTablesOneByOne. Users cannot immediately perceive task issues in production applications. Maybe we can fix it to more perfect. I add a flag to trigger restart when status is NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. newly table will be synchronized. {code:java} // code placeholder /** Assigner for snapshot split. */ public class SnapshotSplitAssigner implements SplitAssigner { private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitAssigner.class); private boolean flagExceptionAssignerStatusWhenCheckpoint; private void captureNewlyAddedTables() { if (sourceConfig.isScanNewlyAddedTableEnabled() && !AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) { .. } else if (AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) { flagExceptionAssignerStatusWhenCheckpoint = true; LOG.info("exceptionAssignerStatusCheckpointFlag to true"); } } @Override public void notifyCheckpointComplete(long checkpointId) { if (AssignerStatus.isNewlyAddedAssigningFinished(assignerStatus) && flagExceptionAssignerStatusWhenCheckpoint) { throw new FlinkRuntimeException("Previous assigner status is NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED and " + "newly add table will cause task always be exception from checkpoint, so we " + "trigger restart for newly table after assigner to normal status"); } } }{code} was (Author: JIRAUSER292212): [~loserwang1024] Users cannot immediately perceive task issues. Maybe we can fix it to more perfect. I add a flag to trigger restart when status is NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. newly table will be synchronized. {code:java} // code placeholder /** Assigner for snapshot split. */ public class SnapshotSplitAssigner implements SplitAssigner { private static final Logger LOG = LoggerFactory.getLogger(SnapshotSplitAssigner.class); private boolean flagExceptionAssignerStatusWhenCheckpoint; private void captureNewlyAddedTables() { if (sourceConfig.isScanNewlyAddedTableEnabled() && !AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) { .. } else if (AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) { flagExceptionAssignerStatusWhenCheckpoint = true; LOG.info("exceptionAssignerStatusCheckpointFlag to true"); } } @Override public void notifyCheckpointComplete(long checkpointId) { if (AssignerStatus.isNewlyAddedAssigningFinished(assignerStatus) && flagExceptionAssignerStatusWhenCheckpoint) { throw new FlinkRuntimeException("Previous assigner status is NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED and " + "newly add table will cause task always be exception from checkpoint, so we " + "trigger restart for newly table after assigner to normal status"); } } } {code} > [flink-cdc] Fix: The assigner is not ready to offer finished split > information, this should not be called > - > > Key: FLINK-35859 > URL: https://issues.apache.org/jira/browse/FLINK-35859 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 >Reporter: Hongshun Wang >Assignee: Hongshun Wang >Priority: Minor > Fix For: cdc-3.2.0 > > > When use CDC with newly added table, an error occurs: > {code:java} > The assigner is not ready to offer finished split information, this should > not be called. {code} > It's because: > 1. when stop then restart the job , the status is > NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. > > 2. Then Enumerator will send each reader with > BinlogSplitUpdateRequestEvent to update binlog. (see > org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator#syncWithReaders). > 3. The Reader will suspend binlog reader then send > BinlogSplitMetaRequestEvent to Enumerator. > 4. The Enumerator found that some tables are not sent, an error will occur > {code:java} > private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent > requestEvent) { > // initialize once >
[jira] [Commented] (FLINK-36768) Empty Checkpoint Directory created when older checkpoint fails due to timeout and the checkpoint interval is same
[ https://issues.apache.org/jira/browse/FLINK-36768?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900228#comment-17900228 ] Rui Xia commented on FLINK-36768: - Hi Thomas, thank you for reporting this. From your description, I am not sure which chk- is not properly deleted. Could you provide a more concrete example for your condition. For example, chk-1 trigger -> chk-1 timeout -> at the same time chk-2 trigger, chk-xxx cp directory is not cleaned. BTW, 1.12.1 is a old flink version (2 years ago). You may use the latest version and check whether the same problem is still existed or not. > Empty Checkpoint Directory created when older checkpoint fails due to timeout > and the checkpoint interval is same > -- > > Key: FLINK-36768 > URL: https://issues.apache.org/jira/browse/FLINK-36768 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.12.1 >Reporter: Eaugene Thomas >Priority: Minor > > When the checkpoint interval equals the checkpoint timeout, and the failure > threshold is reached, the asynchronous checkpoint trigger starts. However, > due to an internal restart, the tombstone directory ({{{}chk-{}}}) is not > cleared. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (FLINK-36773) Introduce new Group Agg Operator with Async State API
[ https://issues.apache.org/jira/browse/FLINK-36773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee reassigned FLINK-36773: --- Assignee: Yang Xu > Introduce new Group Agg Operator with Async State API > - > > Key: FLINK-36773 > URL: https://issues.apache.org/jira/browse/FLINK-36773 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: xuyang >Assignee: Yang Xu >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36772][mysql][cdc-base] Fix error placeholder for errorMessageTemplate of Preconditions [flink-cdc]
gong commented on PR #3754: URL: https://github.com/apache/flink-cdc/pull/3754#issuecomment-2492953554 > LGTM, could you also check all Preconditions.checkState method call? @leonardBang I checked all Preconditions.checkState method call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (FLINK-36773) Introduce new Group Agg Operator with Async State API
[ https://issues.apache.org/jira/browse/FLINK-36773?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lincoln lee reassigned FLINK-36773: --- Assignee: xuyang (was: Yang Xu) > Introduce new Group Agg Operator with Async State API > - > > Key: FLINK-36773 > URL: https://issues.apache.org/jira/browse/FLINK-36773 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Runtime >Reporter: xuyang >Assignee: xuyang >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36774) SQL gateway should support executing multiple SQL statements once
tim yu created FLINK-36774: -- Summary: SQL gateway should support executing multiple SQL statements once Key: FLINK-36774 URL: https://issues.apache.org/jira/browse/FLINK-36774 Project: Flink Issue Type: New Feature Components: Table SQL / Gateway Reporter: tim yu I have a flink SQL file example.sql that contain the following SQL statements: {code:java} set 'state.checkpoints.dir' = 'hdfs:///tmp/flink-checkpoints/datagen_blackhole'; set 'execution.checkpointing.interval' = '3min'; set 'execution.checkpointing.timeout' = '20min'; set 'parallelism.default' = '1'; CREATE TABLE t1 (id BIGINT, info STRING) with ('connector' = 'datagen', 'rows-per-second' = '1', 'fields.id.min' = '1', 'fields.id.max' = '10'); CREATE TABLE black_hole_1 WITH ('connector' = 'blackhole') LIKE t1 (EXCLUDING ALL); insert into black_hole_1 select * from t1;{code} I hope that submit this file to SQL gateway by using the following command: {code:java} curl -d '@example.sql' -H 'Content-Type: application/json' http://gateway:8083/v1/sessions/{session_id}/statements {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] Add blogpost for new KinesisStreamsSource and DynamoDbStreamsSource [flink-web]
hlteoh37 opened a new pull request, #765: URL: https://github.com/apache/flink-web/pull/765 Add a blogpost to illustrated in detail the KDS and DDB streams source Image of blogpost included:  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (FLINK-36535) Optimize the scale down logic based on historical parallelism
[ https://issues.apache.org/jira/browse/FLINK-36535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900226#comment-17900226 ] yuanfenghu commented on FLINK-36535: nice improvement! > Optimize the scale down logic based on historical parallelism > - > > Key: FLINK-36535 > URL: https://issues.apache.org/jira/browse/FLINK-36535 > Project: Flink > Issue Type: Improvement > Components: Autoscaler >Reporter: Rui Fan >Assignee: Rui Fan >Priority: Major > > This is a follow-up to FLINK-36018 . FLINK-36018 supported the lazy scale > down to avoid frequent rescaling. > h1. Proposed Change > Treat scale-down.interval as a window: > * Recording the scale down trigger time when the recommended parallelism < > current parallelism > ** When the recommended parallelism >= current parallelism, cancel the > triggered scale down > * The scale down will be executed when currentTime - triggerTime > > scale-down.interval > ** {color:#de350b}Change1{color}: Using the maximum parallelism within the > window instead of the latest parallelism when scaling down. > * {color:#de350b}Change2{color}: Never scale down when currentTime - > triggerTime < scale-down.interval > * > ** In the FLINK-36018, the scale down may be executed when currentTime - > triggerTime < scale-down.interval. > ** For example: the taskA may scale down when taskB needs to scale up. > h1. Background > Some critical Flink jobs need to scale up in time, but only scale down on a > daily basis. In other words, Flink users do not want Flink jobs to be scaled > down multiple times within 24 hours, and jobs run at the same parallelism as > during the peak hours of each day. > Note: Users hope to scale down only happens when the parallelism during peak > hours still wastes resources. This is a trade-off between downtime and > resource waste for a critical job. > h1. Current solution > In general, this requirement could be met after setting{color:#de350b} > job.autoscaler.scale-down.interval= 24 hour{color}. When taskA runs with 100 > parallelism, and recommended parallelism is 100 during the peak hours of each > day. We hope taskA doesn't rescale forever, because the triggered scale down > will be canceled once the recommended parallelism >= current parallelism > within 24 hours (It‘s exactly what FLINK-36018 does). > h1. Unexpected Scenario & how to solve? > But I found the critical production job is still rescaled about 10 times > every day (when scale-down.interval is set to 24 hours). > Root cause: There may be many sources in a job, and the traffic peaks of > these sources may occur at different times. When taskA triggers scale down, > the scale down of taskA will not be actively executed within 24 hours, but it > may be executed when other tasks are scaled up. > For example: > * The scale down of sourceB and sourceC may be executed when SourceA scales > up. > * After a while, the scale down of sourceA and sourceC may be executed when > SourceB scales up. > * After a while, the scale down of sourceA and sourceB may be executed when > SourceC scales up. > * When there are many tasks, the above 3 steps will be executed repeatedly. > That's why the job is rescaled about 10 times every day, the > {color:#de350b}change2{color} of proposed change could solve this issue: > Never scale down when currentTime - triggerTime < scale-down.interval. > > {color:#de350b}Change1{color}: Using the maximum parallelism within the > window instead of the latest parallelism when scaling down. > * It can ensure that the parallelism after scaling down is the parallelism > at yesterday's peak. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [FLINK-36315][cdc-base]The flink-cdc-base module supports source metric statistics [flink-cdc]
liuxiao2shf commented on code in PR #3619: URL: https://github.com/apache/flink-cdc/pull/3619#discussion_r1853230011 ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java: ## @@ -397,6 +491,27 @@ && allSnapshotSplitsFinished()) { } LOG.info("Snapshot split assigner is turn into finished status."); } + +if (splitFinishedCheckpointIds != null && !splitFinishedCheckpointIds.isEmpty()) { +Iterator> iterator = +splitFinishedCheckpointIds.entrySet().iterator(); +while (iterator.hasNext()) { +Map.Entry splitFinishedCheckpointId = iterator.next(); +String splitId = splitFinishedCheckpointId.getKey(); +Long splitCheckpointId = splitFinishedCheckpointId.getValue(); +if (splitCheckpointId != UNDEFINED_CHECKPOINT_ID +&& checkpointId >= splitCheckpointId) { +// record table-level splits metrics +TableId tableId = SnapshotSplit.parseTableId(splitId); + enumeratorMetrics.getTableMetrics(tableId).addFinishedSplit(splitId); +iterator.remove(); +} +} +LOG.info( Review Comment: Adjustment completed ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java: ## @@ -359,11 +432,31 @@ public void addSplits(Collection splits) { // because they are failed assignedSplits.remove(split.splitId()); splitFinishedOffsets.remove(split.splitId()); + +enumeratorMetrics +.getTableMetrics(split.asSnapshotSplit().getTableId()) +.reprocessSplit(split.splitId()); +TableId tableId = split.asSnapshotSplit().getTableId(); + + enumeratorMetrics.getTableMetrics(tableId).removeFinishedSplit(split.splitId()); } } @Override public SnapshotPendingSplitsState snapshotState(long checkpointId) { +if (splitFinishedCheckpointIds != null && !splitFinishedCheckpointIds.isEmpty()) { +for (Map.Entry splitFinishedCheckpointId : +splitFinishedCheckpointIds.entrySet()) { +if (splitFinishedCheckpointId.getValue() == UNDEFINED_CHECKPOINT_ID) { +splitFinishedCheckpointId.setValue(checkpointId); +} +} +} +LOG.info( +"SnapshotSplitAssigner snapshotState on checkpoint {} with splitFinishedCheckpointIds size {}.", +checkpointId, +splitFinishedCheckpointIds == null ? 0 : splitFinishedCheckpointIds.size()); Review Comment: Adjustment completed ## flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/test/java/org/apache/flink/cdc/connectors/base/source/assigner/state/PendingSplitsStateSerializerTest.java: ## @@ -55,14 +55,14 @@ public void testPendingSplitsStateSerializerAndDeserialize() throws IOException new PendingSplitsStateSerializer(constructSourceSplitSerializer()); PendingSplitsState streamSplitsStateAfter = pendingSplitsStateSerializer.deserializePendingSplitsState( -6, pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore)); +7, pendingSplitsStateSerializer.serialize(streamPendingSplitsStateBefore)); Assert.assertEquals(streamPendingSplitsStateBefore, streamSplitsStateAfter); SnapshotPendingSplitsState snapshotPendingSplitsStateBefore = constructSnapshotPendingSplitsState(AssignerStatus.NEWLY_ADDED_ASSIGNING); PendingSplitsState snapshotPendingSplitsStateAfter = pendingSplitsStateSerializer.deserializePendingSplitsState( -6, Review Comment: Adjustment completed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org