With our stream version, we update the offsets for only the partition we
operating on. We even break down the partition into smaller batches and
then update the offsets after each batch within the partition. With Spark
1.6 and Kafka 0.8.x this was not an issue, and as Sean pointed out, this is
not necessarily a Spark issue since Kafka no longer allows you to simply
update the offsets for a given consumer group. You have to subscribe or
assign partitions to even do so.

As for storing the offsets in some other place like a DB, it don't find
this useful because you then can't use tools like Kafka Manager. In order
to do so you would have to store in a DB and the circle back and update
Kafka afterwards. This means you have to keep two sources in sync which is
not really a good idea.

It is a challenge in Spark to use the Kafka offsets since the drive keeps
subscribed to the topic(s) and consumer group, while the executors prepend
"spark-executor-" to the consumer group. The stream (driver) does allow you
to commit async after each RDD, which is not really viable also. I have not
of implementing an Akka actor system on the driver and send it messages
from the executor code to update the offsets, but then that is asynchronous
as well so not really a good solution.

I have no idea why Kafka made this change and also why in the parallel
KafkaRDD application we would be advised to use different consumer groups
for each RDD. That seems strange to me that different consumer groups would
be required or advised. There is no Kafka documentation that I know if that
states this. The biggest issue I see with the parallel KafkaRDD is the
timeouts. I have tried to set poll.ms to 30 seconds and still get the
issue. Something is not right here and just not seem right. As I mentioned
with the streaming application, with Spark 1.6 and Kafka 0.8.x we never saw
this issue. We have been running the same basic logic for over a year now
without one hitch at all.

Ivan


