Hi, you can do it using the register* methods on StreamExecutionEnvironment. So, for example:
// set up the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.registerType(InputType.class); env.registerType(MicroModel.class); If you want to have custom Kryo Serializers for those types you can also do: env.registerTypeWithKryoSerializer(InputType.class, MyInputTypeSerializer.class); I hope this gets you on the right track. :D Cheers, Aljoscha > On 11 Nov 2015, at 21:14, Martin Neumann <mneum...@sics.se> wrote: > > Thanks for the help. > > TypeExtractor.getForObject(modelMapInit) did the job. Its possible that its > an IDE problem that .getClass() did not work. Intellij is a bit fiddly with > those things. > > 1) Making null the default value and initializing manually is probably more >> efficient, because otherwise the empty map would have to be cloned each >> time the default value is returned, which adds avoidable overhead. > > > What do you mean by initialize manually? Can I do that direct in the open > function or are we talking about checking for null in the FlatMap and > initializing there? In general the program is supposed to constantly run > once deployed, so I can get away with a little slower setup. > > 2) The HashMap type will most likely go through Kryo, so for efficiency, >> make sure you register the types "InputType" and "MicroModel" on the >> execution environment. >> Here you need to do that manually, because they are type erased and >> Flink cannot auto-register them. > > > Can you point me to an example on how to do this? > > cheers Martin > > > On Wed, Nov 11, 2015 at 4:52 PM, Stephan Ewen <se...@apache.org> wrote: > >> It should suffice to do something like >> >> "getRuntimeContext().getKeyValueState("microModelMap", new >> HashMap<InputType,MicroModel>().getClass(), null);" >> >> Two more comments: >> >> 1) Making null the default value and initializing manually is probably more >> efficient, because otherwise the empty map would have to be cloned each >> time the default value is returned, which adds avoidable overhead. >> >> 2) The HashMap type will most likely go through Kryo, so for efficiency, >> make sure you register the types "InputType" and "MicroModel" on the >> execution environment. >> Here you need to do that manually, because they are type erased and >> Flink cannot auto-register them. >> >> Greetings, >> Stephan >> >> >> >> On Wed, Nov 11, 2015 at 4:32 PM, Gyula Fóra <gyula.f...@gmail.com> wrote: >> >>> Hey, >>> >>> Yes what you wrote should work. You can alternatively use >>> TypeExtractor.getForObject(modelMapInit) to extract the tye information. >>> >>> I also like to implement my custom type info for Hashmaps and the other >>> types and use that. >>> >>> Cheers, >>> Gyula >>> >>> Martin Neumann <mneum...@sics.se> ezt írta (időpont: 2015. nov. 11., >> Sze, >>> 16:30): >>> >>>> Hej, >>>> >>>> What is the correct way of initializing a state-full operator that is >>> using >>>> a hashmap? modelMapInit.getClass() does not work neither does >>>> HashMap.class. Do I have to implement my own TypeInformation class or >> is >>>> there a simpler way? >>>> >>>> cheers Martin >>>> >>>> private OperatorState<HashMap<InputType,MicroModel>> microModelMap; >>>> >>>> @Override >>>> public void open(Configuration parameters) throws Exception { >>>> HashMap<InputType,MicroModel> modelMapInit = new HashMap<>(); >>>> this.microModelMap = >>>> getRuntimeContext().getKeyValueState("microModelMap", >>>> modelMapInit.getClass() , modelMapInit); >>>> } >>>> >>> >>