Hi, The same error occurs after changing the code, unfortunately.
BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T serializer` where T extends Serializer<?> & Serializable, so I pass a custom GenericJavaSerializer<T>, but I guess this doesn't matter much. On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote: > Hi! > > Can you use "env.getConfig().registerTypeWithKryoSerializer > (TreeMultimap.class, JavaSerializer.class)" ? > > Best, > Stephan > > > On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk....@gmail.com> wrote: > >> Here is the code snippet: >> >> windowedStream.fold(TreeMultimap.<Long, String>create(), new >> FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() { >> @Override >> public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> >> topKSoFar, >> Tuple2<String, Long> itemCount) >> throws Exception { >> String item = itemCount.f0; >> Long count = itemCount.f1; >> topKSoFar.put(count, item); >> if (topKSoFar.keySet().size() > topK) { >> topKSoFar.removeAll(topKSoFar.keySet().first()); >> } >> return topKSoFar; >> } >> }); >> >> >> The problem is when fold function getting called, the initial value has >> lost therefore it encounters a NullPointerException. This is due to failed >> type extraction and serialization, as shown in the log message: >> "INFO TypeExtractor:1685 - No fields detected for class >> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. >> Will be handled as GenericType." >> >> I have tried the following two ways to fix it but neither of them worked: >> >> 1. Writing a class TreeMultimapSerializer which extends Kryo's >> Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class, >> new TreeMultimapSerializer()`. The write/read methods are almost >> line-by-line translations from TreeMultimap's own implementation. >> >> 2. TreeMultimap has implemented Serializable interface so Kryo can fall >> back to use the standard Java serialization. Since Kryo's JavaSerializer >> itself is not serializable, I wrote an adapter to make it fit the >> "addDefaultKryoSerializer" API. >> >> Could you please give me some working examples for custom Kryo >> serialization in Flink? >> >> >> Best regards, >> Yukun >> >> >