Hey, Thanks for reaching out! I'd love to take a step back and find a better solution, so I'll try to be succint in what I'm trying to accomplish:
We're trying to write a SourceFunction which: * reads some Sequence files from S3 in a particular order (each task gets files in a specific order). * sends a watermark between each sequence file * when that's complete, starts reading from Kafka topics. * (This is similar to the bootstrap problem which Lyft has talked about (see: https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink )) The current solution I have involves a custom InputFormat, InputSplit, and SplitAssignor. It achieves most of these requirements, except I have to extend InputFormatSourceFunction. I have a class that looks like: class MySourceFunction(val s3Archives: CustomInputFormat, val kafka: KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...} There are lots I don't like about the existing solution: * I have to extend InputFormatSourceFunction to ensure the graph is initialized properly (the bug I wrote about) * I had to replicate most of the implementation of InputFormatSourceFunction so I could insert Watermarks between splits. I'd love any suggestions around improving this! Best, Aaron Levin On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Aaron, > > I'l like to take a step back and understand why you're trying to wrap an > InputFormatSourceFunction? > > In my opinion, InputFormatSourceFunction should not be used because it has > some shortcomings, the most prominent among them that it does not support > checkpointing, i.e. in case of failure all data will (probably) be read > again. I'm saying probably because the interaction of > InputFormatSourceFunction with how InputSplits are generated (which relates > to that code snippet with the cast you found) could be somewhat "spooky" > and lead to weird results in some cases. > > The interface is a remnant of a very early version of the streaming API > and should probably be removed soon. I hope we can find a better solution > for your problem that fits better with Flink. > > Best, > Aljoscha > > On 1. Nov 2018, at 15:30, Aaron Levin <aaronle...@stripe.com> wrote: > > Hey Friends! Last ping and I'll move this over to a ticket. If anyone can > provide any insight or advice, that would be helpful! > > Thanks again. > > Best, > > Aaron Levin > > On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronle...@stripe.com> > wrote: > >> Hey, >> >> Not sure how convo threading works on this list, so in case the folks >> CC'd missed my other response, here's some more info: >> >> First, I appreciate everyone's help! Thank you! >> >> I wrote several wrappers to try and debug this, including one which is an >> exact copy of `InputFormatSourceFunction` which also failed. They all >> failed with the same error I detail above. I'll post two of them below. >> They all extended `RichParallelSourceFunction` and, as far as I could tell, >> were properly initialized (though I may have missed something!). >> Additionally, for the two below, if I change `extends >> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`, >> I no longer receive the exception. This is what led me to believe the >> source of the issue was casting and how I found the line of code where the >> stream graph is given the input format. >> >> Quick explanation of the wrappers: >> 1. `WrappedInputFormat` does a basic wrap around >> `InputFormatSourceFunction` and delegates all methods to the underlying >> `InputFormatSourceFunction` >> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the >> `InputFormatSourceFunction` source. >> 3. They're being used in a test which looks vaguely like: >> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new >> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str >> ing]]))).javaStream).asScala.toSeq` >> >> class WrappedInputFormat[A]( >> inputFormat: InputFormatSourceFunction[A] >> )( >> implicit typeInfo: TypeInformation[A] >> ) extends RichParallelSourceFunction[A] { >> >> override def run(sourceContext: SourceFunction.SourceContext[A]): Unit >> = { >> inputFormat.run(sourceContext) >> } >> override def setRuntimeContext(t: RuntimeContext): Unit = { >> inputFormat.setRuntimeContext(t) >> } >> override def equals(obj: scala.Any) = { >> inputFormat.equals(obj) >> } >> override def hashCode() = { inputFormat.hashCode() } >> override def toString = { inputFormat.toString } >> override def getRuntimeContext(): RuntimeContext = { >> inputFormat.getRuntimeContext } >> override def getIterationRuntimeContext = { >> inputFormat.getIterationRuntimeContext } >> override def open(parameters: Configuration): Unit = { >> inputFormat.open(parameters) >> } >> override def cancel(): Unit = { >> inputFormat.cancel() >> } >> override def close(): Unit = { >> inputFormat.close() >> } >> } >> >> And the other one: >> >> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, >> InputSplit], val typeInfo: TypeInformation[A]) extends >> RichParallelSourceFunction[A] { >> >> @transient private var provider: InputSplitProvider = _ >> @transient private var serializer: TypeSerializer[A] = _ >> @transient private var splitIterator: Iterator[InputSplit] = _ >> private var isRunning: Boolean = _ >> >> override def open(parameters: Configuration): Unit = { >> val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext] >> if(format.isInstanceOf[RichInputFormat[_,_]]) { >> format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext( >> context) >> } >> format.configure(parameters) >> >> provider = context.getInputSplitProvider >> serializer = typeInfo.createSerializer(getR >> untimeContext.getExecutionConfig) >> splitIterator = getInputSplits() >> isRunning = splitIterator.hasNext >> } >> >> override def run(sourceContext: SourceFunction.SourceContext[A]): Unit >> = { >> if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) { >> format.asInstanceOf[RichInputFormat[_,_]].openInputFormat() >> } >> >> var nextElement: A = serializer.createInstance() >> try { >> while (isRunning) { >> format.open(splitIterator.next()) >> while (isRunning && !format.reachedEnd()) { >> nextElement = format.nextRecord(nextElement) >> if (nextElement != null) { >> sourceContext.collect(nextElement) >> } else { >> break >> } >> format.close() >> if (isRunning) { >> isRunning = splitIterator.hasNext >> } >> } >> } >> } finally { >> >> format.close() >> if (format.isInstanceOf[RichInputFormat[_,_]]) { >> format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() >> } >> isRunning = false >> } >> } >> >> override def cancel(): Unit = { >> isRunning = false >> } >> >> override def close(): Unit = { >> format.close() >> if(format.isInstanceOf[RichInputFormat[_,_]]) { >> format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() >> } >> } >> >> private def getInputSplits(): Iterator[InputSplit] = { >> new Iterator[InputSplit] { >> private var nextSplit: InputSplit = _ >> private var exhausted: Boolean = _ >> >> override def hasNext: Boolean = { >> if(exhausted) { return false } >> if(nextSplit != null) { return true } >> var split: InputSplit = null >> >> try { >> split = provider.getNextInputSplit(get >> RuntimeContext.getUserCodeClassLoader) >> } catch { >> case e: InputSplitProviderException => >> throw new RuntimeException("No InputSplit Provider", e) >> } >> >> if(split != null) { >> nextSplit = split >> true >> } else { >> exhausted = true >> false >> } >> } >> >> override def next(): InputSplit = { >> if(nextSplit == null && !hasNext) { >> throw new NoSuchElementException() >> } >> val tmp: InputSplit = nextSplit >> nextSplit = null >> tmp >> } >> >> } >> } >> } >> >> >> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakow...@apache.org >> > wrote: >> >>> Hi Aaron, >>> >>> Could you share the code of you custom function? >>> >>> I am also adding Aljosha and Kostas to cc, who should be more helpful on >>> that topic. >>> >>> Best, >>> >>> Dawid >>> On 19/10/2018 20:06, Aaron Levin wrote: >>> >>> Hi, >>> >>> I'm writing a custom `SourceFunction` which wraps an underlying >>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a >>> stream (via `env.addSource` and a subsequent sink) I get errors related to >>> the `InputSplitAssigner` not being initialized for a particular vertex ID. >>> Full error here[1]. >>> >>> I believe the underlying error is related to this[0] call to `instanceof >>> InputFormatSourceFunction`. >>> >>> *My questions*: >>> >>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? >>> Am I missing a chunk of the API covering this? >>> 2. is the error I'm experience related to that casting call? If so, >>> would ya'll be open to a PR which adds an interface one can extend which >>> will set the input format in the stream graph? Or is there a preferred way >>> of achieving this? >>> >>> Thanks! >>> >>> Aaron Levin >>> >>> [0] https://github.com/apache/flink/blob/release-1.6/flink-s >>> treaming-java/src/main/java/org/apache/flink/streaming/api/g >>> raph/StreamGraphGenerator.java#L480 >>> [1] >>> java.lang.RuntimeException: Could not retrieve next input split. >>> at org.apache.flink.streaming.api.functions.source.InputFormatS >>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:157) >>> at org.apache.flink.streaming.api.functions.source.InputFormatS >>> ourceFunction.open(InputFormatSourceFunction.java:71) >>> at REDACTED >>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope >>> nFunction(FunctionUtils.java:36) >>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp >>> erator.open(AbstractUdfStreamOperator.java:102) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO >>> perators(StreamTask.java:424) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S >>> treamTask.java:290) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: >>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: >>> Requesting the next input split failed. >>> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi >>> der.getNextInputSplit(RpcInputSplitProvider.java:69) >>> at org.apache.flink.streaming.api.functions.source.InputFormatS >>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:155) >>> ... 8 more >>> Caused by: java.util.concurrent.ExecutionException: >>> java.lang.Exception: No InputSplitAssigner for vertex ID >>> cbc357ccb763df2852fee8c4fc7d55f2 >>> at java.util.concurrent.CompletableFuture.reportGet(Completable >>> Future.java:357) >>> at java.util.concurrent.CompletableFuture.get(CompletableFuture >>> .java:1915) >>> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi >>> der.getNextInputSplit(RpcInputSplitProvider.java:61) >>> ... 9 more >>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID >>> cbc357ccb763df2852fee8c4fc7d55f2 >>> at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInpu >>> tSplit(JobMaster.java:575) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>> ssorImpl.java:62) >>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>> thodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo >>> cation(AkkaRpcActor.java:247) >>> ... >>> >>> >> > >