Hi Aaron, I'l like to take a step back and understand why you're trying to wrap an InputFormatSourceFunction?
In my opinion, InputFormatSourceFunction should not be used because it has some shortcomings, the most prominent among them that it does not support checkpointing, i.e. in case of failure all data will (probably) be read again. I'm saying probably because the interaction of InputFormatSourceFunction with how InputSplits are generated (which relates to that code snippet with the cast you found) could be somewhat "spooky" and lead to weird results in some cases. The interface is a remnant of a very early version of the streaming API and should probably be removed soon. I hope we can find a better solution for your problem that fits better with Flink. Best, Aljoscha > On 1. Nov 2018, at 15:30, Aaron Levin <aaronle...@stripe.com> wrote: > > Hey Friends! Last ping and I'll move this over to a ticket. If anyone can > provide any insight or advice, that would be helpful! > > Thanks again. > > Best, > > Aaron Levin > > On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronle...@stripe.com > <mailto:aaronle...@stripe.com>> wrote: > Hey, > > Not sure how convo threading works on this list, so in case the folks CC'd > missed my other response, here's some more info: > > First, I appreciate everyone's help! Thank you! > > I wrote several wrappers to try and debug this, including one which is an > exact copy of `InputFormatSourceFunction` which also failed. They all failed > with the same error I detail above. I'll post two of them below. They all > extended `RichParallelSourceFunction` and, as far as I could tell, were > properly initialized (though I may have missed something!). Additionally, for > the two below, if I change `extends RichParallelSourceFunction` to `extends > InputFormatSourceFunction(...,...)`, I no longer receive the exception. This > is what led me to believe the source of the issue was casting and how I found > the line of code where the stream graph is given the input format. > > Quick explanation of the wrappers: > 1. `WrappedInputFormat` does a basic wrap around `InputFormatSourceFunction` > and delegates all methods to the underlying `InputFormatSourceFunction` > 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the > `InputFormatSourceFunction` source. > 3. They're being used in a test which looks vaguely like: > `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new > InputFormatSourceFunction[String](source, > implicitly[TypeInformation[String]]))).javaStream).asScala.toSeq` > > class WrappedInputFormat[A]( > inputFormat: InputFormatSourceFunction[A] > )( > implicit typeInfo: TypeInformation[A] > ) extends RichParallelSourceFunction[A] { > > override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = { > inputFormat.run(sourceContext) > } > override def setRuntimeContext(t: RuntimeContext): Unit = { > inputFormat.setRuntimeContext(t) > } > override def equals(obj: scala.Any) = { > inputFormat.equals(obj) > } > override def hashCode() = { inputFormat.hashCode() } > override def toString = { inputFormat.toString } > override def getRuntimeContext(): RuntimeContext = { > inputFormat.getRuntimeContext } > override def getIterationRuntimeContext = { > inputFormat.getIterationRuntimeContext } > override def open(parameters: Configuration): Unit = { > inputFormat.open(parameters) > } > override def cancel(): Unit = { > inputFormat.cancel() > } > override def close(): Unit = { > inputFormat.close() > } > } > > And the other one: > > class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, > InputSplit], val typeInfo: TypeInformation[A]) extends > RichParallelSourceFunction[A] { > > @transient private var provider: InputSplitProvider = _ > @transient private var serializer: TypeSerializer[A] = _ > @transient private var splitIterator: Iterator[InputSplit] = _ > private var isRunning: Boolean = _ > > override def open(parameters: Configuration): Unit = { > val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext] > if(format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context) > } > format.configure(parameters) > > provider = context.getInputSplitProvider > serializer = > typeInfo.createSerializer(getRuntimeContext.getExecutionConfig) > splitIterator = getInputSplits() > isRunning = splitIterator.hasNext > } > > override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = { > if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].openInputFormat() > } > > var nextElement: A = serializer.createInstance() > try { > while (isRunning) { > format.open(splitIterator.next()) > while (isRunning && !format.reachedEnd()) { > nextElement = format.nextRecord(nextElement) > if (nextElement != null) { > sourceContext.collect(nextElement) > } else { > break > } > format.close() > if (isRunning) { > isRunning = splitIterator.hasNext > } > } > } > } finally { > > format.close() > if (format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() > } > isRunning = false > } > } > > override def cancel(): Unit = { > isRunning = false > } > > override def close(): Unit = { > format.close() > if(format.isInstanceOf[RichInputFormat[_,_]]) { > format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() > } > } > > private def getInputSplits(): Iterator[InputSplit] = { > new Iterator[InputSplit] { > private var nextSplit: InputSplit = _ > private var exhausted: Boolean = _ > > override def hasNext: Boolean = { > if(exhausted) { return false } > if(nextSplit != null) { return true } > var split: InputSplit = null > > try { > split = > provider.getNextInputSplit(getRuntimeContext.getUserCodeClassLoader) > } catch { > case e: InputSplitProviderException => > throw new RuntimeException("No InputSplit Provider", e) > } > > if(split != null) { > nextSplit = split > true > } else { > exhausted = true > false > } > } > > override def next(): InputSplit = { > if(nextSplit == null && !hasNext) { > throw new NoSuchElementException() > } > val tmp: InputSplit = nextSplit > nextSplit = null > tmp > } > > } > } > } > > > On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakow...@apache.org > <mailto:dwysakow...@apache.org>> wrote: > Hi Aaron, > > Could you share the code of you custom function? > > I am also adding Aljosha and Kostas to cc, who should be more helpful on that > topic. > > Best, > > Dawid > > On 19/10/2018 20:06, Aaron Levin wrote: >> Hi, >> >> I'm writing a custom `SourceFunction` which wraps an underlying >> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a >> stream (via `env.addSource` and a subsequent sink) I get errors related to >> the `InputSplitAssigner` not being initialized for a particular vertex ID. >> Full error here[1]. >> >> I believe the underlying error is related to this[0] call to `instanceof >> InputFormatSourceFunction`. >> >> My questions: >> >> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am >> I missing a chunk of the API covering this? >> 2. is the error I'm experience related to that casting call? If so, would >> ya'll be open to a PR which adds an interface one can extend which will set >> the input format in the stream graph? Or is there a preferred way of >> achieving this? >> >> Thanks! >> >> Aaron Levin >> >> [0] >> https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480 >> >> <https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480> >> [1] >> java.lang.RuntimeException: Could not retrieve next input split. >> at >> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157) >> at >> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71) >> at REDACTED >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: >> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: >> Requesting the next input split failed. >> at >> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69) >> at >> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155) >> ... 8 more >> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No >> InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2 >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >> at >> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61) >> ... 9 more >> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID >> cbc357ccb763df2852fee8c4fc7d55f2 >> at >> org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) >> ... > >