Similarly a Java DStream has a dstream method you can call to get the underlying dstream.
On Oct 11, 2016 2:54 AM, "static-max" <flasha...@googlemail.com> wrote: > Hi Cody, > > thanks, rdd.rdd() did the trick. I now have the offsets via > OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); > > But how can you commit the offset to Kafka? > Casting the JavaInputDStream throws an CCE: > > CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream; // Throws CCE > cco.commitAsync(offsets); > > java.lang.ClassCastException: > org.apache.spark.streaming.api.java.JavaInputDStream > cannot be cast to org.apache.spark.streaming.kafka010.CanCommitOffsets > at SparkTest.lambda$0(SparkTest.java:103) > > Best regards, > Max > > > 2016-10-10 20:18 GMT+02:00 Cody Koeninger <c...@koeninger.org>: > >> This should give you hints on the necessary cast: >> >> http://spark.apache.org/docs/latest/streaming-kafka-0-8-inte >> gration.html#tab_java_2 >> >> The main ugly thing there is that the java rdd is wrapping the scala >> rdd, so you need to unwrap one layer via rdd.rdd() >> >> If anyone wants to work on a PR to update the java examples in the >> docs for the 0-10 version, I'm happy to help. >> >> On Mon, Oct 10, 2016 at 10:34 AM, static-max <flasha...@googlemail.com> >> wrote: >> > Hi, >> > >> > by following this article I managed to consume messages from Kafka 0.10 >> in >> > Spark 2.0: >> > http://spark.apache.org/docs/latest/streaming-kafka-0-10-int >> egration.html >> > >> > However, the Java examples are missing and I would like to commit the >> offset >> > myself after processing the RDD. Does anybody have a working example >> for me? >> > "offsetRanges" seems to be a trait and not available after casting the >> RDD >> > to "HasOffsetRanges" >> > >> > Thanks a lot! >> > >> > Scala example: >> > >> > val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >> > stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets) >> > >