Sabastian, *Update:-* This is not possible. Probably will remain this way for the foreseeable future. https://issues.apache.org/jira/browse/SPARK-3863
Srikanth On Fri, Feb 19, 2016 at 10:20 AM, Sebastian Piu <sebastian....@gmail.com> wrote: > I don't have the code with me now, and I ended moving everything to RDD in > the end and using map operations to do some lookups, i.e. instead of > broadcasting a Dataframe I ended broadcasting a Map > > > On Fri, Feb 19, 2016 at 11:39 AM Srikanth <srikanth...@gmail.com> wrote: > >> It didn't fail. It wasn't broadcasting. I just ran the test again and >> here are the logs. >> Every batch is reading the metadata file. >> >> 16/02/19 06:27:02 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:0+27 >> 16/02/19 06:27:02 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:27+28 >> >> 16/02/19 06:27:40 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:27+28 >> 16/02/19 06:27:40 INFO HadoopRDD: Input split: >> file:/shared/data/test-data.txt:0+27 >> >> If I remember, foreachRDD is executed in the driver's context. Not sure >> how we'll be able to achieve broadcast in this approach(unless we use SQL >> broadcast hint again) >> >> When you say "it worked before", was it with an older version of spark? >> I'm trying this on 1.6. >> If you still have the streaming job running can you verify in spark UI >> that broadcast join is being used. Also, if the files are read and >> broadcasted each batch?? >> >> Thanks for the help! >> >> >> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu <sebastian....@gmail.com> >> wrote: >> >>> I don't see anything obviously wrong on your second approach, I've done >>> it like that before and it worked. When you say that it didn't work what do >>> you mean? did it fail? it didnt broadcast? >>> >>> On Thu, Feb 18, 2016 at 11:43 PM Srikanth <srikanth...@gmail.com> wrote: >>> >>>> Code with SQL broadcast hint. This worked and I was able to see that >>>> broadcastjoin was performed. >>>> >>>> val testDF = sqlContext.read.format("com.databricks.spark.csv") >>>> >>>> .schema(schema).load("file:///shared/data/test-data.txt") >>>> >>>> val lines = ssc.socketTextStream("DevNode", 9999) >>>> >>>> lines.foreachRDD((rdd, timestamp) => { >>>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >>>> l(1))).toDF() >>>> val resultDF = recordDF.join(testDF, "Age") >>>> >>>> >>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >>>> } >>>> >>>> But for every batch this file was read and broadcast was performed. >>>> Evaluating the entire DAG I guess. >>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:27+28 >>>> 16/02/18 12:24:02 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:0+27 >>>> >>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:27+28 >>>> 16/02/18 12:25:00 INFO HadoopRDD: Input split: >>>> file:/shared/data/test-data.txt:0+27 >>>> >>>> >>>> Then I changed code to broadcast the dataframe. This didn't work >>>> either. Not sure if this is what you meant by broadcasting a dataframe. >>>> >>>> val testDF = >>>> sc.broadcast(sqlContext.read.format("com.databricks.spark.csv") >>>> >>>> .schema(schema).load("file:///shared/data/test-data.txt") >>>> ) >>>> >>>> val lines = ssc.socketTextStream("DevNode", 9999) >>>> >>>> lines.foreachRDD((rdd, timestamp) => { >>>> val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, >>>> l(1))).toDF() >>>> val resultDF = recordDF.join(testDF.value, "Age") >>>> >>>> >>>> resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp) >>>> } >>>> >>>> >>>> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu < >>>> sebastian....@gmail.com> wrote: >>>> >>>>> Can you paste the code where you use sc.broadcast ? >>>>> >>>>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth <srikanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Sebastian, >>>>>> >>>>>> I was able to broadcast using sql broadcast hint. Question is how to >>>>>> prevent this broadcast for each RDD. >>>>>> Is there a way where it can be broadcast once and used locally for >>>>>> each RDD? >>>>>> Right now every batch the metadata file is read and the DF is >>>>>> broadcasted. >>>>>> I tried sc.broadcast and that did not provide this behavior. >>>>>> >>>>>> Srikanth >>>>>> >>>>>> >>>>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu < >>>>>> sebastian....@gmail.com> wrote: >>>>>> >>>>>>> You should be able to broadcast that data frame using sc.broadcast >>>>>>> and join against it. >>>>>>> >>>>>>> On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello, >>>>>>>> >>>>>>>> I have a streaming use case where I plan to keep a dataset >>>>>>>> broadcasted and cached on each executor. >>>>>>>> Every micro batch in streaming will create a DF out of the RDD and >>>>>>>> join the batch. >>>>>>>> The below code will perform the broadcast operation for each RDD. >>>>>>>> Is there a way to broadcast it just once? >>>>>>>> >>>>>>>> Alternate approachs are also welcome. >>>>>>>> >>>>>>>> val DF1 = >>>>>>>> sqlContext.read.format("json").schema(schema1).load(file1) >>>>>>>> >>>>>>>> val metaDF = >>>>>>>> sqlContext.read.format("json").schema(schema1).load(file2) >>>>>>>> .join(DF1, "id") >>>>>>>> metaDF.cache >>>>>>>> >>>>>>>> >>>>>>>> val lines = streamingcontext.textFileStream(path) >>>>>>>> >>>>>>>> lines.foreachRDD( rdd => { >>>>>>>> val recordDF = rdd.flatMap(r => Record(r)).toDF() >>>>>>>> val joinedDF = recordDF.join(broadcast(metaDF), "id") >>>>>>>> >>>>>>>> joinedDF.rdd.foreachPartition ( partition => { >>>>>>>> partition.foreach( row => { >>>>>>>> ... >>>>>>>> ... >>>>>>>> }) >>>>>>>> }) >>>>>>>> }) >>>>>>>> >>>>>>>> streamingcontext.start >>>>>>>> >>>>>>>> On a similar note, if the metaDF is too big for broadcast, can I >>>>>>>> partition it(df.repartition($"col")) and also partition each streaming >>>>>>>> RDD? >>>>>>>> This way I can avoid shuffling metaDF each time. >>>>>>>> >>>>>>>> Let me know you thoughts. >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> >>>>>> >>>> >>