Hi, I am trying to setup FlinkKafkaConsumer which reads from two partitions in local mode, using setParallelism=2. The producer writes to two partition (as it is shown in metrics report). But the consumer seems to read always from one partition only. Am I missing something in partition configuration?
Code: Producer setup: Configuration localConfig = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig); env.setParallelism(2); String kafkaPort = parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName()); SingleOutputStreamOperator<String> fixMsgSource = env.addSource(srcMsgProvider.getFixMsgSource(), TypeInformation.of(String.class)).name(sourceGenerationType.getValue()); fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:" + kafkaPort, TOPIC_NAME, new SimpleStringSchema())) .name("fix_topic"); env.execute("MsgSimulatorJob"); Consumer setup: String topicName = "fix"; Configuration conf = new Configuration(); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(2); env.getConfig().setGlobalJobParameters(configParams); // make parameters available in the web interface DeserializationSchema<Tuple2<Long, String>> deserializationSchema = new SimpleStringAndTimestampDeserializationSchema (); FlinkKafkaConsumer010<Tuple2<Long, String>> kafkaConsumer = new FlinkKafkaConsumer010<>(topicName, deserializationSchema, kafkaParams.getProperties()); DataStream<Tuple2<Long, String>> fixMessagesStream = env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2); As you can see in output, only 1 consumer partition seems to be used: Producer output: 2017-09-19 14:40:45,818 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0 2017-09-19 14:40:45,818 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.1.numRecordsInPerSecond: 19836.033333333333 2017-09-19 14:40:45,818 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: fix_topic.0.numRecordsInPerSecond: 20337.933333333334 2017-09-19 14:40:45,819 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0 Consumer output: 2017-09-19 14:40:45,116 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.266666666666 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.1.numBytesInRemotePerSecond: 0.0 2017-09-19 14:40:45,117 INFO - [Flink-MetricRegistry-1] 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming Job.Source: fix_topic.0.numRecordsOutPerSecond: 0.0 Thanks and regards, Tovi