Hi,
how does RDD's mapPartitions make a difference regarding 1. and 2.
compared to Dataset's mapPartitions / map function?
Enrico
Am 12.07.22 um 22:13 schrieb Muthu Jayakumar:
Hello Enrico,
Thanks for the reply. I found that I would have to use `mapPartitions`
API of RDD to perform this safely as I have to
1. Read each file from GCS using HDFS FileSystem API.
2. Parse each JSON record in a safe manner.
For (1) to work, I do have to broadcast HadoopConfiguration from
sparkContext. I did try to use GCS Java API to read content, but ran
into many JAR conflicts as the HDFS wrapper and the JAR library uses
different dependencies.
Hope this findings helps others as well.
Thanks,
Muthu
On Mon, 11 Jul 2022 at 14:11, Enrico Minack <i...@enrico.minack.dev>
wrote:
All you need to do is implement a method readJson that reads a
single file given its path. Than, you map the values of column
file_path to the respective JSON content as a string. This can be
done via an UDF or simply Dataset.map:
case class RowWithJsonUri(entity_id: String, file_path: String,
other_useful_id: String)
case class RowWithJsonContent(entity_id: String, json_content:
String, other_useful_id: String)
val ds = Seq(
RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
"gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
"id-2-01g4he5cb4xqn6s1999k6y1vbd"),
RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
"gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
"id-2-01g4he5cbh52che104rwy603sr"),
RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
"gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
"id-2-01g4he5cbqmdv7dnx46sebs0gt"),
RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
"gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
"id-2-01g4he5cbx1kwhgvdme1s560dw")
).toDS()
ds.show(false)
+-----------------------------+-------------------------------------------------------------------+-------------------------------+
|entity_id |file_path |other_useful_id |
+-----------------------------+-------------------------------------------------------------------+-------------------------------+
|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
|id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
|id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
|id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
+-----------------------------+-------------------------------------------------------------------+-------------------------------+
def readJson(uri: String): String = { s"content of $uri" }
ds.map { row => RowWithJsonContent(row.entity_id,
readJson(row.file_path), row.other_useful_id) }.show(false)
+-----------------------------+------------------------------------------------------------------------------+-------------------------------+
|entity_id |json_content |other_useful_id |
+-----------------------------+------------------------------------------------------------------------------+-------------------------------+
|id-01f7pqqbxddb3b1an6ntyqx6mg|content of
gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
|id-01f7pqgbwms4ajmdtdedtwa3mf|content of
gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
|id-01f7pqqbxejt3ef4ap9qcs78m5|content of
gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
|id-01f7pqqbynh895ptpjjfxvk6dc|content of
gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
+-----------------------------+------------------------------------------------------------------------------+-------------------------------+
Cheers,
Enrico
Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:
Hello there,
I have a dataframe with the following...
+-----------------------------+-------------------------------------------------------------------+-------------------------------+
|entity_id |file_path |other_useful_id
|
+-----------------------------+-------------------------------------------------------------------+-------------------------------+
|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
|id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
|id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
|id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
+-----------------------------+-------------------------------------------------------------------+-------------------------------+
I would like to read each row from `file_path` and write the
result to another dataframe containing `entity_id`,
`other_useful_id`, `json_content`, `file_path`.
Assume that I already have the required HDFS url libraries in my
classpath.
Please advice,
Muthu