You should not be getting consumer churn on executors at all, that's the whole point of the cache. How many partitions are you trying to process per executor?
http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#locationstrategies gives instructions on the default size of the cache and how to increase it. On Sat, Nov 12, 2016 at 2:15 PM, Ivan von Nagy <i...@vadio.com> wrote: > Hi Sean, > > Thanks for responding. We have run our jobs with internal parallel > processing for well over a year (Spark 1.5, 1.6 and Kafka 0.8.2.2.) and did > not encounter any of these issues until upgrading to Spark 2.0.1 and Kafka > 0.10 clients. If we process serially, then we sometimes get the errors, but > far less often. Also, if done sequentially it takes sometimes more the 2x as > long which is not an option for this particular job. > > I posted another example on Nov 10th which is the example below. We > basically iterate through a list in parallel and sometimes the list could be > upwards of a hundred elements. The parallelism in Scala/Spark limits to > about 8 at a time on our nodes. For performance reasons we process in > parallel and we also separate each since each channel has their own topic. > We don't combine all into one KafkaRDD because that means we have to process > all or nothing if an error occurs. This way if a couple of channels fail, we > can re-run the job and it will only process those channels. > > This has just been perplexing since we had never encountered any errors for > well over a year using the prior versions. At this time, I am just seeking > any configuration options or code changes that we may be missing or even at > a lower level, fundamentally what changed in Spark 2 and Kafka 0.10 that > surfaced these issues. > > We continue to use Spark 1.6 with the Kafka 0.8.x clients until this can be > figured out, however, it is a deal breaker for use to upgrade to Spark 2.x > with Kafka 0.10 clients. On a side note, we have not encountered any issues > with the Kafka Producers, this is simply with the KafkaRDD and its use of > CachedKafkaConsumer. Any help is much appreciated. > > Thanks, > > Ivan > > Example usage with KafkaRDD: > val channels = Seq("channel1", "channel2") > > channels.toParArray.foreach { channel => > val consumer = new KafkaConsumer[K, V](kafkaParams.asJava) > > // Get offsets for the given topic and the consumer group "$prefix-$topic" > val offsetRanges = getOffsets(s"$prefix-$topic", channel) > > val ds = KafkaUtils.createRDD[K, V](context, > kafkaParams asJava, > offsetRanges, > PreferConsistent).toDS[V] > > // Do some aggregations > ds.agg(...) > // Save the data > ds.write.mode(SaveMode.Append).parquet(somePath) > // Save offsets using a KafkaConsumer > consumer.commitSync(newOffsets.asJava) > consumer.close() > } > > On Sat, Nov 12, 2016 at 11:46 AM, Sean McKibben <grap...@graphex.com> wrote: >> >> How are you iterating through your RDDs in parallel? In the past (Spark >> 1.5.2) when I've had actions being performed on multiple RDDs concurrently >> using futures, I've encountered some pretty bad behavior in Spark, >> especially during job retries. Very difficult to explain things, like >> records from one RDD leaking into a totally different (though shared >> lineage) RDD during job retries. I'm not sure what documentation exists >> around parallelizing on top of Spark's existing parallelization approach, >> but I would venture a guess that that could be the source of your concurrent >> access problems, and potentially other hidden issues. Have you tried a >> version of your app which doesn't parallelize actions on RDDs, but instead >> serially processes each RDD? I'm sure it isn't ideal performance-wise, but >> it seems like a good choice for an A/B test. >> >> The poll.ms issue could very well be settings or capability of your kafka >> cluster. I think other (non-Spark) approaches may have less consumer churn >> and be less susceptible to things like GC pauses or coordination latency. It >> could also be that the number of consumers being simultaneously created on >> each executor causes a thundering herd problem during initial phases (which >> then causes job retries, which then causes more consumer churn, etc.). >> >> Sean >> >> >> >> On Nov 12, 2016, at 11:14 AM, Ivan von Nagy <i...@vadio.com> wrote: >> >> The code was changed to use a unique group for each KafkaRDD that was >> created (see Nov 10 post). There is no KafkaRDD being reused. The basic >> logic (see Nov 10 post for example) is get a list of channels, iterate >> through them in parallel, load a KafkaRDD using a given topic and a consumer >> group that is made from the topic (each RDD uses a different topic and >> group), process the data and write to Parquet files. >> >> Per my Nov 10th post, we still get polling timeouts unless the poll.ms is >> set to something like 10 seconds. We also get concurrent modification >> exceptions as well. I believe the key here is the processing of data in >> parallel is where we encounter issues so we are looking for some possible >> answers surrounding this. >> >> Thanks, >> >> Ivan >> >> >> On Fri, Nov 11, 2016 at 12:12 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >>> >>> It is already documented that you must use a different group id, which as >>> far as I can tell you are still not doing. >>> >>> >>> On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <shixi...@databricks.com> >>> wrote: >>>> >>>> Yeah, the KafkaRDD cannot be reused. It's better to document it. >>>> >>>> On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <i...@vadio.com> wrote: >>>>> >>>>> Ok, I have split he KafkaRDD logic to each use their own group and >>>>> bumped the poll.ms to 10 seconds. Anything less then 2 seconds on the >>>>> poll.ms ends up with a timeout and exception so I am still perplexed on >>>>> that >>>>> one. The new error I am getting now is a `ConcurrentModificationException` >>>>> when Spark is trying to remove the CachedKafkaConsumer. >>>>> >>>>> java.util.ConcurrentModificationException: KafkaConsumer is not safe >>>>> for multi-threaded access >>>>> at >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) >>>>> at >>>>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1361) >>>>> at >>>>> org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$anon$1.removeEldestEntry(CachedKafkaConsumer.scala:128) >>>>> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) >>>>> >>>>> Here is the basic logic: >>>>> >>>>> Using KafkaRDD - This takes a list of channels and processes them in >>>>> parallel using the KafkaRDD directly. They each use a distinct consumer >>>>> group (s"$prefix-$topic"), and each has it's own topic and each topic has >>>>> 4 >>>>> partitions. We routinely get timeout errors when polling for data when the >>>>> poll.ms is less then 2 seconds. This occurs whether we process in >>>>> parallel. >>>>> >>>>> Example usage with KafkaRDD: >>>>> val channels = Seq("channel1", "channel2") >>>>> >>>>> channels.toParArray.foreach { channel => >>>>> val consumer = new KafkaConsumer[K, V](kafkaParams.asJava) >>>>> >>>>> // Get offsets for the given topic and the consumer group >>>>> "$prefix-$topic" >>>>> val offsetRanges = getOffsets(s"$prefix-$topic", channel) >>>>> >>>>> val ds = KafkaUtils.createRDD[K, V](context, >>>>> kafkaParams asJava, >>>>> offsetRanges, >>>>> PreferConsistent).toDS[V] >>>>> >>>>> // Do some aggregations >>>>> ds.agg(...) >>>>> // Save the data >>>>> ds.write.mode(SaveMode.Append).parquet(somePath) >>>>> // Save offsets using a KafkaConsumer >>>>> consumer.commitSync(newOffsets.asJava) >>>>> consumer.close() >>>>> } >>>>> >>>>> I am not sure why the concurrent issue is there as I have tried to >>>>> debug and also looked at the KafkaConsumer code as well, but everything >>>>> looks like it should not occur. The things to figure out is why when >>>>> running >>>>> in parallel does this occur and also why the timeouts still occur. >>>>> >>>>> Thanks, >>>>> >>>>> Ivan >>>>> >>>>> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>>> >>>>>> There definitely is Kafka documentation indicating that you should use >>>>>> a different consumer group for logically different subscribers, this >>>>>> is really basic to Kafka: >>>>>> >>>>>> http://kafka.apache.org/documentation#intro_consumers >>>>>> >>>>>> As for your comment that "commit async after each RDD, which is not >>>>>> really viable also", how is it not viable? Again, committing offsets >>>>>> to Kafka doesn't give you reliable delivery semantics unless your >>>>>> downstream data store is idempotent. If your downstream data store is >>>>>> idempotent, then it shouldn't matter to you when offset commits >>>>>> happen, as long as they happen within a reasonable time after the data >>>>>> is written. >>>>>> >>>>>> Do you want to keep arguing with me, or follow my advice and proceed >>>>>> with debugging any remaining issues after you make the changes I >>>>>> suggested? >>>>>> >>>>>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <i...@vadio.com> wrote: >>>>>> > With our stream version, we update the offsets for only the >>>>>> > partition we >>>>>> > operating on. We even break down the partition into smaller batches >>>>>> > and then >>>>>> > update the offsets after each batch within the partition. With Spark >>>>>> > 1.6 and >>>>>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is >>>>>> > not >>>>>> > necessarily a Spark issue since Kafka no longer allows you to simply >>>>>> > update >>>>>> > the offsets for a given consumer group. You have to subscribe or >>>>>> > assign >>>>>> > partitions to even do so. >>>>>> > >>>>>> > As for storing the offsets in some other place like a DB, it don't >>>>>> > find this >>>>>> > useful because you then can't use tools like Kafka Manager. In order >>>>>> > to do >>>>>> > so you would have to store in a DB and the circle back and update >>>>>> > Kafka >>>>>> > afterwards. This means you have to keep two sources in sync which is >>>>>> > not >>>>>> > really a good idea. >>>>>> > >>>>>> > It is a challenge in Spark to use the Kafka offsets since the drive >>>>>> > keeps >>>>>> > subscribed to the topic(s) and consumer group, while the executors >>>>>> > prepend >>>>>> > "spark-executor-" to the consumer group. The stream (driver) does >>>>>> > allow you >>>>>> > to commit async after each RDD, which is not really viable also. I >>>>>> > have not >>>>>> > of implementing an Akka actor system on the driver and send it >>>>>> > messages from >>>>>> > the executor code to update the offsets, but then that is >>>>>> > asynchronous as >>>>>> > well so not really a good solution. >>>>>> > >>>>>> > I have no idea why Kafka made this change and also why in the >>>>>> > parallel >>>>>> > KafkaRDD application we would be advised to use different consumer >>>>>> > groups >>>>>> > for each RDD. That seems strange to me that different consumer >>>>>> > groups would >>>>>> > be required or advised. There is no Kafka documentation that I know >>>>>> > if that >>>>>> > states this. The biggest issue I see with the parallel KafkaRDD is >>>>>> > the >>>>>> > timeouts. I have tried to set poll.ms to 30 seconds and still get >>>>>> > the issue. >>>>>> > Something is not right here and just not seem right. As I mentioned >>>>>> > with the >>>>>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw >>>>>> > this >>>>>> > issue. We have been running the same basic logic for over a year now >>>>>> > without >>>>>> > one hitch at all. >>>>>> > >>>>>> > Ivan >>>>>> > >>>>>> > >>>>>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <c...@koeninger.org> >>>>>> > wrote: >>>>>> >> >>>>>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in >>>>>> >> general, not just kafka) have been progressing on to the next batch >>>>>> >> after a given batch aborts for quite some time now. Yet another >>>>>> >> reason I put offsets in my database transactionally. My jobs throw >>>>>> >> exceptions if the offset in the DB isn't what I expected it to be. >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <grap...@graphex.com> >>>>>> >> wrote: >>>>>> >> > I've been encountering the same kinds of timeout issues as Ivan, >>>>>> >> > using >>>>>> >> > the "Kafka Stream" approach that he is using, except I'm storing >>>>>> >> > my offsets >>>>>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I >>>>>> >> > haven't yet >>>>>> >> > implemented the KafkaRDD approach, and therefore don't have the >>>>>> >> > concurrency >>>>>> >> > issues, but a very similar use case is coming up for me soon, >>>>>> >> > it's just been >>>>>> >> > backburnered until I can get streaming to be more reliable (I >>>>>> >> > will >>>>>> >> > definitely ensure unique group IDs when I do). Offset commits are >>>>>> >> > certainly >>>>>> >> > more painful in Kafka 0.10, and that doesn't have anything to do >>>>>> >> > with Spark. >>>>>> >> > >>>>>> >> > While i may be able to alleviate the timeout by just increasing >>>>>> >> > it, I've >>>>>> >> > noticed something else that is more worrying: When one task fails >>>>>> >> > 4 times in >>>>>> >> > a row (i.e. "Failed to get records for _ after polling for _"), >>>>>> >> > Spark aborts >>>>>> >> > the Stage and Job with "Job aborted due to stage failure: Task _ >>>>>> >> > in stage _ >>>>>> >> > failed 4 times". That's fine, and it's the behavior I want, but >>>>>> >> > instead of >>>>>> >> > stopping the Application there (as previous versions of Spark >>>>>> >> > did) the next >>>>>> >> > microbatch marches on and offsets are committed ahead of the >>>>>> >> > failed >>>>>> >> > microbatch. Suddenly my at-least-once app becomes more >>>>>> >> > sometimes-at-least-once which is no good. In order for spark to >>>>>> >> > display that >>>>>> >> > failure, I must be propagating the errors up to Spark, but the >>>>>> >> > behavior of >>>>>> >> > marching forward with the next microbatch seems to be new, and a >>>>>> >> > big >>>>>> >> > potential for data loss in streaming applications. >>>>>> >> > >>>>>> >> > Am I perhaps missing a setting to stop the entire streaming >>>>>> >> > application >>>>>> >> > once spark.task.maxFailures is reached? Has anyone else seen this >>>>>> >> > behavior >>>>>> >> > of a streaming application skipping over failed microbatches? >>>>>> >> > >>>>>> >> > Thanks, >>>>>> >> > Sean >>>>>> >> > >>>>>> >> > >>>>>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <c...@koeninger.org> >>>>>> >> >> wrote: >>>>>> >> >> >>>>>> >> >> So basically what I am saying is >>>>>> >> >> >>>>>> >> >> - increase poll.ms >>>>>> >> >> - use a separate group id everywhere >>>>>> >> >> - stop committing offsets under the covers >>>>>> >> >> >>>>>> >> >> That should eliminate all of those as possible causes, and then >>>>>> >> >> we can >>>>>> >> >> see if there are still issues. >>>>>> >> >> >>>>>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or >>>>>> >> >> subscribe to a topic in order to update offsets, Kafka does. If >>>>>> >> >> you >>>>>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple >>>>>> >> >> consumer api should be usable with later brokers. As long as >>>>>> >> >> you >>>>>> >> >> don't need SSL or dynamic subscriptions, and it meets your >>>>>> >> >> needs, keep >>>>>> >> >> using it. >>>>>> >> >> >>>>>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <i...@vadio.com> >>>>>> >> >> wrote: >>>>>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but >>>>>> >> >>> each RDD >>>>>> >> >>> uses a >>>>>> >> >>> single distinct topic. For example, the group would be >>>>>> >> >>> something like >>>>>> >> >>> "storage-group", and the topics would be "storage-channel1", >>>>>> >> >>> and >>>>>> >> >>> "storage-channel2". In each thread a KafkaConsumer is started, >>>>>> >> >>> assigned the >>>>>> >> >>> partitions assigned, and then commit offsets are called after >>>>>> >> >>> the RDD >>>>>> >> >>> is >>>>>> >> >>> processed. This should not interfere with the consumer group >>>>>> >> >>> used by >>>>>> >> >>> the >>>>>> >> >>> executors which would be "spark-executor-storage-group". >>>>>> >> >>> >>>>>> >> >>> In the streaming example there is a single topic >>>>>> >> >>> ("client-events") and >>>>>> >> >>> group >>>>>> >> >>> ("processing-group"). A single stream is created and offsets >>>>>> >> >>> are >>>>>> >> >>> manually >>>>>> >> >>> updated from the executor after each partition is handled. This >>>>>> >> >>> was a >>>>>> >> >>> challenge since Spark now requires one to assign or subscribe >>>>>> >> >>> to a >>>>>> >> >>> topic in >>>>>> >> >>> order to even update the offsets. In 0.8.2.x you did not have >>>>>> >> >>> to worry >>>>>> >> >>> about >>>>>> >> >>> that. This approach limits your exposure to duplicate data >>>>>> >> >>> since >>>>>> >> >>> idempotent >>>>>> >> >>> records are not entirely possible in our scenario. At least >>>>>> >> >>> without a >>>>>> >> >>> lot of >>>>>> >> >>> re-running of logic to de-dup. >>>>>> >> >>> >>>>>> >> >>> Thanks, >>>>>> >> >>> >>>>>> >> >>> Ivan >>>>>> >> >>> >>>>>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger >>>>>> >> >>> <c...@koeninger.org> >>>>>> >> >>> wrote: >>>>>> >> >>>> >>>>>> >> >>>> So just to be clear, the answers to my questions are >>>>>> >> >>>> >>>>>> >> >>>> - you are not using different group ids, you're using the same >>>>>> >> >>>> group >>>>>> >> >>>> id everywhere >>>>>> >> >>>> >>>>>> >> >>>> - you are committing offsets manually >>>>>> >> >>>> >>>>>> >> >>>> Right? >>>>>> >> >>>> >>>>>> >> >>>> If you want to eliminate network or kafka misbehavior as a >>>>>> >> >>>> source, >>>>>> >> >>>> tune poll.ms upwards even higher. >>>>>> >> >>>> >>>>>> >> >>>> You must use different group ids for different rdds or >>>>>> >> >>>> streams. >>>>>> >> >>>> Kafka consumers won't behave the way you expect if they are >>>>>> >> >>>> all in >>>>>> >> >>>> the >>>>>> >> >>>> same group id, and the consumer cache is keyed by group id. >>>>>> >> >>>> Yes, the >>>>>> >> >>>> executor will tack "spark-executor-" on to the beginning, but >>>>>> >> >>>> if you >>>>>> >> >>>> give it the same base group id, it will be the same. And the >>>>>> >> >>>> driver >>>>>> >> >>>> will use the group id you gave it, unmodified. >>>>>> >> >>>> >>>>>> >> >>>> Finally, I really can't help you if you're manually writing >>>>>> >> >>>> your own >>>>>> >> >>>> code to commit offsets directly to Kafka. Trying to minimize >>>>>> >> >>>> duplicates that way doesn't really make sense, your system >>>>>> >> >>>> must be >>>>>> >> >>>> able to handle duplicates if you're using kafka as an offsets >>>>>> >> >>>> store, >>>>>> >> >>>> it can't do transactional exactly once. >>>>>> >> >>>> >>>>>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <i...@vadio.com> >>>>>> >> >>>> wrote: >>>>>> >> >>>>> Here are some examples and details of the scenarios. The >>>>>> >> >>>>> KafkaRDD is >>>>>> >> >>>>> the >>>>>> >> >>>>> most >>>>>> >> >>>>> error prone to polling >>>>>> >> >>>>> timeouts and concurrentm modification errors. >>>>>> >> >>>>> >>>>>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and >>>>>> >> >>>>> processes them >>>>>> >> >>>>> in >>>>>> >> >>>>> parallel using the KafkaRDD directly. they all use the same >>>>>> >> >>>>> consumer >>>>>> >> >>>>> group >>>>>> >> >>>>> ('storage-group'), but each has it's own topic and each topic >>>>>> >> >>>>> has 4 >>>>>> >> >>>>> partitions. We routinely get timeout errors when polling for >>>>>> >> >>>>> data. >>>>>> >> >>>>> This >>>>>> >> >>>>> occurs whether we process in parallel or sequentially. >>>>>> >> >>>>> >>>>>> >> >>>>> *Spark Kafka setting:* >>>>>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000 >>>>>> >> >>>>> >>>>>> >> >>>>> *Kafka Consumer Params:* >>>>>> >> >>>>> metric.reporters = [] >>>>>> >> >>>>> metadata.max.age.ms = 300000 >>>>>> >> >>>>> partition.assignment.strategy = >>>>>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor] >>>>>> >> >>>>> reconnect.backoff.ms = 50 >>>>>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>>>>> >> >>>>> max.partition.fetch.bytes = 1048576 >>>>>> >> >>>>> bootstrap.servers = [somemachine:31000] >>>>>> >> >>>>> ssl.keystore.type = JKS >>>>>> >> >>>>> enable.auto.commit = false >>>>>> >> >>>>> sasl.mechanism = GSSAPI >>>>>> >> >>>>> interceptor.classes = null >>>>>> >> >>>>> exclude.internal.topics = true >>>>>> >> >>>>> ssl.truststore.password = null >>>>>> >> >>>>> client.id = >>>>>> >> >>>>> ssl.endpoint.identification.algorithm = null >>>>>> >> >>>>> max.poll.records = 1000 >>>>>> >> >>>>> check.crcs = true >>>>>> >> >>>>> request.timeout.ms = 40000 >>>>>> >> >>>>> heartbeat.interval.ms = 3000 >>>>>> >> >>>>> auto.commit.interval.ms = 5000 >>>>>> >> >>>>> receive.buffer.bytes = 65536 >>>>>> >> >>>>> ssl.truststore.type = JKS >>>>>> >> >>>>> ssl.truststore.location = null >>>>>> >> >>>>> ssl.keystore.password = null >>>>>> >> >>>>> fetch.min.bytes = 1 >>>>>> >> >>>>> send.buffer.bytes = 131072 >>>>>> >> >>>>> value.deserializer = class >>>>>> >> >>>>> >>>>>> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeserializer >>>>>> >> >>>>> group.id = storage-group >>>>>> >> >>>>> retry.backoff.ms = 100 >>>>>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>>>>> >> >>>>> sasl.kerberos.service.name = null >>>>>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>>>>> >> >>>>> ssl.trustmanager.algorithm = PKIX >>>>>> >> >>>>> ssl.key.password = null >>>>>> >> >>>>> fetch.max.wait.ms = 500 >>>>>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000 >>>>>> >> >>>>> connections.max.idle.ms = 540000 >>>>>> >> >>>>> session.timeout.ms = 30000 >>>>>> >> >>>>> metrics.num.samples = 2 >>>>>> >> >>>>> key.deserializer = class >>>>>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer >>>>>> >> >>>>> ssl.protocol = TLS >>>>>> >> >>>>> ssl.provider = null >>>>>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>>>>> >> >>>>> ssl.keystore.location = null >>>>>> >> >>>>> ssl.cipher.suites = null >>>>>> >> >>>>> security.protocol = PLAINTEXT >>>>>> >> >>>>> ssl.keymanager.algorithm = SunX509 >>>>>> >> >>>>> metrics.sample.window.ms = 30000 >>>>>> >> >>>>> auto.offset.reset = earliest >>>>>> >> >>>>> >>>>>> >> >>>>> *Example usage with KafkaRDD:* >>>>>> >> >>>>> val channels = Seq("channel1", "channel2") >>>>>> >> >>>>> >>>>>> >> >>>>> channels.toParArray.foreach { channel => >>>>>> >> >>>>> val consumer = new KafkaConsumer[K, V](kafkaParams.asJava) >>>>>> >> >>>>> >>>>>> >> >>>>> // Get offsets for the given topic and the consumer group >>>>>> >> >>>>> 'storage-group' >>>>>> >> >>>>> val offsetRanges = getOffsets("storage-group", channel) >>>>>> >> >>>>> >>>>>> >> >>>>> val ds = KafkaUtils.createRDD[K, V](context, >>>>>> >> >>>>> kafkaParams asJava, >>>>>> >> >>>>> offsetRanges, >>>>>> >> >>>>> PreferConsistent).toDS[V] >>>>>> >> >>>>> >>>>>> >> >>>>> // Do some aggregations >>>>>> >> >>>>> ds.agg(...) >>>>>> >> >>>>> // Save the data >>>>>> >> >>>>> ds.write.mode(SaveMode.Append).parquet(somePath) >>>>>> >> >>>>> // Save offsets using a KafkaConsumer >>>>>> >> >>>>> consumer.commitSync(newOffsets.asJava) >>>>>> >> >>>>> consumer.close() >>>>>> >> >>>>> } >>>>>> >> >>>>> >>>>>> >> >>>>> >>>>>> >> >>>>> *Example usage with Kafka Stream:* >>>>>> >> >>>>> This creates a stream and processes events in each partition. >>>>>> >> >>>>> At the >>>>>> >> >>>>> end >>>>>> >> >>>>> of >>>>>> >> >>>>> processing for >>>>>> >> >>>>> each partition, we updated the offsets for each partition. >>>>>> >> >>>>> This is >>>>>> >> >>>>> challenging to do, but is better >>>>>> >> >>>>> then calling commitAysnc on the stream, because that occurs >>>>>> >> >>>>> after >>>>>> >> >>>>> the >>>>>> >> >>>>> /entire/ RDD has been >>>>>> >> >>>>> processed. This method minimizes duplicates in an exactly >>>>>> >> >>>>> once >>>>>> >> >>>>> environment. >>>>>> >> >>>>> Since the executors >>>>>> >> >>>>> use their own custom group "spark-executor-processor-group" >>>>>> >> >>>>> and the >>>>>> >> >>>>> commit >>>>>> >> >>>>> is buried in private >>>>>> >> >>>>> functions we are unable to use the executors cached consumer >>>>>> >> >>>>> to >>>>>> >> >>>>> update >>>>>> >> >>>>> the >>>>>> >> >>>>> offsets. This requires us >>>>>> >> >>>>> to go through multiple steps to update the Kafka offsets >>>>>> >> >>>>> accordingly. >>>>>> >> >>>>> >>>>>> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic") >>>>>> >> >>>>> >>>>>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context, >>>>>> >> >>>>> PreferConsistent, >>>>>> >> >>>>> Subscribe[K, V](Seq("my-topic") asJavaCollection, >>>>>> >> >>>>> kafkaParams, >>>>>> >> >>>>> offsetRanges)) >>>>>> >> >>>>> >>>>>> >> >>>>> stream.foreachRDD { rdd => >>>>>> >> >>>>> val offsetRanges = >>>>>> >> >>>>> rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>>>>> >> >>>>> >>>>>> >> >>>>> // Transform our data >>>>>> >> >>>>> rdd.foreachPartition { events => >>>>>> >> >>>>> // Establish a consumer in the executor so we can >>>>>> >> >>>>> update >>>>>> >> >>>>> offsets >>>>>> >> >>>>> after each partition. >>>>>> >> >>>>> // This class is homegrown and uses the KafkaConsumer >>>>>> >> >>>>> to help >>>>>> >> >>>>> get/set >>>>>> >> >>>>> offsets >>>>>> >> >>>>> val consumer = new ConsumerUtils(kafkaParams) >>>>>> >> >>>>> // do something with our data >>>>>> >> >>>>> >>>>>> >> >>>>> // Write the offsets that were updated in this >>>>>> >> >>>>> partition >>>>>> >> >>>>> kafkaConsumer.setConsumerOffsets("processor-group", >>>>>> >> >>>>> Map(TopicAndPartition(tp.topic, tp.partition) -> >>>>>> >> >>>>> endOffset)) >>>>>> >> >>>>> } >>>>>> >> >>>>> } >>>>>> >> >>>>> >>>>>> >> >>>>> >>>>>> >> >>>>> >>>>>> >> >>>>> -- >>>>>> >> >>>>> View this message in context: >>>>>> >> >>>>> >>>>>> >> >>>>> >>>>>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instability-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html >>>>>> >> >>>>> Sent from the Apache Spark User List mailing list archive at >>>>>> >> >>>>> Nabble.com. >>>>>> >> >>>>> >>>>>> >> >>>>> >>>>>> >> >>>>> >>>>>> >> >>>>> --------------------------------------------------------------------- >>>>>> >> >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>> >> >>>>> >>>>>> >> >>> >>>>>> >> >>> >>>>>> >> >> >>>>>> >> >> >>>>>> >> >> --------------------------------------------------------------------- >>>>>> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>>>>> >> >> >>>>>> >> > >>>>>> > >>>>>> > >>>>> >>>>> >>>> >> >> > --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org