[ 
https://issues.apache.org/jira/browse/FLINK-35076?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17836238#comment-17836238
 ] 

elon_X edited comment on FLINK-35076 at 4/11/24 3:23 PM:
---------------------------------------------------------

[~kkrugler] 

Thank you for your reply.
Setting the idle time is not very controllable in terms of the specific timing. 
For example, setting it to 10 seconds, the minimum watermark will still not 
change within these 10 seconds unless the idle time is set as small as 
possible. I'm not sure if this could solve the problem and further testing is 
needed;
For the solution of shuffling the stream, I didn't quite understand. In the 
Flink API:
DataStream xx = env.fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> 
timestampsAndWatermarks, String sourceName)
Only DataStream supports rebalance, Source can't rebalance. I'm not quite sure 
how to shuffle the data source before {{{}fromSource{}}}.


was (Author: JIRAUSER303028):
[~kkrugler] 

Thank you for your reply.
Setting the idle time is not very controllable in terms of the specific timing. 
For example, setting it to 10 seconds, the minimum watermark will still not 
change within these 10 seconds unless the idle time is set as small as 
possible. I'm not sure if this could solve the problem and further testing is 
needed;
For the solution of shuffling the stream, I didn't quite understand. In the 
Flink API:
DataStream xx = env.fromSource(Source<OUT, ?, ?> source, WatermarkStrategy<OUT> 
timestampsAndWatermarks, String sourceName)
Only DataStream supports rebalance, Source can't rebalance.

> Watermark alignment will cause data flow to experience serious shake
> --------------------------------------------------------------------
>
>                 Key: FLINK-35076
>                 URL: https://issues.apache.org/jira/browse/FLINK-35076
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>    Affects Versions: 1.16.1
>            Reporter: elon_X
>            Priority: Major
>         Attachments: image-2024-04-10-20-15-05-731.png, 
> image-2024-04-10-20-23-13-872.png, image-2024-04-10-20-25-59-387.png, 
> image-2024-04-10-20-29-13-835.png
>
>
> In our company, there is a requirement scenario for multi-stream join 
> operations, we are making modifications based on Flink watermark alignment, 
> then I found that the final join output would experience serious shake.
> and I analyzed the reasons: an upstream topic has more than 300 partitions. 
> The number of partitions requested for this topic is too large, causing some 
> partitions to frequently experience intermittent writes with QPS=0. This 
> phenomenon is more serious between 2 am and 5 am.However, the overall topic 
> writing is very smooth.
> !image-2024-04-10-20-29-13-835.png!
> The final join output will experience serious shake, as shown in the 
> following diagram:
> !image-2024-04-10-20-15-05-731.png!
> Root cause:
>  # The {{SourceOperator#emitLatestWatermark}} reports the 
> lastEmittedWatermark to the SourceCoordinator.
>  # If the partition write is zero during a certain period, the 
> lastEmittedWatermark sent by the subtask corresponding to that partition 
> remains unchanged.
>  # The SourceCoordinator aggregates the watermarks of all subtasks according 
> to the watermark group and takes the smallest watermark. This means that the 
> maxAllowedWatermark may remain unchanged for some time, even though the 
> overall upstream data flow is moving forward. until that minimum value is 
> updated, only then will everything change, which will manifest as serious 
> shake in the output data stream.
> I think choosing the global minimum might not be a good option. Using min/max 
> could more likely encounter some edge cases. Perhaps choosing a median value 
> would be more appropriate? Or a more complex selection strategy?
> If replaced with a median value, it can ensure that the overall data flow is 
> very smooth:
> !image-2024-04-10-20-23-13-872.png!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to