Hi!

Can you use
"env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class,
JavaSerializer.class)"
?

Best,
Stephan


On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk....@gmail.com> wrote:

> Here is the code snippet:
>
> windowedStream.fold(TreeMultimap.<Long, String>create(), new 
> FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>     @Override
>     public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> 
> topKSoFar,
>                                            Tuple2<String, Long> itemCount) 
> throws Exception {
>         String item = itemCount.f0;
>         Long count = itemCount.f1;
>         topKSoFar.put(count, item);
>         if (topKSoFar.keySet().size() > topK) {
>             topKSoFar.removeAll(topKSoFar.keySet().first());
>         }
>         return topKSoFar;
>     }
> });
>
>
> The problem is when fold function getting called, the initial value has
> lost therefore it encounters a NullPointerException. This is due to failed
> type extraction and serialization, as shown in the log message:
> "INFO  TypeExtractor:1685 - No fields detected for class
> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
> Will be handled as GenericType."
>
> I have tried the following two ways to fix it but neither of them worked:
>
> 1. Writing a class TreeMultimapSerializer which extends Kryo's
> Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class,
> new TreeMultimapSerializer()`. The write/read methods are almost
> line-by-line translations from TreeMultimap's own implementation.
>
> 2. TreeMultimap has implemented Serializable interface so Kryo can fall
> back to use the standard Java serialization. Since Kryo's JavaSerializer
> itself is not serializable, I wrote an adapter to make it fit the
> "addDefaultKryoSerializer" API.
>
> Could you please give me some working examples for custom Kryo
> serialization in Flink?
>
>
> Best regards,
> Yukun
>
>

Reply via email to