[
https://issues.apache.org/jira/browse/SPARK-18506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15684995#comment-15684995
]
Heji Kim commented on SPARK-18506:
----------------------------------
Just confirming that when I use ConsumerStrategy.Assign with all partitions
starting at 0, everything works as expected.
[2016-11-21 22:46:15,016] INFO OFFSET: null KafkaRDD[4] at createDirectStream
at SimpleKafkaLoggingDriverAssignedOffset.java:62
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
[2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 3 1169577 1174129
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
[2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 0 1169615 1174109
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
[2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 1 1169561 1174125
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
[2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 2 1169567 1174132
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
[2016-11-21 22:46:15,016] INFO OFFSET: simple_logtest 4 1169628 1174202
(com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka.SimpleKafkaLoggingDriverAssignedOffset)
Code below.
package com.dematic.labs.analytics.ingestion.spark.drivers.diagnostics.kafka;
import java.util.*;
import org.apache.commons.collections.map.HashedMap;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.HasOffsetRanges;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
/*
Simplest possible java driver that assigns starting offset
*/
public final class SimpleKafkaLoggingDriverAssignedOffset {
private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleKafkaLoggingDriverAssignedOffset.class);
public static final String APP_NAME = "TEST_ASSIGNED_OFFSET";
public static void main(final String[] args) throws Exception {
if (args.length != 4) {
throw new IllegalArgumentException("Driver passed in incorrect
parameters" +
"Usage: SimpleKafkaLoggingDriverAssignedOffset <broker
bootstrap servers> <topic> <groupId> <partitionSize> ");
}
String kafka_topic = args[1];
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers",args[0]);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", args[2]);
// kafkaParams.put("auto.offset.reset",args[3]);
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList(kafka_topic);
final SparkConf sparkConfiguration = new
SparkConf().setAppName(APP_NAME+"_"+args[1]);
int totalPartitions= Integer.valueOf(args[3]);
// create the streaming context
final JavaStreamingContext streamingContext = new
JavaStreamingContext(sparkConfiguration,
Durations.seconds(Integer.valueOf(args[3])));
// force log
streamingContext.ssc().sc().setLogLevel("DEBUG");
// assign fixed topic partitions starting at 0
final Map<TopicPartition,Long> partitionStart=new HashedMap();
for (int i=0; i<totalPartitions; i++ ) {
partitionStart.put(new TopicPartition(kafka_topic, i),
Long.valueOf(0));
}
Assign fixedAssignment = new Assign
(partitionStart.keySet(),kafkaParams, partitionStart);
final JavaInputDStream<ConsumerRecord<String, String>> directStream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
fixedAssignment
);
directStream.foreachRDD(rdd -> {
LOGGER.info("OFFSET: " + rdd.rdd().name() + " " + rdd.toString());
for (final OffsetRange offset : ((HasOffsetRanges)
rdd.rdd()).offsetRanges()) {
LOGGER.info("OFFSET: " + offset.topic() + ' ' +
offset.partition() + ' ' + offset.fromOffset() + ' '
+ offset.untilOffset());
}
});
// Start the streaming context and await termination
LOGGER.info("KCP: starting SimpleKafkaLoggingDriverAssignedOffset
Driver with master URL >{}<",
streamingContext.sparkContext().master());
streamingContext.start();
LOGGER.info("KCP: spark state: {}", streamingContext.getState().name());
streamingContext.awaitTermination();
}
}
> kafka 0.10 with Spark 2.02 auto.offset.reset=earliest will only read from a
> single partition on a multi partition topic
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-18506
> URL: https://issues.apache.org/jira/browse/SPARK-18506
> Project: Spark
> Issue Type: Bug
> Components: DStreams
> Affects Versions: 2.0.2
> Environment: Problem occurs both in Hadoop/YARN 2.7.3 and Spark
> standalone mode 2.0.2
> with Kafka 0.10.1.0.
> Reporter: Heji Kim
>
> Our team is trying to upgrade to Spark 2.0.2/Kafka
> 0.10.1.0/spark-streaming-kafka-0-10_2.11 (v 2.0.2) and we cannot get our
> drivers to read all partitions of a single stream when kafka
> auto.offset.reset=earliest running on a real cluster(separate VM nodes).
> When we run our drivers with auto.offset.reset=latest ingesting from a single
> kafka topic with multiple partitions (usually 10 but problem shows up with
> only 3 partitions), the driver reads correctly from all partitions.
> Unfortunately, we need "earliest" for exactly once semantics.
> In the same kafka 0.10.1.0/spark 2.x setup, our legacy driver using
> spark-streaming-kafka-0-8_2.11 with the prior setting
> auto.offset.reset=smallest runs correctly.
> We have tried the following configurations in trying to isolate our problem
> but it is only auto.offset.reset=earliest on a "real multi-machine cluster"
> which causes this problem.
> 1. Ran with spark standalone cluster(4 Debian nodes, 8vCPU/30GB each)
> instead of YARN 2.7.3. Single partition read problem persists both cases.
> Please note this problem occurs on an actual cluster of separate VM nodes
> (but not when our engineer runs in as a cluster on his own Mac.)
> 2. Ran with spark 2.1 nightly build for the last 10 days. Problem persists.
> 3. Turned off checkpointing. Problem persists with or without checkpointing.
> 4. Turned off backpressure. Problem persists with or without backpressure.
> 5. Tried both partition.assignment.strategy RangeAssignor and
> RoundRobinAssignor. Broken with both.
> 6. Tried both LocationStrategies (PreferConsistent/PreferFixed). Broken with
> both.
> 7. Tried the simplest scala driver that only logs. (Our team uses java.)
> Broken with both.
> 8. Tried increasing GCE capacity for cluster but already we were highly
> overprovisioned for cores and memory. Also tried ramping up executors and
> cores. Since driver works with auto.offset.reset=latest, we have ruled out
> GCP cloud infrastructure issues.
> When we turn on the debug logs, we sometimes see partitions being set to
> different offset configuration even though the consumer config correctly
> indicates auto.offset.reset=earliest.
> {noformat}
> 8 DEBUG Resetting offset for partition simple_test-8 to earliest offset.
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Resetting offset for partition simple_test-9 to latest offset.
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Sending ListOffsetRequest
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=8,timestamp=-2}]}]}
> to broker 10.102.20.12:9092 (id: 12 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Sending ListOffsetRequest
> {replica_id=-1,topics=[{topic=simple_test,partitions=[{partition=9,timestamp=-1}]}]}
> to broker 10.102.20.13:9092 (id: 13 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 TRACE Received ListOffsetResponse
> {responses=[{topic=simple_test,partition_responses=[{partition=8,error_code=0,timestamp=-1,offset=0}]}]}
> from broker 10.102.20.12:9092 (id: 12 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 TRACE Received ListOffsetResponse
> {responses=[{topic=simple_test,partition_responses=[{partition=9,error_code=0,timestamp=-1,offset=66724}]}]}
> from broker 10.102.20.13:9092 (id: 13 rack: null)
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 8 DEBUG Fetched {timestamp=-1, offset=0} for partition simple_test-8
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> 9 DEBUG Fetched {timestamp=-1, offset=66724} for partition simple_test-9
> (org.apache.kafka.clients.consumer.internals.Fetcher)
> {noformat}
> I've enclosed below the completely stripped down trivial test driver that
> shows this behavior. After spending 2 weeks trying all combinations with a
> really stripped down driver, we think either there might be a bug in the
> kafka spark integration or if the kafka 0.10/spark upgrade needs special
> configuration, it should be fantastic if it was clearer in the documentation.
> But currently we cannot upgrade.
> {code}
> package com.xxxxx.labs.analytics.diagnostics.spark.drivers
> import org.apache.kafka.common.serialization.StringDeserializer
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming.{Seconds, StreamingContext}
> import org.apache.spark.streaming.kafka010._
> import org.apache.spark.streaming.kafka010.LocationStrategies
> import org.apache.spark.streaming.kafka010.ConsumerStrategies
> /**
> *
> * This driver is only for pulling data from the stream and logging to
> output just to isolate single partition bug
> */
> object SimpleKafkaLoggingDriver {
> def main(args: Array[String]) {
> if (args.length != 4) {
> System.err.println("Usage: SimpleTestDriver <broker bootstrap servers>
> <topic> <groupId> <offsetReset>")
> System.exit(1)
> }
> val Array(brokers, topic, groupId, offsetReset) = args
> val preferredHosts = LocationStrategies.PreferConsistent
> val topics = List(topic)
> val kafkaParams = Map(
> "bootstrap.servers" -> brokers,
> "key.deserializer" -> classOf[StringDeserializer],
> "value.deserializer" -> classOf[StringDeserializer],
> "group.id" -> groupId,
> "auto.offset.reset" -> offsetReset,
> "enable.auto.commit" -> (false: java.lang.Boolean)
> )
> val sparkConf = new SparkConf().setAppName("SimpleTestDriver"+"_" +topic)
> val streamingContext = new StreamingContext(sparkConf, Seconds(5))
> val dstream = KafkaUtils.createDirectStream[String, String](
> streamingContext,
> preferredHosts,
> ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))
> dstream.foreachRDD { rdd =>
> // Get the offset ranges in the RDD and log
> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> for (o <- offsetRanges) {
> println(s"${o.topic} ${o.partition} offsets: ${o.fromOffset} to
> ${o.untilOffset}")
> }
> }
> streamingContext.start
> streamingContext.awaitTermination()
> }
> }
> {code}
> {noformat}
> 16/11/17 23:08:21 INFO ConsumerConfig: ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [10.102.22.11:9092, 10.102.22.12:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 540000
> enable.auto.commit = false
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1
> group.id = simple_test_group
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 300000
> max.poll.records = 500
> metadata.max.age.ms = 300000
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 30000
> partition.assignment.strategy = [class
> org.apache.kafka.clients.consumer.RangeAssignor]
> receive.buffer.bytes = 65536
> reconnect.backoff.ms = 50
> request.timeout.ms = 305000
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 60000
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> session.timeout.ms = 10000
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class
> org.apache.kafka.common.serialization.StringDeserializer
> {noformat}
> Below is the output of above driver for 5 partition topic. Offsets always
> remain 0 for all but a single partition in this case partition 3
> {noformat}
> simple_logtest 3 offsets: 1623531 to 1623531
> simple_logtest 0 offsets: 0 to 0
> simple_logtest 1 offsets: 0 to 0
> simple_logtest 2 offsets: 0 to 0
> simple_logtest 4 offsets: 0 to 0
> simple_logtest 3 offsets: 1623531 to 1623531
> simple_logtest 0 offsets: 0 to 0
> simple_logtest 1 offsets: 0 to 0
> simple_logtest 2 offsets: 0 to 0
> simple_logtest 4 offsets: 0 to 0
> simple_logtest 3 offsets: 1623531 to 1623531
> {noformat}
> Producer is posting messages evenly into each partition:
> {noformat}
> devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$
> kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> '10.102.22.11:9092' --topic simple_logtest --time -2
> simple_logtest:2:0
> simple_logtest:4:0
> simple_logtest:1:0
> simple_logtest:3:0
> simple_logtest:0:0
> devops@kafka-devops-zookeeper-10-102-22-10:/opt/kafka_latest/bin$
> kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> '10.102.22.11:9092' --topic simple_logtest --time -1
> simple_logtest:2:722964
> simple_logtest:4:722864
> simple_logtest:1:722957
> simple_logtest:3:722960
> simple_logtest:0:723021
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]