If you do any RDD transformation, it's going to return a different RDD
than the original.

The implication for casting to HasOffsetRanges is specifically called
out in the docs at

http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers

On Thu, Mar 10, 2016 at 10:56 AM, Guillermo Ortiz <konstt2...@gmail.com> wrote:
> I have a DirectStream and process data from Kafka,
>
>  val directKafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet)
>  directKafkaStream.foreachRDD { rdd =>
>       val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>
> When I have added a new DirectStream and do a union between both it doesn't
> work. I thought that it was the same type, but I got a ClassCastException
>
>
>      val directKafkaStream = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet)
>     val directKafkaStream2 = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams2, topics2.toSet)
>     val kafkaStream = directKafkaStream.union(directKafkaStream2)
>
>     kafkaStream.foreachRDD { rdd =>
>       val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> -->Exception
>
> Exception in thread "main" java.lang.ClassCastException:
> org.apache.spark.rdd.UnionRDD cannot be cast to
> org.apache.spark.streaming.kafka.HasOffsetRanges
> at
> com.produban.metrics.MetricsSpark$$anonfun$main$1.apply(MetricsSpark.scala:72)
>
> I guessed that rdd.union(rdd2) gives same type of RDD..

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to