Thank you for your quick answer. It helped me to find an implicit conversion for JavaInputDStream which takes implicit ClassTag.
Cheers. On Thu, Dec 17, 2015 at 3:11 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote: > Actually this is a Scala problem. createDirectStream actually requires > implicit values, which is implied as context bound, Java does not have the > equivalence, so here change the java class to the ClassTag, and make it as > implicit value, it will be used by createDirectStream. > > > Thanks > Saisai > > > On Thu, Dec 17, 2015 at 9:49 PM, Hao Ren <inv...@gmail.com> wrote: > >> Hi, >> >> I am reading spark streaming Kafka code. >> >> In org.apache.spark.streaming.kafka.KafkaUtils file, >> the function "createDirectStream" takes key class, value class, etc to >> create classTag. >> However, they are all implicit. I don't understand why they are implicit. >> >> In fact, I can not find any other overloaded "createDirectStream" take >> implicit parameters. >> >> So what are these implicit ClassTags are used for ? Thank you. >> >> def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R]( >> jssc: JavaStreamingContext, >> keyClass: Class[K], >> valueClass: Class[V], >> keyDecoderClass: Class[KD], >> valueDecoderClass: Class[VD], >> recordClass: Class[R], >> kafkaParams: JMap[String, String], >> fromOffsets: JMap[TopicAndPartition, JLong], >> messageHandler: JFunction[MessageAndMetadata[K, V], R] >> ): JavaInputDStream[R] = { >> implicit val keyCmt: ClassTag[K] = ClassTag(keyClass) >> implicit val valueCmt: ClassTag[V] = ClassTag(valueClass) >> implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) >> implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) >> implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) >> val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _) >> createDirectStream[K, V, KD, VD, R]( >> jssc.ssc, >> Map(kafkaParams.toSeq: _*), >> Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*), >> cleanedHandler >> ) >> } >> >> >> -- >> Hao Ren >> >> Data Engineer @ leboncoin >> >> Paris, France >> > > -- Hao Ren Data Engineer @ leboncoin Paris, France