Hi! Can you use "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ?
Best, Stephan On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk....@gmail.com> wrote: > Here is the code snippet: > > windowedStream.fold(TreeMultimap.<Long, String>create(), new > FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() { > @Override > public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> > topKSoFar, > Tuple2<String, Long> itemCount) > throws Exception { > String item = itemCount.f0; > Long count = itemCount.f1; > topKSoFar.put(count, item); > if (topKSoFar.keySet().size() > topK) { > topKSoFar.removeAll(topKSoFar.keySet().first()); > } > return topKSoFar; > } > }); > > > The problem is when fold function getting called, the initial value has > lost therefore it encounters a NullPointerException. This is due to failed > type extraction and serialization, as shown in the log message: > "INFO TypeExtractor:1685 - No fields detected for class > com.google.common.collect.TreeMultimap. Cannot be used as a PojoType. > Will be handled as GenericType." > > I have tried the following two ways to fix it but neither of them worked: > > 1. Writing a class TreeMultimapSerializer which extends Kryo's > Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class, > new TreeMultimapSerializer()`. The write/read methods are almost > line-by-line translations from TreeMultimap's own implementation. > > 2. TreeMultimap has implemented Serializable interface so Kryo can fall > back to use the standard Java serialization. Since Kryo's JavaSerializer > itself is not serializable, I wrote an adapter to make it fit the > "addDefaultKryoSerializer" API. > > Could you please give me some working examples for custom Kryo > serialization in Flink? > > > Best regards, > Yukun > >