object LogParserWrapper { private val logParser = { val settings = new ... val builders = new .... new LogParser(builders, settings) } def getParser = logParser }
object MySparkJob { def main(args: Array[String]) { val sc = new SparkContext() val lines = sc.textFile(arg(0)) val parsed = lines.map(line => LogParserWrapper.getParser.parse(line)) ... } Q1: Is this the right way to share LogParser instance among all tasks on the same worker, if LogParser is not serializable? Q2: LogParser is read-only, but can LogParser hold a cache field such as a ConcurrentHashMap where all tasks on the same worker try to get() and put() items? 2014-08-04 19:29 GMT+08:00 Sean Owen <so...@cloudera.com>: > The issue is that it's not clear what "parser" is. It's not shown in > your code. The snippet you show does not appear to contain a parser > object. > > On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO <raofeng...@gmail.com> wrote: > > Thanks, Sean! > > > > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only > Using > > a Single Thread? says " parser instance is now a singleton created in the > > scope of our driver program" which I thought was in the scope of > executor. > > Am I wrong, or why? > > > > I didn't want the equivalent of "setup()" method, since I want to share > the > > "parser" among tasks in the same worker node. It takes tens of seconds to > > initialize a "parser". What's more, I want to know if the "parser" could > > have a field such as ConcurrentHashMap which all tasks in the node may > get() > > of put() items. > > > > > > > > > > 2014-08-04 16:35 GMT+08:00 Sean Owen <so...@cloudera.com>: > > > >> The parser does not need to be serializable. In the line: > >> > >> lines.map(line => JSONParser.parse(line)) > >> > >> ... the parser is called but there is no parser object that with state > >> that can be serialized. Are you sure it does not work? > >> > >> The error message alluded to originally refers to an object not shown > >> in the code, so I'm not 100% sure this was the original issue. > >> > >> If you want, the equivalent of "setup()" is really "writing some code > >> at the start of a call to mapPartitions()" > >> > >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO <raofeng...@gmail.com> > wrote: > >> > Thanks, Ron. > >> > > >> > The problem is that the "parser" is written in another package which > is > >> > not > >> > serializable. > >> > > >> > In mapreduce, I could create the "parser" in the map setup() method. > >> > > >> > Now in spark, I want to create it for each worker, and share it among > >> > all > >> > the tasks on the same work node. > >> > > >> > I know different workers run on different machine, but it doesn't have > >> > to > >> > communicate between workers. > > > > >