You should be able to broadcast that data frame using sc.broadcast and join
against it.

On Wed, 17 Feb 2016, 21:13 Srikanth <srikanth...@gmail.com> wrote:

> Hello,
>
> I have a streaming use case where I plan to keep a dataset broadcasted and
> cached on each executor.
> Every micro batch in streaming will create a DF out of the RDD and join
> the batch.
> The below code will perform the broadcast operation for each RDD. Is there
> a way to broadcast it just once?
>
> Alternate approachs are also welcome.
>
>     val DF1 = sqlContext.read.format("json").schema(schema1).load(file1)
>
>     val metaDF = sqlContext.read.format("json").schema(schema1).load(file2)
>                               .join(DF1, "id")
>     metaDF.cache
>
>
>   val lines = streamingcontext.textFileStream(path)
>
>   lines.foreachRDD( rdd => {
>       val recordDF = rdd.flatMap(r => Record(r)).toDF()
>       val joinedDF = recordDF.join(broadcast(metaDF), "id")
>
>       joinedDF.rdd.foreachPartition ( partition => {
>         partition.foreach( row => {
>              ...
>              ...
>         })
>       })
>   })
>
>  streamingcontext.start
>
> On a similar note, if the metaDF is too big for broadcast, can I partition
> it(df.repartition($"col")) and also partition each streaming RDD?
> This way I can avoid shuffling metaDF each time.
>
> Let me know you thoughts.
>
> Thanks
>
>

Reply via email to