For you or anyone else having issues with consumer rebalance, what are your settings for
heartbeat.interval.ms session.timeout.ms group.max.session.timeout.ms relative to your batch time? On Tue, Oct 11, 2016 at 10:19 AM, static-max <flasha...@googlemail.com> wrote: > Hi, > > I run into the same exception > (org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced ...), but I only use one > stream. > I get the exceptions when trying to manually commit the offset to Kafka: > > OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); > CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream.dstream(); > cco.commitAsync(offsets); > > I tried setting "max.poll.records" to 1000 but this did not help. > > Any idea what could be wrong? > > 2016-10-11 15:36 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >> >> The new underlying kafka consumer prefetches data and is generally heavier >> weight, so it is cached on executors. Group id is part of the cache key. I >> assumed kafka users would use different group ids for consumers they wanted >> to be distinct, since otherwise would cause problems even with the normal >> kafka consumer, but that appears to be a poor assumption. >> >> I'll figure out a way to make this more obvious. >> >> >> On Oct 11, 2016 8:19 AM, "Matthias Niehoff" >> <matthias.nieh...@codecentric.de> wrote: >> >> good point, I changed the group id to be unique for the separate streams >> and now it works. Thanks! >> >> Although changing this is ok for us, i am interested in the why :-) With >> the old connector this was not a problem nor is it afaik with the pure kafka >> consumer api >> >> 2016-10-11 14:30 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >>> >>> Just out of curiosity, have you tried using separate group ids for the >>> separate streams? >>> >>> >>> On Oct 11, 2016 4:46 AM, "Matthias Niehoff" >>> <matthias.nieh...@codecentric.de> wrote: >>>> >>>> I stripped down the job to just consume the stream and print it, without >>>> avro deserialization. When I only consume one topic, everything is fine. As >>>> soon as I add a second stream it stucks after about 5 minutes. So I >>>> basically bails down to: >>>> >>>> >>>> val kafkaParams = Map[String, String]( >>>> "bootstrap.servers" -> kafkaBrokers, >>>> "group.id" -> group, >>>> "key.deserializer" -> classOf[StringDeserializer].getName, >>>> "value.deserializer" -> classOf[BytesDeserializer].getName, >>>> "session.timeout.ms" -> s"${1 * 60 * 1000}", >>>> "request.timeout.ms" -> s"${2 * 60 * 1000}", >>>> "auto.offset.reset" -> "latest", >>>> "enable.auto.commit" -> "false" >>>> ) >>>> >>>> def main(args: Array[String]) { >>>> >>>> def createStreamingContext(): StreamingContext = { >>>> >>>> val sparkConf = new SparkConf(true) >>>> .setAppName("Kafka Consumer Test") >>>> sparkConf.setMaster("local[*]") >>>> >>>> val ssc = new StreamingContext(sparkConf, >>>> Seconds(streaming_interval_seconds)) >>>> >>>> // AD REQUESTS >>>> // =========== >>>> val serializedAdRequestStream = createStream(ssc, topic_adrequest) >>>> serializedAdRequestStream.map(record => >>>> record.value().get()).print(10) >>>> >>>> // VIEWS >>>> // ====== >>>> val serializedViewStream = createStream(ssc, topic_view) >>>> serializedViewStream.map(record => record.value().get()).print(10) >>>> >>>> // // CLICKS >>>> // // ====== >>>> // val serializedClickStream = createStream(ssc, topic_click) >>>> // serializedClickStream.map(record => >>>> record.value().get()).print(10) >>>> >>>> ssc >>>> } >>>> >>>> val streamingContext = createStreamingContext >>>> streamingContext.start() >>>> streamingContext.awaitTermination() >>>> } >>>> >>>> >>>> And in the logs you see: >>>> >>>> >>>> 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job >>>> 1476100944000 ms.2 from job set of time 1476100944000 ms >>>> 16/10/10 14:02:26 INFO JobScheduler: Total delay: 2,314 s for time >>>> 1476100944000 ms (execution: 0,698 s) >>>> 16/10/10 14:03:26 INFO JobScheduler: Added jobs for time 1476100946000 >>>> ms >>>> 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from >>>> persistence list >>>> 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job >>>> 1476100946000 ms.0 from job set of time 1476100946000 ms >>>> >>>> >>>> 2016-10-11 9:28 GMT+02:00 Matthias Niehoff >>>> <matthias.nieh...@codecentric.de>: >>>>> >>>>> This Job will fail after about 5 minutes: >>>>> >>>>> >>>>> object SparkJobMinimal { >>>>> >>>>> //read Avro schemas >>>>> var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc") >>>>> val avroSchemaAdRequest = >>>>> scala.io.Source.fromInputStream(stream).getLines.mkString >>>>> stream.close >>>>> stream = >>>>> getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc") >>>>> val avroSchemaEvent = >>>>> scala.io.Source.fromInputStream(stream).getLines.mkString >>>>> stream.close >>>>> >>>>> >>>>> val kafkaBrokers = "broker-0.kafka.mesos:9092" >>>>> >>>>> val topic_adrequest = "adserving.log.ad_request" >>>>> val topic_view = "adserving.log.view" >>>>> val topic_click = "adserving.log.click" >>>>> val group = UUID.randomUUID().toString >>>>> val streaming_interval_seconds = 2 >>>>> >>>>> val kafkaParams = Map[String, String]( >>>>> "bootstrap.servers" -> kafkaBrokers, >>>>> "group.id" -> group, >>>>> "key.deserializer" -> classOf[StringDeserializer].getName, >>>>> "value.deserializer" -> classOf[BytesDeserializer].getName, >>>>> "session.timeout.ms" -> s"${1 * 60 * 1000}", >>>>> "request.timeout.ms" -> s"${2 * 60 * 1000}", >>>>> "auto.offset.reset" -> "latest", >>>>> "enable.auto.commit" -> "false" >>>>> ) >>>>> >>>>> def main(args: Array[String]) { >>>>> >>>>> def createStreamingContext(): StreamingContext = { >>>>> >>>>> val sparkConf = new SparkConf(true) >>>>> .setAppName("Kafka Consumer Test") >>>>> sparkConf.setMaster("local[*]") >>>>> >>>>> >>>>> val ssc = new StreamingContext(sparkConf, >>>>> Seconds(streaming_interval_seconds)) >>>>> >>>>> // AD REQUESTS >>>>> // =========== >>>>> val serializedAdRequestStream = createStream(ssc, >>>>> topic_adrequest) >>>>> >>>>> val adRequestStream = >>>>> deserializeStream(serializedAdRequestStream, avroSchemaAdRequest, record >>>>> => >>>>> AdRequestLog(record)).cache() >>>>> adRequestStream.print(10) >>>>> >>>>> // VIEWS >>>>> // ====== >>>>> >>>>> val serializedViewStream = createStream(ssc, topic_view) >>>>> val viewStream = deserializeStream(serializedViewStream, >>>>> avroSchemaEvent, record => Event(record, EventType.View)).cache() >>>>> viewStream.print(10) >>>>> >>>>> >>>>> // CLICKS >>>>> // ====== >>>>> val serializedClickStream = createStream(ssc, topic_click) >>>>> val clickEventStream = deserializeStream(serializedClickStream, >>>>> avroSchemaEvent, record => Event(record, EventType.Click)).cache() >>>>> clickEventStream.print(10) >>>>> >>>>> ssc >>>>> } >>>>> >>>>> val streamingContext = createStreamingContext >>>>> streamingContext.start() >>>>> streamingContext.awaitTermination() >>>>> } >>>>> >>>>> def createStream(ssc: StreamingContext, topic: String): >>>>> InputDStream[ConsumerRecord[String, Bytes]] = { >>>>> KafkaUtils.createDirectStream[String, Bytes](ssc, >>>>> LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, >>>>> Bytes](Set(topic), kafkaParams)) >>>>> } >>>>> >>>>> def deserializeStream[EventType: ClassTag](serializedAdRequestStream: >>>>> InputDStream[ConsumerRecord[String, Bytes]], avroSchema: String, >>>>> recordMapper: GenericRecord => EventType): DStream[EventType] = { >>>>> serializedAdRequestStream.mapPartitions { >>>>> iteratorOfMessages => >>>>> val schema: Schema = new Schema.Parser().parse(avroSchema) >>>>> val recordInjection = GenericAvroCodecs.toBinary(schema) >>>>> iteratorOfMessages.map(message => { >>>>> recordInjection.invert(message.value().get()) >>>>> >>>>> }).filter(_.isSuccess).map(_.get.asInstanceOf[GenericRecord]).map(recordMapper) >>>>> } >>>>> } >>>>> } >>>>> >>>>> >>>>> 2016-10-10 17:42 GMT+02:00 Matthias Niehoff >>>>> <matthias.nieh...@codecentric.de>: >>>>>> >>>>>> Yes, without commiting the data the consumer rebalances. >>>>>> The job consumes 3 streams process them. When consuming only one >>>>>> stream it runs fine. But when consuming three streams, even without >>>>>> joining >>>>>> them, just deserialize the payload and trigger an output action it fails. >>>>>> >>>>>> I will prepare code sample. >>>>>> >>>>>> 2016-10-07 3:35 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >>>>>>> >>>>>>> OK, so at this point, even without involving commitAsync, you're >>>>>>> seeing consumer rebalances after a particular batch takes longer than >>>>>>> the session timeout? >>>>>>> >>>>>>> Do you have a minimal code example you can share? >>>>>>> >>>>>>> On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff >>>>>>> <matthias.nieh...@codecentric.de> wrote: >>>>>>> > Hi, >>>>>>> > sry for the late reply. A public holiday in Germany. >>>>>>> > >>>>>>> > Yes, its using a unique group id which no other job or consumer >>>>>>> > group is >>>>>>> > using. I have increased the session.timeout to 1 minutes and set >>>>>>> > the >>>>>>> > max.poll.rate to 1000. The processing takes ~1 second. >>>>>>> > >>>>>>> > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >>>>>>> >> >>>>>>> >> Well, I'd start at the first thing suggested by the error, namely >>>>>>> >> that >>>>>>> >> the group has rebalanced. >>>>>>> >> >>>>>>> >> Is that stream using a unique group id? >>>>>>> >> >>>>>>> >> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff >>>>>>> >> <matthias.nieh...@codecentric.de> wrote: >>>>>>> >> > Hi, >>>>>>> >> > >>>>>>> >> > the stacktrace: >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.CommitFailedException: Commit >>>>>>> >> > cannot >>>>>>> >> > be >>>>>>> >> > completed since the group has already rebalanced and assigned >>>>>>> >> > the >>>>>>> >> > partitions >>>>>>> >> > to another member. This means that the time between subsequent >>>>>>> >> > calls to >>>>>>> >> > poll() was longer than the configured session.timeout.ms, which >>>>>>> >> > typically >>>>>>> >> > implies that the poll loop is spending too much time message >>>>>>> >> > processing. >>>>>>> >> > You >>>>>>> >> > can address this either by increasing the session timeout or by >>>>>>> >> > reducing >>>>>>> >> > the >>>>>>> >> > maximum size of batches returned in poll() with >>>>>>> >> > max.poll.records. >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426) >>>>>>> >> > at >>>>>>> >> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:169) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) >>>>>>> >> > at >>>>>>> >> > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) >>>>>>> >> > at scala.Option.orElse(Option.scala:289) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) >>>>>>> >> > at >>>>>>> >> > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) >>>>>>> >> > at scala.Option.orElse(Option.scala:289) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>>>>> >> > at >>>>>>> >> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) >>>>>>> >> > at >>>>>>> >> > scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246) >>>>>>> >> > at scala.util.Try$.apply(Try.scala:192) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) >>>>>>> >> > at >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) >>>>>>> >> > at >>>>>>> >> > org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >>>>>>> >> > >>>>>>> >> > But it seems like the commit is not the actual problem. The job >>>>>>> >> > also >>>>>>> >> > falls >>>>>>> >> > behind if I do not commit the offsets. The delay would be ok if >>>>>>> >> > the >>>>>>> >> > processing time was bigger than the batch size, but thats not >>>>>>> >> > the case >>>>>>> >> > in >>>>>>> >> > any of the microbatches. Imho for some reason one of the >>>>>>> >> > microbatches >>>>>>> >> > falls >>>>>>> >> > behind more than session.timeout.ms. Then the consumer we >>>>>>> >> > regroup which >>>>>>> >> > takes about 1 minute (see timestamps below). Know begins a >>>>>>> >> > circle of >>>>>>> >> > slow >>>>>>> >> > batches each triggering a consumer regroup. Would this be >>>>>>> >> > possible? >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s for >>>>>>> >> > time >>>>>>> >> > 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34 >>>>>>> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully joined >>>>>>> >> > group >>>>>>> >> > spark_aggregation_job-kafka010 with generation 6 >>>>>>> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly >>>>>>> >> > assigned >>>>>>> >> > partitions [sapxm.adserving.log.ad_request-0, >>>>>>> >> > sapxm.adserving.log.ad_request-2, >>>>>>> >> > sapxm.adserving.log.ad_request-1, >>>>>>> >> > sapxm.adserving.log.ad_request-4, >>>>>>> >> > sapxm.adserving.log.ad_request-3, >>>>>>> >> > sapxm.adserving.log.ad_request-6, >>>>>>> >> > sapxm.adserving.log.ad_request-5, >>>>>>> >> > sapxm.adserving.log.ad_request-8, >>>>>>> >> > sapxm.adserving.log.ad_request-7, >>>>>>> >> > sapxm.adserving.log.ad_request-9] for group >>>>>>> >> > spark_aggregation_job-kafka010 >>>>>>> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously >>>>>>> >> > assigned >>>>>>> >> > partitions [sapxm.adserving.log.view-3, >>>>>>> >> > sapxm.adserving.log.view-4, >>>>>>> >> > sapxm.adserving.log.view-1, sapxm.adserving.log.view-2, >>>>>>> >> > sapxm.adserving.log.view-0, sapxm.adserving.log.view-9, >>>>>>> >> > sapxm.adserving.log.view-7, sapxm.adserving.log.view-8, >>>>>>> >> > sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for >>>>>>> >> > group >>>>>>> >> > spark_aggregation_job-kafka010 >>>>>>> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group >>>>>>> >> > spark_aggregation_job-kafka010 >>>>>>> >> > >>>>>>> >> > 2016-09-27 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>: >>>>>>> >> >> >>>>>>> >> >> What's the actual stacktrace / exception you're getting related >>>>>>> >> >> to >>>>>>> >> >> commit failure? >>>>>>> >> >> >>>>>>> >> >> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff >>>>>>> >> >> <matthias.nieh...@codecentric.de> wrote: >>>>>>> >> >> > Hi everybody, >>>>>>> >> >> > >>>>>>> >> >> > i am using the new Kafka Receiver for Spark Streaming for my >>>>>>> >> >> > Job. >>>>>>> >> >> > When >>>>>>> >> >> > running with old consumer it runs fine. >>>>>>> >> >> > >>>>>>> >> >> > The Job consumes 3 Topics, saves the data to Cassandra, >>>>>>> >> >> > cogroups the >>>>>>> >> >> > topic, >>>>>>> >> >> > calls mapWithState and stores the results in cassandra. After >>>>>>> >> >> > that I >>>>>>> >> >> > manually commit the Kafka offsets using the commitAsync >>>>>>> >> >> > method of the >>>>>>> >> >> > KafkaDStream. >>>>>>> >> >> > >>>>>>> >> >> > With the new consumer I experience the following problem: >>>>>>> >> >> > >>>>>>> >> >> > After a certain amount of time (about 4-5 minutes, might be >>>>>>> >> >> > more or >>>>>>> >> >> > less) >>>>>>> >> >> > there are exceptions that the offset commit failed. The >>>>>>> >> >> > processing >>>>>>> >> >> > takes >>>>>>> >> >> > less than the batch interval. I also adjusted the >>>>>>> >> >> > session.timeout and >>>>>>> >> >> > request.timeout as well as the max.poll.records setting which >>>>>>> >> >> > did not >>>>>>> >> >> > help. >>>>>>> >> >> > >>>>>>> >> >> > After the first offset commit failed the time it takes from >>>>>>> >> >> > kafka >>>>>>> >> >> > until >>>>>>> >> >> > the >>>>>>> >> >> > microbatch is started increases, the processing time is >>>>>>> >> >> > constantly >>>>>>> >> >> > below >>>>>>> >> >> > the >>>>>>> >> >> > batch interval. Moreover further offset commits also fail and >>>>>>> >> >> > as >>>>>>> >> >> > result >>>>>>> >> >> > the >>>>>>> >> >> > delay time builds up. >>>>>>> >> >> > >>>>>>> >> >> > Has anybody made this experience as well? >>>>>>> >> >> > >>>>>>> >> >> > Thank you >>>>>>> >> >> > >>>>>>> >> >> > Relevant Kafka Parameters: >>>>>>> >> >> > >>>>>>> >> >> > "session.timeout.ms" -> s"${1 * 60 * 1000}", >>>>>>> >> >> > "request.timeout.ms" -> s"${2 * 60 * 1000}", >>>>>>> >> >> > "auto.offset.reset" -> "largest", >>>>>>> >> >> > "enable.auto.commit" -> "false", >>>>>>> >> >> > "max.poll.records" -> "1000" >>>>>>> >> >> > >>>>>>> >> >> > >>>>>>> >> >> > >>>>>>> >> >> > -- >>>>>>> >> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory | >>>>>>> >> >> > Consulting >>>>>>> >> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | >>>>>>> >> >> > Deutschland >>>>>>> >> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | >>>>>>> >> >> > mobil: +49 >>>>>>> >> >> > (0) >>>>>>> >> >> > 172.1702676 >>>>>>> >> >> > www.codecentric.de | blog.codecentric.de | >>>>>>> >> >> > www.meettheexperts.de | >>>>>>> >> >> > www.more4fi.de >>>>>>> >> >> > >>>>>>> >> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht >>>>>>> >> >> > Wuppertal >>>>>>> >> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >>>>>>> >> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . >>>>>>> >> >> > Jürgen >>>>>>> >> >> > Schütz >>>>>>> >> >> > >>>>>>> >> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >>>>>>> >> >> > vertrauliche >>>>>>> >> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht >>>>>>> >> >> > der >>>>>>> >> >> > richtige >>>>>>> >> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, >>>>>>> >> >> > informieren >>>>>>> >> >> > Sie >>>>>>> >> >> > bitte sofort den Absender und löschen Sie diese E-Mail und >>>>>>> >> >> > evtl. >>>>>>> >> >> > beigefügter >>>>>>> >> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen >>>>>>> >> >> > evtl. >>>>>>> >> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser >>>>>>> >> >> > E-Mail ist >>>>>>> >> >> > nicht >>>>>>> >> >> > gestattet >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > >>>>>>> >> > -- >>>>>>> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory | >>>>>>> >> > Consulting >>>>>>> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >>>>>>> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: >>>>>>> >> > +49 (0) >>>>>>> >> > 172.1702676 >>>>>>> >> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de >>>>>>> >> > | >>>>>>> >> > www.more4fi.de >>>>>>> >> > >>>>>>> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht >>>>>>> >> > Wuppertal >>>>>>> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >>>>>>> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . >>>>>>> >> > Jürgen >>>>>>> >> > Schütz >>>>>>> >> > >>>>>>> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >>>>>>> >> > vertrauliche >>>>>>> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der >>>>>>> >> > richtige >>>>>>> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, >>>>>>> >> > informieren >>>>>>> >> > Sie >>>>>>> >> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl. >>>>>>> >> > beigefügter >>>>>>> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen >>>>>>> >> > evtl. >>>>>>> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail >>>>>>> >> > ist >>>>>>> >> > nicht >>>>>>> >> > gestattet >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > -- >>>>>>> > Matthias Niehoff | IT-Consultant | Agile Software Factory | >>>>>>> > Consulting >>>>>>> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >>>>>>> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 >>>>>>> > (0) >>>>>>> > 172.1702676 >>>>>>> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >>>>>>> > www.more4fi.de >>>>>>> > >>>>>>> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >>>>>>> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >>>>>>> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . >>>>>>> > Jürgen Schütz >>>>>>> > >>>>>>> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >>>>>>> > vertrauliche >>>>>>> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der >>>>>>> > richtige >>>>>>> > Adressat sind oder diese E-Mail irrtümlich erhalten haben, >>>>>>> > informieren Sie >>>>>>> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl. >>>>>>> > beigefügter >>>>>>> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. >>>>>>> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail >>>>>>> > ist nicht >>>>>>> > gestattet >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory | >>>>>> Consulting >>>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >>>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) >>>>>> 172.1702676 >>>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >>>>>> www.more4fi.de >>>>>> >>>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >>>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >>>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >>>>>> Schütz >>>>>> >>>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >>>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht >>>>>> der >>>>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, >>>>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail >>>>>> und >>>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder >>>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser >>>>>> E-Mail ist nicht gestattet >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) >>>>> 172.1702676 >>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >>>>> www.more4fi.de >>>>> >>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >>>>> Schütz >>>>> >>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht >>>>> der >>>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, >>>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und >>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder >>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser >>>>> E-Mail ist nicht gestattet >>>> >>>> >>>> >>>> >>>> -- >>>> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) >>>> 172.1702676 >>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >>>> www.more4fi.de >>>> >>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >>>> Schütz >>>> >>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält >>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht >>>> der >>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben, >>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und >>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder >>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser >>>> E-Mail ist nicht gestattet >> >> >> >> >> -- >> Matthias Niehoff | IT-Consultant | Agile Software Factory | Consulting >> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland >> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0) >> 172.1702676 >> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de | >> www.more4fi.de >> >> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal >> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns >> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen >> Schütz >> >> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche >> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige >> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie >> bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter >> Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl. >> beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht >> gestattet >> >> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org