Getting compilation error while overriding compute method of DirectKafkaInputDStream.
[ERROR] CustomDirectKafkaInputDstream.java:[51,83] compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream cannot override compute(org.apache.spark.streaming.Time) in org.apache.spark.streaming.dstream.DStream; attempting to use incompatible return type [ERROR] found : scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>> class : public class CustomDirectKafkaInputDstream extends DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, kafka.serializer.DefaultDecoder, byte[][]>{ @Override public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, byte[][]>> compute( Time validTime) { int processed=processedCounter.value(); int failed = failedProcessingsCounter.value(); if((processed==failed)){ System.out.println("backing off since its 100 % failure"); return Option.empty(); }else{ System.out.println("starting the stream "); return super.compute(validTime); } } } What should be the return type of compute method ? super class is returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, byte[][]>> but its expecting scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived class . Is there something wring with code? On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> wrote: > Look at the definitions of the java-specific KafkaUtils.createDirectStream > methods (the ones that take a JavaStreamingContext) > > On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <shushantaror...@gmail.com > > wrote: > >> How to create classtag in java ?Also Constructor >> of DirectKafkaInputDStream takes Function1 not Function but >> kafkautils.createDirectStream allows function. >> >> I have below as overriden DirectKafkaInputDStream. >> >> >> public class CustomDirectKafkaInputDstream extends >> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, >> kafka.serializer.DefaultDecoder, byte[][]>{ >> >> public CustomDirectKafkaInputDstream( >> StreamingContext ssc_, >> Map<String, String> kafkaParams, >> Map<TopicAndPartition, Object> fromOffsets, >> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler, >> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2, >> ClassTag<DefaultDecoder> evidence$3, >> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) { >> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >> evidence$2, >> evidence$3, evidence$4, evidence$5); >> } >> @Override >> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, >> byte[][]>> compute( >> Time validTime) { >> int processe=processedCounter.value(); >> int failed = failedProcessingsCounter.value(); >> if((processed==failed)){ >> System.out.println("backing off since its 100 % failure"); >> return Option.empty(); >> }else{ >> System.out.println("starting the stream "); >> >> return super.compute(validTime); >> } >> } >> >> >> >> To create this stream >> I am using >> scala.collection.immutable.Map<String, String> scalakafkaParams = >> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String, >> String>>conforms()); >> scala.collection.immutable.Map<TopicAndPartition, Long> >> scalaktopicOffsetMap= >> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition, >> Long>>conforms()); >> >> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler = >> new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() { >> ..}); >> JavaDStream<byte[][]> directKafkaStream = new >> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, >> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, >> kafka.serializer.DefaultDecoder.class,byte[][].class); >> >> >> >> How to pass classTag to constructor in CustomDirectKafkaInputDstream ? >> And how to use Function instead of Function1 ? >> >> >> >> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> I'm not aware of an existing api per se, but you could create your own >>> subclass of the DStream that returns None for compute() under certain >>> conditions. >>> >>> >>> >>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> Hi Cody >>>> >>>> Can you help here if streaming 1.3 has any api for not consuming any >>>> message in next few runs? >>>> >>>> Thanks >>>> >>>> ---------- Forwarded message ---------- >>>> From: Shushant Arora <shushantaror...@gmail.com> >>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>> Subject: spark streaming 1.3 doubts(force it to not consume anything) >>>> To: user <user@spark.apache.org> >>>> >>>> >>>> I Can't make my stream application batch interval to change at run time >>>> . Its always fixed and it always creates jobs at specified batch inetval >>>> and enqueue them if earleir batch is not finished. >>>> >>>> My requirement is to process the events and post them to some external >>>> server and if external server is down I want to increase the batch time - >>>> that is not possible but can I make it not to consume any messages in say >>>> next 5 successive runs ? >>>> >>>> >>>> >>>> >>> >> >