Hi Matthias,

No the second table doesn’t have an event time and a watermark specified. In 
order for the window to work do I need a watermark also on the second table?


> Il giorno 21 set 2023, alle ore 13:45, Schwalbe Matthias 
> <matthias.schwa...@viseca.ch> ha scritto:
> Ciao Eugenio,
> I might be mistaken, but did you specify the event time for the second table 
> like you did for the first table (watermark(….))?
> I am no so acquainted with table api (doing more straight data stream api 
> work), but I assume this join and windowing should be by event time.
> What do you think?
> Cari saluti
> Thias
> From: Eugenio Marotti <ing.eugenio.maro...@gmail.com> 
> Sent: Thursday, September 21, 2023 8:56 AM
> To: user@flink.apache.org
> Subject: Window aggregation on two joined table 
> Hi,
> I’m trying to execute a window aggregation on two joined table from two Kafka 
> topics (upsert fashion), but I get no output. Here’s the code I’m using:
> This is the first table from Kafka with an event time watermark on 
> ‘data_fine’ attribute:
> final TableDescriptor phasesDurationsTableDescriptor = 
> TableDescriptor.forConnector("upsert-kafka")
>        .schema(Schema.newBuilder()
>              .column("id_fascicolo", DataTypes.BIGINT().notNull())
>              .column("nrg", DataTypes.STRING())
>              .column("giudice", DataTypes.STRING())
>              .column("oggetto", DataTypes.STRING())
>              .column("codice_oggetto", DataTypes.STRING())
>              .column("ufficio", DataTypes.STRING())
>              .column("sezione", DataTypes.STRING())
>              .column("fase_completata", DataTypes.BOOLEAN())
>              .column("fase", DataTypes.STRING().notNull())
>              .column("durata", DataTypes.BIGINT())
>              .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
>              .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
>              .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
>              .primaryKey("id_fascicolo", "fase")
>              .build())
>        .option(KafkaConnectorOptions.TOPIC, 
> List.of("sicid.processor.phases-durations"))
>        .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>        .option(KafkaConnectorOptions.KEY_FORMAT, "json")
>        .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
>        .build();
> tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
> Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);
> Here’s the second table:
> final TableDescriptor averageJudgeByPhaseReportTableDescriptor = 
> TableDescriptor.forConnector("upsert-kafka")
>        .schema(Schema.newBuilder()
>              .column("giudice", DataTypes.STRING().notNull())
>              .column("fase", DataTypes.STRING().notNull())
>              .column("media_mobile", DataTypes.BIGINT())
>              .primaryKey("giudice", "fase")
>              .build())
>        .option(KafkaConnectorOptions.TOPIC, 
> List.of("sicid.processor.average-judge-by-phase-report"))
>        .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>        .option(KafkaConnectorOptions.KEY_FORMAT, "json")
>        .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
>        .option(KafkaConnectorOptions.PROPS_GROUP_ID, 
> "average-judge-by-phase-report")
>        .build();
> tEnv.createTable("AverageJudgeByPhaseReport_Kafka", 
> averageJudgeByPhaseReportTableDescriptor);
> Table averageJudgeByPhaseReportTable = 
> tEnv.from("AverageJudgeByPhaseReport_Kafka");
> Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable
>        .select(
>              $("giudice").as("giudice_media"),
>              $("fase").as("fase_media"),
>              $("media_mobile")
>        );
> And here’s the code I’m experimenting with:
> phasesDurationsTable
>        .join(renamedAverageJudgeByPhaseReportTable)
>        .where($("giudice").isEqual($("giudice_media")))
>        .window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w"))
>        .groupBy(
>              $("giudice"),
>              $("w")
>        )
>        .select(
>              $("giudice")
>        )
>        .execute().print();
> Am I doing something wrong?
