Hi,

I'm writing a custom `SourceFunction` which wraps an underlying
`InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
stream (via `env.addSource` and a subsequent sink) I get errors related to
the `InputSplitAssigner` not being initialized for a particular vertex ID.
Full error here[1].

I believe the underlying error is related to this[0] call to `instanceof
InputFormatSourceFunction`.

*My questions*:

1. how can I wrap a `InputFormatSourceFunction` which avoids this error? Am
I missing a chunk of the API covering this?
2. is the error I'm experience related to that casting call? If so, would
ya'll be open to a PR which adds an interface one can extend which will set
the input format in the stream graph? Or is there a preferred way of
achieving this?

Thanks!

Aaron Levin

[0]
https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480
[1]
java.lang.RuntimeException: Could not retrieve next input split.
    at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
    at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71)
    at REDACTED
    at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
    at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
Requesting the next input split failed.
    at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
    at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
    ... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: No
InputSplitAssigner for vertex ID cbc357ccb763df2852fee8c4fc7d55f2
    at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
    at
org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
    ... 9 more
Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
cbc357ccb763df2852fee8c4fc7d55f2
    at
org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
...

Reply via email to