Hi,
We are running spark streaming on AWS and trying to process incoming messages on Kafka topics. All was well. Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and Kafka 0.11 version of server. With this new version of software we are facing issues with regard to 'No assignment to partition for a topic and it happens intermittently'. I construct four DStreams with different group.ids as suggested. The main source of code thats causing the issue is this one if (!toSeek.isEmpty) { // work around KAFKA-3370 when reset is none // poll will throw if no position, i.e. auto offset reset none and no explicit position // but cant seek to a position before poll, because poll is what gets subscription partitions // So, poll, suppress the first exception, then seek val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase == "NONE" try { consumer.poll(0) } catch { case x: NoOffsetForPartitionException if shouldSuppress => logWarning("Catching NoOffsetForPartitionException since " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370") } toSeek.asScala.foreach { case (topicPartition, offset) => *consumer.seek(topicPartition, offset)* } } At the start of the job, I also ensure we are supplying all required offsets correctly private Map<TopicPartition, Long> getCommittedOffsets(String topic) { Map<TopicPartition, Long> offsets = new HashMap<>(); List<TopicPartition> topicPartitions = consumer.partitionsFor(topic).stream().map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition())) .collect(Collectors.toList()); Map<TopicPartition, Long> earliestOffsets = consumer.beginningOffsets(topicPartitions); // pick committed offsets for (TopicPartition topicAndPartition : topicPartitions) { final OffsetAndMetadata committed = consumer.committed(topicAndPartition); Long earliestOffset = earliestOffsets.get(topicAndPartition); if (committed != null && committed.offset() > earliestOffset) { logger .warn( "Committed offset found for: {} offset:{} -> Hence adding committed offset", topicAndPartition, committed.offset()); offsets.put(topicAndPartition, committed.offset()); } else { logger .warn( "New partition/stale offset found for: {} offset:{} -> Hence adding earliest offset", topicAndPartition, earliestOffset); offsets.put(topicAndPartition, earliestOffset); } } return offsets; } The actual stack trace: Caused by: java.lang.IllegalStateException: No current assignment for partition genericEvents-1 2017-11-23 10:35:24,677 - at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269) 2017-11-23 10:35:24,677 - at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294) 2017-11-23 10:35:24,677 - at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249) 2017-11-23 10:35:24,678 - at org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107) 2017-11-23 10:35:24,678 - at org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106) 2017-11-23 10:35:24,678 - at scala.collection.Iterator$class.foreach(Iterator.scala:893) 2017-11-23 10:35:24,678 - at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 2017-11-23 10:35:24,678 - at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 2017-11-23 10:35:24,678 - at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 2017-11-23 10:35:24,678 - at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106) 2017-11-23 10:35:24,679 - at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72) 2017-11-23 10:35:24,679 - at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242) 2017-11-23 10:35:24,679 - at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54) 2017-11-23 10:35:24,679 - at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54) 2017-11-23 10:35:24,679 - at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) 2017-11-23 10:35:24,679 - at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) 2017-11-23 10:35:24,679 - at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) 2017-11-23 10:35:24,680 - at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) 2017-11-23 10:35:24,680 - at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) 2017-11-23 10:35:24,680 - at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) 2017-11-23 10:35:24,680 - at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) 2017-11-23 10:35:24,680 - at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) 2017-11-23 10:35:24,680 - at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159) 2017-11-23 10:35:24,680 - at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443) 2017-11-23 10:35:24,681 - at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149) 2017-11-23 10:35:24,681 - at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) 2017-11-23 10:35:24,681 - at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) 2017-11-23 10:35:24,681 - at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) Consumer properties auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [172.16.202.58:9092, 172.16.201.212:9092, 172.16.202.57:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = tp-preprocessor-venkat heartbeat.interval.ms = 100000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 300000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS Thanks Venkat