Hi! To work around FLINK-2491<https://issues.apache.org/jira/browse/FLINK-2491> which causes checkpointing issues for us I am trying to chain SourceFunctions so that the first one never quits. The basic idea is as follows:
class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner]) extends SourceFunction[Outer] { override def run(outerCtx: SourceContext[Outer]): Unit = { outerCtx.collect(...) val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx) innerSourceFunction.run(innerCtx) } override def cancel() = innerSourceFunction.cancel() } Is it ok to call run() of a different SourceFunction inside of run() and implement my own SourceContext delegating to another one? It works for a small test running on a local Flink environment, but I am wondering if there could be any issues doing that on production. Thanks, Jochen