This is the parent ticket for the new source interface:FLINK-10740
<https://issues.apache.org/jira/browse/FLINK-10740>
This is the parent ticket for the reworked watermark
generators:FLINK-17653 <https://issues.apache.org/jira/browse/FLINK-17653>
On 1/11/2021 5:16 PM, Akisaya wrote:
thank you @chesnay
I tried in vain to find the issue about introduction of new watermark
strategy, can you provide some details about it ?
Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>>
于2021年1月11日周一 下午9:43写道:
The idleTimeout you found is from an earlier attempt at
implementing idleness, but making it configurable was aborted
midway through as there were some API issues. The effort was
subsumed by a new source interface and watermark generators that
were introduced in 1.12.
Some more details can be found in FLINK-5018
<https://issues.apache.org/jira/browse/FLINK-5018>.
On 1/11/2021 12:40 PM, Akisaya wrote:
Hey there,
recently I have to join two streams while one of it may be idle
for a long time, in flink 1.12, the Watermark Generator has a
method `withIdleness` to detect if a stream is idle or not so
that the operator can still advance its watermark by another
active stream, and the state of this operator will
continuously grow up.
But in flink 1.10, there's no such withIdleness method
flink 1.10 docs mention a workaround in
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission,but
this doesn't work well.
After walking through the code,I found
StreamSourceContexts#getSourceContext provides a param idleness
which is hard coded to -1 in StreamSource#run.
StreamSourceContexts#getSourceContext
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
TimeCharacteristic timeCharacteristic, ProcessingTimeService
processingTimeService, Object checkpointLock, StreamStatusMaintainer
streamStatusMaintainer, Output<StreamRecord<OUT>> output, long
watermarkInterval, long idleTimeout) {
StreamSource#run
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic, getProcessingTimeService(), lockingObject,
streamStatusMaintainer, collector, watermarkInterval, -1);
After extending a flink KafkaConnector and setting idleness using
reflection, I found it works as I expected!
I'm very curious that why flink does not provide this param to
user to determine if a stream is idle and what will be the side
effect.
thx.