Hey, Not sure how convo threading works on this list, so in case the folks CC'd missed my other response, here's some more info:
First, I appreciate everyone's help! Thank you! I wrote several wrappers to try and debug this, including one which is an exact copy of `InputFormatSourceFunction` which also failed. They all failed with the same error I detail above. I'll post two of them below. They all extended `RichParallelSourceFunction` and, as far as I could tell, were properly initialized (though I may have missed something!). Additionally, for the two below, if I change `extends RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`, I no longer receive the exception. This is what led me to believe the source of the issue was casting and how I found the line of code where the stream graph is given the input format. Quick explanation of the wrappers: 1. `WrappedInputFormat` does a basic wrap around `InputFormatSourceFunction` and delegates all methods to the underlying `InputFormatSourceFunction` 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the `InputFormatSourceFunction` source. 3. They're being used in a test which looks vaguely like: `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new InputFormatSourceFunction[String](source, implicitly[TypeInformation[ String]]))).javaStream).asScala.toSeq` class WrappedInputFormat[A]( inputFormat: InputFormatSourceFunction[A] )( implicit typeInfo: TypeInformation[A] ) extends RichParallelSourceFunction[A] { override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = { inputFormat.run(sourceContext) } override def setRuntimeContext(t: RuntimeContext): Unit = { inputFormat.setRuntimeContext(t) } override def equals(obj: scala.Any) = { inputFormat.equals(obj) } override def hashCode() = { inputFormat.hashCode() } override def toString = { inputFormat.toString } override def getRuntimeContext(): RuntimeContext = { inputFormat.getRuntimeContext } override def getIterationRuntimeContext = { inputFormat.getIterationRuntimeContext } override def open(parameters: Configuration): Unit = { inputFormat.open(parameters) } override def cancel(): Unit = { inputFormat.cancel() } override def close(): Unit = { inputFormat.close() } } And the other one: class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, InputSplit], val typeInfo: TypeInformation[A]) extends RichParallelSourceFunction[A] { @transient private var provider: InputSplitProvider = _ @transient private var serializer: TypeSerializer[A] = _ @transient private var splitIterator: Iterator[InputSplit] = _ private var isRunning: Boolean = _ override def open(parameters: Configuration): Unit = { val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext] if(format.isInstanceOf[RichInputFormat[_,_]]) { format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context) } format.configure(parameters) provider = context.getInputSplitProvider serializer = typeInfo.createSerializer(getRuntimeContext. getExecutionConfig) splitIterator = getInputSplits() isRunning = splitIterator.hasNext } override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = { if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) { format.asInstanceOf[RichInputFormat[_,_]].openInputFormat() } var nextElement: A = serializer.createInstance() try { while (isRunning) { format.open(splitIterator.next()) while (isRunning && !format.reachedEnd()) { nextElement = format.nextRecord(nextElement) if (nextElement != null) { sourceContext.collect(nextElement) } else { break } format.close() if (isRunning) { isRunning = splitIterator.hasNext } } } } finally { format.close() if (format.isInstanceOf[RichInputFormat[_,_]]) { format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() } isRunning = false } } override def cancel(): Unit = { isRunning = false } override def close(): Unit = { format.close() if(format.isInstanceOf[RichInputFormat[_,_]]) { format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() } } private def getInputSplits(): Iterator[InputSplit] = { new Iterator[InputSplit] { private var nextSplit: InputSplit = _ private var exhausted: Boolean = _ override def hasNext: Boolean = { if(exhausted) { return false } if(nextSplit != null) { return true } var split: InputSplit = null try { split = provider.getNextInputSplit(getRuntimeContext. getUserCodeClassLoader) } catch { case e: InputSplitProviderException => throw new RuntimeException("No InputSplit Provider", e) } if(split != null) { nextSplit = split true } else { exhausted = true false } } override def next(): InputSplit = { if(nextSplit == null && !hasNext) { throw new NoSuchElementException() } val tmp: InputSplit = nextSplit nextSplit = null tmp } } } } On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Aaron, > > Could you share the code of you custom function? > > I am also adding Aljosha and Kostas to cc, who should be more helpful on > that topic. > > Best, > > Dawid > On 19/10/2018 20:06, Aaron Levin wrote: > > Hi, > > I'm writing a custom `SourceFunction` which wraps an underlying > `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a > stream (via `env.addSource` and a subsequent sink) I get errors related to > the `InputSplitAssigner` not being initialized for a particular vertex ID. > Full error here[1]. > > I believe the underlying error is related to this[0] call to `instanceof > InputFormatSourceFunction`. > > *My questions*: > > 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? > Am I missing a chunk of the API covering this? > 2. is the error I'm experience related to that casting call? If so, would > ya'll be open to a PR which adds an interface one can extend which will set > the input format in the stream graph? Or is there a preferred way of > achieving this? > > Thanks! > > Aaron Levin > > [0] https://github.com/apache/flink/blob/release-1.6/flink- > streaming-java/src/main/java/org/apache/flink/streaming/api/graph/ > StreamGraphGenerator.java#L480 > [1] > java.lang.RuntimeException: Could not retrieve next input split. > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157) > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction.open(InputFormatSourceFunction.java:71) > at REDACTED > at org.apache.flink.api.common.functions.util.FunctionUtils. > openFunction(FunctionUtils.java:36) > at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator. > open(AbstractUdfStreamOperator.java:102) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > openAllOperators(StreamTask.java:424) > at org.apache.flink.streaming.runtime.tasks.StreamTask. > invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: > Requesting the next input split failed. > at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider. > getNextInputSplit(RpcInputSplitProvider.java:69) > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155) > ... 8 more > Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: > No InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2 > at java.util.concurrent.CompletableFuture.reportGet( > CompletableFuture.java:357) > at java.util.concurrent.CompletableFuture.get( > CompletableFuture.java:1915) > at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider. > getNextInputSplit(RpcInputSplitProvider.java:61) > ... 9 more > Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID > cbc357ccb763df2852fee8c4fc7d55f2 > at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit( > JobMaster.java:575) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation( > AkkaRpcActor.java:247) > ... > >