Hi Cody A non-related question. Any idea when Python-version of direct receiver is expected? Me personally looking forward to it :)
On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org> wrote: > The solution you found is also in the docs: > > http://spark.apache.org/docs/latest/streaming-kafka-integration.html > > Java uses an atomic reference because Java doesn't allow you to close over > non-final references. > > I'm not clear on your other question. > > On Tue, Aug 18, 2015 at 3:43 AM, Petr Novak <oss.mli...@gmail.com> wrote: > >> The solution how to share offsetRanges after DirectKafkaInputStream is >> transformed is in: >> >> https://github.com/apache/spark/blob/master/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala >> >> https://github.com/apache/spark/blob/master/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java >> >> One thing I would like to understand is why Scala version is using normal >> variable while Java version uses AtomicReference. >> >> Another thing which I don't get is about closure serialization. The >> question why logger in the below code doesn't throw NPE even its instance >> isn't copied like in the case of offsetRanges, when val offsets = >> offsetRanges is removed form foreachRDD then mapPratitionsWithIndex throws >> on offsets(idx). I have something like this code: >> >> object StreamOps { >> >> val logger = LoggerFactory.getLogger("StreamOps") >> var offsetRanges = Array[OffsetRange]() >> >> def transform[T](stream: InputDStream[Array[Byte]]): DStream[T] = { >> stream transform { rdd => >> offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges >> >> rdd flatmap { message => >> Try(... decode Array[Byte] to F...) match { >> case Success(fact) => Some(fact) >> case _ => None >> } >> } >> } >> >> // Error handling removed for brevity >> def save[F](stream: DStream[F]): Unit { >> stream foreachRDD { rdd => >> // It has to be here otherwise NullPointerException >> val offsets = offsetRanges >> >> rdd mapartitionWithIndex { (idx, facts) => >> // Use offsets here >> val writer = new MyWriter[F](offsets(idx), ...) >> >> facts foreach { fact => >> writer.write(fact) >> } >> >> writer.close() >> >> // Why logger works and doesn't throw NullPointerException? >> logger.info(...) >> >> Iterator.empty >> } foreach { >> (_: Nothing) => >> } >> } >> } >> >> Many thanks for any advice, I'm sure its a noob question. >> Petr >> >> On Mon, Aug 17, 2015 at 1:12 PM, Petr Novak <oss.mli...@gmail.com> wrote: >> >>> Or can I generally create new RDD from transformation and enrich its >>> partitions with some metadata so that I would copy OffsetRanges in my new >>> RDD in DStream? >>> >>> On Mon, Aug 17, 2015 at 1:08 PM, Petr Novak <oss.mli...@gmail.com> >>> wrote: >>> >>>> Hi all, >>>> I need to transform KafkaRDD into a new stream of deserialized case >>>> classes. I want to use the new stream to save it to file and to perform >>>> additional transformations on it. >>>> >>>> To save it I want to use offsets in filenames, hence I need >>>> OffsetRanges in transformed RDD. But KafkaRDD is private, hence I don't >>>> know how to do it. >>>> >>>> Alternatively I could deserialize directly in messageHandler before >>>> KafkaRDD but it seems it is 1:1 transformation while I need to drop bad >>>> messages (KafkaRDD => RDD it would be flatMap). >>>> >>>> Is there a way how to do it using messageHandler, is there another >>>> approach? >>>> >>>> Many thanks for any help. >>>> Petr >>>> >>> >>> >> > -- Best Regards, Ayan Guha