… well yes and no:

  *   If the second table is a small table used for enrichment, you can also 
mark it as broadcast table, but I don’t know how to do that on table API
  *   If the second table has significant data and significant update, the you 
need to configure watermarking/event time semantics on the second table as well
  *   The logic is this:
     *   Your join operator only generates output windows once the event time 
passes by the end of the time window
     *   The event time/watermark time of you join operator is the minimum 
watermark time of all inputs
     *   Because your second table does not emit watermark, it’s watermark time 
 remains at Long.MinValue, hence also the operator time stays there
  *   Another way to make progress is, in case your second table does not 
update watermarks/data often enough, to mark the source with an idle watermark 
generator in which case it is rendered as ‘timeless’ and does not prevent time 
progress in your join operator
     *   Again, not sure how to configure this

Ancora cari saluti


From: Eugenio Marotti <ing.eugenio.maro...@gmail.com>
Sent: Thursday, September 21, 2023 2:35 PM
To: Schwalbe Matthias <matthias.schwa...@viseca.ch>
Cc: user@flink.apache.org
Subject: Re: Window aggregation on two joined table

Hi Matthias,

No the second table doesn’t have an event time and a watermark specified. In 
order for the window to work do I need a watermark also on the second table?


Il giorno 21 set 2023, alle ore 13:45, Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> ha scritto:

Ciao Eugenio,

I might be mistaken, but did you specify the event time for the second table 
like you did for the first table (watermark(….))?
I am no so acquainted with table api (doing more straight data stream api 
work), but I assume this join and windowing should be by event time.

What do you think?

Cari saluti


From: Eugenio Marotti 
Sent: Thursday, September 21, 2023 8:56 AM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Window aggregation on two joined table


I’m trying to execute a window aggregation on two joined table from two Kafka 
topics (upsert fashion), but I get no output. Here’s the code I’m using:

This is the first table from Kafka with an event time watermark on ‘data_fine’ 

final TableDescriptor phasesDurationsTableDescriptor = 
             .column("id_fascicolo", DataTypes.BIGINT().notNull())
             .column("nrg", DataTypes.STRING())
             .column("giudice", DataTypes.STRING())
             .column("oggetto", DataTypes.STRING())
             .column("codice_oggetto", DataTypes.STRING())
             .column("ufficio", DataTypes.STRING())
             .column("sezione", DataTypes.STRING())
             .column("fase_completata", DataTypes.BOOLEAN())
             .column("fase", DataTypes.STRING().notNull())
             .column("durata", DataTypes.BIGINT())
             .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
             .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
             .watermark("data_inizio", "data_inizio - INTERVAL '1' SECOND")
             .primaryKey("id_fascicolo", "fase")
       .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
       .option(KafkaConnectorOptions.KEY_FORMAT, "json")
       .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
tEnv.createTable("PhasesDurations_Kafka", phasesDurationsTableDescriptor);
Table phasesDurationsTable = tEnv.from("PhasesDurations_Kafka”);

Here’s the second table:

final TableDescriptor averageJudgeByPhaseReportTableDescriptor = 
             .column("giudice", DataTypes.STRING().notNull())
             .column("fase", DataTypes.STRING().notNull())
             .column("media_mobile", DataTypes.BIGINT())
             .primaryKey("giudice", "fase")
       .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
       .option(KafkaConnectorOptions.KEY_FORMAT, "json")
       .option(KafkaConnectorOptions.VALUE_FORMAT, "json")
Table averageJudgeByPhaseReportTable = 

Table renamedAverageJudgeByPhaseReportTable = averageJudgeByPhaseReportTable

And here’s the code I’m experimenting with:


Am I doing something wrong?
