图片好像发送不出去,这里贴一下代码
StreamSourceContexts#getSourceContext
public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
TimeCharacteristic timeCharacteristic,
ProcessingTimeService processingTimeService,
Object checkpointLock,
StreamStatusMaintainer streamStatusMaintainer,
Output<StreamRecord<OUT>> output,
long watermarkInterval,
long idleTimeout) {
StreamSource#run
this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);
Akisaya <[email protected]> 于2021年1月11日周一 下午7:17写道:
> flink 1.12 的 watermark strategy 重构之后,添加了一个 withIdleness 方法能将一个流在一定时间没有数据产生时设置为
> idle 流
> 但是在 1.10 中没有这样的方法可以设置。
>
> 看了下 1.10 的代码发现在 StreamSourceContext 里是可以根据参数设置 idleTimeout 的,但是在
> StreamSource 的 run 方法里实际使用该方法的时候直接将 idleTimeout 写死成 -1 了
> [image: image.png]
>
> StreamSource
> [image: image.png]
>
> 请问下,为啥不考虑将这个参数开放出来供用户使用。
>
> 我通过继承 flink 内置的 kakfa connector,使用反射修改了 idleTimeout 参数,经验证是可以实现自动检测 kafka
> 流是否有数据并设置其为 idle
>
>