I'm implementing a data analysis pipeline in Flink and I have a problem converting a DataStream to a Table. I have this table defined from a join between two Kafka sources:
Table legalFileEventsTable = legalFilesTable.join(eventsTable) .where($("id").isEqual($("id_fascicolo"))) .select( $("id").as("id_fascicolo"), $("id_evento"), $("giudice"), $("nrg"), $("codice_oggetto"), $("ufficio"), $("sezione"), $("data_evento"), $("evento"), $("data_registrazione_evento") ); Then I convert the joined table to a DataStream to apply some computation on the data. Here's the code I'm using: DataStream<Row> phasesDurationsDataStream = tEnv.toChangelogStream(legalFileEventsTable) .keyBy(r -> r.<Long>getFieldAs("id_fascicolo")) .process(new PhaseDurationCounterProcessFunction()); phasesDurationsDataStream.print(); The PhaseDurationCounterProcessFunction emits a Row like this: Row outputRow = Row.withNames(RowKind.INSERT); outputRow.setField("id_fascicolo", currentState.getId_fascicolo()); outputRow.setField("nrg", currentState.getNrg()); outputRow.setField("giudice", currentState.getGiudice()); outputRow.setField("codice_oggetto", currentState.getCodice_oggetto()); outputRow.setField("ufficio", currentState.getUfficio()); outputRow.setField("sezione", currentState.getSezione()); outputRow.setField("fase", currentState.getPhase()); outputRow.setField("fase_completata", false); outputRow.setField("durata", currentState.getDurationCounter()); out.collect(outputRow); After collecting the results from the process function I reconvert the DataStream to a Table and execute the pipeline: Table phasesDurationsTable = tEnv.fromChangelogStream( phasesDurationsDataStream, Schema.newBuilder() .column("id_fascicolo", DataTypes.BIGINT()) .column("nrg", DataTypes.STRING()) .column("giudice", DataTypes.STRING()) .column("codice_oggetto", DataTypes.STRING()) .column("ufficio", DataTypes.STRING()) .column("sezione", DataTypes.STRING()) .column("fase", DataTypes.STRING()) .column("fase_completata", DataTypes.BOOLEAN()) .column("durata", DataTypes.BIGINT()) .primaryKey("id_fascicolo", "fase") .build(), ChangelogMode.upsert() ); env.execute(); But during the startup I receive this exception: Unable to find a field named 'id_fascicolo' in the physical data type derived from the given type information for schema declaration. Make sure that the type information is not a generic raw type. Currently available fields are: [f0] It seems that the row information (name and type) aren't available yet and so the exception is generated. I tried to invoke the env.execute() before the DataStream->Table conversion and in this case the job starts but I have no output if I print the phasesDurationsTable. Any suggestions on how to make this work? Eugenio