I was able to fix with the bellow modifications. Turns out flink can
encode genericRecords to avro when the schema is provided (returns
chained). Also the kryo stuff is not anymore needed then.


@@ -22,14 +22,12 @@
         env.fromSource(source,
             WatermarkStrategy.noWatermarks(),
             "Generator Source");
-    // this for being able to map the stream
-    Class<?> unmodColl =
Class.forName("java.util.Collections$UnmodifiableCollection");
-    env.getConfig().addDefaultKryoSerializer(unmodColl,
UnmodifiableCollectionsSerializer.class);
-
      stream.map(record -> {
       GenericRecord newRecord = new GenericData.Record(abSchema);
       newRecord.put("a", 1L);
       newRecord.put("b", "bar");
       return newRecord;
-    }).sinkTo(sink);
+    })
+         .returns(new GenericRecordAvroTypeInfo(abSchema))
+         .sinkTo(sink);
     env.execute();

Reply via email to