Sorry, realized I probably didn't fully answer your question about my blog
post, as opposed to Michael Nolls.

The direct stream is really blunt, a given RDD partition is just a kafka
topic/partition and an upper / lower bound for the range of offsets.  When
an executor computes the partition, it connects to kafka and pulls only
those messages, then closes the connection.  There's no long running
receiver at all, no caching of connections (I found caching sockets didn't
matter much).

You get much better cluster utilization that way, because if a partition is
relatively small compared to the others in the RDD, the executor gets done
with it and gets scheduled another one to work one.  With long running
receivers spark acts like the receiver takes up a core even if it isn't
doing much.  Look at the CPU graph on slide 13 of the link i posted.

On Thu, May 14, 2015 at 4:21 PM, Cody Koeninger <c...@koeninger.org> wrote:

> If the transformation you're trying to do really is per-partition, it
> shouldn't matter whether you're using scala methods or spark methods.  The
> parallel speedup you're getting is all from doing the work on multiple
> machines, and shuffle or caching or other benefits of spark aren't a factor.
>
> If using scala methods bothers you, do all of your transformation using
> spark methods, collect the results back to the driver, and save them with
> the offsets there:
>
> stream.foreachRDD { rdd =>
>   val offsets = rdd.asInstanceOf[HasOffsets].offsetRanges
>   val results = rdd.some.chain.of.spark.calls.collect
>   save(offsets, results)
> }
>
> My work-in-progress slides for my talk at the upcoming spark conference
> are here
>
> http://koeninger.github.io/kafka-exactly-once/
>
> if that clarifies that point a little bit (slides 20 vs 21)
>
> The direct stream doesn't use long-running receivers, so the concerns that
> blog post is trying to address don't really apply.
>
> Under normal operation a given partition of an rdd is only going to be
> handled by a single executor at a time (as long as you don't turn on
> speculative execution... or I suppose it might be possible in some kind of
> network partition situation).  Transactionality should save you even if
> something weird happens though.
>
> On Thu, May 14, 2015 at 3:44 PM, will-ob <will.obr...@tapjoy.com> wrote:
>
>> Hey Cody (et. al.),
>>
>> Few more questions related to this. It sounds like our missing data issues
>> appear fixed with this approach. Could you shed some light on a few
>> questions that came up?
>>
>> ---------------------
>>
>> Processing our data inside a single foreachPartition function appears to
>> be
>> very different from the pattern seen in the programming guide. Does this
>> become problematic with additional, interleaved reduce/filter/map steps?
>>
>> ```
>> # typical?
>> rdd
>>   .map { ... }
>>   .reduce { ... }
>>   .filter { ... }
>>   .reduce { ... }
>>   .foreachRdd { writeToDb }
>>
>> # with foreachPartition
>> rdd.foreachPartition { case (iter) =>
>>   iter
>>     .map { ... }
>>     .reduce { ... }
>>     .filter { ... }
>>     .reduce { ... }
>> }
>>
>> ```
>> ---------------------------------
>>
>> Could the above be simplified by having
>>
>> one kafka partition per DStream, rather than
>> one kafka partition per RDD partition
>>
>> ?
>>
>> That way, we wouldn't need to do our processing inside each partition as
>> there would only be one set of kafka metadata to commit.
>>
>> Presumably, one could `join` DStreams when topic-level aggregates were
>> needed.
>>
>> It seems this was the approach of Michael Noll in his blog post.
>> (
>> http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/
>> )
>> Although, his primary motivation appears to be maintaining
>> high-throughput /
>> parallelism rather than kafka metadata.
>>
>> ---------------------------------
>>
>> From the blog post:
>>
>> "... there is no long-running receiver task that occupies a core per
>> stream
>> regardless of what the message volume is."
>>
>> Is this because data is retrieved by polling rather than maintaining a
>> socket? Is it still the case that there is only one receiver process per
>> DStream? If so, maybe it is wise to keep DStreams and Kafka partitions 1:1
>> .. else discover the machine's NIC limit?
>>
>> Can you think of a reason not to do this? Cluster utilization, or the
>> like,
>> perhaps?
>>
>> --------------------------------
>>
>> And seems a silly question, but does `foreachPartition` guarantee that a
>> single worker will process the passed function? Or might two workers split
>> the work?
>>
>> Eg. foreachPartition(f)
>>
>> Worker 1:     f( Iterator[partition 1 records 1 - 50] )
>> Worker 2:     f( Iterator[partition 1 records 51 - 100] )
>>
>> It is unclear from the scaladocs
>> (
>> https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD
>> ).
>> But you can imagine, if it is critical that this data be committed in a
>> single transaction, that two workers will have issues.
>>
>>
>>
>> -- Will O
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-developers-list.1001551.n3.nabble.com/practical-usage-of-the-new-exactly-once-supporting-DirectKafkaInputDStream-tp11916p12257.html
>> Sent from the Apache Spark Developers List mailing list archive at
>> Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>> For additional commands, e-mail: dev-h...@spark.apache.org
>>
>>
>

Reply via email to