Yeah, don't mix multiple versions of kafka clients. That's not 100% certain to be the cause of your problem, but it can't be helping.
As for your comments about async commits, read https://issues.apache.org/jira/browse/SPARK-22486 and if you think your use case is still relevant to others given those constraints, then share it. On Fri, Dec 1, 2017 at 4:11 AM, Qiao, Richard <richard.q...@capitalone.com> wrote: > In your case, it looks it’s trying to make 2 versions Kafka existed in the > same JVM at runtime. There is version conflict. > > > > About “I dont find the spark async commit useful for our needs”, do you > mean to say the code like below? > > kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges) > > > > > > Best Regards > > Richard > > > > > > From: venkat <meven...@gmail.com> > Date: Thursday, November 30, 2017 at 8:16 PM > To: Cody Koeninger <c...@koeninger.org> > Cc: "user@spark.apache.org" <user@spark.apache.org> > Subject: Re: [Spark streaming] No assigned partition error during seek > > > > I notice that 'Do not manually add dependencies on org.apache.kafka > artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has > the appropriate transitive dependencies already, and different versions may > be incompatible in hard to diagnose way' after your query. > > Does this imply that we should not be adding kafka clients in our jars?. > > Thanks > > Venkat > > > > On Fri, 1 Dec 2017 at 06:45 venkat <meven...@gmail.com> wrote: > > Yes I use latest Kafka clients 0.11 to determine beginning offsets without > seek and also I use Kafka offsets commits externally. > > I dont find the spark async commit useful for our needs. > > Thanks > > Venkat > > > > On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <c...@koeninger.org> wrote: > > You mentioned 0.11 version; the latest version of org.apache.kafka > kafka-clients artifact supported by DStreams is 0.10.0.1, for which it > has an appropriate dependency. > > Are you manually depending on a different version of the kafka-clients > artifact? > > On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <meven...@gmail.com> wrote: >> Version: 2.2 with Kafka010 >> >> Hi, >> >> We are running spark streaming on AWS and trying to process incoming >> messages on Kafka topics. All was well. >> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library >> and >> Kafka 0.11 version of server. >> >> With this new version of software we are facing issues with regard to 'No >> assignment to partition for a topic and it happens intermittently'. I >> construct four DStreams with different group.ids as suggested. >> >> The main source of code thats causing the issue is this one >> >> if (!toSeek.isEmpty) { >> // work around KAFKA-3370 when reset is none >> // poll will throw if no position, i.e. auto offset reset none and >> no >> explicit position >> // but cant seek to a position before poll, because poll is what >> gets >> subscription partitions >> // So, poll, suppress the first exception, then seek >> val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) >> val shouldSuppress = aor != null && >> aor.asInstanceOf[String].toUpperCase == "NONE" >> try { >> consumer.poll(0) >> } catch { >> case x: NoOffsetForPartitionException if shouldSuppress => >> logWarning("Catching NoOffsetForPartitionException since " + >> ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See >> KAFKA-3370") >> } >> toSeek.asScala.foreach { case (topicPartition, offset) => >> *consumer.seek(topicPartition, offset)* >> } >> } >> >> At the start of the job, I also ensure we are supplying all required >> offsets >> correctly >> >> private Map<TopicPartition, Long> getCommittedOffsets(String topic) { >> Map<TopicPartition, Long> offsets = new HashMap<>(); >> List<TopicPartition> topicPartitions = >> consumer.partitionsFor(topic).stream().map(partitionInfo -> >> new TopicPartition(partitionInfo.topic(), >> partitionInfo.partition())) >> .collect(Collectors.toList()); >> Map<TopicPartition, Long> earliestOffsets = >> consumer.beginningOffsets(topicPartitions); >> // pick committed offsets >> for (TopicPartition topicAndPartition : topicPartitions) { >> final OffsetAndMetadata committed = >> consumer.committed(topicAndPartition); >> Long earliestOffset = earliestOffsets.get(topicAndPartition); >> if (committed != null && committed.offset() > earliestOffset) { >> logger >> .warn( >> "Committed offset found for: {} offset:{} -> Hence adding >> committed offset", >> topicAndPartition, committed.offset()); >> offsets.put(topicAndPartition, committed.offset()); >> } else { >> logger >> .warn( >> "New partition/stale offset found for: {} offset:{} -> >> Hence >> adding earliest offset", >> topicAndPartition, earliestOffset); >> offsets.put(topicAndPartition, earliestOffset); >> } >> } >> return offsets; >> } >> >> The actual stack trace: >> >> Caused by: java.lang.IllegalStateException: No current assignment for >> partition genericEvents-1 >> 2017-11-23 10:35:24,677 - at >> >> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269) >> 2017-11-23 10:35:24,677 - at >> >> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294) >> 2017-11-23 10:35:24,677 - at >> >> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249) >> 2017-11-23 10:35:24,678 - at >> >> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107) >> 2017-11-23 10:35:24,678 - at >> >> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106) >> 2017-11-23 10:35:24,678 - at >> scala.collection.Iterator$class.foreach(Iterator.scala:893) >> 2017-11-23 10:35:24,678 - at >> scala.collection.AbstractIterator.foreach(Iterator.scala:1336) >> 2017-11-23 10:35:24,678 - at >> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) >> 2017-11-23 10:35:24,678 - at >> scala.collection.AbstractIterable.foreach(Iterable.scala:54) >> 2017-11-23 10:35:24,678 - at >> >> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106) >> 2017-11-23 10:35:24,679 - at >> >> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72) >> 2017-11-23 10:35:24,679 - at >> >> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242) >> 2017-11-23 10:35:24,679 - at >> >> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54) >> 2017-11-23 10:35:24,679 - at >> >> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54) >> 2017-11-23 10:35:24,679 - at >> >> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143) >> 2017-11-23 10:35:24,679 - at >> >> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136) >> 2017-11-23 10:35:24,679 - at >> >> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972) >> 2017-11-23 10:35:24,680 - at >> >> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49) >> 2017-11-23 10:35:24,680 - at >> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) >> 2017-11-23 10:35:24,680 - at >> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48) >> 2017-11-23 10:35:24,680 - at >> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51) >> 2017-11-23 10:35:24,680 - at >> >> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969) >> 2017-11-23 10:35:24,680 - at >> >> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159) >> 2017-11-23 10:35:24,680 - at >> >> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443) >> 2017-11-23 10:35:24,681 - at >> >> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149) >> 2017-11-23 10:35:24,681 - at >> >> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443) >> 2017-11-23 10:35:24,681 - at >> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) >> 2017-11-23 10:35:24,681 - at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >> >> >> >> -- >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > > > ________________________________ > > The information contained in this e-mail is confidential and/or proprietary > to Capital One and/or its affiliates and may only be used solely in > performance of work or services for Capital One. The information transmitted > herewith is intended only for use by the individual or entity to which it is > addressed. If the reader of this message is not the intended recipient, you > are hereby notified that any review, retransmission, dissemination, > distribution, copying or other use of, or taking of any action in reliance > upon this information is strictly prohibited. If you have received this > communication in error, please contact the sender and delete the material > from your computer. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org