Hello, I've been using createDirectStream with Kafka and now need to switch to the version of that API that lets me supply offsets for my topics. I'm unable to get this to compile for some reason, even if I lift the very same usage from the Spark test suite.
I'm calling it like this: val topic = "offset" val topicPartition = TopicAndPartition(topic, 0) val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler) Error: MyCode.scala:97: overloaded method value createDirectStream with alternatives: (jssc: org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: Class[String],valueClass: Class[String],keyDecoderClass: Class[kafka.serializer.StringDecoder],valueDecoderClass: Class[kafka.serializer.StringDecoder],recordClass: Class[String],kafkaParams: java.util.Map[String,String],fromOffsets: java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler: org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String] <and> (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: scala.collection.immutable.Map[String,String],fromOffsets: scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler: kafka.message.MessageAndMetadata[String,String] => String)(implicit evidence$14: scala.reflect.ClassTag[String], implicit evidence$15: scala.reflect.ClassTag[String], implicit evidence$16: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit evidence$18: scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String] cannot be applied to (org.apache.spark.streaming.StreamingContext, scala.collection.mutable.Map[String,String], scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long], kafka.message.MessageAndMetadata[String,String] => (String, String)) val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String]( ^ one error found