图片好像发送不出去,这里贴一下代码


StreamSourceContexts#getSourceContext

public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
      TimeCharacteristic timeCharacteristic,
      ProcessingTimeService processingTimeService,
      Object checkpointLock,
      StreamStatusMaintainer streamStatusMaintainer,
      Output<StreamRecord<OUT>> output,
      long watermarkInterval,
      long idleTimeout) {




StreamSource#run

this.ctx = StreamSourceContexts.getSourceContext(
   timeCharacteristic,
   getProcessingTimeService(),
   lockingObject,
   streamStatusMaintainer,
   collector,
   watermarkInterval,
   -1);


Akisaya <[email protected]> 于2021年1月11日周一 下午7:17写道:

> flink 1.12 的 watermark strategy 重构之后,添加了一个 withIdleness 方法能将一个流在一定时间没有数据产生时设置为
> idle 流
> 但是在 1.10 中没有这样的方法可以设置。
>
> 看了下 1.10 的代码发现在 StreamSourceContext 里是可以根据参数设置 idleTimeout 的,但是在
> StreamSource 的 run 方法里实际使用该方法的时候直接将 idleTimeout 写死成 -1 了
> [image: image.png]
>
> StreamSource
> [image: image.png]
>
> 请问下,为啥不考虑将这个参数开放出来供用户使用。
>
> 我通过继承 flink 内置的 kakfa connector,使用反射修改了 idleTimeout 参数,经验证是可以实现自动检测 kafka
> 流是否有数据并设置其为 idle
>
>

回复