Hi Mich -
Here is the output of the ldf.printSchema() & ldf.show() commands.
ldf.printSchema()
root
|-- applianceName: string (nullable = true)
|-- timeslot: long (nullable = true)
|-- customer: string (nullable = true)
|-- window: struct (nullable = false)
| |-- start: timestamp (nullable = true)
| |-- end: timestamp (nullable = true)
|-- sentOctets: long (nullable = true)
|-- recvdOctets: long (nullable = true)
ldf.show() :
+------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
|applianceName |timeslot |customer|window
|sentOctets|recvdOctets|
+------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
|abc1 |2797514 |cust1 |{2023-03-11 04:15:00, 2023-03-11
04:30:00}|21459264 |32211859 |
|pqrq |2797513 |cust1 |{2023-03-11 04:15:00, 2023-03-11
04:30:00}|17775527 |31331093 |
|xyz |2797514 |cust1 |{2023-03-11 04:15:00,
2023-03-11 04:30:00}|12808015 |24191707 |
+------------------+-----------+--------+----------+--------+------------------------------------------+----------+-----------+
Also, any comment on the outputMode ? I've set it to 'update', since I'm
using aggregation.
thanks!
On Fri, Mar 10, 2023 at 10:55 AM Mich Talebzadeh <[email protected]>
wrote:
>
> Just looking at the code
>
>
> in here
>
>
> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>
> window(col("ts"), "15 minutes")) \
> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
> .fillna(0)
>
> What does ldf.printSchema() returns
>
>
> HTH
>
>
> view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
> https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 10 Mar 2023 at 07:16, karan alang <[email protected]> wrote:
>
>>
>> Hello All -
>>
>> I've a structured Streaming job which has a trigger of 10 minutes, and
>> I'm using watermark to account for late data coming in. However, the
>> watermark is not working - and instead of a single record with total
>> aggregated value, I see 2 records.
>>
>> Here is the code :
>>
>> ```
>>
>> 1) StructuredStreaming - Reading from Kafka every 10 mins
>>
>>
>> df_stream = self.spark.readStream.format('kafka') \
>> .option("kafka.security.protocol", "SSL") \
>> .option("kafka.ssl.truststore.location",
>> self.ssl_truststore_location) \
>> .option("kafka.ssl.truststore.password",
>> self.ssl_truststore_password) \
>> .option("kafka.ssl.keystore.location",
>> self.ssl_keystore_location_bandwidth_intermediate) \
>> .option("kafka.ssl.keystore.password",
>> self.ssl_keystore_password_bandwidth_intermediate) \
>> .option("kafka.bootstrap.servers", self.kafkaBrokers) \
>> .option("subscribe", topic) \
>> .option("startingOffsets", "latest") \
>> .option("failOnDataLoss", "false") \
>> .option("kafka.metadata.max.age.ms", "1000") \
>> .option("kafka.ssl.keystore.type", "PKCS12") \
>> .option("kafka.ssl.truststore.type", "PKCS12") \
>> .load()
>>
>> 2. calling foreachBatch(self.process)
>> # note - outputMode is set to "update" (tried setting outputMode =
>> append as well)
>>
>> # 03/09 ::: outputMode - update instead of append
>> query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp",
>> "topic").writeStream \
>> .outputMode("update") \
>> .trigger(processingTime='10 minutes') \
>> .option("truncate", "false") \
>> .option("checkpointLocation", self.checkpoint) \
>> .foreachBatch(self.process) \
>> .start()
>>
>>
>> self.process - where i do the bulk of the processing, which calls the
>> function 'aggIntfLogs'
>>
>> In function aggIntfLogs - i'm using watermark of 15 mins, and doing groupBy
>> to calculate the sum of sentOctets & recvdOctets
>>
>>
>> def aggIntfLogs(ldf):
>> if ldf and ldf.count() > 0:
>>
>> ldf = ldf.select('applianceName', 'timeslot', 'sentOctets',
>> 'recvdOctets','ts', 'customer') \
>> .withColumn('sentOctets',
>> ldf["sentOctets"].cast(LongType())) \
>> .withColumn('recvdOctets',
>> ldf["recvdOctets"].cast(LongType())) \
>> .withWatermark("ts", "15 minutes")
>>
>> ldf = ldf.groupBy("applianceName", "timeslot", "customer",
>>
>> window(col("ts"), "15 minutes")) \
>> .agg({'sentOctets':"sum", 'recvdOctets':"sum"}) \
>> .withColumnRenamed('sum(sentOctets)', 'sentOctets') \
>> .withColumnRenamed('sum(recvdOctets)', 'recvdOctets') \
>> .fillna(0)
>> return ldf
>> return ldf
>>
>>
>> Dataframe 'ldf' returned from the function aggIntfLogs - is written
>> to Kafka topic
>>
>> ```
>>
>> I was expecting that using the watermark will account for late coming
>> data .. i.e. the sentOctets & recvdOctets are calculated for the
>> consolidated data
>> (including late-coming data, since the late coming data comes within 15
>> mins), however, I'm seeing 2 records for some of the data (i.e. key -
>> applianceName/timeslot/customer) i.e. the aggregated data is calculated
>> individually for the records and I see 2 records instead of single record
>> accounting for late coming data within watermark.
>>
>> What needs to be done to fix this & make this work as desired?
>>
>> tia!
>>
>>
>> Here is the Stackoverflow link as well -
>>
>>
>> https://stackoverflow.com/questions/75693171/spark-structuredstreaming-watermark-not-working-as-expected
>>
>>
>>
>>