Hi!
To work around FLINK-2491<https://issues.apache.org/jira/browse/FLINK-2491>
which causes checkpointing issues for us I am trying to chain SourceFunctions
so that the first one never quits. The basic idea is as follows:
class WrappingSourceFunction(innerSourceFunction: SourceFunction[Inner])
extends SourceFunction[Outer] {
override def run(outerCtx: SourceContext[Outer]): Unit = {
outerCtx.collect(...)
val innerCtx: SourceContext[Inner] = new SourceContextWrapper(outerCtx)
innerSourceFunction.run(innerCtx)
}
override def cancel() = innerSourceFunction.cancel()
}
Is it ok to call run() of a different SourceFunction inside of run() and
implement my own SourceContext delegating to another one? It works for a small
test running on a local Flink environment, but I am wondering if there could be
any issues doing that on production.
Thanks,
Jochen