Because I saw some posts that say that consumer cache enabled will have concurrentModification exception with reduceByKeyAndWIndow. I see those errors as well after running for sometime with cache being enabled. So, I had to disable it. Please see the tickets below. We have 96 partitions. So if I enable cache, would teh following settings help to improve performance?
"spark.streaming.kafka.consumer.cache.initialCapacity" -> Integer.*valueOf* (12), "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(15), "spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024), http://markmail.org/message/n4cdxwurlhf44q5x https://issues.apache.org/jira/browse/SPARK-19185 Also, I have a batch of 60 seconds. What do you suggest the following to be? session.timeout.ms, heartbeat.interval.ms On Fri, Aug 25, 2017 at 5:04 PM, swetha kasireddy <swethakasire...@gmail.com > wrote: > Because I saw some posts that say that consumer cache enabled will have > concurrentModification exception with reduceByKeyAndWIndow. I see those > errors as well after running for sometime with cache being enabled. So, I > had to disable it. Please see the tickets below. We have 96 partitions. So > if I enable cache, would teh following settings help to improve > performance? > > "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf* > (96), > "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf* > (96), > > "spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024), > > http://markmail.org/message/n4cdxwurlhf44q5x > > https://issues.apache.org/jira/browse/SPARK-19185 > > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Why are you setting consumer.cache.enabled to false? >> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <swethakasire...@gmail.com> wrote: >> > Hi, >> > >> > What would be the appropriate settings to run Spark with Kafka 10? My >> job >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its >> very >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka >> 10 . I >> > see the following error sometimes . Please see the kafka parameters and >> the >> > consumer strategy for creating the stream below. Any suggestions on how >> to >> > run this with better performance would be of great help. >> > >> > java.lang.AssertionError: assertion failed: Failed to get records for >> test >> > stream1 72 324027964 after polling for 120000 >> > >> > val kafkaParams = Map[String, Object]( >> > "bootstrap.servers" -> kafkaBrokers, >> > "key.deserializer" -> classOf[StringDeserializer], >> > "value.deserializer" -> classOf[StringDeserializer], >> > "auto.offset.reset" -> "latest", >> > "heartbeat.interval.ms" -> Integer.valueOf(20000), >> > "session.timeout.ms" -> Integer.valueOf(60000), >> > "request.timeout.ms" -> Integer.valueOf(90000), >> > "enable.auto.commit" -> (false: java.lang.Boolean), >> > "spark.streaming.kafka.consumer.cache.enabled" -> "false", >> > "group.id" -> "test1" >> > ) >> > >> > val hubbleStream = KafkaUtils.createDirectStream[String, String]( >> > ssc, >> > LocationStrategies.PreferConsistent, >> > ConsumerStrategies.Subscribe[String, String](topicsSet, >> kafkaParams) >> > ) >> > >> > >> > >> > >> > >> > -- >> > View this message in context: http://apache-spark-user-list. >> 1001560.n3.nabble.com/Slower-performance-while-running-Spark >> -Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html >> > Sent from the Apache Spark User List mailing list archive at Nabble.com. >> > >> > --------------------------------------------------------------------- >> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >> > >> > >