Hi everyone, I'm using Flink for processing some streaming data. First of all I have two tables receiving events from Kafka. These tables are joined and the resulting table is converted to a DataStream where it is processed by a custom KeyedProcessFunction. The output is then converted to a table and sent to Opensearch. Here’s the code I’m using:
final TableDescriptor legalFilesTableDescriptor = TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("id", DataTypes.BIGINT()) .column("nrg", DataTypes.STRING()) .column("ufficio", DataTypes.STRING()) .column("sezione", DataTypes.STRING()) .column("giudice", DataTypes.STRING()) .column("oggetto", DataTypes.STRING()) .column("codice_oggetto", DataTypes.STRING()) .build()) .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli")) .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET) .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") .build(); tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor); Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”); final TableDescriptor eventsTableDescriptor = TableDescriptor.forConnector("kafka") .schema(Schema.newBuilder() .column("id", DataTypes.BIGINT()) .column("data", DataTypes.BIGINT()) .columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)") .column("evento", DataTypes.STRING()) .column("id_fascicolo", DataTypes.BIGINT()) .build()) .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi")) .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST) .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET) .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json") .build(); tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor); Table legalFileEventsTable = legalFilesTable.join(eventsTable) .where($("id").isEqual($("id_fascicolo"))) .select( $("id").as("id_fascicolo"), $("id_evento"), $("giudice"), $("nrg"), $("codice_oggetto"), $("oggetto"), $("ufficio"), $("sezione"), $("data_evento"), $("evento") ); DataStream<Row> phasesDurationsDataStream = tEnv.toChangelogStream(legalFileEventsTable) .keyBy(r -> r.<Long>getFieldAs("id_fascicolo")) .process(new PhaseDurationCounterProcessFunction()) .returns(new RowTypeInfo( new TypeInformation[] { BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INSTANT_TYPE_INFO, BasicTypeInfo.INSTANT_TYPE_INFO }, new String[] { "id_fascicolo", "nrg", "giudice", "oggetto", "codice_oggetto", "ufficio", "sezione", "fase", "fase_completata", "durata" , "data_inizio", "data_fine" } )); final Schema phasesDurationsSchema = Schema.newBuilder() .column("id_fascicolo", DataTypes.BIGINT().notNull()) .column("nrg", DataTypes.STRING()) .column("giudice", DataTypes.STRING()) .column("oggetto", DataTypes.STRING()) .column("codice_oggetto", DataTypes.STRING()) .column("ufficio", DataTypes.STRING()) .column("sezione", DataTypes.STRING()) .column("fase", DataTypes.STRING().notNull()) .column("fase_completata", DataTypes.BOOLEAN()) .column("durata", DataTypes.BIGINT()) .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3)) .column("data_fine", DataTypes.TIMESTAMP_LTZ(3)) .primaryKey("id_fascicolo", "fase") .build(); Table phasesDurationsTable = tEnv.fromChangelogStream(phasesDurationsDataStream, phasesDurationsSchema, ChangelogMode.upsert()); final TableDescriptor phasesDurationsOS = TableDescriptor.forConnector("opensearch") .schema(phasesDurationsSchema) .option(OpensearchConnectorOptions.HOSTS_OPTION, List.of(OPENSEARCH_HOST)) .option(OpensearchConnectorOptions.INDEX_OPTION, "legal_files_phase_duration") .option(OpensearchConnectorOptions.USERNAME_OPTION, "admin") .option(OpensearchConnectorOptions.PASSWORD_OPTION, "admin") .option(OpensearchConnectorOptions.ALLOW_INSECURE, true) .build(); tEnv.createTable("PhasesDurationsOS", phasesDurationsOS); After that I filter the phasesDurationTable like this: Table filteredPhasesDurationsTable = phasesDurationsTable .where($("fase_completata").isTrue()) .select( $("id_fascicolo"), $("nrg"), $("giudice"), $("oggetto"), $("codice_oggetto"), $("ufficio"), $("sezione"), $("fase"), $("durata"), $("data_inizio"), $("data_fine") ); With the filteredPhasesDurationsTable I need to calculate some averages with a sliding window. So I need to define a watermark on the data_fine parameter. Is there a way to do this?