Hi, 我觉得你理解的是正确的。watermark就是取各个input channel的最小值作为当前subtask的watermark的。
1193216154 <[email protected]> 于2020年5月11日周一 下午3:17写道: > 大家好,最近在看watermark传递的源码解析的时候,对watermark对齐逻辑有一些疑惑。代码如下 > > public void inputWatermark(Watermark watermark, int channelIndex) { > // ignore the input watermark if its input channel, or all > input channels are idle (i.e. overall the valve is idle). > if (lastOutputStreamStatus.isActive() && > channelStatuses[channelIndex].streamStatus.isActive()) { > long watermarkMillis = watermark.getTimestamp(); > > > // if the input watermark's value is less than the > last received watermark for its input channel, ignore it also. > if (watermarkMillis > > channelStatuses[channelIndex].watermark) { > channelStatuses[channelIndex].watermark = > watermarkMillis; > > > // previously unaligned input channels are > now aligned if its watermark has caught up > if > (!channelStatuses[channelIndex].isWatermarkAligned && > watermarkMillis >= lastOutputWatermark) { > > channelStatuses[channelIndex].isWatermarkAligned = true; > } > > > // now, attempt to find a new min > watermark across all aligned channels > > findAndOutputNewMinWatermarkAcrossAlignedChannels(); > } > } > } > > > > private void findAndOutputNewMinWatermarkAcrossAlignedChannels() { > long newMinWatermark = Long.MAX_VALUE; > boolean hasAlignedChannels = false; > > > // determine new overall watermark by considering only > watermark-aligned channels across all channels > for (InputChannelStatus channelStatus : channelStatuses) { > if (channelStatus.isWatermarkAligned) { > hasAlignedChannels = true; > newMinWatermark = > Math.min(channelStatus.watermark, newMinWatermark); > } > } > > > // we acknowledge and output the new overall watermark if > it really is aggregated > // from some remaining aligned channel, and is also larger > than the last output watermark > if (hasAlignedChannels && newMinWatermark > > lastOutputWatermark) { > lastOutputWatermark = newMinWatermark; > outputHandler.handleWatermark(new > Watermark(lastOutputWatermark)); > } > } > > > > 这段代码中好像并没有多个 channelIndex 相互等待watermark到来的逻辑。难道仅仅是说不同时间不同 > channelIndex 到来的watermark做一个取最小值的逻辑吗? -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: [email protected]; [email protected]
