Hi everyone,
I’m trying to calculate an average with a sliding window. Here’s the code I’m
using. First of all I receive a series of events from a Kafka topic. I declared
a watermark on the ‘data_fine’ attribute.
final TableDescriptor filteredPhasesDurationsTableDescriptor =
TableDescriptor.forConnector("upsert-kafka")
.schema(Schema.newBuilder()
.column("id_fascicolo", DataTypes.BIGINT().notNull())
.column("nrg", DataTypes.STRING())
.column("giudice", DataTypes.STRING())
.column("oggetto", DataTypes.STRING())
.column("codice_oggetto", DataTypes.STRING())
.column("ufficio", DataTypes.STRING())
.column("sezione", DataTypes.STRING())
.column("fase", DataTypes.STRING().notNull())
.column("durata", DataTypes.BIGINT())
.column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
.column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
.watermark("data_fine", "data_fine - INTERVAL '5' SECOND")
.primaryKey("id_fascicolo", "fase")
.build())
.option(KafkaConnectorOptions.TOPIC,
List.of("sicid.processor.filtered-phases-durations"))
.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
.option(KafkaConnectorOptions.KEY_FORMAT, "json")
.option(KafkaConnectorOptions.VALUE_FORMAT, "json")
.build();
tEnv.createTable("FilteredPhasesDurations_Kafka",
filteredPhasesDurationsTableDescriptor);
Table filteredPhasesDurationsTable = tEnv.from("FilteredPhasesDurations_Kafka”);
After receiving the events I compute the average using a sliding window like
this:
Table averageJudgeByPhase = filteredPhasesDurationsTable
.window(Slide.over(lit(15).days())
.every(lit(1).days())
.on($("data_fine"))
.as("w"))
.groupBy(
$("giudice"),
$("fase"),
$("w")
)
.select(
$("giudice"),
$("fase"),
$("durata").avg().as("mediaMobileGiudicePerFase"),
$("w").end().as("data_fine")
);
averageJudgeByPhase.execute().print();
But I get no results.
These are some test events in the Kafka topic. The data_fine attribute has a
span of more than 15 days so why the average is not computed?
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| op | id_fascicolo | nrg |
giudice | oggetto | codice_oggetto
| ufficio | sezione |
fase | durata | data_inizio |
data_fine |
+----+----------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+----------------------+-------------------------+-------------------------+
| +I | 9 | 301 |
giudice01 | Usucapione | 131002
| Benevento | 01 |
FASE_INTRODUTTIVA | 14 | 2023-04-16 02:00:00.000 |
2023-04-30 02:00:00.000 |
| +I | 10 | 302 |
giudice01 | Usucapione | 131002
| Benevento | 01 |
FASE_INTRODUTTIVA | 15 | 2023-04-30 02:00:00.000 |
2023-05-15 02:00:00.000 |
| +I | 8 | 300 |
giudice01 | Usucapione | 131002
| Benevento | 01 |
FASE_INTRODUTTIVA | 14 | 2023-04-01 02:00:00.000 |
2023-04-15 02:00:00.000 |
Thanks,
Eugenio