Hi Eugenio,

I think it is due to window completion which will be complete once your
watermarked field on the event advances 15 days interval since the first
received event time.
Please also check this default trigger behavior here:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/windows/#default-triggers-of-windowassigners

Best regards,
Alexey

On Sat, Jul 1, 2023 at 4:37 PM Eugenio Marotti <
ing.eugenio.maro...@gmail.com> wrote:

> Hi everyone,
>
> I’m trying to calculate an average with a sliding window. Here’s the code
> I’m using. First of all I receive a series of events from a Kafka topic. I
> declared a watermark on the ‘data_fine’ attribute.
>
> final TableDescriptor filteredPhasesDurationsTableDescriptor =
> TableDescriptor.forConnector("upsert-kafka")
> .schema(Schema.newBuilder()
> .column("id_fascicolo", DataTypes.BIGINT().notNull())
> .column("nrg", DataTypes.STRING())
> .column("giudice", DataTypes.STRING())
> .column("oggetto", DataTypes.STRING())
> .column("codice_oggetto", DataTypes.STRING())
> .column("ufficio", DataTypes.STRING())
> .column("sezione", DataTypes.STRING())
> .column("fase", DataTypes.STRING().notNull())
> .column("durata", DataTypes.BIGINT())
> .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
> .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
> .watermark("data_fine", "data_fine - INTERVAL '5' SECOND")
> .primaryKey("id_fascicolo", "fase")
> .build())
> .option(KafkaConnectorOptions.TOPIC, List.of(
> "sicid.processor.filtered-phases-durations"))
> .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
> .option(KafkaConnectorOptions.KEY_FORMAT, "json")
> .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
> .build();
> tEnv.createTable("FilteredPhasesDurations_Kafka",
> filteredPhasesDurationsTableDescriptor);
> Table filteredPhasesDurationsTable = tEnv.from(
> "FilteredPhasesDurations_Kafka”);
>
> After receiving the events I compute the average using a sliding window
> like this:
>
> Table averageJudgeByPhase = filteredPhasesDurationsTable
> .window(Slide.over(lit(15).days())
> .every(lit(1).days())
> .on($("data_fine"))
> .as("w"))
> .groupBy(
> $("giudice"),
> $("fase"),
> $("w")
> )
> .select(
> $("giudice"),
> $("fase"),
> $("durata").avg().as("mediaMobileGiudicePerFase"),
> $("w").end().as("data_fine")
> );
>
> averageJudgeByPhase.execute().print();
>
> But I get no results.
> These are some test events in the Kafka topic. The data_fine attribute has
> a span of more than 15 days so why the average is not computed?
>
>
> +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
> | op |         id_fascicolo |                            nrg |
>            giudice |                        oggetto |
> codice_oggetto |                        ufficio |
>  sezione |                           fase |               durata |
>     data_inizio |               data_fine |
>
> +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
> | +I |                    9 |                            301 |
>          giudice01 |                     Usucapione |
>   131002 |                      Benevento |                             01
> |              FASE_INTRODUTTIVA |                   14 | 2023-04-16
> 02:00:00.000 | 2023-04-30 02:00:00.000 |
> | +I |                   10 |                            302 |
>          giudice01 |                     Usucapione |
>   131002 |                      Benevento |                             01
> |              FASE_INTRODUTTIVA |                   15 | 2023-04-30
> 02:00:00.000 | 2023-05-15 02:00:00.000 |
> | +I |                    8 |                            300 |
>          giudice01 |                     Usucapione |
>   131002 |                      Benevento |                             01
> |              FASE_INTRODUTTIVA |                   14 | 2023-04-01
> 02:00:00.000 | 2023-04-15 02:00:00.000 |
>
>
> Thanks,
> Eugenio
>

Reply via email to