For you or anyone else having issues with consumer rebalance, what are
your settings for

heartbeat.interval.ms
session.timeout.ms
group.max.session.timeout.ms

relative to your batch time?

On Tue, Oct 11, 2016 at 10:19 AM, static-max <flasha...@googlemail.com> wrote:
> Hi,
>
> I run into the same exception
> (org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced ...), but I only use one
> stream.
> I get the exceptions when trying to manually commit the offset to Kafka:
>
> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
> CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream.dstream();
> cco.commitAsync(offsets);
>
> I tried setting "max.poll.records" to 1000 but this did not help.
>
> Any idea what could be wrong?
>
> 2016-10-11 15:36 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>
>> The new underlying kafka consumer prefetches data and is generally heavier
>> weight, so it is cached on executors.  Group id is part of the cache key. I
>> assumed kafka users would use different group ids for consumers they wanted
>> to be distinct, since otherwise would cause problems even with the normal
>> kafka consumer,  but that appears to be a poor assumption.
>>
>> I'll figure out a way to make this more obvious.
>>
>>
>> On Oct 11, 2016 8:19 AM, "Matthias Niehoff"
>> <matthias.nieh...@codecentric.de> wrote:
>>
>> good point, I changed the group id to be unique for the separate streams
>> and now it works. Thanks!
>>
>> Although changing this is ok for us, i am interested in the why :-) With
>> the old connector this was not a problem nor is it afaik with the pure kafka
>> consumer api
>>
>> 2016-10-11 14:30 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>>
>>> Just out of curiosity, have you tried using separate group ids for the
>>> separate streams?
>>>
>>>
>>> On Oct 11, 2016 4:46 AM, "Matthias Niehoff"
>>> <matthias.nieh...@codecentric.de> wrote:
>>>>
>>>> I stripped down the job to just consume the stream and print it, without
>>>> avro deserialization. When I only consume one topic, everything is fine. As
>>>> soon as I add a second stream it stucks after about 5 minutes. So I
>>>> basically bails down to:
>>>>
>>>>
>>>>   val kafkaParams = Map[String, String](
>>>>     "bootstrap.servers" -> kafkaBrokers,
>>>>     "group.id" -> group,
>>>>     "key.deserializer" -> classOf[StringDeserializer].getName,
>>>>     "value.deserializer" -> classOf[BytesDeserializer].getName,
>>>>     "session.timeout.ms" -> s"${1 * 60 * 1000}",
>>>>     "request.timeout.ms" -> s"${2 * 60 * 1000}",
>>>>     "auto.offset.reset" -> "latest",
>>>>     "enable.auto.commit" -> "false"
>>>>   )
>>>>
>>>>   def main(args: Array[String]) {
>>>>
>>>>     def createStreamingContext(): StreamingContext = {
>>>>
>>>>       val sparkConf = new SparkConf(true)
>>>>         .setAppName("Kafka Consumer Test")
>>>>       sparkConf.setMaster("local[*]")
>>>>
>>>>       val ssc = new StreamingContext(sparkConf,
>>>> Seconds(streaming_interval_seconds))
>>>>
>>>>       // AD REQUESTS
>>>>       // ===========
>>>>       val serializedAdRequestStream = createStream(ssc, topic_adrequest)
>>>>       serializedAdRequestStream.map(record =>
>>>> record.value().get()).print(10)
>>>>
>>>>       // VIEWS
>>>>       // ======
>>>>       val serializedViewStream = createStream(ssc, topic_view)
>>>>       serializedViewStream.map(record => record.value().get()).print(10)
>>>>
>>>> //      // CLICKS
>>>> //      // ======
>>>> //      val serializedClickStream = createStream(ssc, topic_click)
>>>> //      serializedClickStream.map(record =>
>>>> record.value().get()).print(10)
>>>>
>>>>       ssc
>>>>     }
>>>>
>>>>     val streamingContext = createStreamingContext
>>>>     streamingContext.start()
>>>>     streamingContext.awaitTermination()
>>>>   }
>>>>
>>>>
>>>> And in the logs you see:
>>>>
>>>>
>>>> 16/10/10 14:02:26 INFO JobScheduler: Finished job streaming job
>>>> 1476100944000 ms.2 from job set of time 1476100944000 ms
>>>> 16/10/10 14:02:26 INFO JobScheduler: Total delay: 2,314 s for time
>>>> 1476100944000 ms (execution: 0,698 s)
>>>> 16/10/10 14:03:26 INFO JobScheduler: Added jobs for time 1476100946000
>>>> ms
>>>> 16/10/10 14:03:26 INFO MapPartitionsRDD: Removing RDD 889 from
>>>> persistence list
>>>> 16/10/10 14:03:26 INFO JobScheduler: Starting job streaming job
>>>> 1476100946000 ms.0 from job set of time 1476100946000 ms
>>>>
>>>>
>>>> 2016-10-11 9:28 GMT+02:00 Matthias Niehoff
>>>> <matthias.nieh...@codecentric.de>:
>>>>>
>>>>> This Job will fail after about 5 minutes:
>>>>>
>>>>>
>>>>> object SparkJobMinimal {
>>>>>
>>>>>   //read Avro schemas
>>>>>   var stream = getClass.getResourceAsStream("/avro/AdRequestLog.avsc")
>>>>>   val avroSchemaAdRequest =
>>>>> scala.io.Source.fromInputStream(stream).getLines.mkString
>>>>>   stream.close
>>>>>   stream =
>>>>> getClass.getResourceAsStream("/avro/AbstractEventLogEntry.avsc")
>>>>>   val avroSchemaEvent =
>>>>> scala.io.Source.fromInputStream(stream).getLines.mkString
>>>>>   stream.close
>>>>>
>>>>>
>>>>>   val kafkaBrokers = "broker-0.kafka.mesos:9092"
>>>>>
>>>>>   val topic_adrequest = "adserving.log.ad_request"
>>>>>   val topic_view = "adserving.log.view"
>>>>>   val topic_click = "adserving.log.click"
>>>>>   val group = UUID.randomUUID().toString
>>>>>   val streaming_interval_seconds = 2
>>>>>
>>>>>   val kafkaParams = Map[String, String](
>>>>>     "bootstrap.servers" -> kafkaBrokers,
>>>>>     "group.id" -> group,
>>>>>     "key.deserializer" -> classOf[StringDeserializer].getName,
>>>>>     "value.deserializer" -> classOf[BytesDeserializer].getName,
>>>>>     "session.timeout.ms" -> s"${1 * 60 * 1000}",
>>>>>     "request.timeout.ms" -> s"${2 * 60 * 1000}",
>>>>>     "auto.offset.reset" -> "latest",
>>>>>     "enable.auto.commit" -> "false"
>>>>>   )
>>>>>
>>>>>   def main(args: Array[String]) {
>>>>>
>>>>>     def createStreamingContext(): StreamingContext = {
>>>>>
>>>>>       val sparkConf = new SparkConf(true)
>>>>>         .setAppName("Kafka Consumer Test")
>>>>>       sparkConf.setMaster("local[*]")
>>>>>
>>>>>
>>>>>       val ssc = new StreamingContext(sparkConf,
>>>>> Seconds(streaming_interval_seconds))
>>>>>
>>>>>       // AD REQUESTS
>>>>>       // ===========
>>>>>       val serializedAdRequestStream = createStream(ssc,
>>>>> topic_adrequest)
>>>>>
>>>>>       val adRequestStream =
>>>>> deserializeStream(serializedAdRequestStream, avroSchemaAdRequest, record 
>>>>> =>
>>>>> AdRequestLog(record)).cache()
>>>>>       adRequestStream.print(10)
>>>>>
>>>>>       // VIEWS
>>>>>       // ======
>>>>>
>>>>>       val serializedViewStream = createStream(ssc, topic_view)
>>>>>       val viewStream = deserializeStream(serializedViewStream,
>>>>> avroSchemaEvent, record => Event(record, EventType.View)).cache()
>>>>>       viewStream.print(10)
>>>>>
>>>>>
>>>>>       // CLICKS
>>>>>       // ======
>>>>>       val serializedClickStream = createStream(ssc, topic_click)
>>>>>       val clickEventStream = deserializeStream(serializedClickStream,
>>>>> avroSchemaEvent, record => Event(record, EventType.Click)).cache()
>>>>>       clickEventStream.print(10)
>>>>>
>>>>>       ssc
>>>>>     }
>>>>>
>>>>>     val streamingContext = createStreamingContext
>>>>>     streamingContext.start()
>>>>>     streamingContext.awaitTermination()
>>>>>   }
>>>>>
>>>>>   def createStream(ssc: StreamingContext, topic: String):
>>>>> InputDStream[ConsumerRecord[String, Bytes]] = {
>>>>>     KafkaUtils.createDirectStream[String, Bytes](ssc,
>>>>> LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String,
>>>>> Bytes](Set(topic), kafkaParams))
>>>>>   }
>>>>>
>>>>>   def deserializeStream[EventType: ClassTag](serializedAdRequestStream:
>>>>> InputDStream[ConsumerRecord[String, Bytes]], avroSchema: String,
>>>>> recordMapper: GenericRecord => EventType): DStream[EventType] = {
>>>>>     serializedAdRequestStream.mapPartitions {
>>>>>       iteratorOfMessages =>
>>>>>         val schema: Schema = new Schema.Parser().parse(avroSchema)
>>>>>         val recordInjection = GenericAvroCodecs.toBinary(schema)
>>>>>         iteratorOfMessages.map(message => {
>>>>>           recordInjection.invert(message.value().get())
>>>>>
>>>>> }).filter(_.isSuccess).map(_.get.asInstanceOf[GenericRecord]).map(recordMapper)
>>>>>     }
>>>>>   }
>>>>> }
>>>>>
>>>>>
>>>>> 2016-10-10 17:42 GMT+02:00 Matthias Niehoff
>>>>> <matthias.nieh...@codecentric.de>:
>>>>>>
>>>>>> Yes, without commiting the data the consumer rebalances.
>>>>>> The job consumes 3 streams process them. When consuming only one
>>>>>> stream it runs fine. But when consuming three streams, even without 
>>>>>> joining
>>>>>> them, just deserialize the payload and trigger an output action it fails.
>>>>>>
>>>>>> I will prepare code sample.
>>>>>>
>>>>>> 2016-10-07 3:35 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>>>>>>
>>>>>>> OK, so at this point, even without involving commitAsync, you're
>>>>>>> seeing consumer rebalances after a particular batch takes longer than
>>>>>>> the session timeout?
>>>>>>>
>>>>>>> Do you have a minimal code example you can share?
>>>>>>>
>>>>>>> On Tue, Oct 4, 2016 at 2:18 AM, Matthias Niehoff
>>>>>>> <matthias.nieh...@codecentric.de> wrote:
>>>>>>> > Hi,
>>>>>>> > sry for the late reply. A public holiday in Germany.
>>>>>>> >
>>>>>>> > Yes, its using a unique group id which no other job or consumer
>>>>>>> > group is
>>>>>>> > using. I have increased the session.timeout to 1 minutes and set
>>>>>>> > the
>>>>>>> > max.poll.rate to 1000. The processing takes ~1 second.
>>>>>>> >
>>>>>>> > 2016-09-29 4:41 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>>>>>> >>
>>>>>>> >> Well, I'd start at the first thing suggested by the error, namely
>>>>>>> >> that
>>>>>>> >> the group has rebalanced.
>>>>>>> >>
>>>>>>> >> Is that stream using a unique group id?
>>>>>>> >>
>>>>>>> >> On Wed, Sep 28, 2016 at 5:17 AM, Matthias Niehoff
>>>>>>> >> <matthias.nieh...@codecentric.de> wrote:
>>>>>>> >> > Hi,
>>>>>>> >> >
>>>>>>> >> > the stacktrace:
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>>>>>> >> > cannot
>>>>>>> >> > be
>>>>>>> >> > completed since the group has already rebalanced and assigned
>>>>>>> >> > the
>>>>>>> >> > partitions
>>>>>>> >> > to another member. This means that the time between subsequent
>>>>>>> >> > calls to
>>>>>>> >> > poll() was longer than the configured session.timeout.ms, which
>>>>>>> >> > typically
>>>>>>> >> > implies that the poll loop is spending too much time message
>>>>>>> >> > processing.
>>>>>>> >> > You
>>>>>>> >> > can address this either by increasing the session timeout or by
>>>>>>> >> > reducing
>>>>>>> >> > the
>>>>>>> >> > maximum size of batches returned in poll() with
>>>>>>> >> > max.poll.records.
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>>>>>>> >> > at
>>>>>>> >> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:169)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:196)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>>>>>> >> > at
>>>>>>> >> > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>>>>>>> >> > at scala.Option.orElse(Option.scala:289)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.MapPartitionedDStream.compute(MapPartitionedDStream.scala:37)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341)
>>>>>>> >> > at
>>>>>>> >> > scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333)
>>>>>>> >> > at scala.Option.orElse(Option.scala:289)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:48)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:117)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>> >> > at
>>>>>>> >> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>>>>>>> >> > at
>>>>>>> >> > scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
>>>>>>> >> > at scala.util.Try$.apply(Try.scala:192)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88)
>>>>>>> >> > at
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
>>>>>>> >> > at
>>>>>>> >> > org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>>>>>> >> >
>>>>>>> >> > But it seems like the commit is not the actual problem. The job
>>>>>>> >> > also
>>>>>>> >> > falls
>>>>>>> >> > behind if I do not commit the offsets. The delay would be ok if
>>>>>>> >> > the
>>>>>>> >> > processing time was bigger than the batch size, but thats not
>>>>>>> >> > the case
>>>>>>> >> > in
>>>>>>> >> > any of the microbatches. Imho for some reason one of the
>>>>>>> >> > microbatches
>>>>>>> >> > falls
>>>>>>> >> > behind more than session.timeout.ms. Then the consumer we
>>>>>>> >> > regroup which
>>>>>>> >> > takes about 1 minute (see timestamps below). Know begins a
>>>>>>> >> > circle of
>>>>>>> >> > slow
>>>>>>> >> > batches each triggering a consumer regroup. Would this be
>>>>>>> >> > possible?
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > 16/09/28 08:15:55 INFO JobScheduler: Total delay: 141.580 s for
>>>>>>> >> > time
>>>>>>> >> > 1475050414000 ms (execution: 0.360 s) --> the job for 08:13:34
>>>>>>> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: Successfully joined
>>>>>>> >> > group
>>>>>>> >> > spark_aggregation_job-kafka010 with generation 6
>>>>>>> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Setting newly
>>>>>>> >> > assigned
>>>>>>> >> > partitions [sapxm.adserving.log.ad_request-0,
>>>>>>> >> > sapxm.adserving.log.ad_request-2,
>>>>>>> >> > sapxm.adserving.log.ad_request-1,
>>>>>>> >> > sapxm.adserving.log.ad_request-4,
>>>>>>> >> > sapxm.adserving.log.ad_request-3,
>>>>>>> >> > sapxm.adserving.log.ad_request-6,
>>>>>>> >> > sapxm.adserving.log.ad_request-5,
>>>>>>> >> > sapxm.adserving.log.ad_request-8,
>>>>>>> >> > sapxm.adserving.log.ad_request-7,
>>>>>>> >> > sapxm.adserving.log.ad_request-9] for group
>>>>>>> >> > spark_aggregation_job-kafka010
>>>>>>> >> > 16/09/28 08:16:48 INFO ConsumerCoordinator: Revoking previously
>>>>>>> >> > assigned
>>>>>>> >> > partitions [sapxm.adserving.log.view-3,
>>>>>>> >> > sapxm.adserving.log.view-4,
>>>>>>> >> > sapxm.adserving.log.view-1, sapxm.adserving.log.view-2,
>>>>>>> >> > sapxm.adserving.log.view-0, sapxm.adserving.log.view-9,
>>>>>>> >> > sapxm.adserving.log.view-7, sapxm.adserving.log.view-8,
>>>>>>> >> > sapxm.adserving.log.view-5, sapxm.adserving.log.view-6] for
>>>>>>> >> > group
>>>>>>> >> > spark_aggregation_job-kafka010
>>>>>>> >> > 16/09/28 08:16:48 INFO AbstractCoordinator: (Re-)joining group
>>>>>>> >> > spark_aggregation_job-kafka010
>>>>>>> >> >
>>>>>>> >> > 2016-09-27 18:55 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>>>>>>> >> >>
>>>>>>> >> >> What's the actual stacktrace / exception you're getting related
>>>>>>> >> >> to
>>>>>>> >> >> commit failure?
>>>>>>> >> >>
>>>>>>> >> >> On Tue, Sep 27, 2016 at 9:37 AM, Matthias Niehoff
>>>>>>> >> >> <matthias.nieh...@codecentric.de> wrote:
>>>>>>> >> >> > Hi everybody,
>>>>>>> >> >> >
>>>>>>> >> >> > i am using the new Kafka Receiver for Spark Streaming for my
>>>>>>> >> >> > Job.
>>>>>>> >> >> > When
>>>>>>> >> >> > running with old consumer it runs fine.
>>>>>>> >> >> >
>>>>>>> >> >> > The Job consumes 3 Topics, saves the data to Cassandra,
>>>>>>> >> >> > cogroups the
>>>>>>> >> >> > topic,
>>>>>>> >> >> > calls mapWithState and stores the results in cassandra. After
>>>>>>> >> >> > that I
>>>>>>> >> >> > manually commit the Kafka offsets using the commitAsync
>>>>>>> >> >> > method of the
>>>>>>> >> >> > KafkaDStream.
>>>>>>> >> >> >
>>>>>>> >> >> > With the new consumer I experience the following problem:
>>>>>>> >> >> >
>>>>>>> >> >> > After a certain amount of time (about 4-5 minutes, might be
>>>>>>> >> >> > more or
>>>>>>> >> >> > less)
>>>>>>> >> >> > there are exceptions that the offset commit failed. The
>>>>>>> >> >> > processing
>>>>>>> >> >> > takes
>>>>>>> >> >> > less than the batch interval. I also adjusted the
>>>>>>> >> >> > session.timeout and
>>>>>>> >> >> > request.timeout as well as the max.poll.records setting which
>>>>>>> >> >> > did not
>>>>>>> >> >> > help.
>>>>>>> >> >> >
>>>>>>> >> >> > After the first offset commit failed the time it takes from
>>>>>>> >> >> > kafka
>>>>>>> >> >> > until
>>>>>>> >> >> > the
>>>>>>> >> >> > microbatch is started increases, the processing time is
>>>>>>> >> >> > constantly
>>>>>>> >> >> > below
>>>>>>> >> >> > the
>>>>>>> >> >> > batch interval. Moreover further offset commits also fail and
>>>>>>> >> >> > as
>>>>>>> >> >> > result
>>>>>>> >> >> > the
>>>>>>> >> >> > delay time builds up.
>>>>>>> >> >> >
>>>>>>> >> >> > Has anybody made this experience as well?
>>>>>>> >> >> >
>>>>>>> >> >> > Thank you
>>>>>>> >> >> >
>>>>>>> >> >> > Relevant Kafka Parameters:
>>>>>>> >> >> >
>>>>>>> >> >> > "session.timeout.ms" -> s"${1 * 60 * 1000}",
>>>>>>> >> >> > "request.timeout.ms" -> s"${2 * 60 * 1000}",
>>>>>>> >> >> > "auto.offset.reset" -> "largest",
>>>>>>> >> >> > "enable.auto.commit" -> "false",
>>>>>>> >> >> > "max.poll.records" -> "1000"
>>>>>>> >> >> >
>>>>>>> >> >> >
>>>>>>> >> >> >
>>>>>>> >> >> > --
>>>>>>> >> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
>>>>>>> >> >> > Consulting
>>>>>>> >> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe |
>>>>>>> >> >> > Deutschland
>>>>>>> >> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 |
>>>>>>> >> >> > mobil: +49
>>>>>>> >> >> > (0)
>>>>>>> >> >> > 172.1702676
>>>>>>> >> >> > www.codecentric.de | blog.codecentric.de |
>>>>>>> >> >> > www.meettheexperts.de |
>>>>>>> >> >> > www.more4fi.de
>>>>>>> >> >> >
>>>>>>> >> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht
>>>>>>> >> >> > Wuppertal
>>>>>>> >> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>>>>> >> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger .
>>>>>>> >> >> > Jürgen
>>>>>>> >> >> > Schütz
>>>>>>> >> >> >
>>>>>>> >> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>>>>> >> >> > vertrauliche
>>>>>>> >> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>>>>>>> >> >> > der
>>>>>>> >> >> > richtige
>>>>>>> >> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>>>>> >> >> > informieren
>>>>>>> >> >> > Sie
>>>>>>> >> >> > bitte sofort den Absender und löschen Sie diese E-Mail und
>>>>>>> >> >> > evtl.
>>>>>>> >> >> > beigefügter
>>>>>>> >> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
>>>>>>> >> >> > evtl.
>>>>>>> >> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>>>>>> >> >> > E-Mail ist
>>>>>>> >> >> > nicht
>>>>>>> >> >> > gestattet
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> >
>>>>>>> >> > --
>>>>>>> >> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
>>>>>>> >> > Consulting
>>>>>>> >> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>>>>>> >> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil:
>>>>>>> >> > +49 (0)
>>>>>>> >> > 172.1702676
>>>>>>> >> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de
>>>>>>> >> > |
>>>>>>> >> > www.more4fi.de
>>>>>>> >> >
>>>>>>> >> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht
>>>>>>> >> > Wuppertal
>>>>>>> >> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>>>>> >> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger .
>>>>>>> >> > Jürgen
>>>>>>> >> > Schütz
>>>>>>> >> >
>>>>>>> >> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>>>>> >> > vertrauliche
>>>>>>> >> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der
>>>>>>> >> > richtige
>>>>>>> >> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>>>>> >> > informieren
>>>>>>> >> > Sie
>>>>>>> >> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
>>>>>>> >> > beigefügter
>>>>>>> >> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
>>>>>>> >> > evtl.
>>>>>>> >> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail
>>>>>>> >> > ist
>>>>>>> >> > nicht
>>>>>>> >> > gestattet
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > --
>>>>>>> > Matthias Niehoff | IT-Consultant | Agile Software Factory  |
>>>>>>> > Consulting
>>>>>>> > codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>>>>>> > tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49
>>>>>>> > (0)
>>>>>>> > 172.1702676
>>>>>>> > www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>>>>>> > www.more4fi.de
>>>>>>> >
>>>>>>> > Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>>>>>> > Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>>>>> > Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger .
>>>>>>> > Jürgen Schütz
>>>>>>> >
>>>>>>> > Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>>>>> > vertrauliche
>>>>>>> > und/oder rechtlich geschützte Informationen. Wenn Sie nicht der
>>>>>>> > richtige
>>>>>>> > Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>>>>> > informieren Sie
>>>>>>> > bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
>>>>>>> > beigefügter
>>>>>>> > Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
>>>>>>> > beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail
>>>>>>> > ist nicht
>>>>>>> > gestattet
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  |
>>>>>> Consulting
>>>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>>>>>> 172.1702676
>>>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>>>>> www.more4fi.de
>>>>>>
>>>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>>>>> Schütz
>>>>>>
>>>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht 
>>>>>> der
>>>>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail 
>>>>>> und
>>>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>>>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>>>>> E-Mail ist nicht gestattet
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>>>>> 172.1702676
>>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>>>> www.more4fi.de
>>>>>
>>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>>>> Schütz
>>>>>
>>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht 
>>>>> der
>>>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>>>> E-Mail ist nicht gestattet
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>>>> 172.1702676
>>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>>> www.more4fi.de
>>>>
>>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>>> Schütz
>>>>
>>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht 
>>>> der
>>>> richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>>> E-Mail ist nicht gestattet
>>
>>
>>
>>
>> --
>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>> 172.1702676
>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>> www.more4fi.de
>>
>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>> Schütz
>>
>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
>> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
>> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
>> bitte sofort den Absender und löschen Sie diese E-Mail und evtl. beigefügter
>> Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen evtl.
>> beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist nicht
>> gestattet
>>
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to