Look at the definitions of the java-specific KafkaUtils.createDirectStream
methods (the ones that take a JavaStreamingContext)

On Mon, Aug 17, 2015 at 5:13 AM, Shushant Arora <shushantaror...@gmail.com>
wrote:

> How to create classtag in java ?Also Constructor
> of DirectKafkaInputDStream takes Function1 not Function but
> kafkautils.createDirectStream allows function.
>
> I have below as overriden DirectKafkaInputDStream.
>
>
> public class CustomDirectKafkaInputDstream extends
> DirectKafkaInputDStream<byte[], byte[], kafka.serializer.DefaultDecoder,
> kafka.serializer.DefaultDecoder, byte[][]>{
>
> public CustomDirectKafkaInputDstream(
> StreamingContext ssc_,
> Map<String, String> kafkaParams,
> Map<TopicAndPartition, Object> fromOffsets,
> Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> messageHandler,
> ClassTag<byte[]> evidence$1, ClassTag<byte[]> evidence$2,
> ClassTag<DefaultDecoder> evidence$3,
> ClassTag<DefaultDecoder> evidence$4, ClassTag<byte[][]> evidence$5) {
> super(ssc_, kafkaParams, fromOffsets, messageHandler, evidence$1,
> evidence$2,
> evidence$3, evidence$4, evidence$5);
> }
> @Override
> public Option<KafkaRDD<byte[], byte[], DefaultDecoder, DefaultDecoder,
> byte[][]>> compute(
> Time validTime) {
> int processe=processedCounter.value();
> int failed = failedProcessingsCounter.value();
> if((processed==failed)){
> System.out.println("backing off since its 100 % failure");
> return Option.empty();
> }else{
> System.out.println("starting the stream ");
>
> return super.compute(validTime);
> }
> }
>
>
>
> To create this stream
> I am using
> scala.collection.immutable.Map<String, String> scalakafkaParams =
> JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala().toMap(Predef.<Tuple2<String,
> String>>conforms());
> scala.collection.immutable.Map<TopicAndPartition, Long>
> scalaktopicOffsetMap=
> JavaConverters.mapAsScalaMapConverter(topicOffsetMap).asScala().toMap(Predef.<Tuple2<TopicAndPartition,
> Long>>conforms());
>
> scala.Function1<MessageAndMetadata<byte[], byte[]>, byte[][]> handler =
> new Function<MessageAndMetadata<byte[], byte[]>, byte[][]>() {
>         ..});
> JavaDStream<byte[][]> directKafkaStream = new
> CustomDirectKafkaInputDstream(jssc,scalakafkaParams ,scalaktopicOffsetMap,
> handler,byte[].class,byte[].class, kafka.serializer.DefaultDecoder.class,
> kafka.serializer.DefaultDecoder.class,byte[][].class);
>
>
>
> How to pass classTag to constructor in CustomDirectKafkaInputDstream ? And
> how to use Function instead of Function1 ?
>
>
>
> On Thu, Aug 13, 2015 at 12:16 AM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> I'm not aware of an existing api per se, but you could create your own
>> subclass of the DStream that returns None for compute() under certain
>> conditions.
>>
>>
>>
>> On Wed, Aug 12, 2015 at 1:03 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Hi Cody
>>>
>>> Can you help here if streaming 1.3 has any api for not consuming any
>>> message in next few runs?
>>>
>>> Thanks
>>>
>>> ---------- Forwarded message ----------
>>> From: Shushant Arora <shushantaror...@gmail.com>
>>> Date: Wed, Aug 12, 2015 at 11:23 PM
>>> Subject: spark streaming 1.3 doubts(force it to not consume anything)
>>> To: user <user@spark.apache.org>
>>>
>>>
>>> I Can't make my stream application batch interval to change at run time
>>> . Its always fixed and it always creates jobs at specified batch inetval
>>> and enqueue them if earleir batch is not finished.
>>>
>>> My requirement is to process the events and post them to some external
>>> server and if external server is down I want to increase the batch time -
>>> that is not possible but can I make it not to consume any messages in say
>>> next 5 successive runs ?
>>>
>>>
>>>
>>>
>>
>

Reply via email to