Hi,

how does RDD's mapPartitions make a difference regarding 1. and 2. compared to Dataset's mapPartitions / map function?

Enrico


Am 12.07.22 um 22:13 schrieb Muthu Jayakumar:
Hello Enrico,

Thanks for the reply. I found that I would have to use `mapPartitions` API of RDD to perform this safely as I have to
1. Read each file from GCS using HDFS FileSystem API.
2. Parse each JSON record in a safe manner.

For (1) to work, I do have to broadcast HadoopConfiguration from sparkContext. I did try to use GCS Java API to read content, but ran into many JAR conflicts as the HDFS wrapper and the JAR library uses different dependencies.
Hope this findings helps others as well.

Thanks,
Muthu


On Mon, 11 Jul 2022 at 14:11, Enrico Minack <i...@enrico.minack.dev> wrote:

    All you need to do is implement a method readJson that reads a
    single file given its path. Than, you map the values of column
    file_path to the respective JSON content as a string. This can be
    done via an UDF or simply Dataset.map:

    case class RowWithJsonUri(entity_id: String, file_path: String,
    other_useful_id: String)
    case class RowWithJsonContent(entity_id: String, json_content:
    String, other_useful_id: String)

    val ds = Seq(
      RowWithJsonUri("id-01f7pqqbxddb3b1an6ntyqx6mg",
    "gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json",
    "id-2-01g4he5cb4xqn6s1999k6y1vbd"),
      RowWithJsonUri("id-01f7pqgbwms4ajmdtdedtwa3mf",
    "gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json",
    "id-2-01g4he5cbh52che104rwy603sr"),
      RowWithJsonUri("id-01f7pqqbxejt3ef4ap9qcs78m5",
    "gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json",
    "id-2-01g4he5cbqmdv7dnx46sebs0gt"),
      RowWithJsonUri("id-01f7pqqbynh895ptpjjfxvk6dc",
    "gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json",
    "id-2-01g4he5cbx1kwhgvdme1s560dw")
    ).toDS()

    ds.show(false)
    
+-----------------------------+-------------------------------------------------------------------+-------------------------------+
    |entity_id |file_path |other_useful_id                |
    
+-----------------------------+-------------------------------------------------------------------+-------------------------------+
    
|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
    
|id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
    
|id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
    
|id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
    
+-----------------------------+-------------------------------------------------------------------+-------------------------------+


    def readJson(uri: String): String = { s"content of $uri" }

    ds.map { row => RowWithJsonContent(row.entity_id,
    readJson(row.file_path), row.other_useful_id) }.show(false)
    
+-----------------------------+------------------------------------------------------------------------------+-------------------------------+
    |entity_id |json_content |other_useful_id                |
    
+-----------------------------+------------------------------------------------------------------------------+-------------------------------+
    |id-01f7pqqbxddb3b1an6ntyqx6mg|content of
    
gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
    |id-01f7pqgbwms4ajmdtdedtwa3mf|content of
    
gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
    |id-01f7pqqbxejt3ef4ap9qcs78m5|content of
    
gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
    |id-01f7pqqbynh895ptpjjfxvk6dc|content of
    
gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
    
+-----------------------------+------------------------------------------------------------------------------+-------------------------------+

    Cheers,
    Enrico




    Am 10.07.22 um 09:11 schrieb Muthu Jayakumar:
    Hello there,

    I have a dataframe with the following...

    
+-----------------------------+-------------------------------------------------------------------+-------------------------------+
    |entity_id                    |file_path  |other_useful_id      
             |
    
+-----------------------------+-------------------------------------------------------------------+-------------------------------+
    
|id-01f7pqqbxddb3b1an6ntyqx6mg|gs://bucket1/path/to/id-01g4he5cb4xqn6s1999k6y1vbd/file_result.json|id-2-01g4he5cb4xqn6s1999k6y1vbd|
    
|id-01f7pqgbwms4ajmdtdedtwa3mf|gs://bucket1/path/to/id-01g4he5cbh52che104rwy603sr/file_result.json|id-2-01g4he5cbh52che104rwy603sr|
    
|id-01f7pqqbxejt3ef4ap9qcs78m5|gs://bucket1/path/to/id-01g4he5cbqmdv7dnx46sebs0gt/file_result.json|id-2-01g4he5cbqmdv7dnx46sebs0gt|
    
|id-01f7pqqbynh895ptpjjfxvk6dc|gs://bucket1/path/to/id-01g4he5cbx1kwhgvdme1s560dw/file_result.json|id-2-01g4he5cbx1kwhgvdme1s560dw|
    
+-----------------------------+-------------------------------------------------------------------+-------------------------------+

    I would like to read each row from `file_path` and write the
    result to another dataframe containing `entity_id`,
    `other_useful_id`, `json_content`, `file_path`.
    Assume that I already have the required HDFS url libraries in my
    classpath.

    Please advice,
    Muthu



Reply via email to