Hi everyone,

I’m trying to calculate an average with a sliding window. Here’s the code I’m 
using. First of all I receive a series of events from a Kafka topic. I declared 
a watermark on the ‘data_fine’ attribute.

final TableDescriptor filteredPhasesDurationsTableDescriptor = 
TableDescriptor.forConnector("upsert-kafka")
       .schema(Schema.newBuilder()
             .column("id_fascicolo", DataTypes.BIGINT().notNull())
             .column("nrg", DataTypes.STRING())
             .column("giudice", DataTypes.STRING())
             .column("oggetto", DataTypes.STRING())
             .column("codice_oggetto", DataTypes.STRING())
             .column("ufficio", DataTypes.STRING())
             .column("sezione", DataTypes.STRING())
             .column("fase", DataTypes.STRING().notNull())
             .column("durata", DataTypes.BIGINT())
             .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
             .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
             .watermark("data_fine", "data_fine - INTERVAL '5' SECOND")
             .primaryKey("id_fascicolo", "fase")
             .build())
       .option(KafkaConnectorOptions.TOPIC, 
List.of("sicid.processor.filtered-phases-durations"))
       .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
       .option(KafkaConnectorOptions.KEY_FORMAT, "json")
       .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
       .build();
tEnv.createTable("FilteredPhasesDurations_Kafka", 
filteredPhasesDurationsTableDescriptor);
Table filteredPhasesDurationsTable = tEnv.from("FilteredPhasesDurations_Kafka”);

After receiving the events I compute the average using a sliding window like 
this:

Table averageJudgeByPhase = filteredPhasesDurationsTable
       .window(Slide.over(lit(15).days())
             .every(lit(1).days())
             .on($("data_fine"))
             .as("w"))
       .groupBy(
             $("giudice"),
             $("fase"),
             $("w")
       )
       .select(
             $("giudice"),
             $("fase"),
             $("durata").avg().as("mediaMobileGiudicePerFase"),
             $("w").end().as("data_fine")
       );

averageJudgeByPhase.execute().print();

But I get no results. 
These are some test events in the Kafka topic. The data_fine attribute has a 
span of more than 15 days so why the average is not computed?

+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| op |         id_fascicolo |                            nrg |                  
      giudice |                        oggetto |                 codice_oggetto 
|                        ufficio |                        sezione |             
              fase |               durata |             data_inizio |           
    data_fine |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| +I |                    9 |                            301 |                  
    giudice01 |                     Usucapione |                         131002 
|                      Benevento |                             01 |             
 FASE_INTRODUTTIVA |                   14 | 2023-04-16 02:00:00.000 | 
2023-04-30 02:00:00.000 |
| +I |                   10 |                            302 |                  
    giudice01 |                     Usucapione |                         131002 
|                      Benevento |                             01 |             
 FASE_INTRODUTTIVA |                   15 | 2023-04-30 02:00:00.000 | 
2023-05-15 02:00:00.000 |
| +I |                    8 |                            300 |                  
    giudice01 |                     Usucapione |                         131002 
|                      Benevento |                             01 |             
 FASE_INTRODUTTIVA |                   14 | 2023-04-01 02:00:00.000 | 
2023-04-15 02:00:00.000 |


Thanks,
Eugenio

Reply via email to