thank you @chesnay I tried in vain to find the issue about introduction of new watermark strategy, can you provide some details about it ?
Chesnay Schepler <ches...@apache.org> 于2021年1月11日周一 下午9:43写道: > The idleTimeout you found is from an earlier attempt at implementing > idleness, but making it configurable was aborted midway through as there > were some API issues. The effort was subsumed by a new source interface and > watermark generators that were introduced in 1.12. > > Some more details can be found in FLINK-5018 > <https://issues.apache.org/jira/browse/FLINK-5018>. > > On 1/11/2021 12:40 PM, Akisaya wrote: > > Hey there, > recently I have to join two streams while one of it may be idle for a long > time, in flink 1.12, the Watermark Generator has a method `withIdleness` > to detect if a stream is idle or not so that the operator can still advance > its watermark by another active stream, and the state of this operator will > continuously grow up. > > But in flink 1.10, there's no such withIdleness method > flink 1.10 docs mention a workaround in > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission > ,but this doesn't work well. > > After walking through the code,I found StreamSourceContexts#getSourceContext > provides a param idleness which is hard coded to -1 in StreamSource#run. > > StreamSourceContexts#getSourceContext > > public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext( > TimeCharacteristic timeCharacteristic, ProcessingTimeService > processingTimeService, Object checkpointLock, > StreamStatusMaintainer streamStatusMaintainer, Output<StreamRecord<OUT>> > output, long watermarkInterval, long idleTimeout) { > > > > StreamSource#run > > this.ctx = StreamSourceContexts.getSourceContext( > timeCharacteristic, getProcessingTimeService(), lockingObject, > streamStatusMaintainer, collector, watermarkInterval, -1); > > > After extending a flink KafkaConnector and setting idleness using > reflection, I found it works as I expected! > > > > I'm very curious that why flink does not provide this param to user to > determine if a stream is idle and what will be the side effect. > > thx. > > > > > >