Hi Matthias, No the second table doesn’t have an event time and a watermark specified. In order for the window to work do I need a watermark also on the second table?
Thanks Eugenio > Il giorno 21 set 2023, alle ore 13:45, Schwalbe Matthias > <matthias.schwa...@viseca.ch> ha scritto: > > Ciao Eugenio, > > I might be mistaken, but did you specify the event time for the second table > like you did for the first table (watermark(….))? > I am no so acquainted with table api (doing more straight data stream api > work), but I assume this join and windowing should be by event time. > > What do you think? > > Cari saluti > > Thias > > > From: Eugenio Marotti <ing.eugenio.maro...@gmail.com> > Sent: Thursday, September 21, 2023 8:56 AM > To: user@flink.apache.org > Subject: Window aggregation on two joined table > > Hi, > > I’m trying to execute a window aggregation on two joined table from two Kafka > topics (upsert fashion), but I get no output. Here’s the code I’m using: > > This is the first table from Kafka with an event time watermark on > ‘data_fine’ attribute: > > final TableDescriptor phasesDurationsTableDescriptor = > TableDescriptor.forConnector("upsert-kafka") > .schema(Schema.newBuilder() > .column("id_fascicolo", DataTypes.BIGINT().notNull()) > .column("nrg", DataTypes.STRING()) > .column("giudice", DataTypes.STRING()) > .column("oggetto", DataTypes.STRING()) > .column("codice_oggetto", DataTypes.STRING()) > .column("ufficio", DataTypes.STRING()) > .column("sezione", DataTypes.STRING()) > .column("fase_completata", DataTypes.BOOLEAN()) > .column("fase", DataTypes.STRING().notNull()) > .column("durata", DataTypes.BIGINT()) > .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) > .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) > .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND") > .primaryKey("id_fascicolo", "fase") > .build()) > .option(KafkaConnectorOptions.TOPIC, > List.of("sicid.processor.phases-durations")) > .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) > .option(KafkaConnectorOptions.KEY_FORMAT, "json") > .option(KafkaConnectorOptions.VALUE_FORMAT, "json") > .build(); > tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor); > Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”); > Here’s the second table: > final TableDescriptor averageJudgeByPhaseReportTableDescriptor = > TableDescriptor.forConnector("upsert-kafka") > .schema(Schema.newBuilder() > .column("giudice", DataTypes.STRING().notNull()) > .column("fase", DataTypes.STRING().notNull()) > .column("media_mobile", DataTypes.BIGINT()) > .primaryKey("giudice", "fase") > .build()) > .option(KafkaConnectorOptions.TOPIC, > List.of("sicid.processor.average-judge-by-phase-report")) > .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) > .option(KafkaConnectorOptions.KEY_FORMAT, "json") > .option(KafkaConnectorOptions.VALUE_FORMAT, "json") > .option(KafkaConnectorOptions.PROPS_GROUP_ID, > "average-judge-by-phase-report") > .build(); > tEnv.createTable("AverageJudgeByPhaseReport_Kafka", > averageJudgeByPhaseReportTableDescriptor); > Table averageJudgeByPhaseReportTable = > tEnv.from("AverageJudgeByPhaseReport_Kafka"); > > Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable > .select( > $("giudice").as("giudice_media"), > $("fase").as("fase_media"), > $("media_mobile") > ); > > And here’s the code I’m experimenting with: > phasesDurationsTable > .join(renamedAverageJudgeByPhaseReportTable) > .where($("giudice").isEqual($("giudice_media"))) > .window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w")) > .groupBy( > $("giudice"), > $("w") > ) > .select( > $("giudice") > ) > .execute().print(); > > Am I doing something wrong? > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit > von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine > Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser > Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung > per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. > Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist > streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have received > it in error, please advise the sender by return e-mail and delete this > message and any attachments. Any unauthorised use or dissemination of this > information is strictly prohibited.