Hi,
Let us visit the approach as some fellow members correctly highlighted the
use case for spark structured streaming and two key concepts that I will
mention
- foreach: A method for applying custom write logic to each individual
row in a streaming DataFrame or Dataset.
- foreachBatch: A method for applying custom write logic to entire
micro-batches of data, providing more flexibility for complex operations.
- sendToSink (my chosen name here, Custom Logic ) : A user-defined
function that encapsulates the logic for writing a micro-batch to a sink
(In my case Google BigQuery DW)
Let us create a pseudo code (in Python for sendToSink function used in f
oreachBatch(SendToBSink)
def sendToSink(df, batchId):
if len(df.take(1)) > 0: # Check for empty DataFrame
try:
# Extract table names from the "@table" column
table_names = df.select("@table").rdd.flatMap(lambda row:
row).collect()
# Iterate through each table name
for table_name in table_names:
# Filter the DataFrame for rows belonging to the current
table
table_df = df.filter(col("@table") == table_name)
# Handle nested structures for specific tables
if table_name in ["product_zones", "product_devices"]:
# Extract nested data (e.g., "zones" or "device"
columns)
nested_data = table_df.select("zones",
"device").rdd.flatMap(lambda row: row)
# Create a separate DataFrame for nested data
nested_df = spark.createDataFrame(nested_data,
schema=nested_data.first().asDict())
# Write nested DataFrame to its corresponding table
write_to_sink(nested_df, table_name)
# Write the main DataFrame to its table
write_to_sink(table_df, table_name)
except Exception as e:
# Log errors gracefully
log_error(f"Error processing table {table_name}: {e}")
else:
print("DataFrame is empty") # Handle empty DataFrame
HTH
Mich Talebzadeh,
Dad | Technologist | Solutions Architect | Engineer
London
United Kingdom
view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
https://en.everybodywiki.com/Mich_Talebzadeh
*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.
On Wed, 10 Jan 2024 at 18:51, PRASHANT L <prashant...@gmail.com> wrote:
> Hi
> I have a use case where I need to process json payloads coming from Kafka
> using structured streaming , but thing is json can have different formats ,
> schema is not fixed
> and each json will have a @type tag so based on tag , json has to be
> parsed and loaded to table with tag name , and if a json has nested sub
> tags , those tags shd go to different table
> so I need to process each json record individually , and determine
> destination tables what would be the best approach
>
>
>> *{*
>> * "os": "andriod",*
>> * "type": "mobile",*
>> * "device": {*
>> * "warrenty": "3 years",*
>> * "replace": "yes"*
>> * },*
>> * "zones": [*
>> * {*
>> * "city": "Bangalore",*
>> * "state": "KA",*
>> * "pin": "577401"*
>> * },*
>> * {*
>> * "city": "Mumbai",*
>> * "state": "MH",*
>> * "pin": "576003"*
>> * }*
>> * ],*
>> * "@table": "product"**}*
>
>
> so for the above json , there are 3 tables created
> 1. Product (@type) THis is a parent table
> 2. poduct_zones and product_devices , child table
>