Hey,

Thanks for reaching out! I'd love to take a step back and find a better
solution, so I'll try to be succint in what I'm trying to accomplish:

We're trying to write a SourceFunction which:
* reads some Sequence files from S3 in a particular order (each task gets
files in a specific order).
* sends a watermark between each sequence file
* when that's complete, starts reading from Kafka topics.
* (This is similar to the bootstrap problem which Lyft has talked about
(see:
https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink
))

The current solution I have involves a custom InputFormat, InputSplit, and
SplitAssignor. It achieves most of these requirements, except I have to
extend InputFormatSourceFunction. I have a class that looks like:

class MySourceFunction(val s3Archives: CustomInputFormat, val kafka:
KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}

There are lots I don't like about the existing solution:
* I have to extend InputFormatSourceFunction to ensure the graph is
initialized properly (the bug I wrote about)
* I had to replicate most of the implementation of
InputFormatSourceFunction so I could insert Watermarks between splits.

I'd love any suggestions around improving this!

Best,

Aaron Levin

On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi Aaron,
>
> I'l like to take a step back and understand why you're trying to wrap an
> InputFormatSourceFunction?
>
> In my opinion, InputFormatSourceFunction should not be used because it has
> some shortcomings, the most prominent among them that it does not support
> checkpointing, i.e. in case of failure all data will (probably) be read
> again. I'm saying probably because the interaction of
> InputFormatSourceFunction with how InputSplits are generated (which relates
> to that code snippet with the cast you found) could be somewhat "spooky"
> and lead to weird results in some cases.
>
> The interface is a remnant of a very early version of the streaming API
> and should probably be removed soon. I hope we can find a better solution
> for your problem that fits better with Flink.
>
> Best,
> Aljoscha
>
> On 1. Nov 2018, at 15:30, Aaron Levin <aaronle...@stripe.com> wrote:
>
> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
> provide any insight or advice, that would be helpful!
>
> Thanks again.
>
> Best,
>
> Aaron Levin
>
> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronle...@stripe.com>
> wrote:
>
>> Hey,
>>
>> Not sure how convo threading works on this list, so in case the folks
>> CC'd missed my other response, here's some more info:
>>
>> First, I appreciate everyone's help! Thank you!
>>
>> I wrote several wrappers to try and debug this, including one which is an
>> exact copy of `InputFormatSourceFunction` which also failed. They all
>> failed with the same error I detail above. I'll post two of them below.
>> They all extended `RichParallelSourceFunction` and, as far as I could tell,
>> were properly initialized (though I may have missed something!).
>> Additionally, for the two below, if I change `extends
>> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
>> I no longer receive the exception. This is what led me to believe the
>> source of the issue was casting and how I found the line of code where the
>> stream graph is given the input format.
>>
>> Quick explanation of the wrappers:
>> 1. `WrappedInputFormat` does a basic wrap around
>> `InputFormatSourceFunction` and delegates all methods to the underlying
>> `InputFormatSourceFunction`
>> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
>> `InputFormatSourceFunction` source.
>> 3. They're being used in a test which looks vaguely like:
>> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
>> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str
>> ing]]))).javaStream).asScala.toSeq`
>>
>> class WrappedInputFormat[A](
>>   inputFormat: InputFormatSourceFunction[A]
>> )(
>>   implicit typeInfo: TypeInformation[A]
>> ) extends RichParallelSourceFunction[A] {
>>
>>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
>> = {
>>     inputFormat.run(sourceContext)
>>   }
>>   override def setRuntimeContext(t: RuntimeContext): Unit = {
>>     inputFormat.setRuntimeContext(t)
>>   }
>>   override def equals(obj: scala.Any) = {
>>     inputFormat.equals(obj)
>>   }
>>   override def hashCode() = { inputFormat.hashCode() }
>>   override def toString = { inputFormat.toString }
>>   override def getRuntimeContext(): RuntimeContext = {
>> inputFormat.getRuntimeContext }
>>   override def getIterationRuntimeContext = {
>> inputFormat.getIterationRuntimeContext }
>>   override def open(parameters: Configuration): Unit = {
>>     inputFormat.open(parameters)
>>   }
>>   override def cancel(): Unit = {
>>     inputFormat.cancel()
>>   }
>>   override def close(): Unit = {
>>     inputFormat.close()
>>   }
>> }
>>
>> And the other one:
>>
>> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
>> InputSplit], val typeInfo: TypeInformation[A]) extends
>> RichParallelSourceFunction[A] {
>>
>>   @transient private var provider: InputSplitProvider = _
>>   @transient private var serializer: TypeSerializer[A] = _
>>   @transient private var splitIterator: Iterator[InputSplit] = _
>>   private var isRunning: Boolean = _
>>
>>   override def open(parameters: Configuration): Unit = {
>>     val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext]
>>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>>       format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(
>> context)
>>     }
>>     format.configure(parameters)
>>
>>     provider = context.getInputSplitProvider
>>     serializer = typeInfo.createSerializer(getR
>> untimeContext.getExecutionConfig)
>>     splitIterator = getInputSplits()
>>     isRunning = splitIterator.hasNext
>>   }
>>
>>   override def run(sourceContext: SourceFunction.SourceContext[A]): Unit
>> = {
>>     if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
>>       format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
>>     }
>>
>>     var nextElement: A = serializer.createInstance()
>>     try {
>>       while (isRunning) {
>>         format.open(splitIterator.next())
>>         while (isRunning && !format.reachedEnd()) {
>>           nextElement = format.nextRecord(nextElement)
>>           if (nextElement != null) {
>>             sourceContext.collect(nextElement)
>>           } else {
>>             break
>>           }
>>           format.close()
>>           if (isRunning) {
>>             isRunning = splitIterator.hasNext
>>           }
>>         }
>>       }
>>     } finally {
>>
>>       format.close()
>>       if (format.isInstanceOf[RichInputFormat[_,_]]) {
>>         format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>>       }
>>       isRunning = false
>>     }
>>   }
>>
>>   override def cancel(): Unit = {
>>     isRunning = false
>>   }
>>
>>   override def close(): Unit = {
>>     format.close()
>>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>>       format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>>     }
>>   }
>>
>>   private def getInputSplits(): Iterator[InputSplit] = {
>>     new Iterator[InputSplit] {
>>       private var nextSplit: InputSplit = _
>>       private var exhausted: Boolean = _
>>
>>       override def hasNext: Boolean = {
>>         if(exhausted) { return false }
>>         if(nextSplit != null) { return true }
>>         var split: InputSplit = null
>>
>>         try {
>>           split = provider.getNextInputSplit(get
>> RuntimeContext.getUserCodeClassLoader)
>>         } catch {
>>           case e: InputSplitProviderException =>
>>             throw new RuntimeException("No InputSplit Provider", e)
>>         }
>>
>>         if(split != null) {
>>           nextSplit = split
>>           true
>>         } else {
>>           exhausted = true
>>           false
>>         }
>>       }
>>
>>       override def next(): InputSplit = {
>>         if(nextSplit == null && !hasNext) {
>>           throw new NoSuchElementException()
>>         }
>>         val tmp: InputSplit = nextSplit
>>         nextSplit = null
>>         tmp
>>       }
>>
>>     }
>>   }
>> }
>>
>>
>> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <dwysakow...@apache.org
>> > wrote:
>>
>>> Hi Aaron,
>>>
>>> Could you share the code of you custom function?
>>>
>>> I am also adding Aljosha and Kostas to cc, who should be more helpful on
>>> that topic.
>>>
>>> Best,
>>>
>>> Dawid
>>> On 19/10/2018 20:06, Aaron Levin wrote:
>>>
>>> Hi,
>>>
>>> I'm writing a custom `SourceFunction` which wraps an underlying
>>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
>>> stream (via `env.addSource` and a subsequent sink) I get errors related to
>>> the `InputSplitAssigner` not being initialized for a particular vertex ID.
>>> Full error here[1].
>>>
>>> I believe the underlying error is related to this[0] call to `instanceof
>>> InputFormatSourceFunction`.
>>>
>>> *My questions*:
>>>
>>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this error?
>>> Am I missing a chunk of the API covering this?
>>> 2. is the error I'm experience related to that casting call? If so,
>>> would ya'll be open to a PR which adds an interface one can extend which
>>> will set the input format in the stream graph? Or is there a preferred way
>>> of achieving this?
>>>
>>> Thanks!
>>>
>>> Aaron Levin
>>>
>>> [0] https://github.com/apache/flink/blob/release-1.6/flink-s
>>> treaming-java/src/main/java/org/apache/flink/streaming/api/g
>>> raph/StreamGraphGenerator.java#L480
>>> [1]
>>> java.lang.RuntimeException: Could not retrieve next input split.
>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>> ourceFunction.open(InputFormatSourceFunction.java:71)
>>>     at REDACTED
>>>     at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>> nFunction(FunctionUtils.java:36)
>>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>> erator.open(AbstractUdfStreamOperator.java:102)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>> perators(StreamTask.java:424)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>> treamTask.java:290)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: 
>>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>>> Requesting the next input split failed.
>>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>>> der.getNextInputSplit(RpcInputSplitProvider.java:69)
>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>>>     ... 8 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.Exception: No InputSplitAssigner for vertex ID
>>> cbc357ccb763df2852fee8c4fc7d55f2
>>>     at java.util.concurrent.CompletableFuture.reportGet(Completable
>>> Future.java:357)
>>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>> .java:1915)
>>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>>> der.getNextInputSplit(RpcInputSplitProvider.java:61)
>>>     ... 9 more
>>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
>>> cbc357ccb763df2852fee8c4fc7d55f2
>>>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInpu
>>> tSplit(JobMaster.java:575)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>> ssorImpl.java:62)
>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>> thodAccessorImpl.java:43)
>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>>> cation(AkkaRpcActor.java:247)
>>> ...
>>>
>>>
>>
>
>

Reply via email to