*Component*: Spark
*Level*: Advanced
*Scenario*: How-to
-
*Problems Description*
I have nested json string value in someone field of spark dataframe, and I
would like to use from_json() to parse json object. Especially, if one of
key type is not match with our defined struc
Dear spark user community,
I have recieved some insight regarding filtering seperate dataframes
in my spark-structured-streaming job. However I wish to write the
dataframes aforementioned above in the stack overflow question each
using a parquet writer to a separate location. My initial impression
https://stackoverflow.com/questions/53938967/writing-corrupt-data-from-kafka-json-datasource-in-spark-structured-streaming
On Wed, Dec 26, 2018 at 2:42 PM Colin Williams
wrote:
>
> From my initial impression it looks like I'd need to create my own
> `from_json` using `jsonToStructs` as a referenc
>From my initial impression it looks like I'd need to create my own
`from_json` using `jsonToStructs` as a reference but try to handle `
case : BadRecordException => null ` or similar to try to write the non
matching string to a corrupt records column
On Wed, Dec 26, 2018 at 1:55 PM Colin Williams
Hi,
I'm trying to figure out how I can write records that don't match a
json read schema via spark structred streaming to an output sink /
parquet location. Previously I did this in batch via corrupt column
features of batch. But in this spark structured streaming I'm reading
from kafka a string a
Maxim, thanks for your replay.
I've left comment in the following jira issue
https://issues.apache.org/jira/browse/SPARK-23194?focusedCommentId=16582025&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16582025
--
Sent from: http://apache-spark-user-list.1001560.n
Hello Denis,
The from_json function supports only the fail fast mode, see:
https://github.com/apache/spark/blob/e2ab7deae76d3b6f41b9ad4d0ece14ea28db40ce/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala#L568
Your settings "mode" -> "P
Hello community,
I can not manage to run from_json method with "columnNameOfCorruptRecord"
option.
```
import org.apache.spark.sql.functions._
val data = Seq(
"{'number': 1}",
"{'number': }"
)
val schema = new StructType()
.add($"number".int)
.add($"_corru