Hi Aaron, Could you share the code of you custom function?
I am also adding Aljosha and Kostas to cc, who should be more helpful on that topic. Best, Dawid On 19/10/2018 20:06, Aaron Levin wrote: > Hi, > > I'm writing a custom `SourceFunction` which wraps an underlying > `InputFormatSourceFunction`. When I try to use this `SourceFunction` > in a stream (via `env.addSource` and a subsequent sink) I get errors > related to the `InputSplitAssigner` not being initialized for a > particular vertex ID. Full error here[1]. > > I believe the underlying error is related to this[0] call to > `instanceof InputFormatSourceFunction`. > > _My questions_: > > 1. how can I wrap a `InputFormatSourceFunction` which avoids this > error? Am I missing a chunk of the API covering this? > 2. is the error I'm experience related to that casting call? If so, > would ya'll be open to a PR which adds an interface one can extend > which will set the input format in the stream graph? Or is there a > preferred way of achieving this? > > Thanks! > > Aaron Levin > > [0] > https://github.com/apache/flink/blob/release-1.6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java#L480 > [1] > java.lang.RuntimeException: Could not retrieve next input split. > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:157) > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.open(InputFormatSourceFunction.java:71) > at REDACTED > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: > Requesting the next input split failed. > at > org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69) > at > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction$1.hasNext(InputFormatSourceFunction.java:155) > ... 8 more > Caused by: java.util.concurrent.ExecutionException: > java.lang.Exception: No InputSplitAssigner for vertex ID > cbc357ccb763df2852fee8c4fc7d55f2 > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915) > at > org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61) > ... 9 more > Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID > cbc357ccb763df2852fee8c4fc7d55f2 > at > org.apache.flink.runtime.jobmaster.JobMaster.requestNextInputSplit(JobMaster.java:575) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247) > ...
signature.asc
Description: OpenPGP digital signature