Hi,

The same error occurs after changing the code, unfortunately.

BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T
serializer` where T extends Serializer<?> & Serializable, so I pass a
custom GenericJavaSerializer<T>, but I guess this doesn't matter much.


On 19 September 2016 at 18:02, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Can you use "env.getConfig().registerTypeWithKryoSerializer
> (TreeMultimap.class, JavaSerializer.class)" ?
>
> Best,
> Stephan
>
>
> On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo <gyk....@gmail.com> wrote:
>
>> Here is the code snippet:
>>
>> windowedStream.fold(TreeMultimap.<Long, String>create(), new 
>> FoldFunction<Tuple2<String, Long>, TreeMultimap<Long, String>>() {
>>     @Override
>>     public TreeMultimap<Long, String> fold(TreeMultimap<Long, String> 
>> topKSoFar,
>>                                            Tuple2<String, Long> itemCount) 
>> throws Exception {
>>         String item = itemCount.f0;
>>         Long count = itemCount.f1;
>>         topKSoFar.put(count, item);
>>         if (topKSoFar.keySet().size() > topK) {
>>             topKSoFar.removeAll(topKSoFar.keySet().first());
>>         }
>>         return topKSoFar;
>>     }
>> });
>>
>>
>> The problem is when fold function getting called, the initial value has
>> lost therefore it encounters a NullPointerException. This is due to failed
>> type extraction and serialization, as shown in the log message:
>> "INFO  TypeExtractor:1685 - No fields detected for class
>> com.google.common.collect.TreeMultimap. Cannot be used as a PojoType.
>> Will be handled as GenericType."
>>
>> I have tried the following two ways to fix it but neither of them worked:
>>
>> 1. Writing a class TreeMultimapSerializer which extends Kryo's
>> Serializer<T>, and calling `env.addDefaultKryoSerializer(TreeMultimap.class,
>> new TreeMultimapSerializer()`. The write/read methods are almost
>> line-by-line translations from TreeMultimap's own implementation.
>>
>> 2. TreeMultimap has implemented Serializable interface so Kryo can fall
>> back to use the standard Java serialization. Since Kryo's JavaSerializer
>> itself is not serializable, I wrote an adapter to make it fit the
>> "addDefaultKryoSerializer" API.
>>
>> Could you please give me some working examples for custom Kryo
>> serialization in Flink?
>>
>>
>> Best regards,
>> Yukun
>>
>>
>

Reply via email to