Hi Cody

A non-related question. Any idea when Python-version of direct receiver is
expected? Me personally looking forward to it :)

On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org> wrote:

> The solution you found is also in the docs:
>
> http://spark.apache.org/docs/latest/streaming-kafka-integration.html
>
> Java uses an atomic reference because Java doesn't allow you to close over
> non-final references.
>
> I'm not clear on your other question.
>
> On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak <oss.mli...@gmail.com> wrote:
>
>> The solution how to share offsetRanges after DirectKafkaInputStream is
>> transformed is in:
>>
>> https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
>>
>> https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java
>>
>> One thing I would like to understand is why Scala version is using normal
>> variable while Java version uses AtomicReference.
>>
>> Another thing which I don't get is about closure serialization. The
>> question why logger in the below code doesn't throw NPE even its instance
>> isn't copied like in the case of offsetRanges, when val offsets =
>> offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws
>> on offsets(idx). I have something like this code:
>>
>> object StreamOps {
>>
>>   val logger = LoggerFactory.getLogger("StreamOps")
>>   var offsetRanges = Array[OffsetRange]()
>>
>> def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = {
>>   stream transform { rdd =>
>>     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>>     rdd flatmap { message =>
>>       Try(... decode Array[Byte] to F...) match {
>>         case Success(fact) => Some(fact)
>>         case _ => None
>>     }
>>   }
>> }
>>
>> // Error handling removed for brevity
>> def save[F](stream: DStream[F]): Unit {
>>   stream foreachRDD { rdd =>
>>     // It has to be here otherwise NullPointerException
>>     val offsets = offsetRanges
>>
>>     rdd mapartitionWithIndex { (idx, facts) =>
>>       // Use offsets here
>>       val writer = new MyWriter[F](offsets(idx), ...)
>>
>>       facts foreach { fact =>
>>         writer.write(fact)
>>       }
>>
>>       writer.close()
>>
>>       // Why logger works and doesn't throw NullPointerException?
>>       logger.info(...)
>>
>>       Iterator.empty
>>     } foreach {
>>       (_: Nothing) =>
>>     }
>>   }
>> }
>>
>> Many thanks for any advice, I'm sure its a noob question.
>> Petr
>>
>> On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak <oss.mli...@gmail.com> wrote:
>>
>>> Or can I generally create new RDD from transformation and enrich its
>>> partitions with some metadata so that I would copy OffsetRanges in my new
>>> RDD in DStream?
>>>
>>> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak <oss.mli...@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>> I need to transform KafkaRDD into a new stream of deserialized case
>>>> classes. I want to use the new stream to save it to file and to perform
>>>> additional transformations on it.
>>>>
>>>> To save it I want to use offsets in filenames, hence I need
>>>> OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't
>>>> know how to do it.
>>>>
>>>> Alternatively I could deserialize directly in messageHandler before
>>>> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad
>>>> messages (KafkaRDD => RDD it would be flatMap).
>>>>
>>>> Is there a way how to do it using messageHandler, is there another
>>>> approach?
>>>>
>>>> Many thanks for any help.
>>>> Petr
>>>>
>>>
>>>
>>
>


-- 
Best Regards,
Ayan Guha

Reply via email to