Hi, I'm using Spark Streaming(1.3.1).
I want to get exactly-once messaging from Kafka and use Window operations of
DStraem,

When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
Direct-API
java.lang.ClassCastException occurs as follows.

--- stacktrace --
java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
        at
org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
        at
org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
        at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
        at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
        at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        at java.lang.Thread.run(Thread.java:722)
        
 
--- my source ---

JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval);
jssc.checkpoint("checkpoint");
                
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream
 (jssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topicsSet);

JavaPairDStream<String, List&lt;String>> pairDS = messages.mapToPair(...);

JavaPairDStream<String, List&lt;String>> windowDs =
pairDS.reduceByKeyAndWindow(new Function2<List&lt;String>, List<String>,
List<String>>() {
        @Override
        public List<String> call(List<String> list1, List<String> list2) throws
Exception {
                ...
        }
}, windowDuration, slideDuration);

windowDs.foreachRDD(new Function<JavaPairRDD&lt;String,List&lt;String>>,
Void>() {

        @Override
        public Void call(JavaPairRDD<String, List&lt;String>> rdd) throws 
Exception
{
                
                
                OffsetRange[] offsetsList = ((HasOffsetRanges) 
rdd.rdd()).offsetRanges(); 
// ClassCastException occurred 

                KafkaCluster kc = new KafkaCluster(toScalaMap(kafkaParams));
                for (OffsetRange offsets : offsetsList) {

                        TopicAndPartition topicAndPartition = new
TopicAndPartition(offsets.topic(), offsets.partition());
                        
                        HashMap<TopicAndPartition, Object> map = new 
HashMap<TopicAndPartition,
Object>();
                        map.put(topicAndPartition, offsets.untilOffset());
                        kc.setConsumerOffsets("group1", toScalaMap(map));
                }
                
                return null;
        }
});

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to