Similarly a Java DStream has a dstream method you can call to get the
underlying dstream.

On Oct 11, 2016 2:54 AM, "static-max" <flasha...@googlemail.com> wrote:

> Hi Cody,
>
> thanks, rdd.rdd() did the trick. I now have the offsets via
> OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
>
> But how can you commit the offset to Kafka?
> Casting the JavaInputDStream throws an CCE:
>
> CanCommitOffsets cco = (CanCommitOffsets) directKafkaStream; // Throws CCE
> cco.commitAsync(offsets);
>
> java.lang.ClassCastException: 
> org.apache.spark.streaming.api.java.JavaInputDStream
> cannot be cast to org.apache.spark.streaming.kafka010.CanCommitOffsets
> at SparkTest.lambda$0(SparkTest.java:103)
>
> Best regards,
> Max
>
>
> 2016-10-10 20:18 GMT+02:00 Cody Koeninger <c...@koeninger.org>:
>
>> This should give you hints on the necessary cast:
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-0-8-inte
>> gration.html#tab_java_2
>>
>> The main ugly thing there is that the java rdd is wrapping the scala
>> rdd, so you need to unwrap one layer via rdd.rdd()
>>
>> If anyone wants to work on a PR to update the java examples in the
>> docs for the 0-10 version, I'm happy to help.
>>
>> On Mon, Oct 10, 2016 at 10:34 AM, static-max <flasha...@googlemail.com>
>> wrote:
>> > Hi,
>> >
>> > by following this article I managed to consume messages from Kafka 0.10
>> in
>> > Spark 2.0:
>> > http://spark.apache.org/docs/latest/streaming-kafka-0-10-int
>> egration.html
>> >
>> > However, the Java examples are missing and I would like to commit the
>> offset
>> > myself after processing the RDD. Does anybody have a working example
>> for me?
>> > "offsetRanges" seems to be a trait and not available after casting the
>> RDD
>> > to "HasOffsetRanges"
>> >
>> > Thanks a lot!
>> >
>> > Scala example:
>> >
>> > val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>> > stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
>>
>
>

Reply via email to