Hi everyone, I’m trying to calculate an average with a sliding window. Here’s the code I’m using. First of all I receive a series of events from a Kafka topic. I declared a watermark on the ‘data_fine’ attribute.
final TableDescriptor filteredPhasesDurationsTableDescriptor = TableDescriptor.forConnector("upsert-kafka") .schema(Schema.newBuilder() .column("id_fascicolo", DataTypes.BIGINT().notNull()) .column("nrg", DataTypes.STRING()) .column("giudice", DataTypes.STRING()) .column("oggetto", DataTypes.STRING()) .column("codice_oggetto", DataTypes.STRING()) .column("ufficio", DataTypes.STRING()) .column("sezione", DataTypes.STRING()) .column("fase", DataTypes.STRING().notNull()) .column("durata", DataTypes.BIGINT()) .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) .watermark("data_fine", "data_fine - INTERVAL '5' SECOND") .primaryKey("id_fascicolo", "fase") .build()) .option(KafkaConnectorOptions.TOPIC, List.of("sicid.processor.filtered-phases-durations")) .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) .option(KafkaConnectorOptions.KEY_FORMAT, "json") .option(KafkaConnectorOptions.VALUE_FORMAT, "json") .build(); tEnv.createTable("FilteredPhasesDurations_Kafka", filteredPhasesDurationsTableDescriptor); Table filteredPhasesDurationsTable = tEnv.from("FilteredPhasesDurations_Kafka”); After receiving the events I compute the average using a sliding window like this: Table averageJudgeByPhase = filteredPhasesDurationsTable .window(Slide.over(lit(15).days()) .every(lit(1).days()) .on($("data_fine")) .as("w")) .groupBy( $("giudice"), $("fase"), $("w") ) .select( $("giudice"), $("fase"), $("durata").avg().as("mediaMobileGiudicePerFase"), $("w").end().as("data_fine") ); averageJudgeByPhase.execute().print(); But I get no results. These are some test events in the Kafka topic. The data_fine attribute has a span of more than 15 days so why the average is not computed? +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+ | op | id_fascicolo | nrg | giudice | oggetto | codice_oggetto | ufficio | sezione | fase | durata | data_inizio | data_fine | +----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+ | +I | 9 | 301 | giudice01 | Usucapione | 131002 | Benevento | 01 | FASE_INTRODUTTIVA | 14 | 2023-04-16 02:00:00.000 | 2023-04-30 02:00:00.000 | | +I | 10 | 302 | giudice01 | Usucapione | 131002 | Benevento | 01 | FASE_INTRODUTTIVA | 15 | 2023-04-30 02:00:00.000 | 2023-05-15 02:00:00.000 | | +I | 8 | 300 | giudice01 | Usucapione | 131002 | Benevento | 01 | FASE_INTRODUTTIVA | 14 | 2023-04-01 02:00:00.000 | 2023-04-15 02:00:00.000 | Thanks, Eugenio