Hi, I’m trying to execute a window aggregation on two joined table from two Kafka topics (upsert fashion), but I get no output. Here’s the code I’m using:
This is the first table from Kafka with an event time watermark on ‘data_fine’ attribute: final TableDescriptor phasesDurationsTableDescriptor = TableDescriptor.forConnector("upsert-kafka") .schema(Schema.newBuilder() .column("id_fascicolo", DataTypes.BIGINT().notNull()) .column("nrg", DataTypes.STRING()) .column("giudice", DataTypes.STRING()) .column("oggetto", DataTypes.STRING()) .column("codice_oggetto", DataTypes.STRING()) .column("ufficio", DataTypes.STRING()) .column("sezione", DataTypes.STRING()) .column("fase_completata", DataTypes.BOOLEAN()) .column("fase", DataTypes.STRING().notNull()) .column("durata", DataTypes.BIGINT()) .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND") .primaryKey("id_fascicolo", "fase") .build()) .option(KafkaConnectorOptions.TOPIC, List.of("sicid.processor.phases-durations")) .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) .option(KafkaConnectorOptions.KEY_FORMAT, "json") .option(KafkaConnectorOptions.VALUE_FORMAT, "json") .build(); tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor); Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”); Here’s the second table: final TableDescriptor averageJudgeByPhaseReportTableDescriptor = TableDescriptor.forConnector("upsert-kafka") .schema(Schema.newBuilder() .column("giudice", DataTypes.STRING().notNull()) .column("fase", DataTypes.STRING().notNull()) .column("media_mobile", DataTypes.BIGINT()) .primaryKey("giudice", "fase") .build()) .option(KafkaConnectorOptions.TOPIC, List.of("sicid.processor.average-judge-by-phase-report")) .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) .option(KafkaConnectorOptions.KEY_FORMAT, "json") .option(KafkaConnectorOptions.VALUE_FORMAT, "json") .option(KafkaConnectorOptions.PROPS_GROUP_ID, "average-judge-by-phase-report") .build(); tEnv.createTable("AverageJudgeByPhaseReport_Kafka", averageJudgeByPhaseReportTableDescriptor); Table averageJudgeByPhaseReportTable = tEnv.from("AverageJudgeByPhaseReport_Kafka"); Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable .select( $("giudice").as("giudice_media"), $("fase").as("fase_media"), $("media_mobile") ); And here’s the code I’m experimenting with: phasesDurationsTable .join(renamedAverageJudgeByPhaseReportTable) .where($("giudice").isEqual($("giudice_media"))) .window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w")) .groupBy( $("giudice"), $("w") ) .select( $("giudice") ) .execute().print(); Am I doing something wrong?