object LogParserWrapper {
    private val logParser = {
        val settings = new ...
        val builders = new ....
        new LogParser(builders, settings)
    }
    def getParser = logParser
}

object MySparkJob {
   def main(args: Array[String]) {
        val sc = new SparkContext()
        val lines = sc.textFile(arg(0))

        val parsed = lines.map(line =>
LogParserWrapper.getParser.parse(line))
        ...
}

Q1: Is this the right way to share LogParser instance among all tasks on
the same worker, if LogParser is not serializable?

Q2: LogParser is read-only, but can LogParser hold a cache field such as a
ConcurrentHashMap where all tasks on the same worker try to get() and put()
items?


2014-08-04 19:29 GMT+08:00 Sean Owen <so...@cloudera.com>:

> The issue is that it's not clear what "parser" is. It's not shown in
> your code. The snippet you show does not appear to contain a parser
> object.
>
> On Mon, Aug 4, 2014 at 10:01 AM, Fengyun RAO <raofeng...@gmail.com> wrote:
> > Thanks, Sean!
> >
> > It works, but as the link in 2 - Why Is My Spark Job so Slow and Only
> Using
> > a Single Thread? says " parser instance is now a singleton created in the
> > scope of our driver program" which I thought was in the scope of
> executor.
> > Am I wrong, or why?
> >
> > I didn't want the equivalent of "setup()" method, since I want to share
> the
> > "parser" among tasks in the same worker node. It takes tens of seconds to
> > initialize a "parser". What's more, I want to know if the "parser" could
> > have a field such as ConcurrentHashMap which all tasks in the node may
> get()
> > of put() items.
> >
> >
> >
> >
> > 2014-08-04 16:35 GMT+08:00 Sean Owen <so...@cloudera.com>:
> >
> >> The parser does not need to be serializable. In the line:
> >>
> >> lines.map(line => JSONParser.parse(line))
> >>
> >> ... the parser is called but there is no parser object that with state
> >> that can be serialized. Are you sure it does not work?
> >>
> >> The error message alluded to originally refers to an object not shown
> >> in the code, so I'm not 100% sure this was the original issue.
> >>
> >> If you want, the equivalent of "setup()" is really "writing some code
> >> at the start of a call to mapPartitions()"
> >>
> >> On Mon, Aug 4, 2014 at 8:40 AM, Fengyun RAO <raofeng...@gmail.com>
> wrote:
> >> > Thanks, Ron.
> >> >
> >> > The problem is that the "parser" is written in another package which
> is
> >> > not
> >> > serializable.
> >> >
> >> > In mapreduce, I could create the "parser" in the map setup() method.
> >> >
> >> > Now in spark, I want to create it for each worker, and share it among
> >> > all
> >> > the tasks on the same work node.
> >> >
> >> > I know different workers run on different machine, but it doesn't have
> >> > to
> >> > communicate between workers.
> >
> >
>

Reply via email to