Python version has been available since 1.4.  It should be close to feature
parity with the jvm version in 1.5

On Tue, Aug 18, 2015 at 9:36 AM, ayan guha <guha.a...@gmail.com> wrote:

> Hi Cody
>
> A non-related question. Any idea when Python-version of direct receiver is
> expected? Me personally looking forward to it :)
>
> On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> The solution you found is also in the docs:
>>
>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>>
>> Java uses an atomic reference because Java doesn't allow you to close
>> over non-final references.
>>
>> I'm not clear on your other question.
>>
>> On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak <oss.mli...@gmail.com> wrote:
>>
>>> The solution how to share offsetRanges after DirectKafkaInputStream is
>>> transformed is in:
>>>
>>> https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
>>>
>>> https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
>>>
>>> One thing I would like to understand is why Scala version is using
>>> normal variable while Java version uses AtomicReference.
>>>
>>> Another thing which I don't get is about closure serialization. The
>>> question why logger in the below code doesn't throw NPE even its instance
>>> isn't copied like in the case of offsetRanges, when val offsets =
>>> offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
>>> on offsets(idx). I have something like this code:
>>>
>>> object StreamOps {
>>>
>>>   val logger = LoggerFactory.getLogger("StreamOps")
>>>   var offsetRanges = Array[OffsetRange]()
>>>
>>> def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
>>>   stream transform { rdd =>
>>>     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>>
>>>     rdd flatmap { message =>
>>>       Try(... decode Array[Byte] to F...) match {
>>>         case Success(fact) => Some(fact)
>>>         case _ => None
>>>     }
>>>   }
>>> }
>>>
>>> // Error handling removed for brevity
>>> def save[F](stream: DStream[F]): Unit {
>>>   stream foreachRDD { rdd =>
>>>     // It has to be here otherwise NullPointerException
>>>     val offsets = offsetRanges
>>>
>>>     rdd mapartitionWithIndex { (idx, facts) =>
>>>       // Use offsets here
>>>       val writer = new MyWriter[F](offsets(idx), ...)
>>>
>>>       facts foreach { fact =>
>>>         writer.write(fact)
>>>       }
>>>
>>>       writer.close()
>>>
>>>       // Why logger works and doesn't throw NullPointerException?
>>>       logger.info(...)
>>>
>>>       Iterator.empty
>>>     } foreach {
>>>       (_: Nothing) =>
>>>     }
>>>   }
>>> }
>>>
>>> Many thanks for any advice, I'm sure its a noob question.
>>> Petr
>>>
>>> On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak <oss.mli...@gmail.com>
>>> wrote:
>>>
>>>> Or can I generally create new RDD from transformation and enrich its
>>>> partitions with some metadata so that I would copy OffsetRanges in my new
>>>> RDD in DStream?
>>>>
>>>> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak <oss.mli...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>> I need to transform KafkaRDD into a new stream of deserialized case
>>>>> classes. I want to use the new stream to save it to file and to perform
>>>>> additional transformations on it.
>>>>>
>>>>> To save it I want to use offsets in filenames, hence I need
>>>>> OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't
>>>>> know how to do it.
>>>>>
>>>>> Alternatively I could deserialize directly in messageHandler before
>>>>> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
>>>>> messages (KafkaRDD => RDD it would be flatMap).
>>>>>
>>>>> Is there a way how to do it using messageHandler, is there another
>>>>> approach?
>>>>>
>>>>> Many thanks for any help.
>>>>> Petr
>>>>>
>>>>
>>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Reply via email to