You should not be getting consumer churn on executors at all, that's
the whole point of the cache.  How many partitions are you trying to
process per executor?

http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies

gives instructions on the default size of the cache and how to increase it.

On Sat, Nov 12, 2016 at 2:15 PM, Ivan von Nagy <i...@vadio.com> wrote:
> Hi Sean,
>
> Thanks for responding. We have run our jobs with internal parallel
> processing for well over a year (Spark 1.5, 1.6 and Kafka 0.8.2.2.) and did
> not encounter any of these issues until upgrading to Spark 2.0.1 and Kafka
> 0.10 clients. If we process serially, then we sometimes get the errors, but
> far less often. Also, if done sequentially it takes sometimes more the 2x as
> long which is not an option for this particular job.
>
> I posted another example on Nov 10th which is the example below. We
> basically iterate through a list in parallel and sometimes the list could be
> upwards of a hundred elements. The parallelism in Scala/Spark limits to
> about 8 at a time on our nodes. For performance reasons we process in
> parallel and we also separate each since each channel has their own topic.
> We don't combine all into one KafkaRDD because that means we have to process
> all or nothing if an error occurs. This way if a couple of channels fail, we
> can re-run the job and it will only process those channels.
>
> This has just been perplexing since we had never encountered any errors for
> well over a year using the prior versions. At this time, I am just seeking
> any configuration options or code changes that we may be missing or even at
> a lower level, fundamentally what changed in Spark 2 and Kafka 0.10 that
> surfaced these issues.
>
> We continue to use Spark 1.6 with the Kafka 0.8.x clients until this can be
> figured out, however, it is a deal breaker for use to upgrade to Spark 2.x
> with Kafka 0.10 clients. On a side note, we have not encountered any issues
> with the Kafka Producers, this is simply with the KafkaRDD and its use of
> CachedKafkaConsumer. Any help is much appreciated.
>
> Thanks,
>
> Ivan
>
> Example usage with KafkaRDD:
> val channels = Seq("channel1", "channel2")
>
> channels.toParArray.foreach { channel =>
>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>
>   // Get offsets for the given topic and the consumer group "$prefix-$topic"
>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>
>   val ds = KafkaUtils.createRDD[K, V](context,
>         kafkaParams asJava,
>         offsetRanges,
>         PreferConsistent).toDS[V]
>
>   // Do some aggregations
>   ds.agg(...)
>   // Save the data
>   ds.write.mode(SaveMode.Append).parquet(somePath)
>   // Save offsets using a KafkaConsumer
>   consumer.commitSync(newOffsets.asJava)
>   consumer.close()
> }
>
> On Sat, Nov 12, 2016 at 11:46 AM, Sean McKibben <grap...@graphex.com> wrote:
>>
>> How are you iterating through your RDDs in parallel? In the past (Spark
>> 1.5.2) when I've had actions being performed on multiple RDDs concurrently
>> using futures, I've encountered some pretty bad behavior in Spark,
>> especially during job retries. Very difficult to explain things, like
>> records from one RDD leaking into a totally different (though shared
>> lineage) RDD during job retries. I'm not sure what documentation exists
>> around parallelizing on top of Spark's existing parallelization approach,
>> but I would venture a guess that that could be the source of your concurrent
>> access problems, and potentially other hidden issues. Have you tried a
>> version of your app which doesn't parallelize actions on RDDs, but instead
>> serially processes each RDD? I'm sure it isn't ideal performance-wise, but
>> it seems like a good choice for an A/B test.
>>
>> The poll.ms issue could very well be settings or capability of your kafka
>> cluster. I think other (non-Spark) approaches may have less consumer churn
>> and be less susceptible to things like GC pauses or coordination latency. It
>> could also be that the number of consumers being simultaneously created on
>> each executor causes a thundering herd problem during initial phases (which
>> then causes job retries, which then causes more consumer churn, etc.).
>>
>> Sean
>>
>>
>>
>> On Nov 12, 2016, at 11:14 AM, Ivan von Nagy <i...@vadio.com> wrote:
>>
>> The code was changed to use a unique group for each KafkaRDD that was
>> created (see Nov 10 post). There is no KafkaRDD being reused. The basic
>> logic (see Nov 10 post for example) is get a list of channels, iterate
>> through them in parallel, load a KafkaRDD using a given topic and a consumer
>> group that is made from the topic (each RDD uses a different topic and
>> group), process the data and write to Parquet files.
>>
>> Per my Nov 10th post, we still get polling timeouts unless the poll.ms is
>> set to something like 10 seconds. We also get concurrent modification
>> exceptions as well. I believe the key here is the processing of data in
>> parallel is where we encounter issues so we are looking for some possible
>> answers surrounding this.
>>
>> Thanks,
>>
>> Ivan
>>
>>
>> On Fri, Nov 11, 2016 at 12:12 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>>
>>> It is already documented that you must use a different group id, which as
>>> far as I can tell you are still not doing.
>>>
>>>
>>> On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <shixi...@databricks.com>
>>> wrote:
>>>>
>>>> Yeah, the KafkaRDD cannot be reused. It's better to document it.
>>>>
>>>> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <i...@vadio.com> wrote:
>>>>>
>>>>> Ok, I have split he KafkaRDD logic to each use their own group and
>>>>> bumped the poll.ms to 10 seconds. Anything less then 2 seconds on the
>>>>> poll.ms ends up with a timeout and exception so I am still perplexed on 
>>>>> that
>>>>> one. The new error I am getting now is a `ConcurrentModificationException`
>>>>> when Spark is trying to remove the CachedKafkaConsumer.
>>>>>
>>>>> java.util.ConcurrentModificationException: KafkaConsumer is not safe
>>>>> for multi-threaded access
>>>>> at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
>>>>> at
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1361)
>>>>> at
>>>>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128)
>>>>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299)
>>>>>
>>>>> Here is the basic logic:
>>>>>
>>>>> Using KafkaRDD - This takes a list of channels and processes them in
>>>>> parallel using the KafkaRDD directly. They each use a distinct consumer
>>>>> group (s"$prefix-$topic"), and each has it's own topic and each topic has 
>>>>> 4
>>>>> partitions. We routinely get timeout errors when polling for data when the
>>>>> poll.ms is less then 2 seconds. This occurs whether we process in 
>>>>> parallel.
>>>>>
>>>>> Example usage with KafkaRDD:
>>>>> val channels = Seq("channel1", "channel2")
>>>>>
>>>>> channels.toParArray.foreach { channel =>
>>>>>   val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>>>
>>>>>   // Get offsets for the given topic and the consumer group
>>>>> "$prefix-$topic"
>>>>>   val offsetRanges = getOffsets(s"$prefix-$topic", channel)
>>>>>
>>>>>   val ds = KafkaUtils.createRDD[K, V](context,
>>>>>         kafkaParams asJava,
>>>>>         offsetRanges,
>>>>>         PreferConsistent).toDS[V]
>>>>>
>>>>>   // Do some aggregations
>>>>>   ds.agg(...)
>>>>>   // Save the data
>>>>>   ds.write.mode(SaveMode.Append).parquet(somePath)
>>>>>   // Save offsets using a KafkaConsumer
>>>>>   consumer.commitSync(newOffsets.asJava)
>>>>>   consumer.close()
>>>>> }
>>>>>
>>>>> I am not sure why the concurrent issue is there as I have tried to
>>>>> debug and also looked at the KafkaConsumer code as well, but everything
>>>>> looks like it should not occur. The things to figure out is why when 
>>>>> running
>>>>> in parallel does this occur and also why the timeouts still occur.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Ivan
>>>>>
>>>>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>>
>>>>>> There definitely is Kafka documentation indicating that you should use
>>>>>> a different consumer group for logically different subscribers, this
>>>>>> is really basic to Kafka:
>>>>>>
>>>>>> http://kafka.apache.org/documentation#intro_consumers
>>>>>>
>>>>>> As for your comment that "commit async after each RDD, which is not
>>>>>> really viable also", how is it not viable?  Again, committing offsets
>>>>>> to Kafka doesn't give you reliable delivery semantics unless your
>>>>>> downstream data store is idempotent.  If your downstream data store is
>>>>>> idempotent, then it shouldn't matter to you when offset commits
>>>>>> happen, as long as they happen within a reasonable time after the data
>>>>>> is written.
>>>>>>
>>>>>> Do you want to keep arguing with me, or follow my advice and proceed
>>>>>> with debugging any remaining issues after you make the changes I
>>>>>> suggested?
>>>>>>
>>>>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <i...@vadio.com> wrote:
>>>>>> > With our stream version, we update the offsets for only the
>>>>>> > partition we
>>>>>> > operating on. We even break down the partition into smaller batches
>>>>>> > and then
>>>>>> > update the offsets after each batch within the partition. With Spark
>>>>>> > 1.6 and
>>>>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is
>>>>>> > not
>>>>>> > necessarily a Spark issue since Kafka no longer allows you to simply
>>>>>> > update
>>>>>> > the offsets for a given consumer group. You have to subscribe or
>>>>>> > assign
>>>>>> > partitions to even do so.
>>>>>> >
>>>>>> > As for storing the offsets in some other place like a DB, it don't
>>>>>> > find this
>>>>>> > useful because you then can't use tools like Kafka Manager. In order
>>>>>> > to do
>>>>>> > so you would have to store in a DB and the circle back and update
>>>>>> > Kafka
>>>>>> > afterwards. This means you have to keep two sources in sync which is
>>>>>> > not
>>>>>> > really a good idea.
>>>>>> >
>>>>>> > It is a challenge in Spark to use the Kafka offsets since the drive
>>>>>> > keeps
>>>>>> > subscribed to the topic(s) and consumer group, while the executors
>>>>>> > prepend
>>>>>> > "spark-executor-" to the consumer group. The stream (driver) does
>>>>>> > allow you
>>>>>> > to commit async after each RDD, which is not really viable also. I
>>>>>> > have not
>>>>>> > of implementing an Akka actor system on the driver and send it
>>>>>> > messages from
>>>>>> > the executor code to update the offsets, but then that is
>>>>>> > asynchronous as
>>>>>> > well so not really a good solution.
>>>>>> >
>>>>>> > I have no idea why Kafka made this change and also why in the
>>>>>> > parallel
>>>>>> > KafkaRDD application we would be advised to use different consumer
>>>>>> > groups
>>>>>> > for each RDD. That seems strange to me that different consumer
>>>>>> > groups would
>>>>>> > be required or advised. There is no Kafka documentation that I know
>>>>>> > if that
>>>>>> > states this. The biggest issue I see with the parallel KafkaRDD is
>>>>>> > the
>>>>>> > timeouts. I have tried to set poll.ms to 30 seconds and still get
>>>>>> > the issue.
>>>>>> > Something is not right here and just not seem right. As I mentioned
>>>>>> > with the
>>>>>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw
>>>>>> > this
>>>>>> > issue. We have been running the same basic logic for over a year now
>>>>>> > without
>>>>>> > one hitch at all.
>>>>>> >
>>>>>> > Ivan
>>>>>> >
>>>>>> >
>>>>>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <c...@koeninger.org>
>>>>>> > wrote:
>>>>>> >>
>>>>>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in
>>>>>> >> general, not just kafka) have been progressing on to the next batch
>>>>>> >> after a given batch aborts for quite some time now.  Yet another
>>>>>> >> reason I put offsets in my database transactionally.  My jobs throw
>>>>>> >> exceptions if the offset in the DB isn't what I expected it to be.
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <grap...@graphex.com>
>>>>>> >> wrote:
>>>>>> >> > I've been encountering the same kinds of timeout issues as Ivan,
>>>>>> >> > using
>>>>>> >> > the "Kafka Stream" approach that he is using, except I'm storing
>>>>>> >> > my offsets
>>>>>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I
>>>>>> >> > haven't yet
>>>>>> >> > implemented the KafkaRDD approach, and therefore don't have the
>>>>>> >> > concurrency
>>>>>> >> > issues, but a very similar use case is coming up for me soon,
>>>>>> >> > it's just been
>>>>>> >> > backburnered until I can get streaming to be more reliable (I
>>>>>> >> > will
>>>>>> >> > definitely ensure unique group IDs when I do). Offset commits are
>>>>>> >> > certainly
>>>>>> >> > more painful in Kafka 0.10, and that doesn't have anything to do
>>>>>> >> > with Spark.
>>>>>> >> >
>>>>>> >> > While i may be able to alleviate the timeout by just increasing
>>>>>> >> > it, I've
>>>>>> >> > noticed something else that is more worrying: When one task fails
>>>>>> >> > 4 times in
>>>>>> >> > a row (i.e. "Failed to get records for _ after polling for _"),
>>>>>> >> > Spark aborts
>>>>>> >> > the Stage and Job with "Job aborted due to stage failure: Task _
>>>>>> >> > in stage _
>>>>>> >> > failed 4 times". That's fine, and it's the behavior I want, but
>>>>>> >> > instead of
>>>>>> >> > stopping the Application there (as previous versions of Spark
>>>>>> >> > did) the next
>>>>>> >> > microbatch marches on and offsets are committed ahead of the
>>>>>> >> > failed
>>>>>> >> > microbatch. Suddenly my at-least-once app becomes more
>>>>>> >> > sometimes-at-least-once which is no good. In order for spark to
>>>>>> >> > display that
>>>>>> >> > failure, I must be propagating the errors up to Spark, but the
>>>>>> >> > behavior of
>>>>>> >> > marching forward with the next microbatch seems to be new, and a
>>>>>> >> > big
>>>>>> >> > potential for data loss in streaming applications.
>>>>>> >> >
>>>>>> >> > Am I perhaps missing a setting to stop the entire streaming
>>>>>> >> > application
>>>>>> >> > once spark.task.maxFailures is reached? Has anyone else seen this
>>>>>> >> > behavior
>>>>>> >> > of a streaming application skipping over failed microbatches?
>>>>>> >> >
>>>>>> >> > Thanks,
>>>>>> >> > Sean
>>>>>> >> >
>>>>>> >> >
>>>>>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <c...@koeninger.org>
>>>>>> >> >> wrote:
>>>>>> >> >>
>>>>>> >> >> So basically what I am saying is
>>>>>> >> >>
>>>>>> >> >> - increase poll.ms
>>>>>> >> >> - use a separate group id everywhere
>>>>>> >> >> - stop committing offsets under the covers
>>>>>> >> >>
>>>>>> >> >> That should eliminate all of those as possible causes, and then
>>>>>> >> >> we can
>>>>>> >> >> see if there are still issues.
>>>>>> >> >>
>>>>>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or
>>>>>> >> >> subscribe to a topic in order to update offsets, Kafka does.  If
>>>>>> >> >> you
>>>>>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple
>>>>>> >> >> consumer api should be usable with later brokers.  As long as
>>>>>> >> >> you
>>>>>> >> >> don't need SSL or dynamic subscriptions, and it meets your
>>>>>> >> >> needs, keep
>>>>>> >> >> using it.
>>>>>> >> >>
>>>>>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <i...@vadio.com>
>>>>>> >> >> wrote:
>>>>>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but
>>>>>> >> >>> each RDD
>>>>>> >> >>> uses a
>>>>>> >> >>> single distinct topic. For example, the group would be
>>>>>> >> >>> something like
>>>>>> >> >>> "storage-group", and the topics would be "storage-channel1",
>>>>>> >> >>> and
>>>>>> >> >>> "storage-channel2". In each thread a KafkaConsumer is started,
>>>>>> >> >>> assigned the
>>>>>> >> >>> partitions assigned, and then commit offsets are called after
>>>>>> >> >>> the RDD
>>>>>> >> >>> is
>>>>>> >> >>> processed. This should not interfere with the consumer group
>>>>>> >> >>> used by
>>>>>> >> >>> the
>>>>>> >> >>> executors which would be "spark-executor-storage-group".
>>>>>> >> >>>
>>>>>> >> >>> In the streaming example there is a single topic
>>>>>> >> >>> ("client-events") and
>>>>>> >> >>> group
>>>>>> >> >>> ("processing-group"). A single stream is created and offsets
>>>>>> >> >>> are
>>>>>> >> >>> manually
>>>>>> >> >>> updated from the executor after each partition is handled. This
>>>>>> >> >>> was a
>>>>>> >> >>> challenge since Spark now requires one to assign or subscribe
>>>>>> >> >>> to a
>>>>>> >> >>> topic in
>>>>>> >> >>> order to even update the offsets. In 0.8.2.x you did not have
>>>>>> >> >>> to worry
>>>>>> >> >>> about
>>>>>> >> >>> that. This approach limits your exposure to duplicate data
>>>>>> >> >>> since
>>>>>> >> >>> idempotent
>>>>>> >> >>> records are not entirely possible in our scenario. At least
>>>>>> >> >>> without a
>>>>>> >> >>> lot of
>>>>>> >> >>> re-running of logic to de-dup.
>>>>>> >> >>>
>>>>>> >> >>> Thanks,
>>>>>> >> >>>
>>>>>> >> >>> Ivan
>>>>>> >> >>>
>>>>>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger
>>>>>> >> >>> <c...@koeninger.org>
>>>>>> >> >>> wrote:
>>>>>> >> >>>>
>>>>>> >> >>>> So just to be clear, the answers to my questions are
>>>>>> >> >>>>
>>>>>> >> >>>> - you are not using different group ids, you're using the same
>>>>>> >> >>>> group
>>>>>> >> >>>> id everywhere
>>>>>> >> >>>>
>>>>>> >> >>>> - you are committing offsets manually
>>>>>> >> >>>>
>>>>>> >> >>>> Right?
>>>>>> >> >>>>
>>>>>> >> >>>> If you want to eliminate network or kafka misbehavior as a
>>>>>> >> >>>> source,
>>>>>> >> >>>> tune poll.ms upwards even higher.
>>>>>> >> >>>>
>>>>>> >> >>>> You must use different group ids for different rdds or
>>>>>> >> >>>> streams.
>>>>>> >> >>>> Kafka consumers won't behave the way you expect if they are
>>>>>> >> >>>> all in
>>>>>> >> >>>> the
>>>>>> >> >>>> same group id, and the consumer cache is keyed by group id.
>>>>>> >> >>>> Yes, the
>>>>>> >> >>>> executor will tack "spark-executor-" on to the beginning, but
>>>>>> >> >>>> if you
>>>>>> >> >>>> give it the same base group id, it will be the same.  And the
>>>>>> >> >>>> driver
>>>>>> >> >>>> will use the group id you gave it, unmodified.
>>>>>> >> >>>>
>>>>>> >> >>>> Finally, I really can't help you if you're manually writing
>>>>>> >> >>>> your own
>>>>>> >> >>>> code to commit offsets directly to Kafka.  Trying to minimize
>>>>>> >> >>>> duplicates that way doesn't really make sense, your system
>>>>>> >> >>>> must be
>>>>>> >> >>>> able to handle duplicates if you're using kafka as an offsets
>>>>>> >> >>>> store,
>>>>>> >> >>>> it can't do transactional exactly once.
>>>>>> >> >>>>
>>>>>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <i...@vadio.com>
>>>>>> >> >>>> wrote:
>>>>>> >> >>>>> Here are some examples and details of the scenarios. The
>>>>>> >> >>>>> KafkaRDD is
>>>>>> >> >>>>> the
>>>>>> >> >>>>> most
>>>>>> >> >>>>> error prone to polling
>>>>>> >> >>>>> timeouts and concurrentm modification errors.
>>>>>> >> >>>>>
>>>>>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and
>>>>>> >> >>>>> processes them
>>>>>> >> >>>>> in
>>>>>> >> >>>>> parallel using the KafkaRDD directly. they all use the same
>>>>>> >> >>>>> consumer
>>>>>> >> >>>>> group
>>>>>> >> >>>>> ('storage-group'), but each has it's own topic and each topic
>>>>>> >> >>>>> has 4
>>>>>> >> >>>>> partitions. We routinely get timeout errors when polling for
>>>>>> >> >>>>> data.
>>>>>> >> >>>>> This
>>>>>> >> >>>>> occurs whether we process in parallel or sequentially.
>>>>>> >> >>>>>
>>>>>> >> >>>>> *Spark Kafka setting:*
>>>>>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000
>>>>>> >> >>>>>
>>>>>> >> >>>>> *Kafka Consumer Params:*
>>>>>> >> >>>>> metric.reporters = []
>>>>>> >> >>>>> metadata.max.age.ms = 300000
>>>>>> >> >>>>> partition.assignment.strategy =
>>>>>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor]
>>>>>> >> >>>>> reconnect.backoff.ms = 50
>>>>>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8
>>>>>> >> >>>>> max.partition.fetch.bytes = 1048576
>>>>>> >> >>>>> bootstrap.servers = [somemachine:31000]
>>>>>> >> >>>>> ssl.keystore.type = JKS
>>>>>> >> >>>>> enable.auto.commit = false
>>>>>> >> >>>>> sasl.mechanism = GSSAPI
>>>>>> >> >>>>> interceptor.classes = null
>>>>>> >> >>>>> exclude.internal.topics = true
>>>>>> >> >>>>> ssl.truststore.password = null
>>>>>> >> >>>>> client.id =
>>>>>> >> >>>>> ssl.endpoint.identification.algorithm = null
>>>>>> >> >>>>> max.poll.records = 1000
>>>>>> >> >>>>> check.crcs = true
>>>>>> >> >>>>> request.timeout.ms = 40000
>>>>>> >> >>>>> heartbeat.interval.ms = 3000
>>>>>> >> >>>>> auto.commit.interval.ms = 5000
>>>>>> >> >>>>> receive.buffer.bytes = 65536
>>>>>> >> >>>>> ssl.truststore.type = JKS
>>>>>> >> >>>>> ssl.truststore.location = null
>>>>>> >> >>>>> ssl.keystore.password = null
>>>>>> >> >>>>> fetch.min.bytes = 1
>>>>>> >> >>>>> send.buffer.bytes = 131072
>>>>>> >> >>>>> value.deserializer = class
>>>>>> >> >>>>>
>>>>>> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer
>>>>>> >> >>>>> group.id = storage-group
>>>>>> >> >>>>> retry.backoff.ms = 100
>>>>>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>>>> >> >>>>> sasl.kerberos.service.name = null
>>>>>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05
>>>>>> >> >>>>> ssl.trustmanager.algorithm = PKIX
>>>>>> >> >>>>> ssl.key.password = null
>>>>>> >> >>>>> fetch.max.wait.ms = 500
>>>>>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000
>>>>>> >> >>>>> connections.max.idle.ms = 540000
>>>>>> >> >>>>> session.timeout.ms = 30000
>>>>>> >> >>>>> metrics.num.samples = 2
>>>>>> >> >>>>> key.deserializer = class
>>>>>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer
>>>>>> >> >>>>> ssl.protocol = TLS
>>>>>> >> >>>>> ssl.provider = null
>>>>>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>>>> >> >>>>> ssl.keystore.location = null
>>>>>> >> >>>>> ssl.cipher.suites = null
>>>>>> >> >>>>> security.protocol = PLAINTEXT
>>>>>> >> >>>>> ssl.keymanager.algorithm = SunX509
>>>>>> >> >>>>> metrics.sample.window.ms = 30000
>>>>>> >> >>>>> auto.offset.reset = earliest
>>>>>> >> >>>>>
>>>>>> >> >>>>> *Example usage with KafkaRDD:*
>>>>>> >> >>>>> val channels = Seq("channel1", "channel2")
>>>>>> >> >>>>>
>>>>>> >> >>>>> channels.toParArray.foreach { channel =>
>>>>>> >> >>>>>  val consumer = new KafkaConsumer[K, V](kafkaParams.asJava)
>>>>>> >> >>>>>
>>>>>> >> >>>>>  // Get offsets for the given topic and the consumer group
>>>>>> >> >>>>> 'storage-group'
>>>>>> >> >>>>>  val offsetRanges = getOffsets("storage-group", channel)
>>>>>> >> >>>>>
>>>>>> >> >>>>>  val ds = KafkaUtils.createRDD[K, V](context,
>>>>>> >> >>>>>        kafkaParams asJava,
>>>>>> >> >>>>>        offsetRanges,
>>>>>> >> >>>>>        PreferConsistent).toDS[V]
>>>>>> >> >>>>>
>>>>>> >> >>>>>  // Do some aggregations
>>>>>> >> >>>>>  ds.agg(...)
>>>>>> >> >>>>>  // Save the data
>>>>>> >> >>>>>  ds.write.mode(SaveMode.Append).parquet(somePath)
>>>>>> >> >>>>>  // Save offsets using a KafkaConsumer
>>>>>> >> >>>>>  consumer.commitSync(newOffsets.asJava)
>>>>>> >> >>>>>  consumer.close()
>>>>>> >> >>>>> }
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>> *Example usage with Kafka Stream:*
>>>>>> >> >>>>> This creates a stream and processes events in each partition.
>>>>>> >> >>>>> At the
>>>>>> >> >>>>> end
>>>>>> >> >>>>> of
>>>>>> >> >>>>> processing for
>>>>>> >> >>>>> each partition, we updated the offsets for each partition.
>>>>>> >> >>>>> This is
>>>>>> >> >>>>> challenging to do, but is better
>>>>>> >> >>>>> then calling commitAysnc on the stream, because that occurs
>>>>>> >> >>>>> after
>>>>>> >> >>>>> the
>>>>>> >> >>>>> /entire/ RDD has been
>>>>>> >> >>>>> processed. This method minimizes duplicates in an exactly
>>>>>> >> >>>>> once
>>>>>> >> >>>>> environment.
>>>>>> >> >>>>> Since the executors
>>>>>> >> >>>>> use their own custom group "spark-executor-processor-group"
>>>>>> >> >>>>> and the
>>>>>> >> >>>>> commit
>>>>>> >> >>>>> is buried in private
>>>>>> >> >>>>> functions we are unable to use the executors cached consumer
>>>>>> >> >>>>> to
>>>>>> >> >>>>> update
>>>>>> >> >>>>> the
>>>>>> >> >>>>> offsets. This requires us
>>>>>> >> >>>>> to go through multiple steps to update the Kafka offsets
>>>>>> >> >>>>> accordingly.
>>>>>> >> >>>>>
>>>>>> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic")
>>>>>> >> >>>>>
>>>>>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context,
>>>>>> >> >>>>>      PreferConsistent,
>>>>>> >> >>>>>      Subscribe[K, V](Seq("my-topic") asJavaCollection,
>>>>>> >> >>>>>        kafkaParams,
>>>>>> >> >>>>>        offsetRanges))
>>>>>> >> >>>>>
>>>>>> >> >>>>> stream.foreachRDD { rdd =>
>>>>>> >> >>>>>    val offsetRanges =
>>>>>> >> >>>>> rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>>>> >> >>>>>
>>>>>> >> >>>>>    // Transform our data
>>>>>> >> >>>>>   rdd.foreachPartition { events =>
>>>>>> >> >>>>>       // Establish a consumer in the executor so we can
>>>>>> >> >>>>> update
>>>>>> >> >>>>> offsets
>>>>>> >> >>>>> after each partition.
>>>>>> >> >>>>>       // This class is homegrown and uses the KafkaConsumer
>>>>>> >> >>>>> to help
>>>>>> >> >>>>> get/set
>>>>>> >> >>>>> offsets
>>>>>> >> >>>>>       val consumer = new ConsumerUtils(kafkaParams)
>>>>>> >> >>>>>       // do something with our data
>>>>>> >> >>>>>
>>>>>> >> >>>>>       // Write the offsets that were updated in this
>>>>>> >> >>>>> partition
>>>>>> >> >>>>>       kafkaConsumer.setConsumerOffsets("processor-group",
>>>>>> >> >>>>>          Map(TopicAndPartition(tp.topic, tp.partition) ->
>>>>>> >> >>>>> endOffset))
>>>>>> >> >>>>>   }
>>>>>> >> >>>>> }
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>> --
>>>>>> >> >>>>> View this message in context:
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html
>>>>>> >> >>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> >> >>>>> Nabble.com.
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>> ---------------------------------------------------------------------
>>>>>> >> >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>> >> >>>>>
>>>>>> >> >>>
>>>>>> >> >>>
>>>>>> >> >>
>>>>>> >> >>
>>>>>> >> >> ---------------------------------------------------------------------
>>>>>> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>>> >> >>
>>>>>> >> >
>>>>>> >
>>>>>> >
>>>>>
>>>>>
>>>>
>>
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to