looking at source code of
org.apache.spark.streaming.kafka.DirectKafkaInputDStream

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
messageHandler)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
  }


But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,

So what should  be the return type of custom DStream extends
DirectKafkaInputDStream .
Since I want the behaviour to be same as of DirectKafkaInputDStream  in
normal scenarios and return none in specific scenario.

And why the same error did not come while extending DirectKafkaInputDStream
from InputDStream ? Since new return type Option[KafkaRDD[K, V, U, T, R]] is
not subclass of Option[RDD[T] so it should have been failed?




On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org> wrote:

> The superclass method in DStream is defined as returning an Option[RDD[T]]
>
> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <shushantaror...@gmail.com
> > wrote:
>
>> Getting compilation error while overriding compute method of
>> DirectKafkaInputDStream.
>>
>>
>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>> compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
>> cannot override compute(org.apache.spark.streaming.Time) in
>> org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
>> return type
>>
>> [ERROR] found   :
>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>
>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>
>>
>> class :
>>
>> public class CustomDirectKafkaInputDstream extends
>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>> kafka.serializer.DefaultDecoder, byte[][]>{
>>
>> @Override
>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>> byte[][]>> compute(
>> Time validTime) {
>>
>> int processed=processedCounter.value();
>> int failed = failedProcessingsCounter.value();
>> if((processed==failed)){
>> System.out.println("backing off since its 100 % failure");
>> return Option.empty();
>> }else{
>> System.out.println("starting the stream ");
>>
>> return super.compute(validTime);
>> }
>> }
>> }
>>
>>
>> What should be the return type of compute method ? super class is
>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>> byte[][]>>  but its expecting
>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . Is
>> there something wring with code?
>>
>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Look at the definitions of the java-specific
>>> KafkaUtils.createDirectStream methods (the ones that take a
>>> JavaStreamingContext)
>>>
>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> How to create classtag in java ?Also Constructor
>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>> kafkautils.createDirectStream allows function.
>>>>
>>>> I have below as overriden DirectKafkaInputDStream.
>>>>
>>>>
>>>> public class CustomDirectKafkaInputDstream extends
>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>
>>>> public CustomDirectKafkaInputDstream(
>>>> StreamingContext ssc_,
>>>> Map<String, String> kafkaParams,
>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>> ClassTag<DefaultDecoder> evidence$3,
>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>> evidence$2,
>>>> evidence$3, evidence$4, evidence$5);
>>>> }
>>>> @Override
>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>>>> byte[][]>> compute(
>>>> Time validTime) {
>>>> int processe=processedCounter.value();
>>>> int failed = failedProcessingsCounter.value();
>>>> if((processed==failed)){
>>>> System.out.println("backing off since its 100 % failure");
>>>> return Option.empty();
>>>> }else{
>>>> System.out.println("starting the stream ");
>>>>
>>>> return super.compute(validTime);
>>>> }
>>>> }
>>>>
>>>>
>>>>
>>>> To create this stream
>>>> I am using
>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>> String>>conforms());
>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>> scalaktopicOffsetMap=
>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>> Long>>conforms());
>>>>
>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler =
>>>> new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>>>         ..});
>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>>>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>
>>>>
>>>>
>>>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>>>> And how to use Function instead of Function1 ?
>>>>
>>>>
>>>>
>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> I'm not aware of an existing api per se, but you could create your own
>>>>> subclass of the DStream that returns None for compute() under certain
>>>>> conditions.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>> shushantaror...@gmail.com> wrote:
>>>>>
>>>>>> Hi Cody
>>>>>>
>>>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>>>> message in next few runs?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> ---------- Forwarded message ----------
>>>>>> From: Shushant Arora <shushantaror...@gmail.com>
>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>>>> To: user <user@spark.apache.org>
>>>>>>
>>>>>>
>>>>>> I Can't make my stream application batch interval to change at run
>>>>>> time . Its always fixed and it always creates jobs at specified batch
>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>
>>>>>> My requirement is to process the events and post them to some
>>>>>> external server and if external server is down I want to increase the 
>>>>>> batch
>>>>>> time - that is not possible but can I make it not to consume any messages
>>>>>> in say next 5 successive runs ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to