Hi,
Since InputFormatSourceFunction is a subclass of
RichParallelSourceFunction, your wrapper should also extend this class.
In addition, remember to overwrite the methods defined in the
AbstractRichFunction interface and
proxy the call to the underlying InputFormatSourceFunction, in order to
initialize the underlying source correctly.
Best regards,
Kien
On 10/20/2018 1:06 AM, Aaron Levin wrote:
Hi,
I'm writing a custom `SourceFunction` which wraps an underlying
`InputFormatSourceFunction`. When I try to use this `SourceFunction`
in a stream (via `env.addSource` and a subsequent sink) I get errors
related to the `InputSplitAssigner` not being initialized for a
particular vertex ID. Full error here[1].
I believe the underlying error is related to this[0] call to
`instanceof InputFormatSourceFunction`.
_My questions_:
1. how can I wrap a `InputFormatSourceFunction` which avoids this
error? Am I missing a chunk of the API covering this?
2. is the error I'm experience related to that casting call? If so,
would ya'll be open to a PR which adds an interface one can extend
which will set the input format in the stream graph? Or is there a
preferred way of achieving this?
Thanks!
Aaron Levin
[0]
https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480
[1]
java.lang.RuntimeException: Could not retrieve next input split.
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
at REDACTED
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
Requesting the next input split failed.
at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
... 8 more
Caused by: java.util.concurrent.ExecutionException:
java.lang.Exception: No InputSplitAssigner for vertex ID
cbc357ccb763df2852fee8c4fc7d55f2
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
... 9 more
Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
cbc357ccb763df2852fee8c4fc7d55f2
at
org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
...