Hi everyone,

I'm using Flink for processing some streaming data. First of all I have two 
tables receiving events from Kafka. These tables are joined and the resulting 
table is converted to a DataStream where it is processed by a custom 
KeyedProcessFunction. The output is then converted to a table and sent to 
Opensearch. Here’s the code I’m using:


final TableDescriptor legalFilesTableDescriptor = 
TableDescriptor.forConnector("kafka")
       .schema(Schema.newBuilder()
             .column("id", DataTypes.BIGINT())
             .column("nrg", DataTypes.STRING())
             .column("ufficio", DataTypes.STRING())
             .column("sezione", DataTypes.STRING())
             .column("giudice", DataTypes.STRING())
             .column("oggetto", DataTypes.STRING())
             .column("codice_oggetto", DataTypes.STRING())
             .build())
       .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli"))
       .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
       .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, 
KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET)
       .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
       .build();
tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor);
Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”);


final TableDescriptor eventsTableDescriptor = 
TableDescriptor.forConnector("kafka")
       .schema(Schema.newBuilder()
             .column("id", DataTypes.BIGINT())
             .column("data", DataTypes.BIGINT())
             .columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)")
             .column("evento", DataTypes.STRING())
             .column("id_fascicolo", DataTypes.BIGINT())
             .build())
       .option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi"))
       .option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
       .option(KafkaConnectorOptions.SCAN_STARTUP_MODE, 
KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET)
       .option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
       .build();
tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor);


Table legalFileEventsTable = legalFilesTable.join(eventsTable)
       .where($("id").isEqual($("id_fascicolo")))
       .select(
             $("id").as("id_fascicolo"),
             $("id_evento"),
             $("giudice"),
             $("nrg"),
             $("codice_oggetto"),
             $("oggetto"),
             $("ufficio"),
             $("sezione"),
             $("data_evento"),
             $("evento")
       );


DataStream<Row> phasesDurationsDataStream = 
tEnv.toChangelogStream(legalFileEventsTable)
       .keyBy(r -> r.<Long>getFieldAs("id_fascicolo"))
       .process(new PhaseDurationCounterProcessFunction())
       .returns(new RowTypeInfo(
             new TypeInformation[] {
                   BasicTypeInfo.LONG_TYPE_INFO,
                   BasicTypeInfo.STRING_TYPE_INFO,
                   BasicTypeInfo.STRING_TYPE_INFO,
                   BasicTypeInfo.STRING_TYPE_INFO,
                   BasicTypeInfo.STRING_TYPE_INFO,
                   BasicTypeInfo.STRING_TYPE_INFO,
                   BasicTypeInfo.STRING_TYPE_INFO,
                   BasicTypeInfo.STRING_TYPE_INFO,
                   BasicTypeInfo.BOOLEAN_TYPE_INFO,
                   BasicTypeInfo.LONG_TYPE_INFO,
                   BasicTypeInfo.INSTANT_TYPE_INFO,
                   BasicTypeInfo.INSTANT_TYPE_INFO
             },
             new String[] { "id_fascicolo", "nrg", "giudice", "oggetto", 
"codice_oggetto",
                   "ufficio", "sezione", "fase", "fase_completata", "durata" , 
"data_inizio", "data_fine" }
       ));

final Schema phasesDurationsSchema = Schema.newBuilder()
       .column("id_fascicolo", DataTypes.BIGINT().notNull())
       .column("nrg", DataTypes.STRING())
       .column("giudice", DataTypes.STRING())
       .column("oggetto", DataTypes.STRING())
       .column("codice_oggetto", DataTypes.STRING())
       .column("ufficio", DataTypes.STRING())
       .column("sezione", DataTypes.STRING())
       .column("fase", DataTypes.STRING().notNull())
       .column("fase_completata", DataTypes.BOOLEAN())
       .column("durata", DataTypes.BIGINT())
       .column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
       .column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
       .primaryKey("id_fascicolo", "fase")
       .build();

Table phasesDurationsTable = 
tEnv.fromChangelogStream(phasesDurationsDataStream, phasesDurationsSchema,
       ChangelogMode.upsert());

final TableDescriptor phasesDurationsOS = 
TableDescriptor.forConnector("opensearch")
       .schema(phasesDurationsSchema)
       .option(OpensearchConnectorOptions.HOSTS_OPTION, 
List.of(OPENSEARCH_HOST))
       .option(OpensearchConnectorOptions.INDEX_OPTION, 
"legal_files_phase_duration")
       .option(OpensearchConnectorOptions.USERNAME_OPTION, "admin")
       .option(OpensearchConnectorOptions.PASSWORD_OPTION, "admin")
       .option(OpensearchConnectorOptions.ALLOW_INSECURE, true)
       .build();
tEnv.createTable("PhasesDurationsOS", phasesDurationsOS);


After that I filter the phasesDurationTable like this:

Table filteredPhasesDurationsTable = phasesDurationsTable
       .where($("fase_completata").isTrue())
       .select(
             $("id_fascicolo"),
             $("nrg"),
             $("giudice"),
             $("oggetto"),
             $("codice_oggetto"),
             $("ufficio"),
             $("sezione"),
             $("fase"),
             $("durata"),
             $("data_inizio"),
             $("data_fine")
       );

With the filteredPhasesDurationsTable I need to calculate some averages with a 
sliding window. So I need to define a watermark on the data_fine parameter. Is 
there a way to do this?

Reply via email to