I understand that I've to create 10 parallel streams. My code is running fine when the no of partitions is ~20, but when I increase the no of partitions I keep getting in this issue.
Below is my code to create kafka streams, along with the configs used. Map<String, String> kafkaConf = new HashMap<String, String>(); kafkaConf.put("zookeeper.connect", kafkaZkQuorum); kafkaConf.put("group.id", kafkaConsumerGroup); kafkaConf.put("consumer.timeout.ms", "30000"); kafkaConf.put("auto.offset.reset", "largest"); kafkaConf.put("fetch.message.max.bytes", "20000000"); kafkaConf.put("zookeeper.session.timeout.ms", "6000"); kafkaConf.put("zookeeper.connection.timeout.ms", "6000"); kafkaConf.put("zookeeper.sync.time.ms", "2000"); kafkaConf.put("rebalance.backoff.ms", "10000"); kafkaConf.put("rebalance.max.retries", "20"); String[] topics = kafkaTopicsList; int numStreams = numKafkaThreads; // this is *10* Map<String, Integer> topicMap = new HashMap<>(); for (String topic: topics) { topicMap.put(topic, numStreams); } List<JavaPairDStream<byte[], byte[]>> kafkaStreams = new ArrayList<>(numStreams); for (int i = 0; i < numStreams; i++) { kafkaStreams.add(KafkaUtils.createStream(sc, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER())); } JavaPairDStream<byte[], byte[]> ks = sc.union(kafkaStreams.remove(0), kafkaStreams); On Wed, Jan 7, 2015 at 8:21 PM, Gerard Maas <gerard.m...@gmail.com> wrote: > Hi, > > Could you add the code where you create the Kafka consumer? > > -kr, Gerard. > > On Wed, Jan 7, 2015 at 3:43 PM, <francois.garil...@typesafe.com> wrote: > >> Hi Mukesh, >> >> If my understanding is correct, each Stream only has a single Receiver. >> So, if you have each receiver consuming 9 partitions, you need 10 input >> DStreams to create 10 concurrent receivers: >> >> >> https://spark.apache.org/docs/latest/streaming-programming-guide.html#level-of-parallelism-in-data-receiving >> >> Would you mind sharing a bit more on how you achieve this ? >> >> -- >> FG >> >> >> On Wed, Jan 7, 2015 at 3:00 PM, Mukesh Jha <me.mukesh....@gmail.com> >> wrote: >> >>> Hi Guys, >>> >>> I have a kafka topic having 90 partitions and I running >>> SparkStreaming(1.2.0) to read from kafka via KafkaUtils to create 10 >>> kafka-receivers. >>> >>> My streaming is running fine and there is no delay in processing, just >>> that some partitions data is never getting picked up. From the kafka >>> console I can see that each receiver is consuming data from 9 partitions >>> but the lag for some offsets keeps on increasing. >>> >>> Below is my kafka-consumers parameters. >>> >>> Any of you have face this kind of issue, if so then do you have any >>> pointers to fix it? >>> >>> Map<String, String> kafkaConf = new HashMap<String, String>(); >>> kafkaConf.put("zookeeper.connect", kafkaZkQuorum); >>> kafkaConf.put("group.id", kafkaConsumerGroup); >>> kafkaConf.put("consumer.timeout.ms", "30000"); >>> kafkaConf.put("auto.offset.reset", "largest"); >>> kafkaConf.put("fetch.message.max.bytes", "20000000"); >>> kafkaConf.put("zookeeper.session.timeout.ms", "6000"); >>> kafkaConf.put("zookeeper.connection.timeout.ms", "6000"); >>> kafkaConf.put("zookeeper.sync.time.ms", "2000"); >>> kafkaConf.put("rebalance.backoff.ms", "10000"); >>> kafkaConf.put("rebalance.max.retries", "20"); >>> >>> -- >>> Thanks & Regards, >>> >>> Mukesh Jha <me.mukesh....@gmail.com> >>> >> >> > -- Thanks & Regards, *Mukesh Jha <me.mukesh....@gmail.com>*