Actually this is a Scala problem. createDirectStream actually requires
implicit values, which is implied as context bound, Java does not have the
equivalence, so here change the java class to the ClassTag, and make it as
implicit value, it will be used by createDirectStream.


Thanks
Saisai


On Thu, Dec 17, 2015 at 9:49 PM, Hao Ren <inv...@gmail.com> wrote:

> Hi,
>
> I am reading spark streaming Kafka code.
>
> In org.apache.spark.streaming.kafka.KafkaUtils file,
> the function "createDirectStream" takes key class, value class, etc to
> create classTag.
> However, they are all implicit. I don't understand why they are implicit.
>
> In fact, I can not find any other overloaded "createDirectStream" take
> implicit parameters.
>
> So what are these implicit ClassTags are used for ? Thank you.
>
> def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
>     jssc: JavaStreamingContext,
>     keyClass: Class[K],
>     valueClass: Class[V],
>     keyDecoderClass: Class[KD],
>     valueDecoderClass: Class[VD],
>     recordClass: Class[R],
>     kafkaParams: JMap[String, String],
>     fromOffsets: JMap[TopicAndPartition, JLong],
>     messageHandler: JFunction[MessageAndMetadata[K, V], R]
>   ): JavaInputDStream[R] = {
>   implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
>   implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
>   implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
>   implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
>   implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
>   val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
>   createDirectStream[K, V, KD, VD, R](
>     jssc.ssc,
>     Map(kafkaParams.toSeq: _*),
>     Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
>     cleanedHandler
>   )
> }
>
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>

Reply via email to