Hi Cody,
Following is the way that I am consuming data for a 60 second batch. Do you
see anything that is wrong with the way the data is getting consumed that
can cause slowness in performance?
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> kafkaBrokers,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"heartbeat.interval.ms" -> Integer.valueOf(20000),
"session.timeout.ms" -> Integer.valueOf(60000),
"request.timeout.ms" -> Integer.valueOf(90000),
"enable.auto.commit" -> (false: java.lang.Boolean),
"spark.streaming.kafka.consumer.cache.enabled" -> "false",
"group.id" -> "test1"
)
val hubbleStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
)
val kafkaStreamRdd = kafkaStream.transform { rdd =>
rdd.map(consumerRecord => (consumerRecord.key(), consumerRecord.value()))
}
On Mon, Aug 28, 2017 at 11:56 AM, swetha kasireddy <
[email protected]> wrote:
> There is no difference in performance even with Cache being enabled.
>
> On Mon, Aug 28, 2017 at 11:27 AM, swetha kasireddy <
> [email protected]> wrote:
>
>> There is no difference in performance even with Cache being disabled.
>>
>> On Mon, Aug 28, 2017 at 7:43 AM, Cody Koeninger <[email protected]>
>> wrote:
>>
>>> So if you can run with cache enabled for some time, does that
>>> significantly affect the performance issue you were seeing?
>>>
>>> Those settings seem reasonable enough. If preferred locations is
>>> behaving correctly you shouldn't need cached consumers for all 96
>>> partitions on any one executor, so that maxCapacity setting is
>>> probably unnecessary.
>>>
>>> On Fri, Aug 25, 2017 at 7:04 PM, swetha kasireddy
>>> <[email protected]> wrote:
>>> > Because I saw some posts that say that consumer cache enabled will
>>> have
>>> > concurrentModification exception with reduceByKeyAndWIndow. I see those
>>> > errors as well after running for sometime with cache being enabled.
>>> So, I
>>> > had to disable it. Please see the tickets below. We have 96
>>> partitions. So
>>> > if I enable cache, would teh following settings help to improve
>>> performance?
>>> >
>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>> Integer.valueOf(96),
>>> > "spark.streaming.kafka.consumer.cache.maxCapacity" ->
>>> Integer.valueOf(96),
>>> >
>>> > "spark.streaming.kafka.consumer.poll.ms" -> Integer.valueOf(1024),
>>> >
>>> >
>>> > http://markmail.org/message/n4cdxwurlhf44q5x
>>> >
>>> > https://issues.apache.org/jira/browse/SPARK-19185
>>> >
>>> > On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger <[email protected]>
>>> wrote:
>>> >>
>>> >> Why are you setting consumer.cache.enabled to false?
>>> >>
>>> >> On Fri, Aug 25, 2017 at 2:19 PM, SRK <[email protected]>
>>> wrote:
>>> >> > Hi,
>>> >> >
>>> >> > What would be the appropriate settings to run Spark with Kafka 10?
>>> My
>>> >> > job
>>> >> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>>> >> > very
>>> >> > slow with Kafka 10 by using Kafka Direct' experimental APIs for
>>> Kafka 10
>>> >> > . I
>>> >> > see the following error sometimes . Please see the kafka parameters
>>> and
>>> >> > the
>>> >> > consumer strategy for creating the stream below. Any suggestions on
>>> how
>>> >> > to
>>> >> > run this with better performance would be of great help.
>>> >> >
>>> >> > java.lang.AssertionError: assertion failed: Failed to get records
>>> for
>>> >> > test
>>> >> > stream1 72 324027964 after polling for 120000
>>> >> >
>>> >> > val kafkaParams = Map[String, Object](
>>> >> > "bootstrap.servers" -> kafkaBrokers,
>>> >> > "key.deserializer" -> classOf[StringDeserializer],
>>> >> > "value.deserializer" -> classOf[StringDeserializer],
>>> >> > "auto.offset.reset" -> "latest",
>>> >> > "heartbeat.interval.ms" -> Integer.valueOf(20000),
>>> >> > "session.timeout.ms" -> Integer.valueOf(60000),
>>> >> > "request.timeout.ms" -> Integer.valueOf(90000),
>>> >> > "enable.auto.commit" -> (false: java.lang.Boolean),
>>> >> > "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>>> >> > "group.id" -> "test1"
>>> >> > )
>>> >> >
>>> >> > val hubbleStream = KafkaUtils.createDirectStream[String,
>>> String](
>>> >> > ssc,
>>> >> > LocationStrategies.PreferConsistent,
>>> >> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>>> >> > kafkaParams)
>>> >> > )
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> > --
>>> >> > View this message in context:
>>> >> > http://apache-spark-user-list.1001560.n3.nabble.com/Slower-p
>>> erformance-while-running-Spark-Kafka-Direct-Streaming-with-K
>>> afka-10-cluster-tp29108.html
>>> >> > Sent from the Apache Spark User List mailing list archive at
>>> Nabble.com.
>>> >> >
>>> >> > ------------------------------------------------------------
>>> ---------
>>> >> > To unsubscribe e-mail: [email protected]
>>> >> >
>>> >
>>> >
>>>
>>
>>
>