Hi Eugenio, I think it is due to window completion which will be complete once your watermarked field on the event advances 15 days interval since the first received event time. Please also check this default trigger behavior here: https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/windows/#default-triggers-of-windowassigners
Best regards, Alexey On Sat, Jul 1, 2023 at 4:37 PM Eugenio Marotti < ing.eugenio.maro...@gmail.com> wrote: > Hi everyone, > > I’m trying to calculate an average with a sliding window. Here’s the code > I’m using. First of all I receive a series of events from a Kafka topic. I > declared a watermark on the ‘data_fine’ attribute. > > final TableDescriptor filteredPhasesDurationsTableDescriptor = > TableDescriptor.forConnector("upsert-kafka") > .schema(Schema.newBuilder() > .column("id_fascicolo", DataTypes.BIGINT().notNull()) > .column("nrg", DataTypes.STRING()) > .column("giudice", DataTypes.STRING()) > .column("oggetto", DataTypes.STRING()) > .column("codice_oggetto", DataTypes.STRING()) > .column("ufficio", DataTypes.STRING()) > .column("sezione", DataTypes.STRING()) > .column("fase", DataTypes.STRING().notNull()) > .column("durata", DataTypes.BIGINT()) > .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) > .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) > .watermark("data_fine", "data_fine - INTERVAL '5' SECOND") > .primaryKey("id_fascicolo", "fase") > .build()) > .option(KafkaConnectorOptions.TOPIC, List.of( > "sicid.processor.filtered-phases-durations")) > .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) > .option(KafkaConnectorOptions.KEY_FORMAT, "json") > .option(KafkaConnectorOptions.VALUE_FORMAT, "json") > .build(); > tEnv.createTable("FilteredPhasesDurations_Kafka", > filteredPhasesDurationsTableDescriptor); > Table filteredPhasesDurationsTable = tEnv.from( > "FilteredPhasesDurations_Kafka”); > > After receiving the events I compute the average using a sliding window > like this: > > Table averageJudgeByPhase = filteredPhasesDurationsTable > .window(Slide.over(lit(15).days()) > .every(lit(1).days()) > .on($("data_fine")) > .as("w")) > .groupBy( > $("giudice"), > $("fase"), > $("w") > ) > .select( > $("giudice"), > $("fase"), > $("durata").avg().as("mediaMobileGiudicePerFase"), > $("w").end().as("data_fine") > ); > > averageJudgeByPhase.execute().print(); > > But I get no results. > These are some test events in the Kafka topic. The data_fine attribute has > a span of more than 15 days so why the average is not computed? > > > +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+ > | op | id_fascicolo | nrg | > giudice | oggetto | > codice_oggetto | ufficio | > sezione | fase | durata | > data_inizio | data_fine | > > +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+ > | +I | 9 | 301 | > giudice01 | Usucapione | > 131002 | Benevento | 01 > | FASE_INTRODUTTIVA | 14 | 2023-04-16 > 02:00:00.000 | 2023-04-30 02:00:00.000 | > | +I | 10 | 302 | > giudice01 | Usucapione | > 131002 | Benevento | 01 > | FASE_INTRODUTTIVA | 15 | 2023-04-30 > 02:00:00.000 | 2023-05-15 02:00:00.000 | > | +I | 8 | 300 | > giudice01 | Usucapione | > 131002 | Benevento | 01 > | FASE_INTRODUTTIVA | 14 | 2023-04-01 > 02:00:00.000 | 2023-04-15 02:00:00.000 | > > > Thanks, > Eugenio >