Thanks, Paul,

I don’t understand how subclass FlatMapFunction helps, could you show a
sample code?

We need one instance per executor, not per partition, thus mapPartitions()
doesn’t help.
​

2015-01-21 16:07 GMT+08:00 Paul Wais <paulw...@gmail.com>:

> To force one instance per executor, you could explicitly subclass
> FlatMapFunction and have it lazy-create your parser in the subclass
> constructor.  You might also want to try RDD#mapPartitions() (instead of
> RDD#flatMap() if you want one instance per partition.  This approach worked
> well for me when I had a flat map function that used non-serializable
> native code / objects.
>
> FWIW RDD#flatMap() does not appear to have changed 1.1 -> 1.2 (tho master
> has a slight refactor).  Agree it's worth checking the number of partitions
> in your 1.1 vs 1.2 test.
>
>
>
> On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO <raofeng...@gmail.com>
> wrote:
>
>> the LogParser instance is not serializable, and thus cannot be a
>> broadcast,
>>
>> what’s worse, it contains an LRU cache, which is essential to the
>> performance, and we would like to share among all the tasks on the same
>> node.
>>
>> If it is the case, what’s the recommended way to share a variable among
>> all the tasks within the same executor.
>> ​
>>
>> 2015-01-21 15:04 GMT+08:00 Davies Liu <dav...@databricks.com>:
>>
>>> Maybe some change related to serialize the closure cause LogParser is
>>> not a singleton any more, then it is initialized for every task.
>>>
>>> Could you change it to a Broadcast?
>>>
>>> On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO <raofeng...@gmail.com>
>>> wrote:
>>> > Currently we are migrating from spark 1.1 to spark 1.2, but found the
>>> > program 3x slower, with nothing else changed.
>>> > note: our program in spark 1.1 has successfully processed a whole year
>>> data,
>>> > quite stable.
>>> >
>>> > the main script is as below
>>> >
>>> > sc.textFile(inputPath)
>>> > .flatMap(line => LogParser.parseLine(line))
>>> > .groupByKey(new HashPartitioner(numPartitions))
>>> > .mapPartitionsWithIndex(...)
>>> > .foreach(_ => {})
>>> >
>>> > where LogParser is a singleton which may take some time to initialized
>>> and
>>> > is shared across the execuator.
>>> >
>>> > the flatMap stage is 3x slower.
>>> >
>>> > We tried to change spark.shuffle.manager back to hash, and
>>> > spark.shuffle.blockTransferService back to nio, but didn’t help.
>>> >
>>> > May somebody explain possible causes, or what should we test or change
>>> to
>>> > find it out
>>>
>>
>>
>

Reply via email to