Is there a reason not to just use scala?  It's not a lot of code... and
it'll be even less code in scala ;)

On Wed, Aug 19, 2015 at 4:14 AM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> To correct myself - KafkaRDD[K, V, U, T, R] is subclass of RDD[R] but
> Option<KafkaRDD[K, V, U, T, R] >  is not subclass of Option<RDD[R]>;
>
> In scala C[T’] is a subclass of C[T] as per
> https://twitter.github.io/scala_school/type-basics.html
> but this is not allowed in java.
>
> So is there any workaround to achieve this in java for overriding 
> DirectKafkaInputDStream
> ?
>
>
> On Wed, Aug 19, 2015 at 12:45 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> But KafkaRDD[K, V, U, T, R] is not subclass of RDD[R] as per java
>> generic inheritance is not supported so derived class cannot return
>>  different genric typed subclass from overriden method.
>>
>> On Wed, Aug 19, 2015 at 12:02 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Option is covariant and KafkaRDD is a subclass of RDD
>>>
>>> On Tue, Aug 18, 2015 at 1:12 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> Is it that in scala its allowed for derived class to have any return
>>>> type ?
>>>>
>>>>  And streaming jar is originally created in scala so its allowed for
>>>> DirectKafkaInputDStream  to return Option[KafkaRDD[K, V, U, T, R]]
>>>> compute method ?
>>>>
>>>> On Tue, Aug 18, 2015 at 8:36 PM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> looking at source code of
>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream
>>>>>
>>>>> override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]]
>>>>> = {
>>>>>     val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
>>>>>     val rdd = KafkaRDD[K, V, U, T, R](
>>>>>       context.sparkContext, kafkaParams, currentOffsets, untilOffsets,
>>>>> messageHandler)
>>>>>
>>>>>     currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
>>>>>     Some(rdd)
>>>>>   }
>>>>>
>>>>>
>>>>> But in DStream its def compute (validTime: Time): Option[RDD[T]]  ,
>>>>>
>>>>> So what should  be the return type of custom DStream extends
>>>>> DirectKafkaInputDStream .
>>>>> Since I want the behaviour to be same as of DirectKafkaInputDStream
>>>>>  in normal scenarios and return none in specific scenario.
>>>>>
>>>>> And why the same error did not come while extending
>>>>> DirectKafkaInputDStream from InputDStream ? Since new return type 
>>>>> Option[KafkaRDD[K,
>>>>> V, U, T, R]] is not subclass of Option[RDD[T] so it should have been
>>>>> failed?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 18, 2015 at 7:28 PM, Cody Koeninger <c...@koeninger.org>
>>>>> wrote:
>>>>>
>>>>>> The superclass method in DStream is defined as returning an
>>>>>> Option[RDD[T]]
>>>>>>
>>>>>> On Tue, Aug 18, 2015 at 8:07 AM, Shushant Arora <
>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>
>>>>>>> Getting compilation error while overriding compute method of
>>>>>>> DirectKafkaInputDStream.
>>>>>>>
>>>>>>>
>>>>>>> [ERROR] CustomDirectKafkaInputDstream.java:[51,83]
>>>>>>> compute(org.apache.spark.streaming.Time) in 
>>>>>>> CustomDirectKafkaInputDstream
>>>>>>> cannot override compute(org.apache.spark.streaming.Time) in
>>>>>>> org.apache.spark.streaming.dstream.DStream; attempting to use 
>>>>>>> incompatible
>>>>>>> return type
>>>>>>>
>>>>>>> [ERROR] found   :
>>>>>>> scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>
>>>>>>>
>>>>>>> [ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>
>>>>>>>
>>>>>>>
>>>>>>> class :
>>>>>>>
>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>
>>>>>>> @Override
>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>> Time validTime) {
>>>>>>>
>>>>>>> int processed=processedCounter.value();
>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>> if((processed==failed)){
>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>> return Option.empty();
>>>>>>> }else{
>>>>>>> System.out.println("starting the stream ");
>>>>>>>
>>>>>>> return super.compute(validTime);
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> What should be the return type of compute method ? super class is
>>>>>>> returning Option<KafkaRDD<byte[], byte[], DefaultDecoder, 
>>>>>>> DefaultDecoder,
>>>>>>> byte[][]>>  but its expecting
>>>>>>>  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from derived  class . 
>>>>>>> Is
>>>>>>> there something wring with code?
>>>>>>>
>>>>>>> On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Look at the definitions of the java-specific
>>>>>>>> KafkaUtils.createDirectStream methods (the ones that take a
>>>>>>>> JavaStreamingContext)
>>>>>>>>
>>>>>>>> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <
>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> How to create classtag in java ?Also Constructor
>>>>>>>>> of DirectKafkaInputDStream takes Function1 not Function but
>>>>>>>>> kafkautils.createDirectStream allows function.
>>>>>>>>>
>>>>>>>>> I have below as overriden DirectKafkaInputDStream.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> public class CustomDirectKafkaInputDstream extends
>>>>>>>>> DirectKafkaInputDStream<byte[], byte[], 
>>>>>>>>> kafka.serializer.DefaultDecoder,
>>>>>>>>> kafka.serializer.DefaultDecoder, byte[][]>{
>>>>>>>>>
>>>>>>>>> public CustomDirectKafkaInputDstream(
>>>>>>>>> StreamingContext ssc_,
>>>>>>>>> Map<String, String> kafkaParams,
>>>>>>>>> Map<TopicAndPartition, Object> fromOffsets,
>>>>>>>>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>>>> messageHandler,
>>>>>>>>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>>>>>>>>> ClassTag<DefaultDecoder> evidence$3,
>>>>>>>>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]>
>>>>>>>>> evidence$5) {
>>>>>>>>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>>>>>>>>> evidence$2,
>>>>>>>>> evidence$3, evidence$4, evidence$5);
>>>>>>>>> }
>>>>>>>>> @Override
>>>>>>>>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder,
>>>>>>>>> DefaultDecoder, byte[][]>> compute(
>>>>>>>>> Time validTime) {
>>>>>>>>> int processe=processedCounter.value();
>>>>>>>>> int failed = failedProcessingsCounter.value();
>>>>>>>>> if((processed==failed)){
>>>>>>>>> System.out.println("backing off since its 100 % failure");
>>>>>>>>> return Option.empty();
>>>>>>>>> }else{
>>>>>>>>> System.out.println("starting the stream ");
>>>>>>>>>
>>>>>>>>> return super.compute(validTime);
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> To create this stream
>>>>>>>>> I am using
>>>>>>>>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>>>>>>>>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>>>>>>>>> String>>conforms());
>>>>>>>>> scala.collection.immutable.Map<TopicAndPartition, Long>
>>>>>>>>> scalaktopicOffsetMap=
>>>>>>>>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>>>>>>>>> Long>>conforms());
>>>>>>>>>
>>>>>>>>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]>
>>>>>>>>> handler = new Function<MessageAndMetadata<byte[], byte[]>, 
>>>>>>>>> byte[][]>() {
>>>>>>>>>         ..});
>>>>>>>>> JavaDStream<byte[][]> directKafkaStream = new
>>>>>>>>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams 
>>>>>>>>> ,scalaktopicOffsetMap,
>>>>>>>>> handler,byte[].class,byte[].class, 
>>>>>>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>>>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> How to pass classTag to constructor in
>>>>>>>>> CustomDirectKafkaInputDstream ? And how to use Function instead of
>>>>>>>>> Function1 ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <
>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> I'm not aware of an existing api per se, but you could create
>>>>>>>>>> your own subclass of the DStream that returns None for compute() 
>>>>>>>>>> under
>>>>>>>>>> certain conditions.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Cody
>>>>>>>>>>>
>>>>>>>>>>> Can you help here if streaming 1.3 has any api for not consuming
>>>>>>>>>>> any message in next few runs?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>> ---------- Forwarded message ----------
>>>>>>>>>>> From: Shushant Arora <shushantaror...@gmail.com>
>>>>>>>>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>>>>>>>>> Subject: spark streaming 1.3 doubts(force it to not consume
>>>>>>>>>>> anything)
>>>>>>>>>>> To: user <user@spark.apache.org>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I Can't make my stream application batch interval to change at
>>>>>>>>>>> run time . Its always fixed and it always creates jobs at specified 
>>>>>>>>>>> batch
>>>>>>>>>>> inetval and enqueue them if earleir batch is not finished.
>>>>>>>>>>>
>>>>>>>>>>> My requirement is to process the events and post them to some
>>>>>>>>>>> external server and if external server is down I want to increase 
>>>>>>>>>>> the batch
>>>>>>>>>>> time - that is not possible but can I make it not to consume any 
>>>>>>>>>>> messages
>>>>>>>>>>> in say next 5 successive runs ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to