Hi,

I’m trying to execute a window aggregation on two joined table from two Kafka 
topics (upsert fashion), but I get no output. Here’s the code I’m using:

This is the first table from Kafka with an event time watermark on ‘data_fine’ 
attribute:

final TableDescriptor phasesDurationsTableDescriptor = 
TableDescriptor.forConnector("upsert-kafka")
       .schema(Schema.newBuilder()
             .column("id_fascicolo", DataTypes.BIGINT().notNull())
             .column("nrg", DataTypes.STRING())
             .column("giudice", DataTypes.STRING())
             .column("oggetto", DataTypes.STRING())
             .column("codice_oggetto", DataTypes.STRING())
             .column("ufficio", DataTypes.STRING())
             .column("sezione", DataTypes.STRING())
             .column("fase_completata", DataTypes.BOOLEAN())
             .column("fase", DataTypes.STRING().notNull())
             .column("durata", DataTypes.BIGINT())
             .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
             .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
             .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
             .primaryKey("id_fascicolo", "fase")
             .build())
       .option(KafkaConnectorOptions.TOPIC, 
List.of("sicid.processor.phases-durations"))
       .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
       .option(KafkaConnectorOptions.KEY_FORMAT, "json")
       .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
       .build();
tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);
Here’s the second table:
final TableDescriptor averageJudgeByPhaseReportTableDescriptor = 
TableDescriptor.forConnector("upsert-kafka")
       .schema(Schema.newBuilder()
             .column("giudice", DataTypes.STRING().notNull())
             .column("fase", DataTypes.STRING().notNull())
             .column("media_mobile", DataTypes.BIGINT())
             .primaryKey("giudice", "fase")
             .build())
       .option(KafkaConnectorOptions.TOPIC, 
List.of("sicid.processor.average-judge-by-phase-report"))
       .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
       .option(KafkaConnectorOptions.KEY_FORMAT, "json")
       .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
       .option(KafkaConnectorOptions.PROPS_GROUP_ID, 
"average-judge-by-phase-report")
       .build();
tEnv.createTable("AverageJudgeByPhaseReport_Kafka", 
averageJudgeByPhaseReportTableDescriptor);
Table averageJudgeByPhaseReportTable = 
tEnv.from("AverageJudgeByPhaseReport_Kafka");

Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable
       .select(
             $("giudice").as("giudice_media"),
             $("fase").as("fase_media"),
             $("media_mobile")
       );

And here’s the code I’m experimenting with:
phasesDurationsTable
       .join(renamedAverageJudgeByPhaseReportTable)
       .where($("giudice").isEqual($("giudice_media")))
       .window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w"))
       .groupBy(
             $("giudice"),
             $("w")
       )
       .select(
             $("giudice")
       )
       .execute().print();

Am I doing something wrong?

Reply via email to