Actually this is a Scala problem. createDirectStream actually requires implicit values, which is implied as context bound, Java does not have the equivalence, so here change the java class to the ClassTag, and make it as implicit value, it will be used by createDirectStream.
Thanks Saisai On Thu, Dec 17, 2015 at 9:49 PM, Hao Ren <inv...@gmail.com> wrote: > Hi, > > I am reading spark streaming Kafka code. > > In org.apache.spark.streaming.kafka.KafkaUtils file, > the function "createDirectStream" takes key class, value class, etc to > create classTag. > However, they are all implicit. I don't understand why they are implicit. > > In fact, I can not find any other overloaded "createDirectStream" take > implicit parameters. > > So what are these implicit ClassTags are used for ? Thank you. > > def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( > jssc: JavaStreamingContext, > keyClass: Class[K], > valueClass: Class[V], > keyDecoderClass: Class[KD], > valueDecoderClass: Class[VD], > recordClass: Class[R], > kafkaParams: JMap[String, String], > fromOffsets: JMap[TopicAndPartition, JLong], > messageHandler: JFunction[MessageAndMetadata[K, V], R] > ): JavaInputDStream[R] = { > implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) > implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) > implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) > implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) > implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) > val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _) > createDirectStream[K, V, KD, VD, R]( > jssc.ssc, > Map(kafkaParams.toSeq: _*), > Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*), > cleanedHandler > ) > } > > > -- > Hao Ren > > Data Engineer @ leboncoin > > Paris, France >