????????????????watermark????????????????????????watermark????????????????????????????

public void inputWatermark(Watermark watermark, int channelIndex) {
                // ignore the input watermark if its input channel, or all 
input channels are idle (i.e. overall the valve is idle).
                if (lastOutputStreamStatus.isActive() && 
channelStatuses[channelIndex].streamStatus.isActive()) {
                        long watermarkMillis = watermark.getTimestamp();


                        // if the input watermark's value is less than the last 
received watermark for its input channel, ignore it also.
                        if (watermarkMillis > 
channelStatuses[channelIndex].watermark) {
                                channelStatuses[channelIndex].watermark = 
watermarkMillis;


                                // previously unaligned input channels are now 
aligned if its watermark has caught up
                                if 
(!channelStatuses[channelIndex].isWatermarkAligned && watermarkMillis 
>= lastOutputWatermark) {
                                        
channelStatuses[channelIndex].isWatermarkAligned = true;
                                }


                                // now, attempt to find a new min watermark 
across all aligned channels
                                
findAndOutputNewMinWatermarkAcrossAlignedChannels();
                        }
                }
        }



private void findAndOutputNewMinWatermarkAcrossAlignedChannels() {
                long newMinWatermark = Long.MAX_VALUE;
                boolean hasAlignedChannels = false;


                // determine new overall watermark by considering only 
watermark-aligned channels across all channels
                for (InputChannelStatus channelStatus : channelStatuses) {
                        if (channelStatus.isWatermarkAligned) {
                                hasAlignedChannels = true;
                                newMinWatermark = 
Math.min(channelStatus.watermark, newMinWatermark);
                        }
                }


                // we acknowledge and output the new overall watermark if it 
really is aggregated
                // from some remaining aligned channel, and is also larger than 
the last output watermark
                if (hasAlignedChannels && newMinWatermark > 
lastOutputWatermark) {
                        lastOutputWatermark = newMinWatermark;
                        outputHandler.handleWatermark(new 
Watermark(lastOutputWatermark));
                }
        }



????????????????????????  channelIndex 
????????watermark???????????????????????????????????? channelIndex 
??????watermark????????????????????????

回复