Relevant documentation - https://spark.apache.org/docs/latest/streaming-kafka-integration.html, towards the end.
directKafkaStream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges] // offsetRanges.length = # of Kafka partitions being consumed ... } On Tue, Jul 14, 2015 at 3:17 PM, Cody Koeninger <c...@koeninger.org> wrote: > You have access to the offset ranges for a given rdd in the stream by > typecasting to HasOffsetRanges. You can then store the offsets wherever > you need to. > > On Tue, Jul 14, 2015 at 5:00 PM, Chen Song <chen.song...@gmail.com> wrote: > >> A follow up question. >> >> When using createDirectStream approach, the offsets are checkpointed to >> HDFS and it is understandable by Spark Streaming job. Is there a way to >> expose the offsets via a REST api to end users. Or alternatively, is there >> a way to have offsets committed to Kafka Offset Manager so users can query >> from a consumer programmatically? >> >> Essentially, all I need to do is monitor the progress of data consumption >> of the Kafka topic. >> >> >> On Tue, Jun 30, 2015 at 9:39 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> You can't use different versions of spark in your application vs your >>> cluster. >>> >>> For the direct stream, it's not 60 partitions per executor, it's 300 >>> partitions, and executors work on them as they are scheduled. Yes, if you >>> have no messages you will get an empty partition. It's up to you whether >>> it's worthwhile to call coalesce or not. >>> >>> On Tue, Jun 30, 2015 at 2:45 AM, Shushant Arora < >>> shushantaror...@gmail.com> wrote: >>> >>>> Is this 3 is no of parallel consumer threads per receiver , means in >>>> total we have 2*3=6 consumer in same consumer group consuming from all 300 >>>> partitions. >>>> 3 is just parallelism on same receiver and recommendation is to use 1 >>>> per receiver since consuming from kafka is not cpu bound rather NIC(network >>>> bound) increasing consumer thread on one receiver won't make it parallel >>>> in ideal sense ? >>>> >>>> In non receiver based consumer spark 1.3 If I use 5 execuots and kafka >>>> topic has 300 partions , does kafkaRDD created on 5 executors will have 60 >>>> partitions per executor (total 300 one to one mapping) and if some of kafka >>>> partitions are empty say offset of last checkpoint to current is same for >>>> partitons P123, still it will create empty partition in kafkaRDD ? So we >>>> should call coalesce on kafkaRDD ? >>>> >>>> >>>> And is there any incompatibity issue when I include >>>> spark-streaming_2.10 (version 1.3) and spark-core_2.10(version 1.3) in my >>>> application but my cluster has spark version 1.2 ? >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Mon, Jun 29, 2015 at 7:56 PM, Shushant Arora < >>>> shushantaror...@gmail.com> wrote: >>>> >>>>> 1. Here you are basically creating 2 receivers and asking each of them >>>>> to consume 3 kafka partitions each. >>>>> >>>>> - In 1.2 we have high level consumers so how can we restrict no of >>>>> kafka partitions to consume from? Say I have 300 kafka partitions in kafka >>>>> topic and as in above I gave 2 receivers and 3 kafka partitions . Then is >>>>> it mean I will read from 6 out of 300 partitions only and for rest 294 >>>>> partitions data is lost? >>>>> >>>>> >>>>> 2.One more doubt in spark streaming how is it decided which part of >>>>> main function of driver will run at each batch interval ? Since whole code >>>>> is written in one function(main function in driver) so how it determined >>>>> kafka streams receivers not to be registered in each batch only >>>>> processing >>>>> to be done . >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Jun 29, 2015 at 7:35 PM, ayan guha <guha.a...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> Let me take ashot at your questions. (I am sure people like Cody and >>>>>> TD will correct if I am wrong) >>>>>> >>>>>> 0. This is exact copy from the similar question in mail thread from >>>>>> Akhil D: >>>>>> Since you set local[4] you will have 4 threads for your computation, >>>>>> and since you are having 2 receivers, you are left with 2 threads to >>>>>> process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 >>>>>> means you are having 2 tasks in that stage (with id 0). >>>>>> >>>>>> 1. Here you are basically creating 2 receivers and asking each of >>>>>> them to consume 3 kafka partitions each. >>>>>> 2. How does that matter? It depends on how many receivers you have >>>>>> created to consume that data and if you have repartitioned it. Remember, >>>>>> spark is lazy and executors are relted to the context >>>>>> 3. I think in java, factory method is fixed. You just pass around the >>>>>> contextFactory object. (I love python :) see the signature isso much >>>>>> cleaner :) ) >>>>>> 4. Yes, if you use spark checkpointing. You can use yourcustom check >>>>>> pointing too. >>>>>> >>>>>> Best >>>>>> Ayan >>>>>> >>>>>> >>>>>> >>>>>> On Mon, Jun 29, 2015 at 4:02 AM, Shushant Arora < >>>>>> shushantaror...@gmail.com> wrote: >>>>>> >>>>>>> Few doubts : >>>>>>> >>>>>>> In 1.2 streaming when I use union of streams , my streaming >>>>>>> application getting hanged sometimes and nothing gets printed on driver. >>>>>>> >>>>>>> >>>>>>> [Stage 2:> >>>>>>> >>>>>>> (0 + 2) / 2] >>>>>>> Whats is 0+2/2 here signifies. >>>>>>> >>>>>>> >>>>>>> >>>>>>> 1.Does no of streams in topicsMap.put("testSparkPartitioned", 3); >>>>>>> be same as numstreams=2 ? in unioned stream ? >>>>>>> >>>>>>> 2. I launched app on yarnRM with num-executors as 5 . It created 2 >>>>>>> receivers and 5 execuots . As in stream receivers nodes get fixed at >>>>>>> start >>>>>>> of app throughout its lifetime . Does executors gets allicated at start >>>>>>> of >>>>>>> each job on 1s batch interval? If yes, how does its fast to allocate >>>>>>> resources. I mean if i increase num-executors to 50 , it will negotiate >>>>>>> 50 >>>>>>> executors from yarnRM at start of each job so does it takes more time in >>>>>>> allocating executors than batch interval(here 1s , say if 500ms).? Can i >>>>>>> fixed processing executors also throughout the app? >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> SparkConf conf = new >>>>>>> SparkConf().setAppName("SampleSparkStreamingApp"); >>>>>>> JavaStreamingContext jssc = new >>>>>>> JavaStreamingContext(conf,Durations.milliseconds(1000)); >>>>>>> >>>>>>> Map<String,String> kafkaParams = new HashMap<String, String>(); >>>>>>> kafkaParams.put("zookeeper.connect","ipadd:2181"); >>>>>>> kafkaParams.put("group.id", "testgroup"); >>>>>>> kafkaParams.put("zookeeper.session.timeout.ms", "10000"); >>>>>>> Map<String,Integer> topicsMap = new HashMap<String,Integer>(); >>>>>>> topicsMap.put("testSparkPartitioned", 3); >>>>>>> int numStreams = 2; >>>>>>> List<JavaPairDStream<byte[],byte[]>> kafkaStreams = new >>>>>>> ArrayList<JavaPairDStream<byte[], byte[]>>(); >>>>>>> for(int i=0;i<numStreams;i++){ >>>>>>> kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, >>>>>>> byte[].class,kafka.serializer.DefaultDecoder.class , >>>>>>> kafka.serializer.DefaultDecoder.class, >>>>>>> kafkaParams, topicsMap, StorageLevel.MEMORY_ONLY())); >>>>>>> } >>>>>>> JavaPairDStream<byte[], byte[]> directKafkaStream = >>>>>>> jssc.union(kafkaStreams.get(0),kafkaStreams.subList(1, >>>>>>> kafkaStreams.size())); >>>>>>> JavaDStream<String> lines = directKafkaStream.map(new >>>>>>> Function<Tuple2<byte[],byte[]>, String>() { >>>>>>> >>>>>>> public String call(Tuple2<byte[], byte[]> arg0) throws Exception { >>>>>>> ...processing >>>>>>> ..return msg; >>>>>>> } >>>>>>> }); >>>>>>> lines.print(); >>>>>>> jssc.start(); >>>>>>> jssc.awaitTermination(); >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> ------------------------------------------------------------------------------------------------------------------------------------------------------- >>>>>>> 3.For avoiding dataloss when we use checkpointing, and factory >>>>>>> method to create sparkConytext, is method name fixed >>>>>>> or we can use any name and how to set in app the method name to be >>>>>>> used ? >>>>>>> >>>>>>> 4.In 1.3 non receiver based streaming, kafka offset is not stored in >>>>>>> zookeeper, is it because of zookeeper is not efficient for high writes >>>>>>> and >>>>>>> read is not strictly consistent? So >>>>>>> >>>>>>> we use simple Kafka API that does not use Zookeeper and offsets >>>>>>> tracked only by Spark Streaming within its checkpoints. This >>>>>>> eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, >>>>>>> and >>>>>>> so each record is received by Spark Streaming effectively exactly once >>>>>>> despite failures. >>>>>>> >>>>>>> So we have to call context.checkpoint(hdfsdir)? Or is it implicit >>>>>>> checkoint location ? Means does hdfs be used for small data(just >>>>>>> offset?) >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Jun 27, 2015 at 7:37 PM, Dibyendu Bhattacharya < >>>>>>> dibyendu.bhattach...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> There is another option to try for Receiver Based Low Level Kafka >>>>>>>> Consumer which is part of Spark-Packages ( >>>>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) . >>>>>>>> This can be used with WAL as well for end to end zero data loss. >>>>>>>> >>>>>>>> This is also Reliable Receiver and Commit offset to ZK. Given the >>>>>>>> number of Kafka Partitions you have ( > 100) , using High Level Kafka >>>>>>>> API >>>>>>>> for Receiver based approach may leads to issues related Consumer >>>>>>>> Re-balancing which is a major issue of Kafka High Level API. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Dibyendu >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Jun 27, 2015 at 3:04 PM, Tathagata Das <t...@databricks.com >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> In the receiver based approach, If the receiver crashes for any >>>>>>>>> reason (receiver crashed or executor crashed) the receiver should get >>>>>>>>> restarted on another executor and should start reading data from the >>>>>>>>> offset >>>>>>>>> present in the zookeeper. There is some chance of data loss which can >>>>>>>>> alleviated using Write Ahead Logs (see streaming programming guide >>>>>>>>> for more >>>>>>>>> details, or see my talk [Slides PDF >>>>>>>>> <http://www.slideshare.net/SparkSummit/recipes-for-running-spark-streaming-apploications-in-production-tathagata-daspptx> >>>>>>>>> , Video >>>>>>>>> <https://www.youtube.com/watch?v=d5UJonrruHk&list=PL-x35fyliRwgfhffEpywn4q23ykotgQJ6&index=4> >>>>>>>>> ] from last Spark Summit 2015). But that approach can give >>>>>>>>> duplicate records. The direct approach gives exactly-once guarantees, >>>>>>>>> so >>>>>>>>> you should try it out. >>>>>>>>> >>>>>>>>> TD >>>>>>>>> >>>>>>>>> On Fri, Jun 26, 2015 at 5:46 PM, Cody Koeninger < >>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>> >>>>>>>>>> Read the spark streaming guide ad the kafka integration guide for >>>>>>>>>> a better understanding of how the receiver based stream works. >>>>>>>>>> >>>>>>>>>> Capacity planning is specific to your environment and what the >>>>>>>>>> job is actually doing, youll need to determine it empirically. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Friday, June 26, 2015, Shushant Arora < >>>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> In 1.2 how to handle offset management after stream application >>>>>>>>>>> starts in each job . I should commit offset after job completion >>>>>>>>>>> manually? >>>>>>>>>>> >>>>>>>>>>> And what is recommended no of consumer threads. Say I have 300 >>>>>>>>>>> partitions in kafka cluster . Load is ~ 1 million events per >>>>>>>>>>> second.Each >>>>>>>>>>> event is of ~500bytes. Having 5 receivers with 60 partitions each >>>>>>>>>>> receiver >>>>>>>>>>> is sufficient for spark streaming to consume ? >>>>>>>>>>> >>>>>>>>>>> On Fri, Jun 26, 2015 at 8:40 PM, Cody Koeninger < >>>>>>>>>>> c...@koeninger.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> The receiver-based kafka createStream in spark 1.2 uses >>>>>>>>>>>> zookeeper to store offsets. If you want finer-grained control over >>>>>>>>>>>> offsets, you can update the values in zookeeper yourself before >>>>>>>>>>>> starting >>>>>>>>>>>> the job. >>>>>>>>>>>> >>>>>>>>>>>> createDirectStream in spark 1.3 is still marked as >>>>>>>>>>>> experimental, and subject to change. That being said, it works >>>>>>>>>>>> better for >>>>>>>>>>>> me in production than the receiver based api. >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jun 26, 2015 at 6:43 AM, Shushant Arora < >>>>>>>>>>>> shushantaror...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I am using spark streaming 1.2. >>>>>>>>>>>>> >>>>>>>>>>>>> If processing executors get crashed will receiver rest the >>>>>>>>>>>>> offset back to last processed offset? >>>>>>>>>>>>> >>>>>>>>>>>>> If receiver itself got crashed is there a way to reset the >>>>>>>>>>>>> offset without restarting streaming application other than >>>>>>>>>>>>> smallest or >>>>>>>>>>>>> largest. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Is spark streaming 1.3 which uses low level consumer api, >>>>>>>>>>>>> stabe? And which is recommended for handling data loss 1.2 or >>>>>>>>>>>>> 1.3 . >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards, >>>>>> Ayan Guha >>>>>> >>>>> >>>>> >>>> >>> >> >> >> -- >> Chen Song >> >> >