You could transform the json to a case class instead of serializing it back to a String. The resulting RDD[MyCaseClass] is then directly usable as a SchemaRDD using the register function implicitly provided by 'import sqlContext.schemaRDD'. Then the rest of your pipeline will remain the same.
-kr, Gerard On Nov 4, 2014 5:05 AM, "Daniel Mahler" <dmah...@gmail.com> wrote: > I am trying to convert terabytes of json log files into parquet files. > but I need to clean it a little first. > I end up doing the following > > txt = sc.textFile(inpath).coalesce(800) > > val json = (for { > line <- txt > JObject(child) = parse(line) > child2 = (for { > JField(name, value) <- child > _ <- patt(name) // filter fields with invalid names > } yield JField(name.toLowerCase, value)) > } yield compact(render(JObject(child2)))) > > sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath) > > And glaring inefficiency is that after parsing and cleaning the data i > reserialize it > by calling compact(render(JObject(child2)))) only to pass the text > to jsonRDD to be parsed agian. However I see no way to turn an RDD of > json4s objects directly into a SchemRDD without turning it back into text > first > > Is there any way to do this? > > I am also open to other suggestions for speeding up the above code, > it is very slow in its current form. > > I would also like to make jsonFile drop invalid json records rather than > failing the entire job. Is that possible? > > thanks > Daniel > >