Hi Spark Community.
I need help with the following issue and I have been researching about it
from last 2 weeks and as a last and best resource I want to ask the Spark
community.
I am running the following code in Spark*
* val sparkConf = new SparkConf()*
* .setMaster("local[*]")*
* .setAppName("KafkaTest")*
* .set("spark.streaming.kafka.maxRatePerPartition","10")*
* .set("spark.default.parallelism","10")*
* .set("spark.streaming.backpressure.enabled", "true")*
* .set("spark.scheduler.mode", "FAIR")*
* lazy val sparkContext = new SparkContext(sparkConf)*
* val sparkJob = new SparkLocal*
* val kafkaParams = Map[String, Object](*
* "bootstrap.servers" -> "kafka-270894369.spark.google.com:9092
<http://kafka-270894369.spark.google.com:9092>",*
* "key.deserializer" -> classOf[StringDeserializer],*
* "value.deserializer" -> classOf[StringDeserializer],*
* "group.id <http://group.id>" -> "stream_group1",*
* "auto.offset.reset" -> "latest",*
* "enable.auto.commit" -> "false",*
* "heartbeat.interval.ms <http://heartbeat.interval.ms>" ->
"130000", //3000*
* "request.timeout.ms <http://request.timeout.ms>" -> "150000",
//40000*
* "session.timeout.ms <http://session.timeout.ms>" -> "140000",
//30000*
* "max.poll.interval.ms <http://max.poll.interval.ms>" ->
"140000", //isn't a known config*
* "max.poll.records" -> "100" //2147483647*
* )*
* val streamingContext = new StreamingContext(sparkContext,
Seconds(120))*
* val topics = Array("topicname")*
* val kafkaStream = KafkaUtils.createDirectStream[String, String](*
* streamingContext,*
* PreferConsistent,*
* Subscribe[String, String](topics, kafkaParams)*
* )*
* def messageTuple(tuple: ConsumerRecord[String, String]): (String)
= {*
* (null) // Removed the code*
* }*
* var offset : Array[OffsetRange] = null*
* kafkaStream.foreachRDD{rdd =>*
* val offsetRanges =
rdd.asInstanceOf[HasOffsetRanges].offsetRanges*
* offset = offsetRanges*
* rdd.map(row => messageTuple(row))*
* .foreachPartition { partition =>*
* partition.map(row => null)*
* .foreach{ record =>*
* print("")*
* Thread.sleep(5)*
* }*
* }*
*
kafkaStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)*
* }*
* streamingContext.start()*
* streamingContext.awaitTerminationOrTimeout(6000000)*
* sys.ShutdownHookThread{*
* println("Gracefully shutting down App")*
* streamingContext.stop(true,true)*
* println("Application stopped")*
* }*
With the above code I am observing multiple commits are sending to Kafka
and I am not sure why ?
(Got the below info from kafka __consumer_offset topic)
* [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000011,
expireTimestamp=Some(1577816400011))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000012,
expireTimestamp=Some(1577816400012))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864005827,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000079,
expireTimestamp=Some(1577816400079))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120008,
expireTimestamp=Some(1577816520008))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120010,
expireTimestamp=Some(1577816520010))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120077,
expireTimestamp=Some(1577816520077))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240010,
expireTimestamp=Some(1577816640010))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240015,
expireTimestamp=Some(1577816640015))
[stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240137,
expireTimestamp=Some(1577816640137))*
* [stream_group1,topicname,59]::OffsetAndMetadata(offset=864006531,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000012,
expireTimestamp=Some(1577816400012))*
* [stream_group1,topicname,59]::OffsetAndMetadata(offset=864005827,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730000079,
expireTimestamp=Some(1577816400079))*
* [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120008,
expireTimestamp=Some(1577816520008))*
* [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120010,
expireTimestamp=Some(1577816520010))*
* [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008524,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730120077,
expireTimestamp=Some(1577816520077))*
* [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240010,
expireTimestamp=Some(1577816640010))*
* [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240015,
expireTimestamp=Some(1577816640015))*
* [stream_group1,topicname,59]::OffsetAndMetadata(offset=864008959,
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1577730240137,
expireTimestamp=Some(1577816640137))*
Ideally we should see only one commit for every 2mins based on my batch
size but in our case we are observing 3 commits.
Also during the Application restart we are loosing the data because of
above issue (Commit mismatch)
Please help me with your inputs ?
Thanks in advance,
Raghu