Hello Nicolas, Thank you for reporting this.
Looks like we have an issue when number of Kafka topic partitions is less than value of parallelism (number of task slots). So, a workaround for now can be to set parallelism <= number of topic partitions - thus, if parallelism=2 then number_partitions should be >= 2 I created new Jira issue to track and work on this: https://issues.apache.org/jira/browse/BEAM-4798 <https://issues.apache.org/jira/browse/BEAM-4798> Alexey > On 12 Jul 2018, at 16:08, Nicolas Viard <[email protected]> wrote: > > Hello, > > I'm trying to use Flink in streaming mode and get data from a Kafka topic. > It works without parallelism, but it doesn't when i set parallelism > 1 and I > get this exception: > > java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 > at java.util.ArrayList.rangeCheck(ArrayList.java:657) > at java.util.ArrayList.get(ArrayList.java:433) > at > org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:277) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) > at > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) > at > org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) > at java.lang.Thread.run(Thread.java:748) > > I'm using Beam 2.5.0, so this list must contain local readers, but I don't > know why it is empty. > Does anyone have an idea about how I can fix this ? > > This is the beginning of my pipeline : > > PipelineOptions options = PipelineOptionsFactory.create(); > > options.setRunner(FlinkRunner.class); > options.as(FlinkPipelineOptions.class).setParallelism(-1); > > pipeline = Pipeline.create(options); > > pipeline > .apply( > KafkaIO.<String, Data>read() > .withBootstrapServers("*****") > .withTopic("*****") > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(DataDeserializer.class) > .withoutMetadata() > ) > ; > > pipeline.run(); > Best Regards, > > Nicolas Viard
