You could transform the json to a case class instead of serializing it back
to a String. The resulting RDD[MyCaseClass] is then directly usable as a
SchemaRDD using the register function implicitly provided by 'import
sqlContext.schemaRDD'. Then the rest of your pipeline will remain the same.

-kr, Gerard
On Nov 4, 2014 5:05 AM, "Daniel Mahler" <dmah...@gmail.com> wrote:

> I am trying to convert terabytes of json log files into parquet files.
> but I need to clean it a little first.
> I end up doing the following
>
> txt = sc.textFile(inpath).coalesce(800)
>
> val json = (for {
>          line <- txt
>          JObject(child) = parse(line)
>          child2 = (for {
>            JField(name, value) <- child
>            _ <- patt(name) // filter fields with invalid names
>          } yield JField(name.toLowerCase, value))
> } yield compact(render(JObject(child2))))
>
> sqx.jsonRDD(json, 5e-2).saveAsParquetFile(outpath)
>
> And glaring inefficiency is that after parsing and cleaning the data i
> reserialize it
> by calling compact(render(JObject(child2)))) only to pass the text
> to jsonRDD to be parsed agian. However I see no way  to turn an RDD of
> json4s objects directly into a SchemRDD without turning it back into text
> first
>
> Is there any way to do this?
>
> I am also open to other suggestions for speeding up the above code,
> it is very slow in its current form.
>
> I would also like to make jsonFile drop invalid json records rather than
> failing the entire job. Is that possible?
>
> thanks
> Daniel
>
>

Reply via email to