thank you @chesnay

I tried in vain to find the issue about introduction of  new watermark
strategy, can you provide some details about it ?

Chesnay Schepler <ches...@apache.org> 于2021年1月11日周一 下午9:43写道:

> The idleTimeout you found is from an earlier attempt at implementing
> idleness, but making it configurable was aborted midway through as there
> were some API issues. The effort was subsumed by a new source interface and
> watermark generators that were introduced in 1.12.
>
> Some more details can be found in FLINK-5018
> <https://issues.apache.org/jira/browse/FLINK-5018>.
>
> On 1/11/2021 12:40 PM, Akisaya wrote:
>
> Hey there,
> recently I have to join two streams while one of it may be idle for a long
> time, in flink 1.12, the Watermark Generator has a method `withIdleness`
> to detect if a stream is idle or not so that the operator can still advance
> its watermark by another active stream, and the state of this operator will
> continuously grow up.
>
> But in flink 1.10, there's no such withIdleness method
> flink 1.10 docs mention a workaround in
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission
> ,but this doesn't work well.
>
> After walking through the code,I found StreamSourceContexts#getSourceContext
> provides a param idleness which is hard coded to -1 in StreamSource#run.
>
> StreamSourceContexts#getSourceContext
>
> public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
>       TimeCharacteristic timeCharacteristic,      ProcessingTimeService 
> processingTimeService,      Object checkpointLock,      
> StreamStatusMaintainer streamStatusMaintainer,      Output<StreamRecord<OUT>> 
> output,      long watermarkInterval,      long idleTimeout) {
>
>
>
> StreamSource#run
>
> this.ctx = StreamSourceContexts.getSourceContext(
>    timeCharacteristic,   getProcessingTimeService(),   lockingObject,   
> streamStatusMaintainer,   collector,   watermarkInterval,   -1);
>
>
> After extending a flink KafkaConnector and setting idleness using
> reflection, I found it works as I expected!
>
>
>
> I'm very curious that why flink does not provide this param to user to
> determine if a stream is idle and what will be the side effect.
>
> thx.
>
>
>
>
>
>

Reply via email to