Hi Shao, Thank you for your quick prompt. I was disappointed. I will try window operations with Receiver-based Approach(KafkaUtils.createStream).
Thank you again, ZIGEN 2015/06/12 17:18、Saisai Shao <[email protected]> のメッセージ: > I think you could not use offsetRange in such way, when you transform a > DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is > changed into normal RDD, but offsetRange is a specific attribute for > KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will meet > such exception. > > you could only do something like: > > directKafkaInputDStream.foreachRDD { rdd => > rdd.asInstanceOf[HasOffsetRanges].... > ... > } > > Apply foreachRDD directly on DirectKafkaInputDStream. > > > > > > > > 2015-06-12 16:10 GMT+08:00 ZIGEN <[email protected]>: >> Hi, I'm using Spark Streaming(1.3.1). >> I want to get exactly-once messaging from Kafka and use Window operations of >> DStraem, >> >> When Window operations(eg DStream#reduceByKeyAndWindow) with kafka >> Direct-API >> java.lang.ClassCastException occurs as follows. >> >> --- stacktrace -- >> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot >> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges >> at >> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146) >> at >> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1) >> at >> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) >> at >> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) >> at >> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >> at >> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) >> at scala.util.Try$.apply(Try.scala:161) >> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) >> at >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) >> at >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) >> at >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) >> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >> at >> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) >> at java.lang.Thread.run(Thread.java:722) >> >> >> --- my source --- >> >> JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval); >> jssc.checkpoint("checkpoint"); >> >> JavaPairInputDStream<String, String> messages = >> KafkaUtils.createDirectStream >> (jssc, String.class, String.class, StringDecoder.class, >> StringDecoder.class, kafkaParams, topicsSet); >> >> JavaPairDStream<String, List<String>> pairDS = messages.mapToPair(...); >> >> JavaPairDStream<String, List<String>> windowDs = >> pairDS.reduceByKeyAndWindow(new Function2<List<String>, List<String>, >> List<String>>() { >> @Override >> public List<String> call(List<String> list1, List<String> list2) >> throws >> Exception { >> ... >> } >> }, windowDuration, slideDuration); >> >> windowDs.foreachRDD(new Function<JavaPairRDD<String,List<String>>, >> Void>() { >> >> @Override >> public Void call(JavaPairRDD<String, List<String>> rdd) throws >> Exception >> { >> >> >> OffsetRange[] offsetsList = ((HasOffsetRanges) >> rdd.rdd()).offsetRanges(); >> // ClassCastException occurred >> >> KafkaCluster kc = new KafkaCluster(toScalaMap(kafkaParams)); >> for (OffsetRange offsets : offsetsList) { >> >> TopicAndPartition topicAndPartition = new >> TopicAndPartition(offsets.topic(), offsets.partition()); >> >> HashMap<TopicAndPartition, Object> map = new >> HashMap<TopicAndPartition, >> Object>(); >> map.put(topicAndPartition, offsets.untilOffset()); >> kc.setConsumerOffsets("group1", toScalaMap(map)); >> } >> >> return null; >> } >> }); >> >> Thanks! >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: [email protected] >> For additional commands, e-mail: [email protected] >
