Look at the definitions of the java-specific KafkaUtils.createDirectStream methods (the ones that take a JavaStreamingContext)
On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <shushantaror...@gmail.com> wrote: > How to create classtag in java ?Also Constructor > of DirectKafkaInputDStream takes Function1 not Function but > kafkautils.createDirectStream allows function. > > I have below as overriden DirectKafkaInputDStream. > > > public class CustomDirectKafkaInputDstream extends > DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder, > kafka.serializer.DefaultDecoder, byte[][]>{ > > public CustomDirectKafkaInputDstream( > StreamingContext ssc_, > Map<String, String> kafkaParams, > Map<TopicAndPartition, Object> fromOffsets, > Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler, > ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2, > ClassTag<DefaultDecoder> evidence$3, > ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) { > super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1, > evidence$2, > evidence$3, evidence$4, evidence$5); > } > @Override > public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder, > byte[][]>> compute( > Time validTime) { > int processe=processedCounter.value(); > int failed = failedProcessingsCounter.value(); > if((processed==failed)){ > System.out.println("backing off since its 100 % failure"); > return Option.empty(); > }else{ > System.out.println("starting the stream "); > > return super.compute(validTime); > } > } > > > > To create this stream > I am using > scala.collection.immutable.Map<String, String> scalakafkaParams = > JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String, > String>>conforms()); > scala.collection.immutable.Map<TopicAndPartition, Long> > scalaktopicOffsetMap= > JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition, > Long>>conforms()); > > scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler = > new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() { > ..}); > JavaDStream<byte[][]> directKafkaStream = new > CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap, > handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class, > kafka.serializer.DefaultDecoder.class,byte[][].class); > > > > How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And > how to use Function instead of Function1 ? > > > > On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> I'm not aware of an existing api per se, but you could create your own >> subclass of the DStream that returns None for compute() under certain >> conditions. >> >> >> >> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora < >> shushantaror...@gmail.com> wrote: >> >>> Hi Cody >>> >>> Can you help here if streaming 1.3 has any api for not consuming any >>> message in next few runs? >>> >>> Thanks >>> >>> ---------- Forwarded message ---------- >>> From: Shushant Arora <shushantaror...@gmail.com> >>> Date: Wed, Aug 12, 2015 at 11:23 PM >>> Subject: spark streaming 1.3 doubts(force it to not consume anything) >>> To: user <user@spark.apache.org> >>> >>> >>> I Can't make my stream application batch interval to change at run time >>> . Its always fixed and it always creates jobs at specified batch inetval >>> and enqueue them if earleir batch is not finished. >>> >>> My requirement is to process the events and post them to some external >>> server and if external server is down I want to increase the batch time - >>> that is not possible but can I make it not to consume any messages in say >>> next 5 successive runs ? >>> >>> >>> >>> >> >