HI Amit,

I was thinking along the lines of (python):


@udf(returnType=StringType())
def reader_udf(filename: str) -> str:
    with open(filename, "r") as f:
        return f.read()


def run_locally():
    with utils.build_spark_session("Local", local=True) as spark:
        df = spark.readStream.csv(r'testdata', 
schema=StructType([StructField('filename', StringType(), True)]))
        df = df.withColumn('content', reader_udf(col('filename')))
        q = 
df.select('content').writeStream.queryName('test').format('console').start()
        q.awaitTermination()

Now each row contains the contents of the files, provided they are not large 
you can foreach() over the df/rdd and do whatever you want with it, such as 
json.loads()/etc.
If you know the shema of the jsons, you can later explode() them into a flat 
DF, ala 
https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala

Note that unless I am missing something you cannot access spark session from 
foreach as code is not running on the driver.
Please say if it makes sense or did I miss anything.

Boris

From: Amit Joshi <mailtojoshia...@gmail.com>
Sent: Monday, 18 January 2021 17:10
To: Boris Litvak <boris.lit...@skf.com>
Cc: spark-user <user@spark.apache.org>
Subject: Re: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi Boris,

I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.

Can you please provide the example of your solution?

Regards
Amit

On Mon, Jan 18, 2021 at 7:15 PM Boris Litvak 
<boris.lit...@skf.com<mailto:boris.lit...@skf.com>> wrote:
Hi Amit,

Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that 
reads the paths?
Also, do you really have to read the json into an additional dataframe?

Thanks, Boris

From: Amit Joshi <mailtojoshia...@gmail.com<mailto:mailtojoshia...@gmail.com>>
Sent: Monday, 18 January 2021 15:04
To: spark-user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: [Spark Structured Streaming] Processing the data path coming from 
kafka.

Hi ,

I have a use case where the file path of the json records stored in s3 are 
coming as a kafka
message in kafka. I have to process the data using spark structured streaming.

The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data 
path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.


kafkaDf.select($"value".cast(StringType))
  .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) =>  {

//rough code

//collec to driver

val records = batchDf.collect()

//create dataframe and process
records foreach((rec: Row) =>{
  println("records:######################",rec.toString())
  val path = rec.getAs[String]("data_path")

  val dfToProcess =spark.read.json(path)

  ....

})

}

I would like to know the views, if this approach is fine? Specifically if there 
is some problem with

with creating the dataframe after calling collect.

If there is any better approach, please let know the same.



Regards

Amit Joshi

Reply via email to