2016-12-28 20:17 GMT+01:00 Chawla,Sumit <sumitkcha...@gmail.com>:

> Would this work for you?
>
> def processRDD(rdd):
>     analyzer = ShortTextAnalyzer(root_dir)
>     rdd.foreach(lambda record: analyzer.analyze_short_text_
> event(record[1]))
>
> ssc.union(*streams).filter(lambda x: x[1] != None)
> .foreachRDD(lambda rdd: processRDD(rdd))
>

I think, this will still send each analyzer to all executors where rdd
partitions are stored.

Maybe you can work around this with `RDD.foreachPartition()`:

def processRDD(rdd):
    def partition_func(records):
        analyzer = ShortTextAnalyzer(root_dir)
        for record in records:
            analyzer.analyze_short_text_event(record[1])
    rdd.foreachPartition(partition_func)

This will create one analyzer per partition and RDD.

Best

Eike

Reply via email to