On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Someone can correct me, but I'm pretty sure Spark dstreams (in
> general, not just kafka) have been progressing on to the next batch
> after a given batch aborts for quite some time now.  Yet another
> reason I put offsets in my database transactionally.  My jobs throw
> exceptions if the offset in the DB isn't what I expected it to be.
>
>
>
>
> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <grap...@graphex.com> wrote:
> > I've been encountering the same kinds of timeout issues as Ivan, using
> the "Kafka Stream" approach that he is using, except I'm storing my offsets
> manually from the driver to Zookeeper in the Kafka 8 format. I haven't yet
> implemented the KafkaRDD approach, and therefore don't have the concurrency
> issues, but a very similar use case is coming up for me soon, it's just
> been backburnered until I can get streaming to be more reliable (I will
> definitely ensure unique group IDs when I do). Offset commits are certainly
> more painful in Kafka 0.10, and that doesn't have anything to do with Spark.
> >
> > While i may be able to alleviate the timeout by just increasing it, I've
> noticed something else that is more worrying: When one task fails 4 times
> in a row (i.e. "Failed to get records for _ after polling for _"), Spark
> aborts the Stage and Job with "Job aborted due to stage failure: Task _ in
> stage _ failed 4 times". That's fine, and it's the behavior I want, but
> instead of stopping the Application there (as previous versions of Spark
> did) the next microbatch marches on and offsets are committed ahead of the
> failed microbatch. Suddenly my at-least-once app becomes more
> sometimes-at-least-once which is no good. In order for spark to display
> that failure, I must be propagating the errors up to Spark, but the
> behavior of marching forward with the next microbatch seems to be new, and
> a big potential for data loss in streaming applications.
> >
> > Am I perhaps missing a setting to stop the entire streaming application
> once spark.task.maxFailures is reached? Has anyone else seen this behavior
> of a streaming application skipping over failed microbatches?
> >
> > Thanks,
> > Sean
> >
> >
> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <c...@koeninger.org> wrote:
> >>
> >> So basically what I am saying is
> >>
> >> - increase poll.ms
> >> - use a separate group id everywhere
> >> - stop committing offsets under the covers
> >>
> >> That should eliminate all of those as possible causes, and then we can
> >> see if there are still issues.
> >>
> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
> >> subscribe to a topic in order to update offsets, Kafka does.  If you
> >> don't like the new Kafka consumer api, the existing 0.8 simple
> >> consumer api should be usable with later brokers.  As long as you
> >> don't need SSL or dynamic subscriptions, and it meets your needs, keep
> >> using it.
> >>
> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <i...@vadio.com> wrote:
> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each RDD
> uses a
> >>> single distinct topic. For example, the group would be something like
> >>> "storage-group", and the topics would be "storage-channel1", and
> >>> "storage-channel2". In each thread a KafkaConsumer is started,
> assigned the
> >>> partitions assigned, and then commit offsets are called after the RDD
> is
> >>> processed. This should not interfere with the consumer group used by
> the
> >>> executors which would be "spark-executor-storage-group".
> >>>
> >>> In the streaming example there is a single topic ("client-events") and
> group
> >>> ("processing-group"). A single stream is created and offsets are
> manually
> >>> updated from the executor after each partition is handled. This was a
> >>> challenge since Spark now requires one to assign or subscribe to a
> topic in
> >>> order to even update the offsets. In 0.8.2.x you did not have to worry
> about
> >>> that. This approach limits your exposure to duplicate data since
> idempotent
> >>> records are not entirely possible in our scenario. At least without a
> lot of
> >>> re-running of logic to de-dup.
> >>>
> >>> Thanks,
> >>>
> >>> Ivan
> >>>
> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
> >>>>
> >>>> So just to be clear, the answers to my questions are
> >>>>
> >>>> - you are not using different group ids, you're using the same group
> >>>> id everywhere
> >>>>
> >>>> - you are committing offsets manually
> >>>>
> >>>> Right?
> >>>>
> >>>> If you want to eliminate network or kafka misbehavior as a source,
> >>>> tune poll.ms upwards even higher.
> >>>>
> >>>> You must use different group ids for different rdds or streams.
> >>>> Kafka consumers won't behave the way you expect if they are all in the
> >>>> same group id, and the consumer cache is keyed by group id. Yes, the
> >>>> executor will tack "spark-executor-" on to the beginning, but if you
> >>>> give it the same base group id, it will be the same.  And the driver
> >>>> will use the group id you gave it, unmodified.
> >>>>
> >>>> Finally, I really can't help you if you're manually writing your own
> >>>> code to commit offsets directly to Kafka.  Trying to minimize
> >>>> duplicates that way doesn't really make sense, your system must be
> >>>> able to handle duplicates if you're using kafka as an offsets store,
> >>>> it can't do transactional exactly once.
> >>>>
> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <i...@vadio.com> wrote:
> >>>>> Here are some examples and details of the scenarios. The KafkaRDD is
> the
> >>>>> most
> >>>>> error prone to polling
> >>>>> timeouts and concurrentm modification errors.
> >>>>>
> >>>>> *Using KafkaRDD* - This takes a list of channels and processes them
> in
> >>>>> parallel using the KafkaRDD directly. they all use the same consumer
> >>>>> group
> >>>>> ('storage-group'), but each has it's own topic and each topic has 4
> >>>>> partitions. We routinely get timeout errors when polling for data.
> This
> >>>>> occurs whether we process in parallel or sequentially.
> >>>>>
> >>>>> *Spark Kafka setting:*
> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
> >>>>>
> >>>>> *Kafka Consumer Params:*
> >>>>> metric.reporters = []
> >>>>> metadata.max.age.ms = 300000
> >>>>> partition.assignment.strategy =
> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
> >>>>> reconnect.backoff.ms = 50
> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
> >>>>> max.partition.fetch.bytes = 1048576
> >>>>> bootstrap.servers = [somemachine:31000]
> >>>>> ssl.keystore.type = JKS
> >>>>> enable.auto.commit = false
> >>>>> sasl.mechanism = GSSAPI
> >>>>> interceptor.classes = null
> >>>>> exclude.internal.topics = true
> >>>>> ssl.truststore.password = null
> >>>>> client.id =
> >>>>> ssl.endpoint.identification.algorithm = null
> >>>>> max.poll.records = 1000
> >>>>> check.crcs = true
> >>>>> request.timeout.ms = 40000
> >>>>> heartbeat.interval.ms = 3000
> >>>>> auto.commit.interval.ms = 5000
> >>>>> receive.buffer.bytes = 65536
> >>>>> ssl.truststore.type = JKS
> >>>>> ssl.truststore.location = null
> >>>>> ssl.keystore.password = null
> >>>>> fetch.min.bytes = 1
> >>>>> send.buffer.bytes = 131072
> >>>>> value.deserializer = class
> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
> >>>>> group.id = storage-group
> >>>>> retry.backoff.ms = 100
> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >>>>> sasl.kerberos.service.name = null
> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
> >>>>> ssl.trustmanager.algorithm = PKIX
> >>>>> ssl.key.password = null
> >>>>> fetch.max.wait.ms = 500
> >>>>> sasl.kerberos.min.time.before.relogin = 60000
> >>>>> connections.max.idle.ms = 540000
> >>>>> session.timeout.ms = 30000
> >>>>> metrics.num.samples = 2
> >>>>> key.deserializer = class
> >>>>> org.apache.kafka.common.serialization.StringDeserializer
> >>>>> ssl.protocol = TLS
> >>>>> ssl.provider = null
> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >>>>> ssl.keystore.location = null
> >>>>> ssl.cipher.suites = null
> >>>>> security.protocol = PLAINTEXT
> >>>>> ssl.keymanager.algorithm = SunX509
> >>>>> metrics.sample.window.ms = 30000
> >>>>> auto.offset.reset = earliest
> >>>>>
> >>>>> *Example usage with KafkaRDD:*
> >>>>> val channels = Seq("channel1", "channel2")
> >>>>>
> >>>>> channels.toParArray.foreach { channel =>
> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
> >>>>>
> >>>>>  // Get offsets for the given topic and the consumer group
> >>>>> 'storage-group'
> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
> >>>>>
> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
> >>>>>        kafkaParams asJava,
> >>>>>        offsetRanges,
> >>>>>        PreferConsistent).toDS[V]
> >>>>>
> >>>>>  // Do some aggregations
> >>>>>  ds.agg(...)
> >>>>>  // Save the data
> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
> >>>>>  // Save offsets using a KafkaConsumer
> >>>>>  consumer.commitSync(newOffsets.asJava)
> >>>>>  consumer.close()
> >>>>> }
> >>>>>
> >>>>>
> >>>>> *Example usage with Kafka Stream:*
> >>>>> This creates a stream and processes events in each partition. At the
> end
> >>>>> of
> >>>>> processing for
> >>>>> each partition, we updated the offsets for each partition. This is
> >>>>> challenging to do, but is better
> >>>>> then calling commitAysnc on the stream, because that occurs after the
> >>>>> /entire/ RDD has been
> >>>>> processed. This method minimizes duplicates in an exactly once
> >>>>> environment.
> >>>>> Since the executors
> >>>>> use their own custom group "spark-executor-processor-group" and the
> >>>>> commit
> >>>>> is buried in private
> >>>>> functions we are unable to use the executors cached consumer to
> update
> >>>>> the
> >>>>> offsets. This requires us
> >>>>> to go through multiple steps to update the Kafka offsets accordingly.
> >>>>>
> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
> >>>>>
> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
> >>>>>      PreferConsistent,
> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
> >>>>>        kafkaParams,
> >>>>>        offsetRanges))
> >>>>>
> >>>>> stream.foreachRDD { rdd =>
> >>>>>    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >>>>>
> >>>>>    // Transform our data
> >>>>>   rdd.foreachPartition { events =>
> >>>>>       // Establish a consumer in the executor so we can update
> offsets
> >>>>> after each partition.
> >>>>>       // This class is homegrown and uses the KafkaConsumer to help
> >>>>> get/set
> >>>>> offsets
> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
> >>>>>       // do something with our data
> >>>>>
> >>>>>       // Write the offsets that were updated in this partition
> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
> endOffset))
> >>>>>   }
> >>>>> }
> >>>>>
> >>>>>
> >>>>>
> >>>>> --
> >>>>> View this message in context:
> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/
> Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
> >>>>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>>>
> >>>>> ------------------------------------------------------------
> ---------
> >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>>>>
> >>>
> >>>
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >
>

Reply via email to