will try scala. Only Reason of not using scala is - never worked on that. On Wed, Aug 19, 2015 at 7:34 PM, Cody Koeninger <c...@koeninger.org> wrote:
> Is there a reason not to just use scala? It's not a lot of code... and > it'll be even less code in scala ;) > > On Wed, Aug 19, 2015 at 4:14 AM, Shushant Arora <shushantaror...@gmail.com > > wrote: > >> To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but >> Option<KafkaRDD[K, V, U, T, R] > is not subclass of Option<RDD[R]>; >> >> In scala C[T’] is a subclass of C[T] as per >> https://twitter.github.io/scala_school/type-basics.html >> but this is not allowed in java. >> >> So is there any workaround to achieve this in java for overriding >> DirectKafkaInputDStream >> ? >> >> >> On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java >>> generic inheritance is not supported so derived class cannot return >>> different genric typed subclass from overriden method. >>> >>> On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> Option is covariant and KafkaRDD is a subclass of RDD >>>> >>>> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> Is it that in scala its allowed for derived class to have any return >>>>> type ? >>>>> >>>>> And streaming jar is originally created in scala so its allowed for >>>>> DirectKafkaInputDStream to return Option[KafkaRDD[K, V, U, T, R]] >>>>> compute method ? >>>>> >>>>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora < >>>>> shushantaror...@gmail.com> wrote: >>>>> >>>>>> looking at source code of >>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream >>>>>> >>>>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, >>>>>> R]] = { >>>>>> val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) >>>>>> val rdd = KafkaRDD[K, V, U, T, R]( >>>>>> context.sparkContext, kafkaParams, currentOffsets, >>>>>> untilOffsets, messageHandler) >>>>>> >>>>>> currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) >>>>>> Some(rdd) >>>>>> } >>>>>> >>>>>> >>>>>> But in DStream its def compute (validTime: Time): Option[RDD[T]] , >>>>>> >>>>>> So what should be the return type of custom DStream extends >>>>>> DirectKafkaInputDStream . >>>>>> Since I want the behaviour to be same as of DirectKafkaInputDStream >>>>>> in normal scenarios and return none in specific scenario. >>>>>> >>>>>> And why the same error did not come while extending >>>>>> DirectKafkaInputDStream from InputDStream ? Since new return type >>>>>> Option[KafkaRDD[K, >>>>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been >>>>>> failed? >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org> >>>>>> wrote: >>>>>> >>>>>>> The superclass method in DStream is defined as returning an >>>>>>> Option[RDD[T]] >>>>>>> >>>>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora < >>>>>>> shushantaror...@gmail.com> wrote: >>>>>>> >>>>>>>> Getting compilation error while overriding compute method of >>>>>>>> DirectKafkaInputDStream. >>>>>>>> >>>>>>>> >>>>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83] >>>>>>>> compute(org.apache.spark.streaming.Time) in >>>>>>>> CustomDirectKafkaInputDstream >>>>>>>> cannot override compute(org.apache.spark.streaming.Time) in >>>>>>>> org.apache.spark.streaming.dstream.DStream; attempting to use >>>>>>>> incompatible >>>>>>>> return type >>>>>>>> >>>>>>>> [ERROR] found : >>>>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>> >>>>>>>> >>>>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>> >>>>>>>> >>>>>>>> >>>>>>>> class : >>>>>>>> >>>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>>> DirectKafkaInputDStream<byte[], byte[], >>>>>>>> kafka.serializer.DefaultDecoder, >>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>>> >>>>>>>> @Override >>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, >>>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>>> Time validTime) { >>>>>>>> >>>>>>>> int processed=processedCounter.value(); >>>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>>> if((processed==failed)){ >>>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>>> return Option.empty(); >>>>>>>> }else{ >>>>>>>> System.out.println("starting the stream "); >>>>>>>> >>>>>>>> return super.compute(validTime); >>>>>>>> } >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> What should be the return type of compute method ? super class is >>>>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, >>>>>>>> DefaultDecoder, >>>>>>>> byte[][]>> but its expecting >>>>>>>> scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived class >>>>>>>> . Is >>>>>>>> there something wring with code? >>>>>>>> >>>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> Look at the definitions of the java-specific >>>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a >>>>>>>>> JavaStreamingContext) >>>>>>>>> >>>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora < >>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> How to create classtag in java ?Also Constructor >>>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but >>>>>>>>>> kafkautils.createDirectStream allows function. >>>>>>>>>> >>>>>>>>>> I have below as overriden DirectKafkaInputDStream. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> public class CustomDirectKafkaInputDstream extends >>>>>>>>>> DirectKafkaInputDStream<byte[], byte[], >>>>>>>>>> kafka.serializer.DefaultDecoder, >>>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{ >>>>>>>>>> >>>>>>>>>> public CustomDirectKafkaInputDstream( >>>>>>>>>> StreamingContext ssc_, >>>>>>>>>> Map<String, String> kafkaParams, >>>>>>>>>> Map<TopicAndPartition, Object> fromOffsets, >>>>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> >>>>>>>>>> messageHandler, >>>>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2, >>>>>>>>>> ClassTag<DefaultDecoder> evidence$3, >>>>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> >>>>>>>>>> evidence$5) { >>>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, >>>>>>>>>> evidence$2, >>>>>>>>>> evidence$3, evidence$4, evidence$5); >>>>>>>>>> } >>>>>>>>>> @Override >>>>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, >>>>>>>>>> DefaultDecoder, byte[][]>> compute( >>>>>>>>>> Time validTime) { >>>>>>>>>> int processe=processedCounter.value(); >>>>>>>>>> int failed = failedProcessingsCounter.value(); >>>>>>>>>> if((processed==failed)){ >>>>>>>>>> System.out.println("backing off since its 100 % failure"); >>>>>>>>>> return Option.empty(); >>>>>>>>>> }else{ >>>>>>>>>> System.out.println("starting the stream "); >>>>>>>>>> >>>>>>>>>> return super.compute(validTime); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> To create this stream >>>>>>>>>> I am using >>>>>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams = >>>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String, >>>>>>>>>> String>>conforms()); >>>>>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long> >>>>>>>>>> scalaktopicOffsetMap= >>>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition, >>>>>>>>>> Long>>conforms()); >>>>>>>>>> >>>>>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> >>>>>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, >>>>>>>>>> byte[][]>() { >>>>>>>>>> ..}); >>>>>>>>>> JavaDStream<byte[][]> directKafkaStream = new >>>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams >>>>>>>>>> ,scalaktopicOffsetMap, >>>>>>>>>> handler,byte[].class,byte[].class, >>>>>>>>>> kafka.serializer.DefaultDecoder.class, >>>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class); >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> How to pass classTag to constructor in >>>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of >>>>>>>>>> Function1 ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger < >>>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>>> >>>>>>>>>>> I'm not aware of an existing api per se, but you could create >>>>>>>>>>> your own subclass of the DStream that returns None for compute() >>>>>>>>>>> under >>>>>>>>>>> certain conditions. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >>>>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Cody >>>>>>>>>>>> >>>>>>>>>>>> Can you help here if streaming 1.3 has any api for not >>>>>>>>>>>> consuming any message in next few runs? >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>>> ---------- Forwarded message ---------- >>>>>>>>>>>> From: Shushant Arora <shushantaror...@gmail.com> >>>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM >>>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume >>>>>>>>>>>> anything) >>>>>>>>>>>> To: user <user@spark.apache.org> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I Can't make my stream application batch interval to change at >>>>>>>>>>>> run time . Its always fixed and it always creates jobs at >>>>>>>>>>>> specified batch >>>>>>>>>>>> inetval and enqueue them if earleir batch is not finished. >>>>>>>>>>>> >>>>>>>>>>>> My requirement is to process the events and post them to some >>>>>>>>>>>> external server and if external server is down I want to increase >>>>>>>>>>>> the batch >>>>>>>>>>>> time - that is not possible but can I make it not to consume any >>>>>>>>>>>> messages >>>>>>>>>>>> in say next 5 successive runs ? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >