[
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15713476#comment-15713476
]
Heji Kim commented on SPARK-18506:
----------------------------------
Breaking news!!!! I finally found the source of the problem. Our driver jars
have a lot of dependencies and we also include
the kafka-clients jar along with spark-streaming_2.11 (2.02). Our data
architect says our code uses it.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
If I downgrade kafka-clients 0.10.0.1, "earliest" works exactly as expected.
(I'll update the issue name with this jar name...)
> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a
> single partition on a multi partition topic
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark
> standalone mode 2.0.2
> with Kafka 0.10.1.0.
> Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our
> drivers to read all partitions of a single stream when kafka
> auto.offset.reset=earliest running on a real cluster(separate VM nodes).
> When we run our drivers with auto.offset.reset=latest ingesting from a single
> kafka topic with multiple partitions (usually 10 but problem shows up with
> only 3 partitions), the driver reads correctly from all partitions.
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using
> spark-streaming-kafka-0-8_2.11 with the prior setting
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster"
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)
> instead of YARN 2.7.3. Single partition read problem persists both cases.
> Please note this problem occurs on an actual cluster of separate VM nodes
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with
> both.
> 7. Tried the simplest scala driver that only logs. (Our team uses java.)
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly
> overprovisioned for cores and memory. Also tried ramping up executors and
> cores. Since driver works with auto.offset.reset=latest, we have ruled out
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to
> different offset configuration even though the consumer config correctly
> indicates auto.offset.reset=earliest.
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
> to broker 10.102.20.12:9092 (id: 12 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
> to broker 10.102.20.13:9092 (id: 13 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> from broker 10.102.20.12:9092 (id: 12 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> from broker 10.102.20.13:9092 (id: 13 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the completely stripped down trivial test driver that
> shows this behavior. After spending 2 weeks trying all combinations with a
> really stripped down driver, we think either there might be a bug in the
> kafka spark integration or if the kafka 0.10/spark upgrade needs special
> configuration, it should be fantastic if it was clearer in the documentation.
> But currently we cannot upgrade.
> {code}
> package com.xxxxx.labs.analytics.diagnostics.spark.drivers
> import org.apache.kafka.common.serialization.StringDeserializer
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.LocationStrategies
> import org.apache.spark.streaming.kafka010.ConsumerStrategies
> /**
> *
> * This driver is only for pulling data from the stream and logging to
> output just to isolate single partition bug
> */
> object SimpleKafkaLoggingDriver {
> def main(args: Array[String]) {
> if (args.length != 4) {
> System.err.println("Usage: SimpleTestDriver <broker bootstrap servers>
> <topic> <groupId> <offsetReset>")
> System.exit(1)
> }
> val Array(brokers, topic, groupId, offsetReset) = args
> val preferredHosts = LocationStrategies.PreferConsistent
> val topics = List(topic)
> val kafkaParams = Map(
> "bootstrap.servers" -> brokers,
> "key.deserializer" -> classOf[StringDeserializer],
> "value.deserializer" -> classOf[StringDeserializer],
> "group.id" -> groupId,
> "auto.offset.reset" -> offsetReset,
> "enable.auto.commit" -> (false: java.lang.Boolean)
> )
> val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic)
> val streamingContext = new StreamingContext(sparkConf, Seconds(5))
> val dstream = KafkaUtils.createDirectStream[String, String](
> streamingContext,
> preferredHosts,
> ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
> dstream.foreachRDD { rdd =>
> // Get the offset ranges in the RDD and log
> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> for (o <- offsetRanges) {
> println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to
> ${o.untilOffset}")
> }
> }
> streamingContext.start
> streamingContext.awaitTermination()
> }
> }
> {code}
> {noformat}
> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 540000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = simple_test_group
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 500
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.ms = 50
> request.timeout.ms = 305000
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> session.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> {noformat}
> Below is the output of above driver for 5 partition topic. Offsets always
> remain 0 for all but a single partition in this case partition 3
> {noformat}
> simple_logtest 3 offsets: 1623531 to 1623531
> simple_logtest 0 offsets: 0 to 0
> simple_logtest 1 offsets: 0 to 0
> simple_logtest 2 offsets: 0 to 0
> simple_logtest 4 offsets: 0 to 0
> simple_logtest 3 offsets: 1623531 to 1623531
> simple_logtest 0 offsets: 0 to 0
> simple_logtest 1 offsets: 0 to 0
> simple_logtest 2 offsets: 0 to 0
> simple_logtest 4 offsets: 0 to 0
> simple_logtest 3 offsets: 1623531 to 1623531
> {noformat}
> Producer is posting messages evenly into each partition:
> {noformat}
> devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$
> kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> '10.102.22.11:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
> devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$
> kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> '10.102.22.11:9092' --topic simple_logtest --time -1
> simple_logtest:2:722964
> simple_logtest:4:722864
> simple_logtest:1:722957
> simple_logtest:3:722960
> simple_logtest:0:723021
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]