Hi Shao,

Thank you for your quick prompt.
I was disappointed.
I will try window operations with Receiver-based 
Approach(KafkaUtils.createStream).

Thank you again,
ZIGEN


2015/06/12 17:18、Saisai Shao <[email protected]> のメッセージ:

> I think you could not use offsetRange in such way, when you transform a 
> DirectKafkaInputDStream into WindowedDStream, internally the KafkaRDD is 
> changed into normal RDD, but offsetRange is a specific attribute for 
> KafkaRDD, so when you convert a normal RDD to HasOffsetRanges, you will meet 
> such exception.
> 
> you could only do something like:
> 
> directKafkaInputDStream.foreachRDD { rdd =>
>    rdd.asInstanceOf[HasOffsetRanges]....
>   ...
> }
> 
> Apply foreachRDD directly on DirectKafkaInputDStream.
> 
> 
> 
> 
> 
> 
> 
> 2015-06-12 16:10 GMT+08:00 ZIGEN <[email protected]>:
>> Hi, I'm using Spark Streaming(1.3.1).
>> I want to get exactly-once messaging from Kafka and use Window operations of
>> DStraem,
>> 
>> When Window operations(eg DStream#reduceByKeyAndWindow) with kafka
>> Direct-API
>> java.lang.ClassCastException occurs as follows.
>> 
>> --- stacktrace --
>> java.lang.ClassCastException: org.apache.spark.rdd.MapPartitionsRDD cannot
>> be cast to org.apache.spark.streaming.kafka.HasOffsetRanges
>>         at
>> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:146)
>>         at
>> org.apache.spark.examples.streaming.JavaDirectKafkaWordCountWithReduceByKeyAndWindow$3.call(JavaDirectKafkaWordCountWithReduceByKeyAndWindow.java:1)
>>         at
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>>         at
>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:311)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>>         at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>         at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>         at scala.util.Try$.apply(Try.scala:161)
>>         at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>         at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
>>         at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>>         at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
>>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>         at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
>>         at java.lang.Thread.run(Thread.java:722)
>> 
>> 
>> --- my source ---
>> 
>> JavaStreamingContext jssc = new JavaStreamingContext(_ctx, batchInterval);
>> jssc.checkpoint("checkpoint");
>> 
>> JavaPairInputDStream<String, String> messages =
>> KafkaUtils.createDirectStream
>>  (jssc, String.class, String.class, StringDecoder.class,
>> StringDecoder.class, kafkaParams, topicsSet);
>> 
>> JavaPairDStream<String, List&lt;String>> pairDS = messages.mapToPair(...);
>> 
>> JavaPairDStream<String, List&lt;String>> windowDs =
>> pairDS.reduceByKeyAndWindow(new Function2<List&lt;String>, List<String>,
>> List<String>>() {
>>         @Override
>>         public List<String> call(List<String> list1, List<String> list2) 
>> throws
>> Exception {
>>                 ...
>>         }
>> }, windowDuration, slideDuration);
>> 
>> windowDs.foreachRDD(new Function<JavaPairRDD&lt;String,List&lt;String>>,
>> Void>() {
>> 
>>         @Override
>>         public Void call(JavaPairRDD<String, List&lt;String>> rdd) throws 
>> Exception
>> {
>> 
>> 
>>                 OffsetRange[] offsetsList = ((HasOffsetRanges) 
>> rdd.rdd()).offsetRanges();
>> // ClassCastException occurred
>> 
>>                 KafkaCluster kc = new KafkaCluster(toScalaMap(kafkaParams));
>>                 for (OffsetRange offsets : offsetsList) {
>> 
>>                         TopicAndPartition topicAndPartition = new
>> TopicAndPartition(offsets.topic(), offsets.partition());
>> 
>>                         HashMap<TopicAndPartition, Object> map = new 
>> HashMap<TopicAndPartition,
>> Object>();
>>                         map.put(topicAndPartition, offsets.untilOffset());
>>                         kc.setConsumerOffsets("group1", toScalaMap(map));
>>                 }
>> 
>>                 return null;
>>         }
>> });
>> 
>> Thanks!
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-use-Window-Operations-with-kafka-Direct-API-tp23293.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [email protected]
>> For additional commands, e-mail: [email protected]
> 

Reply via email to