Thanks, Paul, I don’t understand how subclass FlatMapFunction helps, could you show a sample code?
We need one instance per executor, not per partition, thus mapPartitions() doesn’t help. 2015-01-21 16:07 GMT+08:00 Paul Wais <paulw...@gmail.com>: > To force one instance per executor, you could explicitly subclass > FlatMapFunction and have it lazy-create your parser in the subclass > constructor. You might also want to try RDD#mapPartitions() (instead of > RDD#flatMap() if you want one instance per partition. This approach worked > well for me when I had a flat map function that used non-serializable > native code / objects. > > FWIW RDD#flatMap() does not appear to have changed 1.1 -> 1.2 (tho master > has a slight refactor). Agree it's worth checking the number of partitions > in your 1.1 vs 1.2 test. > > > > On Tue, Jan 20, 2015 at 11:13 PM, Fengyun RAO <raofeng...@gmail.com> > wrote: > >> the LogParser instance is not serializable, and thus cannot be a >> broadcast, >> >> what’s worse, it contains an LRU cache, which is essential to the >> performance, and we would like to share among all the tasks on the same >> node. >> >> If it is the case, what’s the recommended way to share a variable among >> all the tasks within the same executor. >> >> >> 2015-01-21 15:04 GMT+08:00 Davies Liu <dav...@databricks.com>: >> >>> Maybe some change related to serialize the closure cause LogParser is >>> not a singleton any more, then it is initialized for every task. >>> >>> Could you change it to a Broadcast? >>> >>> On Tue, Jan 20, 2015 at 10:39 PM, Fengyun RAO <raofeng...@gmail.com> >>> wrote: >>> > Currently we are migrating from spark 1.1 to spark 1.2, but found the >>> > program 3x slower, with nothing else changed. >>> > note: our program in spark 1.1 has successfully processed a whole year >>> data, >>> > quite stable. >>> > >>> > the main script is as below >>> > >>> > sc.textFile(inputPath) >>> > .flatMap(line => LogParser.parseLine(line)) >>> > .groupByKey(new HashPartitioner(numPartitions)) >>> > .mapPartitionsWithIndex(...) >>> > .foreach(_ => {}) >>> > >>> > where LogParser is a singleton which may take some time to initialized >>> and >>> > is shared across the execuator. >>> > >>> > the flatMap stage is 3x slower. >>> > >>> > We tried to change spark.shuffle.manager back to hash, and >>> > spark.shuffle.blockTransferService back to nio, but didn’t help. >>> > >>> > May somebody explain possible causes, or what should we test or change >>> to >>> > find it out >>> >> >> >