Hello Nicolas,

Thank you for reporting this.

Looks like we have an issue when number of Kafka topic partitions is less than 
value of parallelism (number of task slots).
So, a workaround for now can be to set parallelism <= number of topic 
partitions - thus, if parallelism=2 then number_partitions should be >= 2

I created new Jira issue to track and work on this:
https://issues.apache.org/jira/browse/BEAM-4798 
<https://issues.apache.org/jira/browse/BEAM-4798>


Alexey

> On 12 Jul 2018, at 16:08, Nicolas Viard <[email protected]> wrote:
> 
> Hello,
> 
> I'm trying to use Flink in streaming mode and get data from a Kafka topic.
> It works without parallelism, but it doesn't when i set parallelism > 1 and I 
> get this exception:
> 
> java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
>       at java.util.ArrayList.rangeCheck(ArrayList.java:657)
>       at java.util.ArrayList.get(ArrayList.java:433)
>       at 
> org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:277)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>       at 
> org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
>       at java.lang.Thread.run(Thread.java:748)
> 
> I'm using Beam 2.5.0, so this list must contain local readers, but I don't 
> know why it is empty.
> Does anyone have an idea about how I can fix this ?
> 
> This is the beginning of my pipeline :
> 
> PipelineOptions options = PipelineOptionsFactory.create();
> 
> options.setRunner(FlinkRunner.class);
> options.as(FlinkPipelineOptions.class).setParallelism(-1);
> 
> pipeline = Pipeline.create(options);
> 
> pipeline
>     .apply(
>         KafkaIO.<String, Data>read()
>             .withBootstrapServers("*****")
>             .withTopic("*****")
>             .withKeyDeserializer(StringDeserializer.class)
>             .withValueDeserializer(DataDeserializer.class)
>             .withoutMetadata()
>     )
> ;
> 
> pipeline.run();
> Best Regards,
> 
> Nicolas Viard

Reply via email to