Is there a reason not to just use scala? It's not a lot of code... and it'll be even less code in scala ;)
On Wed, Aug 19, 2015 at 4:14 AM, Shushant Arora <shushantaror...@gmail.com> wrote: > To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but > Option<KafkaRDD[K, V, U, T, R] > is not subclass of Option<RDD[R]>; > > In scala C[T’] is a subclass of C[T] as per > https://twitter.github.io/scala_school/type-basics.html > but this is not allowed in java. > > So is there any workaround to achieve this in java for overriding > DirectKafkaInputDStream > ? > > > On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora < > shushantaror...@gmail.com> wrote: > >> But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java >> generic inheritance is not supported so derived class cannot return >> different genric typed subclass from overriden method. >> >> On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> Option is covariant and KafkaRDD is a subclass of RDD >>> >>> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> Is it that in scala its allowed for derived class to have any return >>>> type ? >>>> >>>> And streaming jar is originally created in scala so its allowed for >>>> DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] >>>> compute method ? >>>> >>>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> looking at source code of >>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream >>>>> >>>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] >>>>> = { >>>>> val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) >>>>> val rdd = KafkaRDD[K, V, U, T, R]( >>>>> context.sparkContext, kafkaParams, currentOffsets, untilOffsets, >>>>> messageHandler) >>>>> >>>>> currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) >>>>> Some(rdd) >>>>> } >>>>> >>>>> >>>>> But in DStream its def compute (validTime: Time): Option[RDD[T]] , >>>>> >>>>> So what should be the return type of custom DStream extends >>>>> DirectKafkaInputDStream . >>>>> Since I want the behaviour to be same as of DirectKafkaInputDStream >>>>> in normal scenarios and return none in specific scenario. >>>>> >>>>> And why the same error did not come while extending >>>>> DirectKafkaInputDStream from InputDStream ? Since new return type >>>>> Option[KafkaRDD[K, >>>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been >>>>> failed? >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>> >>>>>> The superclass method in DStream is defined as returning an >>>>>> Option[RDD[T]] >>>>>> >>>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora < >>>>>> shushantaror...@gmail.com> wrote: >>>>>> >>>>>>> Getting compilation error while overriding compute method of >>>>>>> DirectKafkaInputDStream. >>>>>>> >>>>>>> >>>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83] >>>>>>> compute(org.apache.spark.streaming.Time) in >>>>>>> CustomDirectKafkaInputDstream >>>>>>> cannot override compute(org.apache.spark.streaming.Time) in >>>>>>> org.apache.spark.streaming.dstream.DStream; attempting to use >>>>>>> incompatible >>>>>>> return type >>>>>>> >>>>>>> [ERROR] found : >>>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>> >>>>>>> >>>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>> >>>>>>> >>>>>>> >>>>>>> class : >>>>>>> >>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, >>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>> >>>>>>> @Override >>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, >>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>> Time validTime) { >>>>>>> >>>>>>> int processed=processedCounter.value(); >>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>> if((processed==failed)){ >>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>> return Option.empty(); >>>>>>> }else{ >>>>>>> System.out.println("starting the stream "); >>>>>>> >>>>>>> return super.compute(validTime); >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> What should be the return type of compute method ? super class is >>>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, >>>>>>> DefaultDecoder, >>>>>>> byte[][]>> but its expecting >>>>>>> scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived class . >>>>>>> Is >>>>>>> there something wring with code? >>>>>>> >>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Look at the definitions of the java-specific >>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a >>>>>>>> JavaStreamingContext) >>>>>>>> >>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>> >>>>>>>>> How to create classtag in java ?Also Constructor >>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but >>>>>>>>> kafkautils.createDirectStream allows function. >>>>>>>>> >>>>>>>>> I have below as overriden DirectKafkaInputDStream. >>>>>>>>> >>>>>>>>> >>>>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>>>> DirectKafkaInputDStream<byte[], byte[], >>>>>>>>> kafka.serializer.DefaultDecoder, >>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>>>> >>>>>>>>> public CustomDirectKafkaInputDstream( >>>>>>>>> StreamingContext ssc_, >>>>>>>>> Map<String, String> kafkaParams, >>>>>>>>> Map<TopicAndPartition, Object> fromOffsets, >>>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> >>>>>>>>> messageHandler, >>>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2, >>>>>>>>> ClassTag<DefaultDecoder> evidence$3, >>>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> >>>>>>>>> evidence$5) { >>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>>>>>> evidence$2, >>>>>>>>> evidence$3, evidence$4, evidence$5); >>>>>>>>> } >>>>>>>>> @Override >>>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, >>>>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>>>> Time validTime) { >>>>>>>>> int processe=processedCounter.value(); >>>>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>>>> if((processed==failed)){ >>>>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>>>> return Option.empty(); >>>>>>>>> }else{ >>>>>>>>> System.out.println("starting the stream "); >>>>>>>>> >>>>>>>>> return super.compute(validTime); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> To create this stream >>>>>>>>> I am using >>>>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams = >>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String, >>>>>>>>> String>>conforms()); >>>>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long> >>>>>>>>> scalaktopicOffsetMap= >>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition, >>>>>>>>> Long>>conforms()); >>>>>>>>> >>>>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> >>>>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, >>>>>>>>> byte[][]>() { >>>>>>>>> ..}); >>>>>>>>> JavaDStream<byte[][]> directKafkaStream = new >>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams >>>>>>>>> ,scalaktopicOffsetMap, >>>>>>>>> handler,byte[].class,byte[].class, >>>>>>>>> kafka.serializer.DefaultDecoder.class, >>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> How to pass classTag to constructor in >>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of >>>>>>>>> Function1 ? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger < >>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>> >>>>>>>>>> I'm not aware of an existing api per se, but you could create >>>>>>>>>> your own subclass of the DStream that returns None for compute() >>>>>>>>>> under >>>>>>>>>> certain conditions. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Cody >>>>>>>>>>> >>>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming >>>>>>>>>>> any message in next few runs? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>>> ---------- Forwarded message ---------- >>>>>>>>>>> From: Shushant Arora <shushantaror...@gmail.com> >>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume >>>>>>>>>>> anything) >>>>>>>>>>> To: user <user@spark.apache.org> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I Can't make my stream application batch interval to change at >>>>>>>>>>> run time . Its always fixed and it always creates jobs at specified >>>>>>>>>>> batch >>>>>>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>>>>>> >>>>>>>>>>> My requirement is to process the events and post them to some >>>>>>>>>>> external server and if external server is down I want to increase >>>>>>>>>>> the batch >>>>>>>>>>> time - that is not possible but can I make it not to consume any >>>>>>>>>>> messages >>>>>>>>>>> in say next 5 successive runs ? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >