Hi everyone,
I'm using Flink for processing some streaming data. First of all I have two
tables receiving events from Kafka. These tables are joined and the resulting
table is converted to a DataStream where it is processed by a custom
KeyedProcessFunction. The output is then converted to a table and sent to
Opensearch. Here’s the code I’m using:
final TableDescriptor legalFilesTableDescriptor =
TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("nrg", DataTypes.STRING())
.column("ufficio", DataTypes.STRING())
.column("sezione", DataTypes.STRING())
.column("giudice", DataTypes.STRING())
.column("oggetto", DataTypes.STRING())
.column("codice_oggetto", DataTypes.STRING())
.build())
.option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.fascicoli"))
.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
.option(KafkaConnectorOptions.SCAN_STARTUP_MODE,
KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET)
.option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
.build();
tEnv.createTable("LegalFilesTable_Kafka", legalFilesTableDescriptor);
Table legalFilesTable = tEnv.from("LegalFilesTable_Kafka”);
final TableDescriptor eventsTableDescriptor =
TableDescriptor.forConnector("kafka")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("data", DataTypes.BIGINT())
.columnByExpression("data_evento", "TO_TIMESTAMP_LTZ(data, 3)")
.column("evento", DataTypes.STRING())
.column("id_fascicolo", DataTypes.BIGINT())
.build())
.option(KafkaConnectorOptions.TOPIC, List.of("sicid.public.eventi"))
.option(KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS, KAFKA_HOST)
.option(KafkaConnectorOptions.SCAN_STARTUP_MODE,
KafkaConnectorOptions.ScanStartupMode.LATEST_OFFSET)
.option(KafkaConnectorOptions.VALUE_FORMAT, "debezium-json")
.build();
tEnv.createTable("EventsTable_Kafka", eventsTableDescriptor);
Table legalFileEventsTable = legalFilesTable.join(eventsTable)
.where($("id").isEqual($("id_fascicolo")))
.select(
$("id").as("id_fascicolo"),
$("id_evento"),
$("giudice"),
$("nrg"),
$("codice_oggetto"),
$("oggetto"),
$("ufficio"),
$("sezione"),
$("data_evento"),
$("evento")
);
DataStream<Row> phasesDurationsDataStream =
tEnv.toChangelogStream(legalFileEventsTable)
.keyBy(r -> r.<Long>getFieldAs("id_fascicolo"))
.process(new PhaseDurationCounterProcessFunction())
.returns(new RowTypeInfo(
new TypeInformation[] {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.INSTANT_TYPE_INFO,
BasicTypeInfo.INSTANT_TYPE_INFO
},
new String[] { "id_fascicolo", "nrg", "giudice", "oggetto",
"codice_oggetto",
"ufficio", "sezione", "fase", "fase_completata", "durata" ,
"data_inizio", "data_fine" }
));
final Schema phasesDurationsSchema = Schema.newBuilder()
.column("id_fascicolo", DataTypes.BIGINT().notNull())
.column("nrg", DataTypes.STRING())
.column("giudice", DataTypes.STRING())
.column("oggetto", DataTypes.STRING())
.column("codice_oggetto", DataTypes.STRING())
.column("ufficio", DataTypes.STRING())
.column("sezione", DataTypes.STRING())
.column("fase", DataTypes.STRING().notNull())
.column("fase_completata", DataTypes.BOOLEAN())
.column("durata", DataTypes.BIGINT())
.column("data_inizio", DataTypes.TIMESTAMP_LTZ(3))
.column("data_fine", DataTypes.TIMESTAMP_LTZ(3))
.primaryKey("id_fascicolo", "fase")
.build();
Table phasesDurationsTable =
tEnv.fromChangelogStream(phasesDurationsDataStream, phasesDurationsSchema,
ChangelogMode.upsert());
final TableDescriptor phasesDurationsOS =
TableDescriptor.forConnector("opensearch")
.schema(phasesDurationsSchema)
.option(OpensearchConnectorOptions.HOSTS_OPTION,
List.of(OPENSEARCH_HOST))
.option(OpensearchConnectorOptions.INDEX_OPTION,
"legal_files_phase_duration")
.option(OpensearchConnectorOptions.USERNAME_OPTION, "admin")
.option(OpensearchConnectorOptions.PASSWORD_OPTION, "admin")
.option(OpensearchConnectorOptions.ALLOW_INSECURE, true)
.build();
tEnv.createTable("PhasesDurationsOS", phasesDurationsOS);
After that I filter the phasesDurationTable like this:
Table filteredPhasesDurationsTable = phasesDurationsTable
.where($("fase_completata").isTrue())
.select(
$("id_fascicolo"),
$("nrg"),
$("giudice"),
$("oggetto"),
$("codice_oggetto"),
$("ufficio"),
$("sezione"),
$("fase"),
$("durata"),
$("data_inizio"),
$("data_fine")
);
With the filteredPhasesDurationsTable I need to calculate some averages with a
sliding window. So I need to define a watermark on the data_fine parameter. Is
there a way to do this?