hey Eran, I run into this all the time with Json.

the problem is likely that your Json is "too pretty" and extending beyond a 
single line which trips up the Json reader.

my solution is usually to de-pretty the Json - either manually or through an 
ETL step - by stripping all white space before pointing my DataFrame/JSON 
reader at the file.

this tool is handy for one-off scenerios:  http://jsonviewer.stack.hu

for streaming use cases, you'll want to have a light de-pretty ETL step either 
within the Spark Streaming job after ingestion - or upstream using something 
like a Flume interceptor, NiFi Processor (I love NiFi), or Kafka transformation 
assuming those exist by now.

a similar problem exists for XML, btw.  there's lots of wonky workarounds for 
this that use MapPartitions and all kinds of craziness.  the best option, in my 
opinion, is to just ETL/flatten the data to make the DataFrame reader happy.

> On Dec 19, 2015, at 4:55 PM, Eran Witkon <eranwit...@gmail.com> wrote:
> 
> Hi,
> I tried the following code in spark-shell on spark1.5.2:
> 
> val df = 
> sqlContext.read.json("/home/eranw/Workspace/JSON/sample/sample2.json")
> df.count()
> 
> 15/12/19 23:49:40 ERROR Executor: Managed memory leak detected; size = 
> 67108864 bytes, TID = 3
> 15/12/19 23:49:40 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 3)
> java.lang.RuntimeException: Failed to parse a value for data type 
> StructType() (current token: VALUE_STRING).
>       at scala.sys.package$.error(package.scala:27)
>       at 
> org.apache.spark.sql.execution.datasources.json.JacksonParser$.convertField(JacksonParser.scala:172)
>       at 
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:251)
>       at 
> org.apache.spark.sql.execution.datasources.json.JacksonParser$$anonfun$parseJson$1$$anonfun$apply$1.apply(JacksonParser.scala:246)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:365)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.start(TungstenAggregationIterator.scala:622)
>       at 
> org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1.org$apache$spark$sql$execution$aggregate$TungstenAggregate$$anonfun$$executePartition$1(TungstenAggregate.scala:110)
> 
> Am I am doing something wrong?
> Eran

Reply via email to