If you do any RDD transformation, it's going to return a different RDD than the original.
The implication for casting to HasOffsetRanges is specifically called out in the docs at http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers On Thu, Mar 10, 2016 at 10:56 AM, Guillermo Ortiz <konstt2...@gmail.com> wrote: > I have a DirectStream and process data from Kafka, > > val directKafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet) > directKafkaStream.foreachRDD { rdd => > val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > > When I have added a new DirectStream and do a union between both it doesn't > work. I thought that it was the same type, but I got a ClassCastException > > > val directKafkaStream = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams1, topics1.toSet) > val directKafkaStream2 = KafkaUtils.createDirectStream[String, String, > StringDecoder, StringDecoder](ssc, kafkaParams2, topics2.toSet) > val kafkaStream = directKafkaStream.union(directKafkaStream2) > > kafkaStream.foreachRDD { rdd => > val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges > -->Exception > > Exception in thread "main" java.lang.ClassCastException: > org.apache.spark.rdd.UnionRDD cannot be cast to > org.apache.spark.streaming.kafka.HasOffsetRanges > at > com.produban.metrics.MetricsSpark$$anonfun$main$1.apply(MetricsSpark.scala:72) > > I guessed that rdd.union(rdd2) gives same type of RDD.. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org