Hi all, I’m converting several batch Flink workflows to streaming, with bounded sources.
Some of our sources are reading Hadoop sequence files via StreamExecutionEnvironment.createInput(HadoopInputFormat). The problem is that StreamGraphGenerator.existsUnboundedSource is returning true, because the LegacySourceTransformation for this source says it’s CONTINUOUS_UNBOUNDED. So the workflow fails to run, because I’ve set the execution mode to batch. The root cause is that StreamExecutionEnvironment.createInput() checks if the input format extends FileInputFormat, and only sets up a bounded source if it does. HadoopInputFormat doesn’t extend FileInputFormat, so boundedness gets set to CONTINUOUS_UNBOUNDED, which is wrong. This looks like a bug in StreamExecutionEnvironment.createInput(), though not sure how best to fix it. Relying on class checks feels brittle. Regards, — Ken -------------------------- Ken Krugler http://www.scaleunlimited.com Custom big data solutions Flink, Pinot, Solr, Elasticsearch