[ https://issues.apache.org/jira/browse/FLINK-7721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aljoscha Krettek reopened FLINK-7721: ------------------------------------- Reopen to fix git commit hash > StatusWatermarkValve should output a new min watermark only if it was > aggregated from aligned chhanels > ------------------------------------------------------------------------------------------------------ > > Key: FLINK-7721 > URL: https://issues.apache.org/jira/browse/FLINK-7721 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.1, 1.4.0, 1.3.2 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > Context: > {code} > long newMinWatermark = Long.MAX_VALUE; > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > newMinWatermark = Math.min(channelStatus.watermark, newMinWatermark); > } > } > {code} > In the calculation of the new min watermark in > {{StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels()}}, > there is not verification that the calculated new min watermark > {{newMinWatermark}} really is aggregated from some aligned channel. > In the corner case where all input channels are currently not aligned but > actually some are active, we would then incorrectly determine that the final > aggregation of {{newMinWatermark}} is {{Long.MAX_VALUE}} and emit that. > The fix would simply be to only emit the aggregated watermark IFF it was > really calculated from some aligned input channel (as well as the already > existing constraint that it needs to be larger than the last emitted > watermark). This change should also safely cover the case that a > {{Long.MAX_VALUE}} was genuinely aggregated from one of the input channels. -- This message was sent by Atlassian JIRA (v6.4.14#64029)