Hello,

I've been using createDirectStream with Kafka and now need to switch to the
version of that API that lets me supply offsets for my topics.  I'm unable
to get this to compile for some reason, even if I lift the very same usage
from the Spark test suite.

I'm calling it like this:

    val topic = "offset"

    val topicPartition = TopicAndPartition(topic, 0)

    val messageHandler = (mmd: MessageAndMetadata[String, String]) =>
(mmd.key, mmd.message)

    val stream =  KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder, String](

        ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler)



Error:

MyCode.scala:97: overloaded method value createDirectStream with
alternatives:

  (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass:
Class[String],valueClass: Class[String],keyDecoderClass:
Class[kafka.serializer.StringDecoder],valueDecoderClass:
Class[kafka.serializer.StringDecoder],recordClass:
Class[String],kafkaParams: java.util.Map[String,String],fromOffsets:
java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler:
org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String]
<and>

  (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams:
scala.collection.immutable.Map[String,String],fromOffsets:
scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler:
kafka.message.MessageAndMetadata[String,String] => String)(implicit
evidence$14: scala.reflect.ClassTag[String], implicit evidence$15:
scala.reflect.ClassTag[String], implicit evidence$16:
scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit
evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder],
implicit evidence$18:
scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String]

 cannot be applied to (org.apache.spark.streaming.StreamingContext,
scala.collection.mutable.Map[String,String],
scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long],
kafka.message.MessageAndMetadata[String,String] => (String, String))

    val stream =  KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder, String](

                                               ^

one error found

Reply via email to