Cody,

First off--thanks for your contributions and blog post, I actually linked to in 
my original question. You'll have to forgive me as I've only been using Spark 
and writing Scala for a few days. I'm aware that the RDD partitions are 1:1 
with Kafka topic partitions and you can get the offset ranges.  But my 
understand is that the below code would need to be executed after the stream 
has been processed.

Let's say we're storing our filters in a key value map where the key is the 
topic name, and the value is a string that a message within a partition of that 
topic must contain to match.

Is this the approach you're suggesting (using your example code)?

// This would get built up on the driver, likely fetching the topic and filters 
from ZK
val topicFilters = Map("topic1" -> "this text must match", "topic2" -> "this 
other text must match")


val stream = KafkaUtils.createDirectStream(...)
  ...
  stream.foreachRDD { rdd =>
    // Cast the rdd to an interface that lets us get an array of OffsetRange
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    rdd.foreachPartition { iter =>
      // index to get the correct offset range for the rdd partition we're 
working on
      val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)

      // get any needed data from the offset range
      val topic = osr.topic
      val kafkaPartitionId = osr.partition
      val begin = osr.fromOffset
      val end = osr.untilOffset

      // Now we know the topic name, we can filter something
      // Or could we have referenced the topic name from
      // offsetRanges(TaskContext.get.partitionId) earlier
      // before we entered into stream.foreachRDD...?





From: Cody Koeninger [mailto:c...@koeninger.org]
Sent: Wednesday, October 21, 2015 3:01 PM
To: Dave Ariens
Cc: user@spark.apache.org
Subject: Re: Kafka Streaming and Filtering > 3000 partitons

The rdd partitions are 1:1 with kafka topicpartitions, so you can use offsets 
ranges to figure out which topic a given rdd partition is for and proceed 
accordingly.  See the kafka integration guide in the spark streaming docs for 
more details, or  https://github.com/koeninger/kafka-exactly-once

As far as setting offsets in ZK, there's a private interface in the spark 
codebase that would make it a little easier for you to do that.  You can see 
that code for reference, or there's an outstanding ticket for making it public 
https://issues.apache.org/jira/browse/SPARK-10963

On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens 
<dari...@blackberry.com<mailto:dari...@blackberry.com>> wrote:
Hey folks,

I have a very large number of Kafka topics (many thousands of partitions) that 
I want to consume, filter based on topic-specific filters, then produce back to 
filtered topics in Kafka.

Using the receiver-less based approach with Spark 1.4.1 (described 
here<https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>) 
I am able to use either KafkaUtils.createDirectStream or KafkaUtils.createRDD, 
consume from many topics, and filter them with the same filters but I can't 
seem to wrap my head around how to apply topic-specific filters, or to finally 
produce to topic-specific destination topics.

Another point would be that I will need to checkpoint the metadata after each 
successful batch and set starting offsets per partition back to ZK.  I expect I 
can do that on the final RDDs after casting them accordingly, but if anyone has 
any expertise/guidance doing that and is willing to share, I'd be pretty 
grateful.

Reply via email to