Relevant documentation -
https://spark.apache.org/docs/latest/streaming-kafka-integration.html,
towards the end.

directKafkaStream.foreachRDD { rdd =>
     val offsetRanges = rdd.asInstanceOf[HasOffsetRanges]
     // offsetRanges.length = # of Kafka partitions being consumed
     ...
 }


On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger <c...@koeninger.org> wrote:

> You have access to the offset ranges for a given rdd in the stream by
> typecasting to HasOffsetRanges.  You can then store the offsets wherever
> you need to.
>
> On Tue, Jul 14, 2015 at 5:00 PM, Chen Song <chen.song...@gmail.com> wrote:
>
>> A follow up question.
>>
>> When using createDirectStream approach, the offsets are checkpointed to
>> HDFS and it is understandable by Spark Streaming job. Is there a way to
>> expose the offsets via a REST api to end users. Or alternatively, is there
>> a way to have offsets committed to Kafka Offset Manager so users can query
>> from a consumer programmatically?
>>
>> Essentially, all I need to do is monitor the progress of data consumption
>> of the Kafka topic.
>>
>>
>> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> You can't use different versions of spark in your application vs your
>>> cluster.
>>>
>>> For the direct stream, it's not 60 partitions per executor, it's 300
>>> partitions, and executors work on them as they are scheduled.  Yes, if you
>>> have no messages you will get an empty partition.  It's up to you whether
>>> it's worthwhile to call coalesce or not.
>>>
>>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
>>>> Is this 3 is no of parallel consumer threads per receiver , means in
>>>> total we have 2*3=6 consumer in same consumer group consuming from all 300
>>>> partitions.
>>>> 3 is just parallelism on same receiver and recommendation is to use 1
>>>> per receiver since consuming from kafka is not cpu bound rather NIC(network
>>>> bound)  increasing consumer thread on one receiver won't make it parallel
>>>> in ideal sense ?
>>>>
>>>> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka
>>>> topic has 300 partions , does kafkaRDD created on 5 executors will have 60
>>>> partitions per executor (total 300 one to one mapping) and if some of kafka
>>>> partitions are empty say offset of last checkpoint to current is same for
>>>> partitons P123, still it will create empty partition in kafkaRDD ? So we
>>>> should call coalesce on kafkaRDD ?
>>>>
>>>>
>>>> And is there any incompatibity issue when I include
>>>> spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my
>>>> application but my cluster has spark version 1.2 ?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora <
>>>> shushantaror...@gmail.com> wrote:
>>>>
>>>>> 1. Here you are basically creating 2 receivers and asking each of them
>>>>> to consume 3 kafka partitions each.
>>>>>
>>>>> - In 1.2 we have high level consumers so how can we restrict no of
>>>>> kafka partitions to consume from? Say I have 300 kafka partitions in kafka
>>>>> topic and as in above I gave 2 receivers and 3 kafka partitions . Then is
>>>>> it mean I will read from 6 out of 300 partitions only and for rest 294
>>>>> partitions data is lost?
>>>>>
>>>>>
>>>>> 2.One more doubt in spark streaming how is it decided which part of
>>>>> main function of driver will run at each batch interval ? Since whole code
>>>>> is written in one function(main function in driver) so how it determined
>>>>> kafka streams receivers  not to be registered in each batch only 
>>>>> processing
>>>>> to be done .
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha <guha.a...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> Let me take ashot at your questions. (I am sure people like Cody and
>>>>>> TD will correct if I am wrong)
>>>>>>
>>>>>> 0. This is exact copy from the similar question in mail thread from
>>>>>> Akhil D:
>>>>>> Since you set local[4] you will have 4 threads for your computation,
>>>>>> and since you are having 2 receivers, you are left with 2 threads to
>>>>>> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2
>>>>>> means you are having 2 tasks in that stage (with id 0).
>>>>>>
>>>>>> 1. Here you are basically creating 2 receivers and asking each of
>>>>>> them to consume 3 kafka partitions each.
>>>>>> 2. How does that matter? It depends on how many receivers you have
>>>>>> created to consume that data and if you have repartitioned it. Remember,
>>>>>> spark is lazy and executors are relted to the context
>>>>>> 3. I think in java, factory method is fixed. You just pass around the
>>>>>> contextFactory object. (I love python :) see the signature isso much
>>>>>> cleaner :) )
>>>>>> 4. Yes, if you use spark checkpointing. You can use yourcustom check
>>>>>> pointing too.
>>>>>>
>>>>>> Best
>>>>>> Ayan
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora <
>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>
>>>>>>> Few doubts :
>>>>>>>
>>>>>>> In 1.2 streaming when I use union of streams , my streaming
>>>>>>> application getting hanged sometimes and nothing gets printed on driver.
>>>>>>>
>>>>>>>
>>>>>>> [Stage 2:>
>>>>>>>
>>>>>>>                                         (0 + 2) / 2]
>>>>>>>  Whats is 0+2/2 here signifies.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3);
>>>>>>> be same as numstreams=2 ? in unioned stream ?
>>>>>>>
>>>>>>> 2. I launched app on yarnRM with num-executors as 5 . It created 2
>>>>>>> receivers and 5 execuots . As in stream receivers nodes get fixed at 
>>>>>>> start
>>>>>>> of app throughout its lifetime . Does executors gets allicated at start 
>>>>>>> of
>>>>>>> each job on 1s batch interval? If yes, how does its fast to allocate
>>>>>>> resources. I mean if i increase num-executors to 50 , it will negotiate 
>>>>>>> 50
>>>>>>> executors from yarnRM at start of each job so does it takes more time in
>>>>>>> allocating executors than batch interval(here 1s , say if 500ms).? Can i
>>>>>>> fixed processing executors also throughout the app?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> SparkConf conf = new
>>>>>>> SparkConf().setAppName("SampleSparkStreamingApp");
>>>>>>> JavaStreamingContext jssc = new
>>>>>>> JavaStreamingContext(conf,Durations.milliseconds(1000));
>>>>>>>
>>>>>>> Map<String,String> kafkaParams = new HashMap<String, String>();
>>>>>>> kafkaParams.put("zookeeper.connect","ipadd:2181");
>>>>>>> kafkaParams.put("group.id", "testgroup");
>>>>>>> kafkaParams.put("zookeeper.session.timeout.ms", "10000");
>>>>>>>  Map<String,Integer> topicsMap = new HashMap<String,Integer>();
>>>>>>> topicsMap.put("testSparkPartitioned", 3);
>>>>>>> int numStreams = 2;
>>>>>>> List<JavaPairDStream<byte[],byte[]>> kafkaStreams = new
>>>>>>> ArrayList<JavaPairDStream<byte[], byte[]>>();
>>>>>>>   for(int i=0;i<numStreams;i++){
>>>>>>>  kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class,
>>>>>>> byte[].class,kafka.serializer.DefaultDecoder.class ,
>>>>>>> kafka.serializer.DefaultDecoder.class,
>>>>>>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY()));
>>>>>>> }
>>>>>>>  JavaPairDStream<byte[], byte[]> directKafkaStream =
>>>>>>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1,
>>>>>>> kafkaStreams.size()));
>>>>>>>  JavaDStream<String> lines = directKafkaStream.map(new
>>>>>>> Function<Tuple2<byte[],byte[]>, String>() {
>>>>>>>
>>>>>>> public String call(Tuple2<byte[], byte[]> arg0) throws Exception {
>>>>>>> ...processing
>>>>>>> ..return msg;
>>>>>>> }
>>>>>>> });
>>>>>>> lines.print();
>>>>>>> jssc.start();
>>>>>>> jssc.awaitTermination();
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -------------------------------------------------------------------------------------------------------------------------------------------------------
>>>>>>> 3.For avoiding dataloss when we use checkpointing, and factory
>>>>>>> method to create sparkConytext, is method name fixed
>>>>>>> or we can use any name and how to set in app the method name to be
>>>>>>> used ?
>>>>>>>
>>>>>>> 4.In 1.3 non receiver based streaming, kafka offset is not stored in
>>>>>>> zookeeper, is it because of zookeeper is not efficient for high writes 
>>>>>>> and
>>>>>>> read is not strictly consistent? So
>>>>>>>
>>>>>>>  we use simple Kafka API that does not use Zookeeper and offsets
>>>>>>> tracked only by Spark Streaming within its checkpoints. This
>>>>>>> eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, 
>>>>>>> and
>>>>>>> so each record is received by Spark Streaming effectively exactly once
>>>>>>> despite failures.
>>>>>>>
>>>>>>> So we have to call context.checkpoint(hdfsdir)? Or is it implicit
>>>>>>> checkoint location ? Means does hdfs be used for small data(just 
>>>>>>> offset?)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya <
>>>>>>> dibyendu.bhattach...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> There is another option to try for Receiver Based Low Level Kafka
>>>>>>>> Consumer which is part of Spark-Packages (
>>>>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) .
>>>>>>>> This can be used with WAL as well for end to end zero data loss.
>>>>>>>>
>>>>>>>> This is also Reliable Receiver and Commit offset to ZK.  Given the
>>>>>>>> number of Kafka Partitions you have ( > 100) , using High Level Kafka 
>>>>>>>> API
>>>>>>>> for Receiver based approach may leads to issues related Consumer
>>>>>>>> Re-balancing  which is a major issue of Kafka High Level API.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Dibyendu
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das <t...@databricks.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> In the receiver based approach, If the receiver crashes for any
>>>>>>>>> reason (receiver crashed or executor crashed) the receiver should get
>>>>>>>>> restarted on another executor and should start reading data from the 
>>>>>>>>> offset
>>>>>>>>> present in the zookeeper. There is some chance of data loss which can
>>>>>>>>> alleviated using Write Ahead Logs (see streaming programming guide 
>>>>>>>>> for more
>>>>>>>>> details, or see my talk [Slides PDF
>>>>>>>>> <http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx>
>>>>>>>>> , Video
>>>>>>>>> <https://www.youtube.com/watch?v=d5UJonrruHk&list=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6&index=4>
>>>>>>>>> ] from last Spark Summit 2015). But that approach can give
>>>>>>>>> duplicate records. The direct approach gives exactly-once guarantees, 
>>>>>>>>> so
>>>>>>>>> you should try it out.
>>>>>>>>>
>>>>>>>>> TD
>>>>>>>>>
>>>>>>>>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger <
>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>
>>>>>>>>>> Read the spark streaming guide ad the kafka integration guide for
>>>>>>>>>> a better understanding of how the receiver based stream works.
>>>>>>>>>>
>>>>>>>>>> Capacity planning is specific to your environment and what the
>>>>>>>>>> job is actually doing, youll need to determine it empirically.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Friday, June 26, 2015, Shushant Arora <
>>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> In 1.2 how to handle offset management after stream application
>>>>>>>>>>> starts in each job . I should commit offset after job completion 
>>>>>>>>>>> manually?
>>>>>>>>>>>
>>>>>>>>>>> And what is recommended no of consumer threads. Say I have 300
>>>>>>>>>>> partitions in kafka cluster . Load is ~ 1 million events per 
>>>>>>>>>>> second.Each
>>>>>>>>>>> event is of ~500bytes. Having 5 receivers with 60 partitions each 
>>>>>>>>>>> receiver
>>>>>>>>>>> is sufficient for spark streaming to consume ?
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger <
>>>>>>>>>>> c...@koeninger.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> The receiver-based kafka createStream in spark 1.2 uses
>>>>>>>>>>>> zookeeper to store offsets.  If you want finer-grained control over
>>>>>>>>>>>> offsets, you can update the values in zookeeper yourself before 
>>>>>>>>>>>> starting
>>>>>>>>>>>> the job.
>>>>>>>>>>>>
>>>>>>>>>>>> createDirectStream in spark 1.3 is still marked as
>>>>>>>>>>>> experimental, and subject to change.  That being said, it works 
>>>>>>>>>>>> better for
>>>>>>>>>>>> me in production than the receiver based api.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora <
>>>>>>>>>>>> shushantaror...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I am using spark streaming 1.2.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If processing executors get crashed will receiver rest the
>>>>>>>>>>>>> offset back to last processed offset?
>>>>>>>>>>>>>
>>>>>>>>>>>>> If receiver itself got crashed is there a way to reset the
>>>>>>>>>>>>> offset without restarting streaming application other than 
>>>>>>>>>>>>> smallest or
>>>>>>>>>>>>> largest.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Is spark streaming 1.3  which uses low level consumer api,
>>>>>>>>>>>>> stabe? And which is recommended for handling data  loss 1.2 or 
>>>>>>>>>>>>> 1.3 .
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards,
>>>>>> Ayan Guha
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>>
>> --
>> Chen Song
>>
>>
>

Reply via email to