Cody, First off--thanks for your contributions and blog post, I actually linked to in my original question. You'll have to forgive me as I've only been using Spark and writing Scala for a few days. I'm aware that the RDD partitions are 1:1 with Kafka topic partitions and you can get the offset ranges. But my understand is that the below code would need to be executed after the stream has been processed.
Let's say we're storing our filters in a key value map where the key is the topic name, and the value is a string that a message within a partition of that topic must contain to match. Is this the approach you're suggesting (using your example code)? // This would get built up on the driver, likely fetching the topic and filters from ZK val topicFilters = Map("topic1" -> "this text must match", "topic2" -> "this other text must match") val stream = KafkaUtils.createDirectStream(...) ... stream.foreachRDD { rdd => // Cast the rdd to an interface that lets us get an array of OffsetRange val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd.foreachPartition { iter => // index to get the correct offset range for the rdd partition we're working on val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId) // get any needed data from the offset range val topic = osr.topic val kafkaPartitionId = osr.partition val begin = osr.fromOffset val end = osr.untilOffset // Now we know the topic name, we can filter something // Or could we have referenced the topic name from // offsetRanges(TaskContext.get.partitionId) earlier // before we entered into stream.foreachRDD...? From: Cody Koeninger [mailto:c...@koeninger.org] Sent: Wednesday, October 21, 2015 3:01 PM To: Dave Ariens Cc: user@spark.apache.org Subject: Re: Kafka Streaming and Filtering > 3000 partitons The rdd partitions are 1:1 with kafka topicpartitions, so you can use offsets ranges to figure out which topic a given rdd partition is for and proceed accordingly. See the kafka integration guide in the spark streaming docs for more details, or https://github.com/koeninger/kafka-exactly-once As far as setting offsets in ZK, there's a private interface in the spark codebase that would make it a little easier for you to do that. You can see that code for reference, or there's an outstanding ticket for making it public https://issues.apache.org/jira/browse/SPARK-10963 On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens <dari...@blackberry.com<mailto:dari...@blackberry.com>> wrote: Hey folks, I have a very large number of Kafka topics (many thousands of partitions) that I want to consume, filter based on topic-specific filters, then produce back to filtered topics in Kafka. Using the receiver-less based approach with Spark 1.4.1 (described here<https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>) I am able to use either KafkaUtils.createDirectStream or KafkaUtils.createRDD, consume from many topics, and filter them with the same filters but I can't seem to wrap my head around how to apply topic-specific filters, or to finally produce to topic-specific destination topics. Another point would be that I will need to checkpoint the metadata after each successful batch and set starting offsets per partition back to ZK. I expect I can do that on the final RDDs after casting them accordingly, but if anyone has any expertise/guidance doing that and is willing to share, I'd be pretty grateful.