You mentioned 0.11 version; the latest version of org.apache.kafka kafka-clients artifact supported by DStreams is 0.10.0.1, for which it has an appropriate dependency.
Are you manually depending on a different version of the kafka-clients artifact? On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <meven...@gmail.com> wrote: > Version: 2.2 with Kafka010 > > Hi, > > We are running spark streaming on AWS and trying to process incoming > messages on Kafka topics. All was well. > Recently we wanted to migrate from 0.8 to 0.11 version of Spark library and > Kafka 0.11 version of server. > > With this new version of software we are facing issues with regard to 'No > assignment to partition for a topic and it happens intermittently'. I > construct four DStreams with different group.ids as suggested. > > The main source of code thats causing the issue is this one > > if (!toSeek.isEmpty) { > // work around KAFKA-3370 when reset is none > // poll will throw if no position, i.e. auto offset reset none and no > explicit position > // but cant seek to a position before poll, because poll is what gets > subscription partitions > // So, poll, suppress the first exception, then seek > val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) > val shouldSuppress = aor != null && > aor.asInstanceOf[String].toUpperCase == "NONE" > try { > consumer.poll(0) > } catch { > case x: NoOffsetForPartitionException if shouldSuppress => > logWarning("Catching NoOffsetForPartitionException since " + > ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See > KAFKA-3370") > } > toSeek.asScala.foreach { case (topicPartition, offset) => > *consumer.seek(topicPartition, offset)* > } > } > > At the start of the job, I also ensure we are supplying all required offsets > correctly > > private Map<TopicPartition, Long> getCommittedOffsets(String topic) { > Map<TopicPartition, Long> offsets = new HashMap<>(); > List<TopicPartition> topicPartitions = > consumer.partitionsFor(topic).stream().map(partitionInfo -> > new TopicPartition(partitionInfo.topic(), > partitionInfo.partition())) > .collect(Collectors.toList()); > Map<TopicPartition, Long> earliestOffsets = > consumer.beginningOffsets(topicPartitions); > // pick committed offsets > for (TopicPartition topicAndPartition : topicPartitions) { > final OffsetAndMetadata committed = > consumer.committed(topicAndPartition); > Long earliestOffset = earliestOffsets.get(topicAndPartition); > if (committed != null && committed.offset() > earliestOffset) { > logger > .warn( > "Committed offset found for: {} offset:{} -> Hence adding > committed offset", > topicAndPartition, committed.offset()); > offsets.put(topicAndPartition, committed.offset()); > } else { > logger > .warn( > "New partition/stale offset found for: {} offset:{} -> Hence > adding earliest offset", > topicAndPartition, earliestOffset); > offsets.put(topicAndPartition, earliestOffset); > } > } > return offsets; > } > > The actual stack trace: > > Caused by: java.lang.IllegalStateException: No current assignment for > partition genericEvents-1 > 2017-11-23 10:35:24,677 - at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269) > 2017-11-23 10:35:24,677 - at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294) > 2017-11-23 10:35:24,677 - at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249) > 2017-11-23 10:35:24,678 - at > org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107) > 2017-11-23 10:35:24,678 - at > org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106) > 2017-11-23 10:35:24,678 - at > scala.collection.Iterator$class.foreach(Iterator.scala:893) > 2017-11-23 10:35:24,678 - at > scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > 2017-11-23 10:35:24,678 - at > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > 2017-11-23 10:35:24,678 - at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > 2017-11-23 10:35:24,678 - at > org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106) > 2017-11-23 10:35:24,679 - at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72) > 2017-11-23 10:35:24,679 - at > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242) > 2017-11-23 10:35:24,679 - at > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54) > 2017-11-23 10:35:24,679 - at > org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54) > 2017-11-23 10:35:24,679 - at > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) > 2017-11-23 10:35:24,679 - at > scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) > 2017-11-23 10:35:24,679 - at > scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) > 2017-11-23 10:35:24,680 - at > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) > 2017-11-23 10:35:24,680 - at > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) > 2017-11-23 10:35:24,680 - at > scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) > 2017-11-23 10:35:24,680 - at > scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) > 2017-11-23 10:35:24,680 - at > scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) > 2017-11-23 10:35:24,680 - at > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159) > 2017-11-23 10:35:24,680 - at > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443) > 2017-11-23 10:35:24,681 - at > scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149) > 2017-11-23 10:35:24,681 - at > scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) > 2017-11-23 10:35:24,681 - at > scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) > 2017-11-23 10:35:24,681 - at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > > > > -- > Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ > > --------------------------------------------------------------------- > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org