Re: Serialization problem for Guava's TreeMultimap

2016-09-20 Thread Yukun Guo
Thank you for quickly fixing it! On 20 September 2016 at 17:17, Fabian Hueske wrote: > Hi Yukun, > > I debugged this issue and found that this is a bug in the serialization of > the StateDescriptor. > I have created FLINK-4640 [1] to resolve the issue. > > Thanks for reporting the issue. > > Be

Re: Serialization problem for Guava's TreeMultimap

2016-09-20 Thread Fabian Hueske
Hi Yukun, I debugged this issue and found that this is a bug in the serialization of the StateDescriptor. I have created FLINK-4640 [1] to resolve the issue. Thanks for reporting the issue. Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-4640 2016-09-20 10:35 GMT+02:00 Yukun Guo :

Re: Serialization problem for Guava's TreeMultimap

2016-09-20 Thread Yukun Guo
Some detail: if running the FoldFunction on a KeyedStream, everything works fine. So it must relate to the way WindowedStream handles type extraction. In case any Flink experts would like to reproduce it, I have created a repo on Github: github.com/gyk/flink-multimap On 20 September 2016 at 10:33

Re: Serialization problem for Guava's TreeMultimap

2016-09-19 Thread Yukun Guo
Hi, The same error occurs after changing the code, unfortunately. BTW, registerTypeWithKryoSerializer requires the 2nd argument to be a `T serializer` where T extends Serializer & Serializable, so I pass a custom GenericJavaSerializer, but I guess this doesn't matter much. On 19 September 2016

Re: Serialization problem for Guava's TreeMultimap

2016-09-19 Thread Stephan Ewen
Hi! Can you use "env.getConfig().registerTypeWithKryoSerializer(TreeMultimap.class, JavaSerializer.class)" ? Best, Stephan On Sun, Sep 18, 2016 at 12:53 PM, Yukun Guo wrote: > Here is the code snippet: > > windowedStream.fold(TreeMultimap.create(), new > FoldFunction, TreeMultimap>() { >

Serialization problem for Guava's TreeMultimap

2016-09-18 Thread Yukun Guo
Here is the code snippet: windowedStream.fold(TreeMultimap.create(), new FoldFunction, TreeMultimap>() { @Override public TreeMultimap fold(TreeMultimap topKSoFar, Tuple2 itemCount) throws Exception { String item = itemCount.f0; Lo