You can have offsetRanges on workers f.e.

object Something {
  var offsetRanges = Array[OffsetRange]()

  def create[F : ClassTag](stream: InputDStream[Array[Byte]])
      (implicit codec: Codec[F]: DStream[F] = {
    stream transform { rdd =>
      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

      rdd flatMap { message =>
        Try(codec.decode(message)) match {
          case Success(fact) => Some(fact)
          case Failure(e) => None
        }
      }
  }
}

call create and use returned stream downstream.

or something like

// See https://issues.apache.org/jira/browse/SPARK-5569 why I map
OffsetRamges to a custom class


case class TopicMetadata(name: String, partition: Int, fromOffset:
Long, untilOffset: Long)

object KafkaContext {
  private[this] var state = Array[TopicMetadata]()

  def captureTopicMetadata(offsetRanges: Array[OffsetRange]): Unit = {
    state = offsetRanges.map { o =>
      TopicMetadata(o.topic, o.partition, o.fromOffset, o.untilOffset)
    }
  }

  def topics: Array[TopicMetadata] = state
}

//then somewhere

def run(steam) = {
  stream.transform { rdd =>
    
KafkaContext.captureTopicMetadata(rdd.asInstanceOf[HasOffsetRanges].offsetRanges)

    rdd
      ....
      ....
      ....
  }

  .forecahRDD {
    val s = KafkaContext.topics.map { x =>
      s"${x.name}_${x.partition}_${x.fromOffset}-${x.untilOffset}"
    }
    ...
  }


}


So they can be available on Driver. Sorry for not precise code. I'm in a
hurry. There a probably mistakes but you can get the idea.
Petr

On Fri, Sep 25, 2015 at 7:50 PM, Neelesh <neele...@gmail.com> wrote:

> Hi,
>    We are using DirectKafkaInputDStream and store completed consumer
> offsets in Kafka (0.8.2). However, some of our use case require that
> offsets be not written if processing of a partition fails with certain
> exceptions. This allows us to build various backoff strategies for that
> partition, instead of either blindly committing consumer offsets regardless
> of errors (because KafkaRDD as HasOffsetRanges is available only on the
> driver)  or relying on Spark's retry logic and continuing without remedial
> action.
>
> I was playing with SparkListener and found that while one can listen on
> taskCompletedEvent on the driver and even figure out that there was an
> error, there is no way of mapping this task back to the partition and
> retrieving offset range, topic & kafka partition # etc.
>
> Any pointers appreciated!
>
> Thanks!
> -neelesh
>

Reply via email to