Thanks Sean.
Please find attached my code. Let me know your suggestions/ideas.
Regards,
*Sarath*
On Wed, Sep 10, 2014 at 8:05 PM, Sean Owen <[email protected]> wrote:
> You mention that you are creating a UserGroupInformation inside your
> function, but something is still serializing it. You should show your
> code since it may not be doing what you think.
>
> If you instantiate an object, it happens every time your function is
> called. map() is called once per data element; mapPartitions() once
> per partition. It depends.
>
> On Wed, Sep 10, 2014 at 3:25 PM, Sarath Chandra
> <[email protected]> wrote:
> > Hi Sean,
> >
> > The solution of instantiating the non-serializable class inside the map
> is
> > working fine, but I hit a road block. The solution is not working for
> > singleton classes like UserGroupInformation.
> >
> > In my logic as part of processing a HDFS file, I need to refer to some
> > reference files which are again available in HDFS. So inside the map
> method
> > I'm trying to instantiate UserGroupInformation to get an instance of
> > FileSystem. Then using this FileSystem instance I read those reference
> files
> > and use that data in my processing logic.
> >
> > This is throwing task not serializable exceptions for
> 'UserGroupInformation'
> > and 'FileSystem' classes. I also tried using 'SparkHadoopUtil' instead of
> > 'UserGroupInformation'. But it didn't resolve the issue.
> >
> > Request you provide some pointers in this regard.
> >
> > Also I have a query - when we instantiate a class inside map method,
> does it
> > create a new instance for every RDD it is processing?
> >
> > Thanks & Regards,
> > Sarath
> >
> > On Sat, Sep 6, 2014 at 4:32 PM, Sean Owen <[email protected]> wrote:
> >>
> >> I disagree that the generally right change is to try to make the
> >> classes serializable. Usually, classes that are not serializable are
> >> not supposed to be serialized. You're using them in a way that's
> >> causing them to be serialized, and that's probably not desired.
> >>
> >> For example, this is wrong:
> >>
> >> val foo: SomeUnserializableManagerClass = ...
> >> rdd.map(d => foo.bar(d))
> >>
> >> This is right:
> >>
> >> rdd.map { d =>
> >> val foo: SomeUnserializableManagerClass = ...
> >> foo.bar(d)
> >> }
> >>
> >> In the first instance, you create the object on the driver and try to
> >> serialize and copy it to workers. In the second, you're creating
> >> SomeUnserializableManagerClass in the function and therefore on the
> >> worker.
> >>
> >> mapPartitions is better if this creation is expensive.
> >>
> >> On Fri, Sep 5, 2014 at 3:06 PM, Sarath Chandra
> >> <[email protected]> wrote:
> >> > Hi,
> >> >
> >> > I'm trying to migrate a map-reduce program to work with spark. I
> >> > migrated
> >> > the program from Java to Scala. The map-reduce program basically
> loads a
> >> > HDFS file and for each line in the file it applies several
> >> > transformation
> >> > functions available in various external libraries.
> >> >
> >> > When I execute this over spark, it is throwing me "Task not
> >> > serializable"
> >> > exceptions for each and every class being used from these from
> external
> >> > libraries. I included serialization to few classes which are in my
> >> > scope,
> >> > but there there are several other classes which are out of my scope
> like
> >> > org.apache.hadoop.io.Text.
> >> >
> >> > How to overcome these exceptions?
> >> >
> >> > ~Sarath.
> >
> >
>
class MapperNew extends Serializable {
var hadoopConf: Configuration = _;
var recordValidator: RecordValidator = _;
var rulesLoader: RulesLoader = _;
var recordTransformer: TransFilEngine = _;
var ach: ConfigurationHadoop = _;
var file: RDD[(Long, String)] = _;
// Configuration is passed by caller
def run(c: Configuration): Unit = {
hadoopConf = c;
val sparkConf = new SparkConf()
.setMaster(hadoopConf.get("sparkMaster"))
.setAppName("NewMapper")
.setSparkHome(hadoopConf.get("sparkHome"))
.setJars(Seq("rbcommon.jar",
"rbengine.jar"));
val sparkContext = new SparkContext(sparkConf);
val util = SparkHadoopUtil.get;
util.runAsSparkUser(() => {
file = sparkContext.newAPIHadoopFile(hadoopConf.get("inputPath"),
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text], hadoopConf).map(r => (r._1.get(), r._2.toString()));
});
// Works fine till this line
println("File line count = " + file.count);
val rulesFilePath = "hdfs://slave:54310/user/hduser/" + // Without this spark is trying to read from local file system
hadoopConf.get("rulesFilePath") + "/" +
hadoopConf.get("rulesFile");
var processed = file.map(line => {
// Doesn't work throws 'task not serializable' exception for UserGroupInformation
val ugi = UserGroupInformation.createRemoteUser(hadoopConf.get("remoteUser"));
// Doesn't work throws 'task not serializable' exception for FileSystem
val fs = ugi.doAs(new PrivilegedExceptionAction[FileSystem] {
def run(): FileSystem = {
FileSystem.get(hadoopConf);
}
});
// RulesLoader and TransFilEngine are classes from external libraries
rulesLoader = new RulesLoader(fs.open(new Path(rulesFilePath)));
recordTransformer = TransFilEngine.getInstance(rulesLoader, Level.FINER);
// Doesn't work this way too - Throws 'task not serializable' exception
// util.runAsSparkUser(() => {
// val fs = FileSystem.get(hadoopConf);
// rulesLoader = new RulesLoader(fs.open(new Path(rulesFilePath)));
// recordTransformer = TransFilEngine.getInstance(rulesLoader, Level.FINER);
// });
process(line);
});
println("Processed count = " + processed.count);
}
def process(line: (Long, String)): String = {
// More processing to do using other third party libraries
line._2;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]