Cody,
First off--thanks for your contributions and blog post, I actually linked to in
my original question. You'll have to forgive me as I've only been using Spark
and writing Scala for a few days. I'm aware that the RDD partitions are 1:1
with Kafka topic partitions and you can get the offset ranges. But my
understand is that the below code would need to be executed after the stream
has been processed.
Let's say we're storing our filters in a key value map where the key is the
topic name, and the value is a string that a message within a partition of that
topic must contain to match.
Is this the approach you're suggesting (using your example code)?
// This would get built up on the driver, likely fetching the topic and filters
from ZK
val topicFilters = Map("topic1" -> "this text must match", "topic2" -> "this
other text must match")
val stream = KafkaUtils.createDirectStream(...)
...
stream.foreachRDD { rdd =>
// Cast the rdd to an interface that lets us get an array of OffsetRange
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
// index to get the correct offset range for the rdd partition we're
working on
val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)
// get any needed data from the offset range
val topic = osr.topic
val kafkaPartitionId = osr.partition
val begin = osr.fromOffset
val end = osr.untilOffset
// Now we know the topic name, we can filter something
// Or could we have referenced the topic name from
// offsetRanges(TaskContext.get.partitionId) earlier
// before we entered into stream.foreachRDD...?
From: Cody Koeninger [mailto:[email protected]]
Sent: Wednesday, October 21, 2015 3:01 PM
To: Dave Ariens
Cc: [email protected]
Subject: Re: Kafka Streaming and Filtering > 3000 partitons
The rdd partitions are 1:1 with kafka topicpartitions, so you can use offsets
ranges to figure out which topic a given rdd partition is for and proceed
accordingly. See the kafka integration guide in the spark streaming docs for
more details, or https://github.com/koeninger/kafka-exactly-once
As far as setting offsets in ZK, there's a private interface in the spark
codebase that would make it a little easier for you to do that. You can see
that code for reference, or there's an outstanding ticket for making it public
https://issues.apache.org/jira/browse/SPARK-10963
On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens
<[email protected]<mailto:[email protected]>> wrote:
Hey folks,
I have a very large number of Kafka topics (many thousands of partitions) that
I want to consume, filter based on topic-specific filters, then produce back to
filtered topics in Kafka.
Using the receiver-less based approach with Spark 1.4.1 (described
here<https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md>)
I am able to use either KafkaUtils.createDirectStream or KafkaUtils.createRDD,
consume from many topics, and filter them with the same filters but I can't
seem to wrap my head around how to apply topic-specific filters, or to finally
produce to topic-specific destination topics.
Another point would be that I will need to checkpoint the metadata after each
successful batch and set starting offsets per partition back to ZK. I expect I
can do that on the final RDDs after casting them accordingly, but if anyone has
any expertise/guidance doing that and is willing to share, I'd be pretty
grateful.