Python version has been available since 1.4. It should be close to feature parity with the jvm version in 1.5
On Tue, Aug 18, 2015 at 9:36 AM, ayan guha <guha.a...@gmail.com> wrote: > Hi Cody > > A non-related question. Any idea when Python-version of direct receiver is > expected? Me personally looking forward to it :) > > On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> The solution you found is also in the docs: >> >> http://spark.apache.org/docs/latest/streaming-kafka-integration.html >> >> Java uses an atomic reference because Java doesn't allow you to close >> over non-final references. >> >> I'm not clear on your other question. >> >> On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak <oss.mli...@gmail.com> wrote: >> >>> The solution how to share offsetRanges after DirectKafkaInputStream is >>> transformed is in: >>> >>> https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala >>> >>> https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java >>> >>> One thing I would like to understand is why Scala version is using >>> normal variable while Java version uses AtomicReference. >>> >>> Another thing which I don't get is about closure serialization. The >>> question why logger in the below code doesn't throw NPE even its instance >>> isn't copied like in the case of offsetRanges, when val offsets = >>> offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws >>> on offsets(idx). I have something like this code: >>> >>> object StreamOps { >>> >>> val logger = LoggerFactory.getLogger("StreamOps") >>> var offsetRanges = Array[OffsetRange]() >>> >>> def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { >>> stream transform { rdd => >>> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >>> >>> rdd flatmap { message => >>> Try(... decode Array[Byte] to F...) match { >>> case Success(fact) => Some(fact) >>> case _ => None >>> } >>> } >>> } >>> >>> // Error handling removed for brevity >>> def save[F](stream: DStream[F]): Unit { >>> stream foreachRDD { rdd => >>> // It has to be here otherwise NullPointerException >>> val offsets = offsetRanges >>> >>> rdd mapartitionWithIndex { (idx, facts) => >>> // Use offsets here >>> val writer = new MyWriter[F](offsets(idx), ...) >>> >>> facts foreach { fact => >>> writer.write(fact) >>> } >>> >>> writer.close() >>> >>> // Why logger works and doesn't throw NullPointerException? >>> logger.info(...) >>> >>> Iterator.empty >>> } foreach { >>> (_: Nothing) => >>> } >>> } >>> } >>> >>> Many thanks for any advice, I'm sure its a noob question. >>> Petr >>> >>> On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak <oss.mli...@gmail.com> >>> wrote: >>> >>>> Or can I generally create new RDD from transformation and enrich its >>>> partitions with some metadata so that I would copy OffsetRanges in my new >>>> RDD in DStream? >>>> >>>> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak <oss.mli...@gmail.com> >>>> wrote: >>>> >>>>> Hi all, >>>>> I need to transform KafkaRDD into a new stream of deserialized case >>>>> classes. I want to use the new stream to save it to file and to perform >>>>> additional transformations on it. >>>>> >>>>> To save it I want to use offsets in filenames, hence I need >>>>> OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't >>>>> know how to do it. >>>>> >>>>> Alternatively I could deserialize directly in messageHandler before >>>>> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad >>>>> messages (KafkaRDD => RDD it would be flatMap). >>>>> >>>>> Is there a way how to do it using messageHandler, is there another >>>>> approach? >>>>> >>>>> Many thanks for any help. >>>>> Petr >>>>> >>>> >>>> >>> >> > > > -- > Best Regards, > Ayan Guha >