Getting compilation error while overriding compute method of
DirectKafkaInputDStream.


[ERROR] CustomDirectKafkaInputDstream.java:[51,83]
compute(org.apache.spark.streaming.Time) in CustomDirectKafkaInputDstream
cannot override compute(org.apache.spark.streaming.Time) in
org.apache.spark.streaming.dstream.DStream; attempting to use incompatible
return type

[ERROR] found   :
scala.Option<org.apache.spark.streaming.kafka.KafkaRDD<byte[],byte[],kafka.serializer.DefaultDecoder,kafka.serializer.DefaultDecoder,byte[][]>>

[ERROR] required: scala.Option<org.apache.spark.rdd.RDD<byte[][]>>


class :

public class CustomDirectKafkaInputDstream extends
DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
kafka.serializer.DefaultDecoder, byte[][]>{

@Override
public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
byte[][]>> compute(
Time validTime) {

int processed=processedCounter.value();
int failed = failedProcessingsCounter.value();
if((processed==failed)){
System.out.println("backing off since its 100 % failure");
return Option.empty();
}else{
System.out.println("starting the stream ");

return super.compute(validTime);
}
}
}


What should be the return type of compute method ? super class is returning
Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, byte[][]>>
 but its expecting  scala.Option<org.apache.spark.rdd.RDD<byte[][]>> from
derived  class . Is there something wring with code?

On Mon, Aug 17, 2015 at 7:08 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Look at the definitions of the java-specific KafkaUtils.createDirectStream
> methods (the ones that take a JavaStreamingContext)
>
> On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <shushantaror...@gmail.com
> > wrote:
>
>> How to create classtag in java ?Also Constructor
>> of DirectKafkaInputDStream takes Function1 not Function but
>> kafkautils.createDirectStream allows function.
>>
>> I have below as overriden DirectKafkaInputDStream.
>>
>>
>> public class CustomDirectKafkaInputDstream extends
>> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
>> kafka.serializer.DefaultDecoder, byte[][]>{
>>
>> public CustomDirectKafkaInputDstream(
>> StreamingContext ssc_,
>> Map<String, String> kafkaParams,
>> Map<TopicAndPartition, Object> fromOffsets,
>> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
>> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
>> ClassTag<DefaultDecoder> evidence$3,
>> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
>> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
>> evidence$2,
>> evidence$3, evidence$4, evidence$5);
>> }
>> @Override
>> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
>> byte[][]>> compute(
>> Time validTime) {
>> int processe=processedCounter.value();
>> int failed = failedProcessingsCounter.value();
>> if((processed==failed)){
>> System.out.println("backing off since its 100 % failure");
>> return Option.empty();
>> }else{
>> System.out.println("starting the stream ");
>>
>> return super.compute(validTime);
>> }
>> }
>>
>>
>>
>> To create this stream
>> I am using
>> scala.collection.immutable.Map<String, String> scalakafkaParams =
>> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
>> String>>conforms());
>> scala.collection.immutable.Map<TopicAndPartition, Long>
>> scalaktopicOffsetMap=
>> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
>> Long>>conforms());
>>
>> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler =
>> new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>>         ..});
>> JavaDStream<byte[][]> directKafkaStream = new
>> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
>> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
>> kafka.serializer.DefaultDecoder.class,byte[][].class);
>>
>>
>>
>> How to pass classTag to constructor in CustomDirectKafkaInputDstream ?
>> And how to use Function instead of Function1 ?
>>
>>
>>
>> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> I'm not aware of an existing api per se, but you could create your own
>>> subclass of the DStream that returns None for compute() under certain
>>> conditions.
>>>
>>>
>>>
>>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> Hi Cody
>>>>
>>>> Can you help here if streaming 1.3 has any api for not consuming any
>>>> message in next few runs?
>>>>
>>>> Thanks
>>>>
>>>> ---------- Forwarded message ----------
>>>> From: Shushant Arora <shushantaror...@gmail.com>
>>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>>> To: user <user@spark.apache.org>
>>>>
>>>>
>>>> I Can't make my stream application batch interval to change at run time
>>>> . Its always fixed and it always creates jobs at specified batch inetval
>>>> and enqueue them if earleir batch is not finished.
>>>>
>>>> My requirement is to process the events and post them to some external
>>>> server and if external server is down I want to increase the batch time -
>>>> that is not possible but can I make it not to consume any messages in say
>>>> next 5 successive runs ?
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to