Hi,
I have a kafka producer which sends messages transactionally to kafka and
spark streaming job which should consume read_committed messages from kafka.
But there is a problem for spark streaming to consume read_committed
messages.
The count of messages sent by kafka producer transactionally is not the
same to the count of the read_committed messages consumed by spark
streaming.
Some consumer properties of my spark streaming job are as follows.
auto.offset.reset=earliest
enable.auto.commit=false
isolation.level=read_committed
I also added the following spark streaming configuration.
sparkConf.set("spark.streaming.kafka.allowNonConsecutiveOffsets", "true");
sparkConf.set("spark.streaming.kafka.consumer.poll.ms",
String.valueOf(2 * 60 * 1000));
My spark streaming is using DirectStream like this.
JavaInputDStream<ConsumerRecord<Integer, GenericRecord>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<Integer,
GenericRecord>Subscribe(topics, kafkaParams)
);
stream.foreachRDD(rdd -> O
// get offset ranges.
OffsetRange[] offsetRanges = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();
// process something.
// commit offset.
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
}
);
I have tested with a kafka consumer written with raw kafka-clients jar
library without problem that it consumes read_committed messages correctly,
and the count of consumed read_committed messages is equal to the count of
messages sent by kafka producer.
And sometimes, I got the following exception.
Job aborted due to stage failure: Task 0 in stage 324.0 failed 1 times,
most recent failure: Lost task 0.0 in stage 324.0 (TID 1674)
(chango-private-1.chango.private executor driver):
java.lang.IllegalArgumentException: requirement failed: Failed to get
records for compacted spark-executor-school-student-group school-student-7
after polling for 120000
at scala.Predef$.require(Predef.scala:281)
at
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.compactedNext(KafkaDataConsumer.scala:186)
at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:60)
at
org.apache.spark.streaming.kafka010.KafkaDataConsumer.compactedNext$(KafkaDataConsumer.scala:59)
at
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.compactedNext(KafkaDataConsumer.scala:219)
I have experienced spark streaming job which works fine with kafka messages
which are non-transactional, and I never encountered the exceptions like
above.
It seems that spark streaming for kafka transaction does not handle such as
kafka consumer properties like isolation.level=read_committed and
enable.auto.commit=false correctly.
Any help appreciated.
- Kidong.
--
*이기동 *
*Kidong Lee*
Email: [email protected]
Chango: https://cloudcheflabs.github.io/chango-private-docs
Web Site: http://www.cloudchef-labs.com/
Mobile: +82 10 4981 7297
<http://www.cloudchef-labs.com/>