You can have offsetRanges on workers f.e. object Something { var offsetRanges = Array[OffsetRange]()
def create[F : ClassTag](stream: InputDStream[Array[Byte]]) (implicit codec: Codec[F]: DStream[F] = { stream transform { rdd => offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd flatMap { message => Try(codec.decode(message)) match { case Success(fact) => Some(fact) case Failure(e) => None } } } } call create and use returned stream downstream. or something like // See https://issues.apache.org/jira/browse/SPARK-5569 why I map OffsetRamges to a custom class case class TopicMetadata(name: String, partition: Int, fromOffset: Long, untilOffset: Long) object KafkaContext { private[this] var state = Array[TopicMetadata]() def captureTopicMetadata(offsetRanges: Array[OffsetRange]): Unit = { state = offsetRanges.map { o => TopicMetadata(o.topic, o.partition, o.fromOffset, o.untilOffset) } } def topics: Array[TopicMetadata] = state } //then somewhere def run(steam) = { stream.transform { rdd => KafkaContext.captureTopicMetadata(rdd.asInstanceOf[HasOffsetRanges].offsetRanges) rdd .... .... .... } .forecahRDD { val s = KafkaContext.topics.map { x => s"${x.name}_${x.partition}_${x.fromOffset}-${x.untilOffset}" } ... } } So they can be available on Driver. Sorry for not precise code. I'm in a hurry. There a probably mistakes but you can get the idea. Petr On Fri, Sep 25, 2015 at 7:50 PM, Neelesh <neele...@gmail.com> wrote: > Hi, > We are using DirectKafkaInputDStream and store completed consumer > offsets in Kafka (0.8.2). However, some of our use case require that > offsets be not written if processing of a partition fails with certain > exceptions. This allows us to build various backoff strategies for that > partition, instead of either blindly committing consumer offsets regardless > of errors (because KafkaRDD as HasOffsetRanges is available only on the > driver) or relying on Spark's retry logic and continuing without remedial > action. > > I was playing with SparkListener and found that while one can listen on > taskCompletedEvent on the driver and even figure out that there was an > error, there is no way of mapping this task back to the partition and > retrieving offset range, topic & kafka partition # etc. > > Any pointers appreciated! > > Thanks! > -neelesh >