Hi, I'm using Spark Streaming(1.3.1).
I want to get exactly-once messaging from Kafka and use Window operations of
DStraem,
When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
Direct-API
java.lang.ClassCastException occurs as follows.
--- stacktrace --
java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
at
org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
at
org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
--- my source ---
JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval);
jssc.checkpoint("checkpoint");
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream
(jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet);
JavaPairDStream<String, List<String>> pairDS = messages.mapToPair(...);
JavaPairDStream<String, List<String>> windowDs =
pairDS.reduceByKeyAndWindow(new Function2<List<String>, List<String>,
List<String>>() {
@Override
public List<String> call(List<String> list1, List<String> list2) throws
Exception {
...
}
}, windowDuration, slideDuration);
windowDs.foreachRDD(new Function<JavaPairRDD<String,List<String>>,
Void>() {
@Override
public Void call(JavaPairRDD<String, List<String>> rdd) throws
Exception
{
OffsetRange[] offsetsList = ((HasOffsetRanges)
rdd.rdd()).offsetRanges();
// ClassCastException occurred
KafkaCluster kc = new KafkaCluster(toScalaMap(kafkaParams));
for (OffsetRange offsets : offsetsList) {
TopicAndPartition topicAndPartition = new
TopicAndPartition(offsets.topic(), offsets.partition());
HashMap<TopicAndPartition, Object> map = new
HashMap<TopicAndPartition,
Object>();
map.put(topicAndPartition, offsets.untilOffset());
kc.setConsumerOffsets("group1", toScalaMap(map));
}
return null;
}
});
Thanks!
--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]