Yes, without commiting the data the consumer rebalances.
The job consumes 3 streams process them. When consuming only one stream it
runs fine. But when consuming three streams, even without joining them,
just deserialize the payload and trigger an output action it fails.

I will prepare code sample.

2016-10-07 3:35 GMT+02:00 Cody Koeninger <c...@koeninger.org>:

> OK, so at this point, even without involving commitAsync, you're
> seeing consumer rebalances after a particular batch takes longer than
> the session timeout?
>
> Do you have a minimal code example you can share?
>
> On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff
> <matthias.nieh...@codecentric.de> wrote:
> > Hi,
> > sry for the late reply. A public holiday in Germany.
> >
> > Yes, its using a unique group id which no other job or consumer group is
> > using. I have increased the session.timeout to 1 minutes and set the
> > max.poll.rate to 1000. The processing takes ~1 second.
> >
> > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >>
> >> Well, I'd start at the first thing suggested by the error, namely that
> >> the group has rebalanced.
> >>
> >> Is that stream using a unique group id?
> >>
> >> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff
> >> <matthias.nieh...@codecentric.de> wrote:
> >> > Hi,
> >> >
> >> > the stacktrace:
> >> >
> >> > org.apache.kafka.clients.consumer.CommitFailedException: Commit
> cannot
> >> > be
> >> > completed since the group has already rebalanced and assigned the
> >> > partitions
> >> > to another member. This means that the time between subsequent calls
> to
> >> > poll() was longer than the configured session.timeout.ms, which
> >> > typically
> >> > implies that the poll loop is spending too much time message
> processing.
> >> > You
> >> > can address this either by increasing the session timeout or by
> reducing
> >> > the
> >> > maximum size of batches returned in poll() with max.poll.records.
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$
> OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$
> OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$
> CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> RequestFuture$1.onSuccess(RequestFuture.java:167)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> RequestFuture.fireSuccess(RequestFuture.java:133)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(
> RequestFuture.java:107)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$
> RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
> >> > at org.apache.kafka.clients.NetworkClient.poll(
> NetworkClient.java:278)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.
> clientPoll(ConsumerNetworkClient.java:360)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.
> pollOnce(KafkaConsumer.java:998)
> >> > at
> >> >
> >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> KafkaConsumer.java:937)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.
> latestOffsets(DirectKafkaInputDStream.scala:169)
> >> > at
> >> >
> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(
> DirectKafkaInputDStream.scala:196)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> >> > at scala.Option.orElse(Option.scala:289)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(
> DStream.scala:330)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.MapPartitionedDStream.
> compute(MapPartitionedDStream.scala:37)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
> >> > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream.
> createRDDWithLocalProperties(DStream.scala:415)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:335)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$
> getOrCompute$1.apply(DStream.scala:333)
> >> > at scala.Option.orElse(Option.scala:289)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(
> DStream.scala:330)
> >> > at
> >> >
> >> > org.apache.spark.streaming.dstream.ForEachDStream.
> generateJob(ForEachDStream.scala:48)
> >> > at
> >> >
> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(
> DStreamGraph.scala:117)
> >> > at
> >> >
> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(
> DStreamGraph.scala:116)
> >> > at
> >> >
> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
> >> > at
> >> >
> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(
> TraversableLike.scala:241)
> >> > at
> >> >
> >> > scala.collection.mutable.ResizableArray$class.foreach(
> ResizableArray.scala:59)
> >> > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> >> > at
> >> > scala.collection.TraversableLike$class.flatMap(
> TraversableLike.scala:241)
> >> > at scala.collection.AbstractTraversable.flatMap(
> Traversable.scala:104)
> >> > at
> >> >
> >> > org.apache.spark.streaming.DStreamGraph.generateJobs(
> DStreamGraph.scala:116)
> >> > at
> >> >
> >> > org.apache.spark.streaming.scheduler.JobGenerator$$
> anonfun$3.apply(JobGenerator.scala:248)
> >> > at
> >> >
> >> > org.apache.spark.streaming.scheduler.JobGenerator$$
> anonfun$3.apply(JobGenerator.scala:246)
> >> > at scala.util.Try$.apply(Try.scala:192)
> >> > at
> >> >
> >> > org.apache.spark.streaming.scheduler.JobGenerator.
> generateJobs(JobGenerator.scala:246)
> >> > at
> >> >
> >> > org.apache.spark.streaming.scheduler.JobGenerator.org$
> apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.
> scala:182)
> >> > at
> >> >
> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:88)
> >> > at
> >> >
> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$
> 1.onReceive(JobGenerator.scala:87)
> >> > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> >> >
> >> > But it seems like the commit is not the actual problem. The job also
> >> > falls
> >> > behind if I do not commit the offsets. The delay would be ok if the
> >> > processing time was bigger than the batch size, but thats not the case
> >> > in
> >> > any of the microbatches. Imho for some reason one of the microbatches
> >> > falls
> >> > behind more than session.timeout.ms. Then the consumer we regroup
> which
> >> > takes about 1 minute (see timestamps below). Know begins a circle of
> >> > slow
> >> > batches each triggering a consumer regroup. Would this be possible?
> >> >
> >> >
> >> > 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s for time
> >> > 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34
> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully joined group
> >> > spark_aggregation_job-kafka010 with generation 6
> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly assigned
> >> > partitions [sapxm.adserving.log.ad_request-0,
> >> > sapxm.adserving.log.ad_request-2, sapxm.adserving.log.ad_request-1,
> >> > sapxm.adserving.log.ad_request-4, sapxm.adserving.log.ad_request-3,
> >> > sapxm.adserving.log.ad_request-6, sapxm.adserving.log.ad_request-5,
> >> > sapxm.adserving.log.ad_request-8, sapxm.adserving.log.ad_request-7,
> >> > sapxm.adserving.log.ad_request-9] for group
> >> > spark_aggregation_job-kafka010
> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously
> assigned
> >> > partitions [sapxm.adserving.log.view-3, sapxm.adserving.log.view-4,
> >> > sapxm.adserving.log.view-1, sapxm.adserving.log.view-2,
> >> > sapxm.adserving.log.view-0, sapxm.adserving.log.view-9,
> >> > sapxm.adserving.log.view-7, sapxm.adserving.log.view-8,
> >> > sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for group
> >> > spark_aggregation_job-kafka010
> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group
> >> > spark_aggregation_job-kafka010
> >> >
> >> > 2016-09-27 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
> >> >>
> >> >> What's the actual stacktrace / exception you're getting related to
> >> >> commit failure?
> >> >>
> >> >> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff
> >> >> <matthias.nieh...@codecentric.de> wrote:
> >> >> > Hi everybody,
> >> >> >
> >> >> > i am using the new Kafka Receiver for Spark Streaming for my Job.
> >> >> > When
> >> >> > running with old consumer it runs fine.
> >> >> >
> >> >> > The Job consumes 3 Topics, saves the data to Cassandra, cogroups
> the
> >> >> > topic,
> >> >> > calls mapWithState and stores the results in cassandra. After that
> I
> >> >> > manually commit the Kafka offsets using the commitAsync method of
> the
> >> >> > KafkaDStream.
> >> >> >
> >> >> > With the new consumer I experience the following problem:
> >> >> >
> >> >> > After a certain amount of time (about 4-5 minutes, might be more or
> >> >> > less)
> >> >> > there are exceptions that the offset commit failed. The processing
> >> >> > takes
> >> >> > less than the batch interval. I also adjusted the session.timeout
> and
> >> >> > request.timeout as well as the max.poll.records setting which did
> not
> >> >> > help.
> >> >> >
> >> >> > After the first offset commit failed the time it takes from kafka
> >> >> > until
> >> >> > the
> >> >> > microbatch is started increases, the processing time is constantly
> >> >> > below
> >> >> > the
> >> >> > batch interval. Moreover further offset commits also fail and as
> >> >> > result
> >> >> > the
> >> >> > delay time builds up.
> >> >> >
> >> >> > Has anybody made this experience as well?
> >> >> >
> >> >> > Thank you
> >> >> >
> >> >> > Relevant Kafka Parameters:
> >> >> >
> >> >> > "session.timeout.ms" -> s"${1 * 60 * 1000}",
> >> >> > "request.timeout.ms" -> s"${2 * 60 * 1000}",
> >> >> > "auto.offset.reset" -> "largest",
> >> >> > "enable.auto.commit" -> "false",
> >> >> > "max.poll.records" -> "1000"
> >> >> >
> >> >> >
> >> >> >
> >> >> > --
> >> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
> >> >> > Consulting
> >> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> >> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49
> >> >> > (0)
> >> >> > 172.1702676
> >> >> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> >> >> > www.more4fi.de
> >> >> >
> >> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> >> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> >> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger .
> Jürgen
> >> >> > Schütz
> >> >> >
> >> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> >> >> > vertrauliche
> >> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der
> >> >> > richtige
> >> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
> >> >> > informieren
> >> >> > Sie
> >> >> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> >> >> > beigefügter
> >> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
> >> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail
> ist
> >> >> > nicht
> >> >> > gestattet
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
> Consulting
> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49
> (0)
> >> > 172.1702676
> >> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> >> > www.more4fi.de
> >> >
> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
> >> > Schütz
> >> >
> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> >> > vertrauliche
> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der
> richtige
> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren
> >> > Sie
> >> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> >> > beigefügter
> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> >> > nicht
> >> > gestattet
> >
> >
> >
> >
> > --
> > Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> > 172.1702676
> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> > www.more4fi.de
> >
> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
> Schütz
> >
> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
> vertrauliche
> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren
> Sie
> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter
> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht
> > gestattet
>



-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet

Reply via email to