Hi, sry for the late reply. A public holiday in Germany. Yes, its using a unique group id which no other job or consumer group is using. I have increased the session.timeout to 1 minutes and set the max.poll.rate to 1000. The processing takes ~1 second.
2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > Well, I'd start at the first thing suggested by the error, namely that > the group has rebalanced. > > Is that stream using a unique group id? > > On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff > <matthias.nieh...@codecentric.de> wrote: > > Hi, > > > > the stacktrace: > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot > be > > completed since the group has already rebalanced and assigned the > partitions > > to another member. This means that the time between subsequent calls to > > poll() was longer than the configured session.timeout.ms, which > typically > > implies that the poll loop is spending too much time message processing. > You > > can address this either by increasing the session timeout or by reducing > the > > maximum size of batches returned in poll() with max.poll.records. > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ > OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ > CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) > > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$ > CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess( > RequestFuture.java:167) > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess( > RequestFuture.java:133) > > at > > org.apache.kafka.clients.consumer.internals.RequestFuture.complete( > RequestFuture.java:107) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$ > RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) > > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient. > clientPoll(ConsumerNetworkClient.java:360) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > ConsumerNetworkClient.java:224) > > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > ConsumerNetworkClient.java:201) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:998) > > at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:937) > > at > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream. > latestOffsets(DirectKafkaInputDStream.scala:169) > > at > > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute( > DirectKafkaInputDStream.scala:196) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > > at > > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties( > DStream.scala:415) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1.apply(DStream.scala:335) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1.apply(DStream.scala:333) > > at scala.Option.orElse(Option.scala:289) > > at > > org.apache.spark.streaming.dstream.DStream.getOrCompute( > DStream.scala:330) > > at > > org.apache.spark.streaming.dstream.MapPartitionedDStream. > compute(MapPartitionedDStream.scala:37) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1$$anonfun$1.apply(DStream.scala:340) > > at > > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties( > DStream.scala:415) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1.apply(DStream.scala:335) > > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$ > getOrCompute$1.apply(DStream.scala:333) > > at scala.Option.orElse(Option.scala:289) > > at > > org.apache.spark.streaming.dstream.DStream.getOrCompute( > DStream.scala:330) > > at > > org.apache.spark.streaming.dstream.ForEachDStream. > generateJob(ForEachDStream.scala:48) > > at > > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply( > DStreamGraph.scala:117) > > at > > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply( > DStreamGraph.scala:116) > > at > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply( > TraversableLike.scala:241) > > at > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply( > TraversableLike.scala:241) > > at > > scala.collection.mutable.ResizableArray$class.foreach( > ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) > > at scala.collection.TraversableLike$class.flatMap( > TraversableLike.scala:241) > > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > > at > > org.apache.spark.streaming.DStreamGraph.generateJobs( > DStreamGraph.scala:116) > > at > > org.apache.spark.streaming.scheduler.JobGenerator$$ > anonfun$3.apply(JobGenerator.scala:248) > > at > > org.apache.spark.streaming.scheduler.JobGenerator$$ > anonfun$3.apply(JobGenerator.scala:246) > > at scala.util.Try$.apply(Try.scala:192) > > at > > org.apache.spark.streaming.scheduler.JobGenerator. > generateJobs(JobGenerator.scala:246) > > at > > org.apache.spark.streaming.scheduler.JobGenerator.org$ > apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator. > scala:182) > > at > > org.apache.spark.streaming.scheduler.JobGenerator$$anon$ > 1.onReceive(JobGenerator.scala:88) > > at > > org.apache.spark.streaming.scheduler.JobGenerator$$anon$ > 1.onReceive(JobGenerator.scala:87) > > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > > > > But it seems like the commit is not the actual problem. The job also > falls > > behind if I do not commit the offsets. The delay would be ok if the > > processing time was bigger than the batch size, but thats not the case in > > any of the microbatches. Imho for some reason one of the microbatches > falls > > behind more than session.timeout.ms. Then the consumer we regroup which > > takes about 1 minute (see timestamps below). Know begins a circle of slow > > batches each triggering a consumer regroup. Would this be possible? > > > > > > 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s for time > > 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34 > > 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully joined group > > spark_aggregation_job-kafka010 with generation 6 > > 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly assigned > > partitions [sapxm.adserving.log.ad_request-0, > > sapxm.adserving.log.ad_request-2, sapxm.adserving.log.ad_request-1, > > sapxm.adserving.log.ad_request-4, sapxm.adserving.log.ad_request-3, > > sapxm.adserving.log.ad_request-6, sapxm.adserving.log.ad_request-5, > > sapxm.adserving.log.ad_request-8, sapxm.adserving.log.ad_request-7, > > sapxm.adserving.log.ad_request-9] for group > spark_aggregation_job-kafka010 > > 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously assigned > > partitions [sapxm.adserving.log.view-3, sapxm.adserving.log.view-4, > > sapxm.adserving.log.view-1, sapxm.adserving.log.view-2, > > sapxm.adserving.log.view-0, sapxm.adserving.log.view-9, > > sapxm.adserving.log.view-7, sapxm.adserving.log.view-8, > > sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for group > > spark_aggregation_job-kafka010 > > 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group > > spark_aggregation_job-kafka010 > > > > 2016-09-27 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >> > >> What's the actual stacktrace / exception you're getting related to > >> commit failure? > >> > >> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff > >> <matthias.nieh...@codecentric.de> wrote: > >> > Hi everybody, > >> > > >> > i am using the new Kafka Receiver for Spark Streaming for my Job. When > >> > running with old consumer it runs fine. > >> > > >> > The Job consumes 3 Topics, saves the data to Cassandra, cogroups the > >> > topic, > >> > calls mapWithState and stores the results in cassandra. After that I > >> > manually commit the Kafka offsets using the commitAsync method of the > >> > KafkaDStream. > >> > > >> > With the new consumer I experience the following problem: > >> > > >> > After a certain amount of time (about 4-5 minutes, might be more or > >> > less) > >> > there are exceptions that the offset commit failed. The processing > takes > >> > less than the batch interval. I also adjusted the session.timeout and > >> > request.timeout as well as the max.poll.records setting which did not > >> > help. > >> > > >> > After the first offset commit failed the time it takes from kafka > until > >> > the > >> > microbatch is started increases, the processing time is constantly > below > >> > the > >> > batch interval. Moreover further offset commits also fail and as > result > >> > the > >> > delay time builds up. > >> > > >> > Has anybody made this experience as well? > >> > > >> > Thank you > >> > > >> > Relevant Kafka Parameters: > >> > > >> > "session.timeout.ms" -> s"${1 * 60 * 1000}", > >> > "request.timeout.ms" -> s"${2 * 60 * 1000}", > >> > "auto.offset.reset" -> "largest", > >> > "enable.auto.commit" -> "false", > >> > "max.poll.records" -> "1000" > >> > > >> > > >> > > >> > -- > >> > Matthias Niehoff | IT-Consultant | Agile Software Factory | > Consulting > >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 > (0) > >> > 172.1702676 > >> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | > >> > www.more4fi.de > >> > > >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen > >> > Schütz > >> > > >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält > >> > vertrauliche > >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der > richtige > >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren > >> > Sie > >> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl. > >> > beigefügter > >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. > >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist > >> > nicht > >> > gestattet > > > > > > > > > > -- > > Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting > > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland > > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) > > 172.1702676 > > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | > > www.more4fi.de > > > > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal > > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns > > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen > Schütz > > > > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält > vertrauliche > > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige > > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren > Sie > > bitte sofort den Absender und löschen Sie diese E-Mail und evtl. > beigefügter > > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. > > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist > nicht > > gestattet > -- Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) 172.1702676 www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | www.more4fi.de Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht gestattet