I was able to fix with the bellow modifications. Turns out flink can encode genericRecords to avro when the schema is provided (returns chained). Also the kryo stuff is not anymore needed then.
@@ -22,14 +22,12 @@ env.fromSource(source, WatermarkStrategy.noWatermarks(), "Generator Source"); - // this for being able to map the stream - Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); - env.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class); - stream.map(record -> { GenericRecord newRecord = new GenericData.Record(abSchema); newRecord.put("a", 1L); newRecord.put("b", "bar"); return newRecord; - }).sinkTo(sink); + }) + .returns(new GenericRecordAvroTypeInfo(abSchema)) + .sinkTo(sink); env.execute();