Thanks Cody. It turns out that there was an even simpler explanation (the flaw you pointed out was accurate too). I had mutable.Map instances being passed where KafkaUtils wants immutable ones.
On Fri, May 6, 2016 at 8:32 AM, Cody Koeninger <c...@koeninger.org> wrote: > Look carefully at the error message, the types you're passing in don't > match. For instance, you're passing in a message handler that returns > a tuple, but the rdd return type you're specifying (the 5th type > argument) is just String. > > On Fri, May 6, 2016 at 9:49 AM, Eric Friedman <eric.d.fried...@gmail.com> > wrote: > > My build dependencies: > > > > > > compile 'org.scala-lang:scala-library:2.10.4' > > > > compile 'org.apache.spark:spark-core_2.10:1.6.1' > > > > compile 'org.apache.spark:spark-sql_2.10:1.6.1' > > > > compile 'org.apache.spark:spark-hive_2.10:1.6.1' > > > > compile 'org.apache.spark:spark-streaming_2.10:1.6.1' > > > > compile 'com.databricks:spark-avro_2.10:2.0.1' > > > > > > compile 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1' > > > > compile 'org.apache.kafka:kafka-clients:0.8.2.1' > > > > compile 'org.apache.kafka:kafka_2.10:0.8.2.1' > > > > compile 'com.yammer.metrics:metrics-core:2.2.0' > > > > > > On Fri, May 6, 2016 at 7:47 AM, Eric Friedman <eric.d.fried...@gmail.com > > > > wrote: > >> > >> Hello, > >> > >> I've been using createDirectStream with Kafka and now need to switch to > >> the version of that API that lets me supply offsets for my topics. I'm > >> unable to get this to compile for some reason, even if I lift the very > same > >> usage from the Spark test suite. > >> > >> I'm calling it like this: > >> > >> val topic = "offset" > >> > >> val topicPartition = TopicAndPartition(topic, 0) > >> > >> val messageHandler = (mmd: MessageAndMetadata[String, String]) => > >> (mmd.key, mmd.message) > >> > >> val stream = KafkaUtils.createDirectStream[String, String, > >> StringDecoder, StringDecoder, String]( > >> > >> ssc, kafkaParams, Map(topicPartition -> 11L), messageHandler) > >> > >> > >> > >> > >> Error: > >> > >> MyCode.scala:97: overloaded method value createDirectStream with > >> alternatives: > >> > >> (jssc: > >> org.apache.spark.streaming.api.java.JavaStreamingContext,keyClass: > >> Class[String],valueClass: Class[String],keyDecoderClass: > >> Class[kafka.serializer.StringDecoder],valueDecoderClass: > >> Class[kafka.serializer.StringDecoder],recordClass: > >> Class[String],kafkaParams: java.util.Map[String,String],fromOffsets: > >> > java.util.Map[kafka.common.TopicAndPartition,java.lang.Long],messageHandler: > >> > org.apache.spark.api.java.function.Function[kafka.message.MessageAndMetadata[String,String],String])org.apache.spark.streaming.api.java.JavaInputDStream[String] > >> <and> > >> > >> (ssc: org.apache.spark.streaming.StreamingContext,kafkaParams: > >> scala.collection.immutable.Map[String,String],fromOffsets: > >> > scala.collection.immutable.Map[kafka.common.TopicAndPartition,scala.Long],messageHandler: > >> kafka.message.MessageAndMetadata[String,String] => String)(implicit > >> evidence$14: scala.reflect.ClassTag[String], implicit evidence$15: > >> scala.reflect.ClassTag[String], implicit evidence$16: > >> scala.reflect.ClassTag[kafka.serializer.StringDecoder], implicit > >> evidence$17: scala.reflect.ClassTag[kafka.serializer.StringDecoder], > >> implicit evidence$18: > >> > scala.reflect.ClassTag[String])org.apache.spark.streaming.dstream.InputDStream[String] > >> > >> cannot be applied to (org.apache.spark.streaming.StreamingContext, > >> scala.collection.mutable.Map[String,String], > >> scala.collection.mutable.Map[kafka.common.TopicAndPartition,scala.Long], > >> kafka.message.MessageAndMetadata[String,String] => (String, String)) > >> > >> val stream = KafkaUtils.createDirectStream[String, String, > >> StringDecoder, StringDecoder, String]( > >> > >> ^ > >> > >> one error found > > > > >