Hi,

In answer to your question I did some tests using broadly your approach.
With regard to your questions:

"but it does not work well because it does not give a temperature average
as you can see in the attached pic.
Why is the average not calculated on temperature?
How can I view data in each window of 5 minutes and related average?

This is similar to the code you are doing

           streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", "temperature") \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "false") \
                .option("startingOffsets", "latest") \
                .load()

            streamingDataFrame.printSchema()
            result = streamingDataFrame. \
*                     withWatermark("timestamp", "5 minutes"). \*
*                     groupBy(window(streamingDataFrame.timestamp, "5
minutes", "5 minutes")). \*
 *                    avg(). \*
                     writeStream. \
                     outputMode('complete'). \
                     option("numRows", 100). \
                     option("truncate", "false"). \
                     format('console'). \
                     option('checkpointLocation', checkpoint_path). \
                     queryName("temperature"). \
                     start()

OK

To simulate the schema of your data which I believe comprises two keys;
timestamp, temperature. I am sending a line of temperature to kafka every
minute. Single message every minute, for temperature between 20-30 degrees.
An example

{"timestamp":"2021-05-15T22:16:31", "temperature":29}

So let us print the schema

streamingDataFrame.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

There is no temperature there as you have not created a temperature column
from json ( see later), So this is what you get if you run this code. Note
the batch cycle is 1 minute in my case

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+--------------+-----------+------------------+
|window
|avg(partition)|avg(offset)|avg(timestampType)|
+------------------------------------------+--------------+-----------+------------------+
|{2021-05-15 22:05:00, 2021-05-15 22:10:00}|7.0           |7071.0     |0.0
             |
|{2021-05-15 22:00:00, 2021-05-15 22:05:00}|0.0           |7117.0     |0.0
             |
+------------------------------------------+--------------+-----------+------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+--------------+-----------+------------------+
|window
|avg(partition)|avg(offset)|avg(timestampType)|
+------------------------------------------+--------------+-----------+------------------+
|{2021-05-15 22:05:00, 2021-05-15 22:10:00}|5.5           |7147.5     |0.0
             |
|{2021-05-15 22:00:00, 2021-05-15 22:05:00}|0.0           |7117.0     |0.0
             |
+------------------------------------------+--------------+-----------+------------------+

So this is I think what you need to do with your schema


        schema = StructType().add("timestamp",
TimestampType()).add("temperature", IntegerType())


            streamingDataFrame = self.spark \

                .readStream \

                .format("kafka") \

                .option("kafka.bootstrap.servers",
config['MDVariables']['bootstrapServers'],) \

                .option("schema.registry.url",
config['MDVariables']['schemaRegistryURL']) \

                .option("group.id", config['common']['appName']) \

                .option("zookeeper.connection.timeout.ms",
config['MDVariables']['zookeeperConnectionTimeoutMs']) \

                .option("rebalance.backoff.ms",
config['MDVariables']['rebalanceBackoffMS']) \

                .option("zookeeper.session.timeout.ms",
config['MDVariables']['zookeeperSessionTimeOutMs']) \

                .option("auto.commit.interval.ms",
config['MDVariables']['autoCommitIntervalMS']) \

                .option("subscribe", "temperature") \

                .option("failOnDataLoss", "false") \

                .option("includeHeaders", "true") \

                .option("startingOffsets", "latest") \

                .load() \

                *.select(from_json(col("value").cast("string"),
schema).alias("parsed_value"))*


                      # get columns from struct

     *       resultM = streamingDataFrame.select( \*

*                     col("parsed_value.timestamp").alias("timestamp") \*

*                   , col("parsed_value.temperature").alias("temperature"))*


                 result = resultM. \

                     withWatermark("timestamp", "5 minutes"). \

                     groupBy(window(resultM.timestamp, "5 minutes", "5
minutes")). \

                     avg(). \

                     writeStream. \

                     outputMode('complete'). \

                     option("numRows", 100). \

                     option("truncate", "false"). \

                     format('console'). \

                     option('checkpointLocation', checkpoint_path). \

                     queryName("temperature"). \

                     start()


And you will get



-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----------------+
|window                                    |avg(temperature)|
+------------------------------------------+----------------+
|{2021-05-15 22:30:00, 2021-05-15 22:35:00}|28.0            |
+------------------------------------------+----------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+----------------+
|window                                    |avg(temperature)|
+------------------------------------------+----------------+
|{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
+------------------------------------------+----------------+

Batch: 3
-------------------------------------------
+------------------------------------------+----------------+
|window                                    |avg(temperature)|
+------------------------------------------+----------------+
|{2021-05-15 22:35:00, 2021-05-15 22:40:00}|22.0            |
|{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
+------------------------------------------+----------------+

Batch: 4
-------------------------------------------
+------------------------------------------+----------------+
|window                                    |avg(temperature)|
+------------------------------------------+----------------+
|{2021-05-15 22:35:00, 2021-05-15 22:40:00}|26.0            |
|{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
+------------------------------------------+----------------+

Batch: 5
-------------------------------------------
+------------------------------------------+------------------+
|window                                    |avg(temperature)  |
+------------------------------------------+------------------+
|{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.3              |
|{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5              |
+------------------------------------------+------------------+

Batch: 6
-------------------------------------------
+------------------------------------------+----------------+
|window                                    |avg(temperature)|
+------------------------------------------+----------------+
|{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.75           |
|{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
+------------------------------------------+----------------+

Batch: 7
-------------------------------------------
+------------------------------------------+----------------+
|window                                    |avg(temperature)|
+------------------------------------------+----------------+
|{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.0            |
|{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
+------------------------------------------+----------------+

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+----------------+
|window                                    |avg(temperature)|
+------------------------------------------+----------------+
|{2021-05-15 22:35:00, 2021-05-15 22:40:00}|27.0            |
|{2021-05-15 22:40:00, 2021-05-15 22:45:00}|26.0            |
|{2021-05-15 22:30:00, 2021-05-15 22:35:00}|25.5            |
+------------------------------------------+----------------+

This should be all you need I believe.

HTH


   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Tue, 11 May 2021 at 14:42, Giuseppe Ricci <peppepega...@gmail.com> wrote:

> Hi,
>
> As suggested by Jayesh I follow his solution.
> I need to have the average temperature at some prefixed minute: 5, 10, 15
> etc. So it seems a tumbling window is the optimal solution (a).
> Real sensors may send data with some delay..this can be few seconds (b).
> So this is my new code (I used a window of 5 minutes):
>
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StringType
>
> # Spark session & context
> spark = (SparkSession
>          .builder
>          .master('local')
>          .appName('TemperatureStreamApp')
>          # Add kafka package
>          .config("spark.jars.packages",
> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
>          .getOrCreate())
>
> sc = spark.sparkContext
>
> # Create stream dataframe setting kafka server, topic and offset option
> df = (spark
>   .readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "localhost:9092") # kafka server
>   .option("subscribe", "temperature") # topic
>   .option("startingOffsets", "earliest") # start from beginning
>   .load())
>
> windowedAvg = df\
>     .withWatermark("timestamp", "5 minutes") \
>     .groupBy(
>     window(df.timestamp, "5 minutes", "5 minutes")).avg()
>
> query = windowedAvg\
>         .writeStream\
>         .outputMode('complete')\
>         .format('console')\
>         .option('truncate', 'false')\
>         .start()
>
> query.awaitTermination()
>
>
> but it does not work well because it does not give a temperature average
> as you can see in the attached pic.
> Why the average is not calculated on temperature?
> How can I view data in each window of 5 minutes and related average?
> Thanks for your help.
>
>
> PhD. Giuseppe Ricci
>
>
> Il giorno lun 10 mag 2021 alle ore 18:14 Lalwani, Jayesh <
> jlalw...@amazon.com> ha scritto:
>
>> You don’t need to “launch batches” every 5 minutes. You can launch
>> batches every 2 seconds, and aggregate on window for 5 minutes. Spark will
>> read data from topic every 2 seconds, and keep the data in memory for 5
>> minutes.
>>
>>
>>
>> You need to make few decisions
>>
>>    1. DO you want a tumbling window or a rolling window? A tumbling
>>    window of 5 minutes will produce an aggregate every 5 minutes. It will
>>    aggregate data for 5 minutes before. A rolling window of 5 miutes/1 
>> minute,
>>    will produce an aggregate ever 1 minute. It will aggregate data ever 1
>>    minute. For example, let’s say you have data evert 2 seconds. A tumbling
>>    window will produce a result on minute 5, 10, 15, 20…. Minute 5 result 
>> will
>>    have data from minute 1-4., 15 will have data from 6-10… and so on. 
>> Rolling
>>    window will produce data on minute 5, 6, 7, 8, …. Minute 5 will have
>>    aggregate from 1-5, minute 6 will have aggregate from 2-6, and so on. This
>>    defines your window. In your code you have
>>
>>
>> window(df_temp.timestamp, "2 minutes", "1 minutes")
>>
>> This is a rolling window. Here second parameter(2 minutes) is the window
>> interval, and third parameter(1 minutes) is the slide interval. In the
>> above example, it will produce an aggregate every 1 minute interval for
>> 2minute worth of data.
>>
>> If you define
>>
>> window(df_temp.timestamp, "2 minutes", "2 minutes")
>>
>> This is a tumbling window. It will produce an aggregate every 2 minutes,
>> with 2 minutes worth of data
>>
>>
>>
>>
>>
>>    1. Can you have late data? How late can data arrive? Usually
>>    streaming systems send data out of order. Liik, it could happen that you
>>    get data for t=11:00:00 AM, and then get data for t=10:59:59AM. This means
>>    that the data is late by 1 second. What’s the worst case condition for 
>> late
>>    data? You need to define the watermark for late data. In your code, you
>>    have defined a watermark of 2 minutes. For aggregations, the watermark 
>> also
>>    defines which windows Spark will keep in memory. If you define a watermark
>>    of 2 minutes, and you have a rolling window with slide interval of 1
>>    minute, Spark will keep 2 windows in memory. Watermark interval affects 
>> how
>>    much memory will be used by Spark
>>
>>
>>
>> It might help if you try to follow the example in this guide very
>> carefully
>> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time
>> That is a pretty good example, but you need to follow it event by event
>> very carefully to get all the nuances.
>>
>>
>>
>> *From: *Giuseppe Ricci <peppepega...@gmail.com>
>> *Date: *Monday, May 10, 2021 at 11:19 AM
>> *To: *"user@spark.apache.org" <user@spark.apache.org>
>> *Subject: *[EXTERNAL] Calculate average from Spark stream
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> Hi, I'm new on Apache Spark.
>>
>> I'm trying to read data from an Apache Kafka topic (I have a simulated
>> temperature sensor producer which sends data every 2 second) and I need
>> every 5 minutes to calculate the average temperature. Reading documentation
>> I understand I need to use windows but I'm not able to finalize my code.
>> Can some help me?
>> How can I launch batches every 5 minutes? My code works one time and
>> finishes. Why in the console I can't find any helpful information for
>> correct execution? See attached picture.
>>
>> This is my code:
>>
>> https://pastebin.com/4S31jEeP
>>
>>
>>
>> Thanks for your precious help.
>>
>>
>>
>>
>>
>>
>>
>> PhD. Giuseppe Ricci
>>
>>
>>
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to