Thank you for your quick answer.
It helped me to find an implicit conversion for JavaInputDStream which
takes implicit ClassTag.

Cheers.

On Thu, Dec 17, 2015 at 3:11 PM, Saisai Shao <sai.sai.s...@gmail.com> wrote:

> Actually this is a Scala problem. createDirectStream actually requires
> implicit values, which is implied as context bound, Java does not have the
> equivalence, so here change the java class to the ClassTag, and make it as
> implicit value, it will be used by createDirectStream.
>
>
> Thanks
> Saisai
>
>
> On Thu, Dec 17, 2015 at 9:49 PM, Hao Ren <inv...@gmail.com> wrote:
>
>> Hi,
>>
>> I am reading spark streaming Kafka code.
>>
>> In org.apache.spark.streaming.kafka.KafkaUtils file,
>> the function "createDirectStream" takes key class, value class, etc to
>> create classTag.
>> However, they are all implicit. I don't understand why they are implicit.
>>
>> In fact, I can not find any other overloaded "createDirectStream" take
>> implicit parameters.
>>
>> So what are these implicit ClassTags are used for ? Thank you.
>>
>> def createDirectStream[K, V, KD <: Decoder[K], VD <: Decoder[V], R](
>>     jssc: JavaStreamingContext,
>>     keyClass: Class[K],
>>     valueClass: Class[V],
>>     keyDecoderClass: Class[KD],
>>     valueDecoderClass: Class[VD],
>>     recordClass: Class[R],
>>     kafkaParams: JMap[String, String],
>>     fromOffsets: JMap[TopicAndPartition, JLong],
>>     messageHandler: JFunction[MessageAndMetadata[K, V], R]
>>   ): JavaInputDStream[R] = {
>>   implicit val keyCmt: ClassTag[K] = ClassTag(keyClass)
>>   implicit val valueCmt: ClassTag[V] = ClassTag(valueClass)
>>   implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass)
>>   implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass)
>>   implicit val recordCmt: ClassTag[R] = ClassTag(recordClass)
>>   val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _)
>>   createDirectStream[K, V, KD, VD, R](
>>     jssc.ssc,
>>     Map(kafkaParams.toSeq: _*),
>>     Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*),
>>     cleanedHandler
>>   )
>> }
>>
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France

Reply via email to