Hi, I think for this case a model that is similar to how the Streaming File Source works should be good. You can have a look at ContinuousFileMonitoringFunction and ContinuousFileReaderOperator. The idea is that the first emits splits that should be processed and the second is responsible for reading those splits. A generic version of that is what I'm proposing for the refactoring of our source interface [1] that also comes with a prototype implementation [2].
I think something like this should be adaptable to your case. The split enumerator would at first only emit file splits downstream, after that it would emit Kafka partitions that should be read. The split reader would understand both file splits and kafka partitions and can read from both. This still has some kinks to be worked out when it comes to watermarks, FLIP-27 is not finished. What do you think? Best, Aljoscha [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface> [2] https://github.com/aljoscha/flink/commits/refactor-source-interface <https://github.com/aljoscha/flink/commits/refactor-source-interface> > On 1. Nov 2018, at 16:50, Aaron Levin <aaronle...@stripe.com> wrote: > > Hey, > > Thanks for reaching out! I'd love to take a step back and find a better > solution, so I'll try to be succint in what I'm trying to accomplish: > > We're trying to write a SourceFunction which: > * reads some Sequence files from S3 in a particular order (each task gets > files in a specific order). > * sends a watermark between each sequence file > * when that's complete, starts reading from Kafka topics. > * (This is similar to the bootstrap problem which Lyft has talked about (see: > https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink > > <https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink>)) > > > The current solution I have involves a custom InputFormat, InputSplit, and > SplitAssignor. It achieves most of these requirements, except I have to > extend InputFormatSourceFunction. I have a class that looks like: > > class MySourceFunction(val s3Archives: CustomInputFormat, val kafka: > KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...} > > There are lots I don't like about the existing solution: > * I have to extend InputFormatSourceFunction to ensure the graph is > initialized properly (the bug I wrote about) > * I had to replicate most of the implementation of InputFormatSourceFunction > so I could insert Watermarks between splits. > > I'd love any suggestions around improving this! > > Best, > > Aaron Levin > > On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > Hi Aaron, > > I'l like to take a step back and understand why you're trying to wrap an > InputFormatSourceFunction? > > In my opinion, InputFormatSourceFunction should not be used because it has > some shortcomings, the most prominent among them that it does not support > checkpointing, i.e. in case of failure all data will (probably) be read > again. I'm saying probably because the interaction of > InputFormatSourceFunction with how InputSplits are generated (which relates > to that code snippet with the cast you found) could be somewhat "spooky" and > lead to weird results in some cases. > > The interface is a remnant of a very early version of the streaming API and > should probably be removed soon. I hope we can find a better solution for > your problem that fits better with Flink. > > Best, > Aljoscha > >> On 1. Nov 2018, at 15:30, Aaron Levin <aaronle...@stripe.com >> <mailto:aaronle...@stripe.com>> wrote: >> >> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can >> provide any insight or advice, that would be helpful! >> >> Thanks again. >> >> Best, >> >> Aaron Levin >> >> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronle...@stripe.com >> <mailto:aaronle...@stripe.com>> wrote: >> Hey, >> >> Not sure how convo threading works on this list, so in case the folks CC'd >> missed my other response, here's some more info: >> >> First, I appreciate everyone's help! Thank you! >> >> I wrote several wrappers to try and debug this, including one which is an >> exact copy of `InputFormatSourceFunction` which also failed. They all failed >> with the same error I detail above. I'll post two of them below. They all >> extended `RichParallelSourceFunction` and, as far as I could tell, were >> properly initialized (though I may have missed something!). Additionally, >> for the two below, if I change `extends RichParallelSourceFunction` to >> `extends InputFormatSourceFunction(...,...)`, I no longer receive the >> exception. This is what led me to believe the source of the issue was >> casting and how I found the line of code where the stream graph is given the >> input format. >> >> Quick explanation of the wrappers: >> 1. `WrappedInputFormat` does a basic wrap around `InputFormatSourceFunction` >> and delegates all methods to the underlying `InputFormatSourceFunction` >> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the >> `InputFormatSourceFunction` source. >> 3. They're being used in a test which looks vaguely like: >> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new >> InputFormatSourceFunction[String](source, >> implicitly[TypeInformation[String]]))).javaStream).asScala.toSeq` >> >> class WrappedInputFormat[A]( >> inputFormat: InputFormatSourceFunction[A] >> )( >> implicit typeInfo: TypeInformation[A] >> ) extends RichParallelSourceFunction[A] { >> >> override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = { >> inputFormat.run(sourceContext) >> } >> override def setRuntimeContext(t: RuntimeContext): Unit = { >> inputFormat.setRuntimeContext(t) >> } >> override def equals(obj: scala.Any) = { >> inputFormat.equals(obj) >> } >> override def hashCode() = { inputFormat.hashCode() } >> override def toString = { inputFormat.toString } >> override def getRuntimeContext(): RuntimeContext = { >> inputFormat.getRuntimeContext } >> override def getIterationRuntimeContext = { >> inputFormat.getIterationRuntimeContext } >> override def open(parameters: Configuration): Unit = { >> inputFormat.open(parameters) >> } >> override def cancel(): Unit = { >> inputFormat.cancel() >> } >> override def close(): Unit = { >> inputFormat.close() >> } >> } >> >> And the other one: >> >> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, >> InputSplit], val typeInfo: TypeInformation[A]) extends >> RichParallelSourceFunction[A] { >> >> @transient private var provider: InputSplitProvider = _ >> @transient private var serializer: TypeSerializer[A] = _ >> @transient private var splitIterator: Iterator[InputSplit] = _ >> private var isRunning: Boolean = _ >> >> override def open(parameters: Configuration): Unit = { >> val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext] >> if(format.isInstanceOf[RichInputFormat[_,_]]) { >> format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(context) >> } >> format.configure(parameters) >> >> provider = context.getInputSplitProvider >> serializer = >> typeInfo.createSerializer(getRuntimeContext.getExecutionConfig) >> splitIterator = getInputSplits() >> isRunning = splitIterator.hasNext >> } >> >> override def run(sourceContext: SourceFunction.SourceContext[A]): Unit = { >> if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) { >> format.asInstanceOf[RichInputFormat[_,_]].openInputFormat() >> } >> >> var nextElement: A = serializer.createInstance() >> try { >> while (isRunning) { >> format.open(splitIterator.next()) >> while (isRunning && !format.reachedEnd()) { >> nextElement = format.nextRecord(nextElement) >> if (nextElement != null) { >> sourceContext.collect(nextElement) >> } else { >> break >> } >> format.close() >> if (isRunning) { >> isRunning = splitIterator.hasNext >> } >> } >> } >> } finally { >> >> format.close() >> if (format.isInstanceOf[RichInputFormat[_,_]]) { >> format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() >> } >> isRunning = false >> } >> } >> >> override def cancel(): Unit = { >> isRunning = false >> } >> >> override def close(): Unit = { >> format.close() >> if(format.isInstanceOf[RichInputFormat[_,_]]) { >> format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() >> } >> } >> >> private def getInputSplits(): Iterator[InputSplit] = { >> new Iterator[InputSplit] { >> private var nextSplit: InputSplit = _ >> private var exhausted: Boolean = _ >> >> override def hasNext: Boolean = { >> if(exhausted) { return false } >> if(nextSplit != null) { return true } >> var split: InputSplit = null >> >> try { >> split = >> provider.getNextInputSplit(getRuntimeContext.getUserCodeClassLoader) >> } catch { >> case e: InputSplitProviderException => >> throw new RuntimeException("No InputSplit Provider", e) >> } >> >> if(split != null) { >> nextSplit = split >> true >> } else { >> exhausted = true >> false >> } >> } >> >> override def next(): InputSplit = { >> if(nextSplit == null && !hasNext) { >> throw new NoSuchElementException() >> } >> val tmp: InputSplit = nextSplit >> nextSplit = null >> tmp >> } >> >> } >> } >> } >> >> >> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakow...@apache.org >> <mailto:dwysakow...@apache.org>> wrote: >> Hi Aaron, >> >> Could you share the code of you custom function? >> >> I am also adding Aljosha and Kostas to cc, who should be more helpful on >> that topic. >> >> Best, >> >> Dawid >> >> On 19/10/2018 20:06, Aaron Levin wrote: >>> Hi, >>> >>> I'm writing a custom `SourceFunction` which wraps an underlying >>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a >>> stream (via `env.addSource` and a subsequent sink) I get errors related to >>> the `InputSplitAssigner` not being initialized for a particular vertex ID. >>> Full error here[1]. >>> >>> I believe the underlying error is related to this[0] call to `instanceof >>> InputFormatSourceFunction`. >>> >>> My questions: >>> >>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am >>> I missing a chunk of the API covering this? >>> 2. is the error I'm experience related to that casting call? If so, would >>> ya'll be open to a PR which adds an interface one can extend which will set >>> the input format in the stream graph? Or is there a preferred way of >>> achieving this? >>> >>> Thanks! >>> >>> Aaron Levin >>> >>> [0] >>> https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480 >>> >>> <https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480> >>> [1] >>> java.lang.RuntimeException: Could not retrieve next input split. >>> at >>> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157) >>> at >>> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71) >>> at REDACTED >>> at >>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >>> at java.lang.Thread.run(Thread.java:748) >>> Caused by: >>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: >>> Requesting the next input split failed. >>> at >>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69) >>> at >>> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155) >>> ... 8 more >>> Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No >>> InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2 >>> at >>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>> at >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) >>> at >>> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61) >>> ... 9 more >>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID >>> cbc357ccb763df2852fee8c4fc7d55f2 >>> at >>> org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) >>> ... >> >> > >