Hi Matthias,

No the second table doesn’t have an event time and a watermark specified. In 
order for the window to work do I need a watermark also on the second table?

Thanks
Eugenio

> Il giorno 21 set 2023, alle ore 13:45, Schwalbe Matthias 
> <matthias.schwa...@viseca.ch> ha scritto:
> 
> Ciao Eugenio,
>  
> I might be mistaken, but did you specify the event time for the second table 
> like you did for the first table (watermark(….))?
> I am no so acquainted with table api (doing more straight data stream api 
> work), but I assume this join and windowing should be by event time.
>  
> What do you think?
>  
> Cari saluti
>  
> Thias
>  
>  
> From: Eugenio Marotti <ing.eugenio.maro...@gmail.com> 
> Sent: Thursday, September 21, 2023 8:56 AM
> To: user@flink.apache.org
> Subject: Window aggregation on two joined table 
>  
> Hi,
>  
> I’m trying to execute a window aggregation on two joined table from two Kafka 
> topics (upsert fashion), but I get no output. Here’s the code I’m using:
>  
> This is the first table from Kafka with an event time watermark on 
> ‘data_fine’ attribute:
>  
> final TableDescriptor phasesDurationsTableDescriptor = 
> TableDescriptor.forConnector("upsert-kafka")
>        .schema(Schema.newBuilder()
>              .column("id_fascicolo", DataTypes.BIGINT().notNull())
>              .column("nrg", DataTypes.STRING())
>              .column("giudice", DataTypes.STRING())
>              .column("oggetto", DataTypes.STRING())
>              .column("codice_oggetto", DataTypes.STRING())
>              .column("ufficio", DataTypes.STRING())
>              .column("sezione", DataTypes.STRING())
>              .column("fase_completata", DataTypes.BOOLEAN())
>              .column("fase", DataTypes.STRING().notNull())
>              .column("durata", DataTypes.BIGINT())
>              .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
>              .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
>              .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
>              .primaryKey("id_fascicolo", "fase")
>              .build())
>        .option(KafkaConnectorOptions.TOPIC, 
> List.of("sicid.processor.phases-durations"))
>        .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>        .option(KafkaConnectorOptions.KEY_FORMAT, "json")
>        .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
>        .build();
> tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
> Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);
> Here’s the second table:
> final TableDescriptor averageJudgeByPhaseReportTableDescriptor = 
> TableDescriptor.forConnector("upsert-kafka")
>        .schema(Schema.newBuilder()
>              .column("giudice", DataTypes.STRING().notNull())
>              .column("fase", DataTypes.STRING().notNull())
>              .column("media_mobile", DataTypes.BIGINT())
>              .primaryKey("giudice", "fase")
>              .build())
>        .option(KafkaConnectorOptions.TOPIC, 
> List.of("sicid.processor.average-judge-by-phase-report"))
>        .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
>        .option(KafkaConnectorOptions.KEY_FORMAT, "json")
>        .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
>        .option(KafkaConnectorOptions.PROPS_GROUP_ID, 
> "average-judge-by-phase-report")
>        .build();
> tEnv.createTable("AverageJudgeByPhaseReport_Kafka", 
> averageJudgeByPhaseReportTableDescriptor);
> Table averageJudgeByPhaseReportTable = 
> tEnv.from("AverageJudgeByPhaseReport_Kafka");
> 
> Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable
>        .select(
>              $("giudice").as("giudice_media"),
>              $("fase").as("fase_media"),
>              $("media_mobile")
>        );
>  
> And here’s the code I’m experimenting with:
> phasesDurationsTable
>        .join(renamedAverageJudgeByPhaseReportTable)
>        .where($("giudice").isEqual($("giudice_media")))
>        .window(Tumble.over(lit(30).days()).on($("data_inizio")).as("w"))
>        .groupBy(
>              $("giudice"),
>              $("w")
>        )
>        .select(
>              $("giudice")
>        )
>        .execute().print();
>  
> Am I doing something wrong?
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und 
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit 
> von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
> Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
> Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung 
> per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. 
> Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist 
> streng verboten.
> 
> This message is intended only for the named recipient and may contain 
> confidential or privileged information. As the confidentiality of email 
> communication cannot be guaranteed, we do not accept any responsibility for 
> the confidentiality and the intactness of this message. If you have received 
> it in error, please advise the sender by return e-mail and delete this 
> message and any attachments. Any unauthorised use or dissemination of this 
> information is strictly prohibited.

Reply via email to