It is already documented that you must use a different group id, which as far as I can tell you are still not doing.
On Nov 10, 2016 7:43 PM, "Shixiong(Ryan) Zhu" <shixi...@databricks.com> wrote: > Yeah, the KafkaRDD cannot be reused. It's better to document it. > > On Thu, Nov 10, 2016 at 8:26 AM, Ivan von Nagy <i...@vadio.com> wrote: > >> Ok, I have split he KafkaRDD logic to each use their own group and bumped >> the poll.ms to 10 seconds. Anything less then 2 seconds on the poll.ms >> ends up with a timeout and exception so I am still perplexed on that one. >> The new error I am getting now is a `ConcurrentModificationException` >> when Spark is trying to remove the CachedKafkaConsumer. >> >> java.util.ConcurrentModificationException: KafkaConsumer is not safe for >> multi-threaded access >> at org.apache.kafka.clients.consumer.KafkaConsumer.acquire( >> KafkaConsumer.java:1431) >> at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaC >> onsumer.java:1361) >> at org.apache.spark.streaming.kafka010.CachedKafkaConsumer$$ano >> n$1.removeEldestEntry(CachedKafkaConsumer.scala:128) >> at java.util.LinkedHashMap.afterNodeInsertion(LinkedHashMap.java:299) >> >> Here is the basic logic: >> >> *Using KafkaRDD* - This takes a list of channels and processes them in >> parallel using the KafkaRDD directly. They each use a distinct consumer >> group (s"$prefix-$topic"), and each has it's own topic and each topic >> has 4 partitions. We routinely get timeout errors when polling for data >> when the poll.ms is less then 2 seconds. This occurs whether we process >> in parallel. >> >> *Example usage with KafkaRDD:* >> val channels = Seq("channel1", "channel2") >> >> channels.toParArray.foreach { channel => >> val consumer = new KafkaConsumer[K, V](kafkaParams.asJava) >> >> // Get offsets for the given topic and the consumer group "$prefix-$ >> topic" >> val offsetRanges = getOffsets(s"$prefix-$topic", channel) >> >> val ds = KafkaUtils.createRDD[K, V](context, >> kafkaParams asJava, >> offsetRanges, >> PreferConsistent).toDS[V] >> >> // Do some aggregations >> ds.agg(...) >> // Save the data >> ds.write.mode(SaveMode.Append).parquet(somePath) >> // Save offsets using a KafkaConsumer >> consumer.commitSync(newOffsets.asJava) >> consumer.close() >> } >> >> I am not sure why the concurrent issue is there as I have tried to debug >> and also looked at the KafkaConsumer code as well, but everything looks >> like it should not occur. The things to figure out is why when running in >> parallel does this occur and also why the timeouts still occur. >> >> Thanks, >> >> Ivan >> >> On Mon, Nov 7, 2016 at 11:55 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> There definitely is Kafka documentation indicating that you should use >>> a different consumer group for logically different subscribers, this >>> is really basic to Kafka: >>> >>> http://kafka.apache.org/documentation#intro_consumers >>> >>> As for your comment that "commit async after each RDD, which is not >>> really viable also", how is it not viable? Again, committing offsets >>> to Kafka doesn't give you reliable delivery semantics unless your >>> downstream data store is idempotent. If your downstream data store is >>> idempotent, then it shouldn't matter to you when offset commits >>> happen, as long as they happen within a reasonable time after the data >>> is written. >>> >>> Do you want to keep arguing with me, or follow my advice and proceed >>> with debugging any remaining issues after you make the changes I >>> suggested? >>> >>> On Mon, Nov 7, 2016 at 1:35 PM, Ivan von Nagy <i...@vadio.com> wrote: >>> > With our stream version, we update the offsets for only the partition >>> we >>> > operating on. We even break down the partition into smaller batches >>> and then >>> > update the offsets after each batch within the partition. With Spark >>> 1.6 and >>> > Kafka 0.8.x this was not an issue, and as Sean pointed out, this is not >>> > necessarily a Spark issue since Kafka no longer allows you to simply >>> update >>> > the offsets for a given consumer group. You have to subscribe or assign >>> > partitions to even do so. >>> > >>> > As for storing the offsets in some other place like a DB, it don't >>> find this >>> > useful because you then can't use tools like Kafka Manager. In order >>> to do >>> > so you would have to store in a DB and the circle back and update Kafka >>> > afterwards. This means you have to keep two sources in sync which is >>> not >>> > really a good idea. >>> > >>> > It is a challenge in Spark to use the Kafka offsets since the drive >>> keeps >>> > subscribed to the topic(s) and consumer group, while the executors >>> prepend >>> > "spark-executor-" to the consumer group. The stream (driver) does >>> allow you >>> > to commit async after each RDD, which is not really viable also. I >>> have not >>> > of implementing an Akka actor system on the driver and send it >>> messages from >>> > the executor code to update the offsets, but then that is asynchronous >>> as >>> > well so not really a good solution. >>> > >>> > I have no idea why Kafka made this change and also why in the parallel >>> > KafkaRDD application we would be advised to use different consumer >>> groups >>> > for each RDD. That seems strange to me that different consumer groups >>> would >>> > be required or advised. There is no Kafka documentation that I know if >>> that >>> > states this. The biggest issue I see with the parallel KafkaRDD is the >>> > timeouts. I have tried to set poll.ms to 30 seconds and still get the >>> issue. >>> > Something is not right here and just not seem right. As I mentioned >>> with the >>> > streaming application, with Spark 1.6 and Kafka 0.8.x we never saw this >>> > issue. We have been running the same basic logic for over a year now >>> without >>> > one hitch at all. >>> > >>> > Ivan >>> > >>> > >>> > On Mon, Nov 7, 2016 at 11:16 AM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >> >>> >> Someone can correct me, but I'm pretty sure Spark dstreams (in >>> >> general, not just kafka) have been progressing on to the next batch >>> >> after a given batch aborts for quite some time now. Yet another >>> >> reason I put offsets in my database transactionally. My jobs throw >>> >> exceptions if the offset in the DB isn't what I expected it to be. >>> >> >>> >> >>> >> >>> >> >>> >> On Mon, Nov 7, 2016 at 1:08 PM, Sean McKibben <grap...@graphex.com> >>> wrote: >>> >> > I've been encountering the same kinds of timeout issues as Ivan, >>> using >>> >> > the "Kafka Stream" approach that he is using, except I'm storing my >>> offsets >>> >> > manually from the driver to Zookeeper in the Kafka 8 format. I >>> haven't yet >>> >> > implemented the KafkaRDD approach, and therefore don't have the >>> concurrency >>> >> > issues, but a very similar use case is coming up for me soon, it's >>> just been >>> >> > backburnered until I can get streaming to be more reliable (I will >>> >> > definitely ensure unique group IDs when I do). Offset commits are >>> certainly >>> >> > more painful in Kafka 0.10, and that doesn't have anything to do >>> with Spark. >>> >> > >>> >> > While i may be able to alleviate the timeout by just increasing it, >>> I've >>> >> > noticed something else that is more worrying: When one task fails 4 >>> times in >>> >> > a row (i.e. "Failed to get records for _ after polling for _"), >>> Spark aborts >>> >> > the Stage and Job with "Job aborted due to stage failure: Task _ in >>> stage _ >>> >> > failed 4 times". That's fine, and it's the behavior I want, but >>> instead of >>> >> > stopping the Application there (as previous versions of Spark did) >>> the next >>> >> > microbatch marches on and offsets are committed ahead of the failed >>> >> > microbatch. Suddenly my at-least-once app becomes more >>> >> > sometimes-at-least-once which is no good. In order for spark to >>> display that >>> >> > failure, I must be propagating the errors up to Spark, but the >>> behavior of >>> >> > marching forward with the next microbatch seems to be new, and a big >>> >> > potential for data loss in streaming applications. >>> >> > >>> >> > Am I perhaps missing a setting to stop the entire streaming >>> application >>> >> > once spark.task.maxFailures is reached? Has anyone else seen this >>> behavior >>> >> > of a streaming application skipping over failed microbatches? >>> >> > >>> >> > Thanks, >>> >> > Sean >>> >> > >>> >> > >>> >> >> On Nov 4, 2016, at 2:48 PM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >> >> >>> >> >> So basically what I am saying is >>> >> >> >>> >> >> - increase poll.ms >>> >> >> - use a separate group id everywhere >>> >> >> - stop committing offsets under the covers >>> >> >> >>> >> >> That should eliminate all of those as possible causes, and then we >>> can >>> >> >> see if there are still issues. >>> >> >> >>> >> >> As far as 0.8 vs 0.10, Spark doesn't require you to assign or >>> >> >> subscribe to a topic in order to update offsets, Kafka does. If >>> you >>> >> >> don't like the new Kafka consumer api, the existing 0.8 simple >>> >> >> consumer api should be usable with later brokers. As long as you >>> >> >> don't need SSL or dynamic subscriptions, and it meets your needs, >>> keep >>> >> >> using it. >>> >> >> >>> >> >> On Fri, Nov 4, 2016 at 3:37 PM, Ivan von Nagy <i...@vadio.com> >>> wrote: >>> >> >>> Yes, the parallel KafkaRDD uses the same consumer group, but each >>> RDD >>> >> >>> uses a >>> >> >>> single distinct topic. For example, the group would be something >>> like >>> >> >>> "storage-group", and the topics would be "storage-channel1", and >>> >> >>> "storage-channel2". In each thread a KafkaConsumer is started, >>> >> >>> assigned the >>> >> >>> partitions assigned, and then commit offsets are called after the >>> RDD >>> >> >>> is >>> >> >>> processed. This should not interfere with the consumer group used >>> by >>> >> >>> the >>> >> >>> executors which would be "spark-executor-storage-group". >>> >> >>> >>> >> >>> In the streaming example there is a single topic >>> ("client-events") and >>> >> >>> group >>> >> >>> ("processing-group"). A single stream is created and offsets are >>> >> >>> manually >>> >> >>> updated from the executor after each partition is handled. This >>> was a >>> >> >>> challenge since Spark now requires one to assign or subscribe to a >>> >> >>> topic in >>> >> >>> order to even update the offsets. In 0.8.2.x you did not have to >>> worry >>> >> >>> about >>> >> >>> that. This approach limits your exposure to duplicate data since >>> >> >>> idempotent >>> >> >>> records are not entirely possible in our scenario. At least >>> without a >>> >> >>> lot of >>> >> >>> re-running of logic to de-dup. >>> >> >>> >>> >> >>> Thanks, >>> >> >>> >>> >> >>> Ivan >>> >> >>> >>> >> >>> On Fri, Nov 4, 2016 at 1:24 PM, Cody Koeninger < >>> c...@koeninger.org> >>> >> >>> wrote: >>> >> >>>> >>> >> >>>> So just to be clear, the answers to my questions are >>> >> >>>> >>> >> >>>> - you are not using different group ids, you're using the same >>> group >>> >> >>>> id everywhere >>> >> >>>> >>> >> >>>> - you are committing offsets manually >>> >> >>>> >>> >> >>>> Right? >>> >> >>>> >>> >> >>>> If you want to eliminate network or kafka misbehavior as a >>> source, >>> >> >>>> tune poll.ms upwards even higher. >>> >> >>>> >>> >> >>>> You must use different group ids for different rdds or streams. >>> >> >>>> Kafka consumers won't behave the way you expect if they are all >>> in >>> >> >>>> the >>> >> >>>> same group id, and the consumer cache is keyed by group id. Yes, >>> the >>> >> >>>> executor will tack "spark-executor-" on to the beginning, but if >>> you >>> >> >>>> give it the same base group id, it will be the same. And the >>> driver >>> >> >>>> will use the group id you gave it, unmodified. >>> >> >>>> >>> >> >>>> Finally, I really can't help you if you're manually writing your >>> own >>> >> >>>> code to commit offsets directly to Kafka. Trying to minimize >>> >> >>>> duplicates that way doesn't really make sense, your system must >>> be >>> >> >>>> able to handle duplicates if you're using kafka as an offsets >>> store, >>> >> >>>> it can't do transactional exactly once. >>> >> >>>> >>> >> >>>> On Fri, Nov 4, 2016 at 1:48 PM, vonnagy <i...@vadio.com> wrote: >>> >> >>>>> Here are some examples and details of the scenarios. The >>> KafkaRDD is >>> >> >>>>> the >>> >> >>>>> most >>> >> >>>>> error prone to polling >>> >> >>>>> timeouts and concurrentm modification errors. >>> >> >>>>> >>> >> >>>>> *Using KafkaRDD* - This takes a list of channels and processes >>> them >>> >> >>>>> in >>> >> >>>>> parallel using the KafkaRDD directly. they all use the same >>> consumer >>> >> >>>>> group >>> >> >>>>> ('storage-group'), but each has it's own topic and each topic >>> has 4 >>> >> >>>>> partitions. We routinely get timeout errors when polling for >>> data. >>> >> >>>>> This >>> >> >>>>> occurs whether we process in parallel or sequentially. >>> >> >>>>> >>> >> >>>>> *Spark Kafka setting:* >>> >> >>>>> spark.streaming.kafka.consumer.poll.ms=2000 >>> >> >>>>> >>> >> >>>>> *Kafka Consumer Params:* >>> >> >>>>> metric.reporters = [] >>> >> >>>>> metadata.max.age.ms = 300000 >>> >> >>>>> partition.assignment.strategy = >>> >> >>>>> [org.apache.kafka.clients.consumer.RangeAssignor] >>> >> >>>>> reconnect.backoff.ms = 50 >>> >> >>>>> sasl.kerberos.ticket.renew.window.factor = 0.8 >>> >> >>>>> max.partition.fetch.bytes = 1048576 >>> >> >>>>> bootstrap.servers = [somemachine:31000] >>> >> >>>>> ssl.keystore.type = JKS >>> >> >>>>> enable.auto.commit = false >>> >> >>>>> sasl.mechanism = GSSAPI >>> >> >>>>> interceptor.classes = null >>> >> >>>>> exclude.internal.topics = true >>> >> >>>>> ssl.truststore.password = null >>> >> >>>>> client.id = >>> >> >>>>> ssl.endpoint.identification.algorithm = null >>> >> >>>>> max.poll.records = 1000 >>> >> >>>>> check.crcs = true >>> >> >>>>> request.timeout.ms = 40000 >>> >> >>>>> heartbeat.interval.ms = 3000 >>> >> >>>>> auto.commit.interval.ms = 5000 >>> >> >>>>> receive.buffer.bytes = 65536 >>> >> >>>>> ssl.truststore.type = JKS >>> >> >>>>> ssl.truststore.location = null >>> >> >>>>> ssl.keystore.password = null >>> >> >>>>> fetch.min.bytes = 1 >>> >> >>>>> send.buffer.bytes = 131072 >>> >> >>>>> value.deserializer = class >>> >> >>>>> com.vadio.analytics.spark.storage.ClientEventJsonOptionDeser >>> ializer >>> >> >>>>> group.id = storage-group >>> >> >>>>> retry.backoff.ms = 100 >>> >> >>>>> sasl.kerberos.kinit.cmd = /usr/bin/kinit >>> >> >>>>> sasl.kerberos.service.name = null >>> >> >>>>> sasl.kerberos.ticket.renew.jitter = 0.05 >>> >> >>>>> ssl.trustmanager.algorithm = PKIX >>> >> >>>>> ssl.key.password = null >>> >> >>>>> fetch.max.wait.ms = 500 >>> >> >>>>> sasl.kerberos.min.time.before.relogin = 60000 >>> >> >>>>> connections.max.idle.ms = 540000 >>> >> >>>>> session.timeout.ms = 30000 >>> >> >>>>> metrics.num.samples = 2 >>> >> >>>>> key.deserializer = class >>> >> >>>>> org.apache.kafka.common.serialization.StringDeserializer >>> >> >>>>> ssl.protocol = TLS >>> >> >>>>> ssl.provider = null >>> >> >>>>> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >>> >> >>>>> ssl.keystore.location = null >>> >> >>>>> ssl.cipher.suites = null >>> >> >>>>> security.protocol = PLAINTEXT >>> >> >>>>> ssl.keymanager.algorithm = SunX509 >>> >> >>>>> metrics.sample.window.ms = 30000 >>> >> >>>>> auto.offset.reset = earliest >>> >> >>>>> >>> >> >>>>> *Example usage with KafkaRDD:* >>> >> >>>>> val channels = Seq("channel1", "channel2") >>> >> >>>>> >>> >> >>>>> channels.toParArray.foreach { channel => >>> >> >>>>> val consumer = new KafkaConsumer[K, V](kafkaParams.asJava) >>> >> >>>>> >>> >> >>>>> // Get offsets for the given topic and the consumer group >>> >> >>>>> 'storage-group' >>> >> >>>>> val offsetRanges = getOffsets("storage-group", channel) >>> >> >>>>> >>> >> >>>>> val ds = KafkaUtils.createRDD[K, V](context, >>> >> >>>>> kafkaParams asJava, >>> >> >>>>> offsetRanges, >>> >> >>>>> PreferConsistent).toDS[V] >>> >> >>>>> >>> >> >>>>> // Do some aggregations >>> >> >>>>> ds.agg(...) >>> >> >>>>> // Save the data >>> >> >>>>> ds.write.mode(SaveMode.Append).parquet(somePath) >>> >> >>>>> // Save offsets using a KafkaConsumer >>> >> >>>>> consumer.commitSync(newOffsets.asJava) >>> >> >>>>> consumer.close() >>> >> >>>>> } >>> >> >>>>> >>> >> >>>>> >>> >> >>>>> *Example usage with Kafka Stream:* >>> >> >>>>> This creates a stream and processes events in each partition. >>> At the >>> >> >>>>> end >>> >> >>>>> of >>> >> >>>>> processing for >>> >> >>>>> each partition, we updated the offsets for each partition. This >>> is >>> >> >>>>> challenging to do, but is better >>> >> >>>>> then calling commitAysnc on the stream, because that occurs >>> after >>> >> >>>>> the >>> >> >>>>> /entire/ RDD has been >>> >> >>>>> processed. This method minimizes duplicates in an exactly once >>> >> >>>>> environment. >>> >> >>>>> Since the executors >>> >> >>>>> use their own custom group "spark-executor-processor-group" >>> and the >>> >> >>>>> commit >>> >> >>>>> is buried in private >>> >> >>>>> functions we are unable to use the executors cached consumer to >>> >> >>>>> update >>> >> >>>>> the >>> >> >>>>> offsets. This requires us >>> >> >>>>> to go through multiple steps to update the Kafka offsets >>> >> >>>>> accordingly. >>> >> >>>>> >>> >> >>>>> val offsetRanges = getOffsets("processor-group", "my-topic") >>> >> >>>>> >>> >> >>>>> val stream = KafkaUtils.createDirectStream[K, V](context, >>> >> >>>>> PreferConsistent, >>> >> >>>>> Subscribe[K, V](Seq("my-topic") asJavaCollection, >>> >> >>>>> kafkaParams, >>> >> >>>>> offsetRanges)) >>> >> >>>>> >>> >> >>>>> stream.foreachRDD { rdd => >>> >> >>>>> val offsetRanges = rdd.asInstanceOf[HasOffsetRang >>> es].offsetRanges >>> >> >>>>> >>> >> >>>>> // Transform our data >>> >> >>>>> rdd.foreachPartition { events => >>> >> >>>>> // Establish a consumer in the executor so we can update >>> >> >>>>> offsets >>> >> >>>>> after each partition. >>> >> >>>>> // This class is homegrown and uses the KafkaConsumer to >>> help >>> >> >>>>> get/set >>> >> >>>>> offsets >>> >> >>>>> val consumer = new ConsumerUtils(kafkaParams) >>> >> >>>>> // do something with our data >>> >> >>>>> >>> >> >>>>> // Write the offsets that were updated in this partition >>> >> >>>>> kafkaConsumer.setConsumerOffsets("processor-group", >>> >> >>>>> Map(TopicAndPartition(tp.topic, tp.partition) -> >>> >> >>>>> endOffset)) >>> >> >>>>> } >>> >> >>>>> } >>> >> >>>>> >>> >> >>>>> >>> >> >>>>> >>> >> >>>>> -- >>> >> >>>>> View this message in context: >>> >> >>>>> >>> >> >>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Instabil >>> ity-issues-with-Spark-2-0-1-and-Kafka-0-10-tp28017p28020.html >>> >> >>>>> Sent from the Apache Spark User List mailing list archive at >>> >> >>>>> Nabble.com. >>> >> >>>>> >>> >> >>>>> >>> >> >>>>> ------------------------------------------------------------ >>> --------- >>> >> >>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >> >>>>> >>> >> >>> >>> >> >>> >>> >> >> >>> >> >> ------------------------------------------------------------ >>> --------- >>> >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org >>> >> >> >>> >> > >>> > >>> > >>> >> >> >