Because I saw some posts that say that consumer cache  enabled will have
concurrentModification exception with reduceByKeyAndWIndow. I see those
errors as well after running for sometime with cache being enabled. So, I
had to disable it. Please see the tickets below.  We have 96 partitions. So
if I enable cache, would teh following settings help to improve
performance?

"spark.streaming.kafka.consumer.cache.initialCapacity" -> Integer.*valueOf*
(12),
"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(15),

"spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),

http://markmail.org/message/n4cdxwurlhf44q5x

https://issues.apache.org/jira/browse/SPARK-19185


Also, I have a batch of 60 seconds. What do you suggest the following  to
be?

 session.timeout.ms, heartbeat.interval.ms

On Fri, Aug 25, 2017 at 5:04 PM, swetha kasireddy <swethakasire...@gmail.com
> wrote:

> Because I saw some posts that say that consumer cache  enabled will have
> concurrentModification exception with reduceByKeyAndWIndow. I see those
> errors as well after running for sometime with cache being enabled. So, I
> had to disable it. Please see the tickets below.  We have 96 partitions. So
> if I enable cache, would teh following settings help to improve
> performance?
>
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*
> (96),
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*
> (96),
>
> "spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),
>
> http://markmail.org/message/n4cdxwurlhf44q5x
>
> https://issues.apache.org/jira/browse/SPARK-19185
>
> On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Why are you setting consumer.cache.enabled to false?
>>
>> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote:
>> > Hi,
>> >
>> > What would be the appropriate settings to run Spark with Kafka 10? My
>> job
>> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> very
>> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka
>> 10 . I
>> > see the following error sometimes . Please see the kafka parameters and
>> the
>> > consumer strategy for creating the stream below. Any suggestions on how
>> to
>> > run this with better performance would be of great help.
>> >
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> test
>> > stream1 72 324027964 after polling for 120000
>> >
>> > val kafkaParams = Map[String, Object](
>> >       "bootstrap.servers" -> kafkaBrokers,
>> >       "key.deserializer" -> classOf[StringDeserializer],
>> >       "value.deserializer" -> classOf[StringDeserializer],
>> >       "auto.offset.reset" -> "latest",
>> >       "heartbeat.interval.ms" -> Integer.valueOf(20000),
>> >       "session.timeout.ms" -> Integer.valueOf(60000),
>> >       "request.timeout.ms" -> Integer.valueOf(90000),
>> >       "enable.auto.commit" -> (false: java.lang.Boolean),
>> >       "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >       "group.id" -> "test1"
>> >     )
>> >
>> >       val hubbleStream = KafkaUtils.createDirectStream[String, String](
>> >         ssc,
>> >         LocationStrategies.PreferConsistent,
>> >         ConsumerStrategies.Subscribe[String, String](topicsSet,
>> kafkaParams)
>> >       )
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Slower-performance-while-running-Spark
>> -Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>

Reply via email to