It is already documented that you must use a different group id, which as
far as I can tell you are still not doing.

On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
wrote:

> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>
> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <i...@vadio.com> wrote:
>
>> Ok, I have split he KafkaRDD logic to each use their own group and bumped
>> the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms
>> ends up with a timeout and exception so I am still perplexed on that one.
>> The new error I am getting now is a `ConcurrentModificationException`
>> when Spark is trying to remove the CachedKafkaConsumer.
>>
>> java.util.ConcurrentModificationException: KafkaConsumer is not safe for
>> multi-threaded access
>> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(
>> KafkaConsumer.java:1431)
>> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaC
>> onsumer.java:1361)
>> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ano
>> n$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>>
>> Here is the basic logic:
>>
>> *Using KafkaRDD* - This takes a list of channels and processes them in
>> parallel using the KafkaRDD directly. They each use a distinct consumer
>> group (s"$prefix-$topic"), and each has it's own topic and each topic
>> has 4 partitions. We routinely get timeout errors when polling for data
>> when the poll.ms is less then 2 seconds. This occurs whether we process
>> in parallel.
>>
>> *Example usage with KafkaRDD:*
>> val channels = Seq("channel1", "channel2")
>>
>> channels.toParArray.foreach { channel =>
>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>
>>   // Get offsets for the given topic and the consumer group "$prefix-$
>> topic"
>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>>
>>   val ds = KafkaUtils.createRDD[K, V](context,
>>         kafkaParams asJava,
>>         offsetRanges,
>>         PreferConsistent).toDS[V]
>>
>>   // Do some aggregations
>>   ds.agg(...)
>>   // Save the data
>>   ds.write.mode(SaveMode.Append).parquet(somePath)
>>   // Save offsets using a KafkaConsumer
>>   consumer.commitSync(newOffsets.asJava)
>>   consumer.close()
>> }
>>
>> I am not sure why the concurrent issue is there as I have tried to debug
>> and also looked at the KafkaConsumer code as well, but everything looks
>> like it should not occur. The things to figure out is why when running in
>> parallel does this occur and also why the timeouts still occur.
>>
>> Thanks,
>>
>> Ivan
>>
>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> There definitely is Kafka documentation indicating that you should use
>>> a different consumer group for logically different subscribers, this
>>> is really basic to Kafka:
>>>
>>> http://kafka.apache.org/documentation#intro_consumers
>>>
>>> As for your comment that "commit async after each RDD, which is not
>>> really viable also", how is it not viable?  Again, committing offsets
>>> to Kafka doesn't give you reliable delivery semantics unless your
>>> downstream data store is idempotent.  If your downstream data store is
>>> idempotent, then it shouldn't matter to you when offset commits
>>> happen, as long as they happen within a reasonable time after the data
>>> is written.
>>>
>>> Do you want to keep arguing with me, or follow my advice and proceed
>>> with debugging any remaining issues after you make the changes I
>>> suggested?
>>>
>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <i...@vadio.com> wrote:
>>> > With our stream version, we update the offsets for only the partition
>>> we
>>> > operating on. We even break down the partition into smaller batches
>>> and then
>>> > update the offsets after each batch within the partition. With Spark
>>> 1.6 and
>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not
>>> > necessarily a Spark issue since Kafka no longer allows you to simply
>>> update
>>> > the offsets for a given consumer group. You have to subscribe or assign
>>> > partitions to even do so.
>>> >
>>> > As for storing the offsets in some other place like a DB, it don't
>>> find this
>>> > useful because you then can't use tools like Kafka Manager. In order
>>> to do
>>> > so you would have to store in a DB and the circle back and update Kafka
>>> > afterwards. This means you have to keep two sources in sync which is
>>> not
>>> > really a good idea.
>>> >
>>> > It is a challenge in Spark to use the Kafka offsets since the drive
>>> keeps
>>> > subscribed to the topic(s) and consumer group, while the executors
>>> prepend
>>> > "spark-executor-" to the consumer group. The stream (driver) does
>>> allow you
>>> > to commit async after each RDD, which is not really viable also. I
>>> have not
>>> > of implementing an Akka actor system on the driver and send it
>>> messages from
>>> > the executor code to update the offsets, but then that is asynchronous
>>> as
>>> > well so not really a good solution.
>>> >
>>> > I have no idea why Kafka made this change and also why in the parallel
>>> > KafkaRDD application we would be advised to use different consumer
>>> groups
>>> > for each RDD. That seems strange to me that different consumer groups
>>> would
>>> > be required or advised. There is no Kafka documentation that I know if
>>> that
>>> > states this. The biggest issue I see with the parallel KafkaRDD is the
>>> > timeouts. I have tried to set poll.ms to 30 seconds and still get the
>>> issue.
>>> > Something is not right here and just not seem right. As I mentioned
>>> with the
>>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this
>>> > issue. We have been running the same basic logic for over a year now
>>> without
>>> > one hitch at all.
>>> >
>>> > Ivan
>>> >
>>> >
>>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> >>
>>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
>>> >> general, not just kafka) have been progressing on to the next batch
>>> >> after a given batch aborts for quite some time now.  Yet another
>>> >> reason I put offsets in my database transactionally.  My jobs throw
>>> >> exceptions if the offset in the DB isn't what I expected it to be.
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <grap...@graphex.com>
>>> wrote:
>>> >> > I've been encountering the same kinds of timeout issues as Ivan,
>>> using
>>> >> > the "Kafka Stream" approach that he is using, except I'm storing my
>>> offsets
>>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
>>> haven't yet
>>> >> > implemented the KafkaRDD approach, and therefore don't have the
>>> concurrency
>>> >> > issues, but a very similar use case is coming up for me soon, it's
>>> just been
>>> >> > backburnered until I can get streaming to be more reliable (I will
>>> >> > definitely ensure unique group IDs when I do). Offset commits are
>>> certainly
>>> >> > more painful in Kafka 0.10, and that doesn't have anything to do
>>> with Spark.
>>> >> >
>>> >> > While i may be able to alleviate the timeout by just increasing it,
>>> I've
>>> >> > noticed something else that is more worrying: When one task fails 4
>>> times in
>>> >> > a row (i.e. "Failed to get records for _ after polling for _"),
>>> Spark aborts
>>> >> > the Stage and Job with "Job aborted due to stage failure: Task _ in
>>> stage _
>>> >> > failed 4 times". That's fine, and it's the behavior I want, but
>>> instead of
>>> >> > stopping the Application there (as previous versions of Spark did)
>>> the next
>>> >> > microbatch marches on and offsets are committed ahead of the failed
>>> >> > microbatch. Suddenly my at-least-once app becomes more
>>> >> > sometimes-at-least-once which is no good. In order for spark to
>>> display that
>>> >> > failure, I must be propagating the errors up to Spark, but the
>>> behavior of
>>> >> > marching forward with the next microbatch seems to be new, and a big
>>> >> > potential for data loss in streaming applications.
>>> >> >
>>> >> > Am I perhaps missing a setting to stop the entire streaming
>>> application
>>> >> > once spark.task.maxFailures is reached? Has anyone else seen this
>>> behavior
>>> >> > of a streaming application skipping over failed microbatches?
>>> >> >
>>> >> > Thanks,
>>> >> > Sean
>>> >> >
>>> >> >
>>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>> >> >>
>>> >> >> So basically what I am saying is
>>> >> >>
>>> >> >> - increase poll.ms
>>> >> >> - use a separate group id everywhere
>>> >> >> - stop committing offsets under the covers
>>> >> >>
>>> >> >> That should eliminate all of those as possible causes, and then we
>>> can
>>> >> >> see if there are still issues.
>>> >> >>
>>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>>> >> >> subscribe to a topic in order to update offsets, Kafka does.  If
>>> you
>>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
>>> >> >> consumer api should be usable with later brokers.  As long as you
>>> >> >> don't need SSL or dynamic subscriptions, and it meets your needs,
>>> keep
>>> >> >> using it.
>>> >> >>
>>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <i...@vadio.com>
>>> wrote:
>>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each
>>> RDD
>>> >> >>> uses a
>>> >> >>> single distinct topic. For example, the group would be something
>>> like
>>> >> >>> "storage-group", and the topics would be "storage-channel1", and
>>> >> >>> "storage-channel2". In each thread a KafkaConsumer is started,
>>> >> >>> assigned the
>>> >> >>> partitions assigned, and then commit offsets are called after the
>>> RDD
>>> >> >>> is
>>> >> >>> processed. This should not interfere with the consumer group used
>>> by
>>> >> >>> the
>>> >> >>> executors which would be "spark-executor-storage-group".
>>> >> >>>
>>> >> >>> In the streaming example there is a single topic
>>> ("client-events") and
>>> >> >>> group
>>> >> >>> ("processing-group"). A single stream is created and offsets are
>>> >> >>> manually
>>> >> >>> updated from the executor after each partition is handled. This
>>> was a
>>> >> >>> challenge since Spark now requires one to assign or subscribe to a
>>> >> >>> topic in
>>> >> >>> order to even update the offsets. In 0.8.2.x you did not have to
>>> worry
>>> >> >>> about
>>> >> >>> that. This approach limits your exposure to duplicate data since
>>> >> >>> idempotent
>>> >> >>> records are not entirely possible in our scenario. At least
>>> without a
>>> >> >>> lot of
>>> >> >>> re-running of logic to de-dup.
>>> >> >>>
>>> >> >>> Thanks,
>>> >> >>>
>>> >> >>> Ivan
>>> >> >>>
>>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <
>>> c...@koeninger.org>
>>> >> >>> wrote:
>>> >> >>>>
>>> >> >>>> So just to be clear, the answers to my questions are
>>> >> >>>>
>>> >> >>>> - you are not using different group ids, you're using the same
>>> group
>>> >> >>>> id everywhere
>>> >> >>>>
>>> >> >>>> - you are committing offsets manually
>>> >> >>>>
>>> >> >>>> Right?
>>> >> >>>>
>>> >> >>>> If you want to eliminate network or kafka misbehavior as a
>>> source,
>>> >> >>>> tune poll.ms upwards even higher.
>>> >> >>>>
>>> >> >>>> You must use different group ids for different rdds or streams.
>>> >> >>>> Kafka consumers won't behave the way you expect if they are all
>>> in
>>> >> >>>> the
>>> >> >>>> same group id, and the consumer cache is keyed by group id. Yes,
>>> the
>>> >> >>>> executor will tack "spark-executor-" on to the beginning, but if
>>> you
>>> >> >>>> give it the same base group id, it will be the same.  And the
>>> driver
>>> >> >>>> will use the group id you gave it, unmodified.
>>> >> >>>>
>>> >> >>>> Finally, I really can't help you if you're manually writing your
>>> own
>>> >> >>>> code to commit offsets directly to Kafka.  Trying to minimize
>>> >> >>>> duplicates that way doesn't really make sense, your system must
>>> be
>>> >> >>>> able to handle duplicates if you're using kafka as an offsets
>>> store,
>>> >> >>>> it can't do transactional exactly once.
>>> >> >>>>
>>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <i...@vadio.com> wrote:
>>> >> >>>>> Here are some examples and details of the scenarios. The
>>> KafkaRDD is
>>> >> >>>>> the
>>> >> >>>>> most
>>> >> >>>>> error prone to polling
>>> >> >>>>> timeouts and concurrentm modification errors.
>>> >> >>>>>
>>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and processes
>>> them
>>> >> >>>>> in
>>> >> >>>>> parallel using the KafkaRDD directly. they all use the same
>>> consumer
>>> >> >>>>> group
>>> >> >>>>> ('storage-group'), but each has it's own topic and each topic
>>> has 4
>>> >> >>>>> partitions. We routinely get timeout errors when polling for
>>> data.
>>> >> >>>>> This
>>> >> >>>>> occurs whether we process in parallel or sequentially.
>>> >> >>>>>
>>> >> >>>>> *Spark Kafka setting:*
>>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
>>> >> >>>>>
>>> >> >>>>> *Kafka Consumer Params:*
>>> >> >>>>> metric.reporters = []
>>> >> >>>>> metadata.max.age.ms = 300000
>>> >> >>>>> partition.assignment.strategy =
>>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>> >> >>>>> reconnect.backoff.ms = 50
>>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>> >> >>>>> max.partition.fetch.bytes = 1048576
>>> >> >>>>> bootstrap.servers = [somemachine:31000]
>>> >> >>>>> ssl.keystore.type = JKS
>>> >> >>>>> enable.auto.commit = false
>>> >> >>>>> sasl.mechanism = GSSAPI
>>> >> >>>>> interceptor.classes = null
>>> >> >>>>> exclude.internal.topics = true
>>> >> >>>>> ssl.truststore.password = null
>>> >> >>>>> client.id =
>>> >> >>>>> ssl.endpoint.identification.algorithm = null
>>> >> >>>>> max.poll.records = 1000
>>> >> >>>>> check.crcs = true
>>> >> >>>>> request.timeout.ms = 40000
>>> >> >>>>> heartbeat.interval.ms = 3000
>>> >> >>>>> auto.commit.interval.ms = 5000
>>> >> >>>>> receive.buffer.bytes = 65536
>>> >> >>>>> ssl.truststore.type = JKS
>>> >> >>>>> ssl.truststore.location = null
>>> >> >>>>> ssl.keystore.password = null
>>> >> >>>>> fetch.min.bytes = 1
>>> >> >>>>> send.buffer.bytes = 131072
>>> >> >>>>> value.deserializer = class
>>> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeser
>>> ializer
>>> >> >>>>> group.id = storage-group
>>> >> >>>>> retry.backoff.ms = 100
>>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>> >> >>>>> sasl.kerberos.service.name = null
>>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>> >> >>>>> ssl.trustmanager.algorithm = PKIX
>>> >> >>>>> ssl.key.password = null
>>> >> >>>>> fetch.max.wait.ms = 500
>>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
>>> >> >>>>> connections.max.idle.ms = 540000
>>> >> >>>>> session.timeout.ms = 30000
>>> >> >>>>> metrics.num.samples = 2
>>> >> >>>>> key.deserializer = class
>>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
>>> >> >>>>> ssl.protocol = TLS
>>> >> >>>>> ssl.provider = null
>>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>> >> >>>>> ssl.keystore.location = null
>>> >> >>>>> ssl.cipher.suites = null
>>> >> >>>>> security.protocol = PLAINTEXT
>>> >> >>>>> ssl.keymanager.algorithm = SunX509
>>> >> >>>>> metrics.sample.window.ms = 30000
>>> >> >>>>> auto.offset.reset = earliest
>>> >> >>>>>
>>> >> >>>>> *Example usage with KafkaRDD:*
>>> >> >>>>> val channels = Seq("channel1", "channel2")
>>> >> >>>>>
>>> >> >>>>> channels.toParArray.foreach { channel =>
>>> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>> >> >>>>>
>>> >> >>>>>  // Get offsets for the given topic and the consumer group
>>> >> >>>>> 'storage-group'
>>> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
>>> >> >>>>>
>>> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>>> >> >>>>>        kafkaParams asJava,
>>> >> >>>>>        offsetRanges,
>>> >> >>>>>        PreferConsistent).toDS[V]
>>> >> >>>>>
>>> >> >>>>>  // Do some aggregations
>>> >> >>>>>  ds.agg(...)
>>> >> >>>>>  // Save the data
>>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>>> >> >>>>>  // Save offsets using a KafkaConsumer
>>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
>>> >> >>>>>  consumer.close()
>>> >> >>>>> }
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>> *Example usage with Kafka Stream:*
>>> >> >>>>> This creates a stream and processes events in each partition.
>>> At the
>>> >> >>>>> end
>>> >> >>>>> of
>>> >> >>>>> processing for
>>> >> >>>>> each partition, we updated the offsets for each partition. This
>>> is
>>> >> >>>>> challenging to do, but is better
>>> >> >>>>> then calling commitAysnc on the stream, because that occurs
>>> after
>>> >> >>>>> the
>>> >> >>>>> /entire/ RDD has been
>>> >> >>>>> processed. This method minimizes duplicates in an exactly once
>>> >> >>>>> environment.
>>> >> >>>>> Since the executors
>>> >> >>>>> use their own custom group "spark-executor-processor-group"
>>> and the
>>> >> >>>>> commit
>>> >> >>>>> is buried in private
>>> >> >>>>> functions we are unable to use the executors cached consumer to
>>> >> >>>>> update
>>> >> >>>>> the
>>> >> >>>>> offsets. This requires us
>>> >> >>>>> to go through multiple steps to update the Kafka offsets
>>> >> >>>>> accordingly.
>>> >> >>>>>
>>> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>>> >> >>>>>
>>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>>> >> >>>>>      PreferConsistent,
>>> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>>> >> >>>>>        kafkaParams,
>>> >> >>>>>        offsetRanges))
>>> >> >>>>>
>>> >> >>>>> stream.foreachRDD { rdd =>
>>> >> >>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRang
>>> es].offsetRanges
>>> >> >>>>>
>>> >> >>>>>    // Transform our data
>>> >> >>>>>   rdd.foreachPartition { events =>
>>> >> >>>>>       // Establish a consumer in the executor so we can update
>>> >> >>>>> offsets
>>> >> >>>>> after each partition.
>>> >> >>>>>       // This class is homegrown and uses the KafkaConsumer to
>>> help
>>> >> >>>>> get/set
>>> >> >>>>> offsets
>>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
>>> >> >>>>>       // do something with our data
>>> >> >>>>>
>>> >> >>>>>       // Write the offsets that were updated in this partition
>>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>>> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
>>> >> >>>>> endOffset))
>>> >> >>>>>   }
>>> >> >>>>> }
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>> --
>>> >> >>>>> View this message in context:
>>> >> >>>>>
>>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instabil
>>> ity-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>>> >> >>>>> Sent from the Apache Spark User List mailing list archive at
>>> >> >>>>> Nabble.com.
>>> >> >>>>>
>>> >> >>>>>
>>> >> >>>>> ------------------------------------------------------------
>>> ---------
>>> >> >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >> >>>>>
>>> >> >>>
>>> >> >>>
>>> >> >>
>>> >> >> ------------------------------------------------------------
>>> ---------
>>> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>> >> >>
>>> >> >
>>> >
>>> >
>>>
>>
>>
>

Reply via email to