This is the parent ticket for the new source interface:FLINK-10740 <https://issues.apache.org/jira/browse/FLINK-10740> This is the parent ticket for the reworked watermark generators:FLINK-17653 <https://issues.apache.org/jira/browse/FLINK-17653>

On 1/11/2021 5:16 PM, Akisaya wrote:
thank you @chesnay

I tried in vain to find the issue about introduction of new watermark strategy, can you provide some details about it ?

Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> 于2021年1月11日周一 下午9:43写道:

    The idleTimeout you found is from an earlier attempt at
    implementing idleness, but making it configurable was aborted
    midway through as there were some API issues. The effort was
    subsumed by a new source interface and watermark generators that
    were introduced in 1.12.

    Some more details can be found in FLINK-5018
    <https://issues.apache.org/jira/browse/FLINK-5018>.

    On 1/11/2021 12:40 PM, Akisaya wrote:
    Hey there,
    recently I have to join two streams while one of it may be idle
    for a long time, in flink 1.12, the Watermark Generator has a
    method `withIdleness` to detect if a stream is idle or not so
    that the operator can still advance its watermark by another
    active stream, and the state of this operator will
    continuously grow up.

    But in flink 1.10, there's no such withIdleness method
    flink 1.10 docs mention a workaround in
    
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission,but
    this doesn't work well.

    After walking through the code,I found
    StreamSourceContexts#getSourceContext provides a param idleness
    which is hard coded to -1 in StreamSource#run.

    StreamSourceContexts#getSourceContext
    public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
           TimeCharacteristic timeCharacteristic, ProcessingTimeService 
processingTimeService, Object checkpointLock, StreamStatusMaintainer 
streamStatusMaintainer, Output<StreamRecord<OUT>> output, long 
watermarkInterval, long idleTimeout) {


    StreamSource#run
    this.ctx = StreamSourceContexts.getSourceContext(
        timeCharacteristic, getProcessingTimeService(), lockingObject, 
streamStatusMaintainer, collector, watermarkInterval, -1);

    After extending a flink KafkaConnector and setting idleness using
    reflection, I found it works as I expected!



    I'm very curious that why flink does not provide this param to
    user to determine if a stream is idle and what will be the side
    effect.

    thx.






Reply via email to