Hej, I pass an instance of M in the constructor of the class, can I use that instead? Maybe give the class a function that returns the right TypeInformation? I'm trying figure out how TypeInformation works to better understand the Issue is there any documentation about this? At the moment I don't really understand what TypeInformation does and how it works.
cheers Martin On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > I think it doesn't work because the concrete type of M is not available to > create a TypeInformation for M. What you can do is manually pass a > TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use > that when creating the state descriptor. > > Cheers, > Aljoscha > > On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mneum...@sics.se> wrote: > >> Hey, >> >> I have a FlatMap that uses some generics (appended at the end of the >> mail). >> I have some trouble with the type inference running into >> InvalidTypesException on the first line in the open function. >> >> How can I fix it? >> >> Cheers Martin >> >> >> >> >> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> extends >> RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> { >> private transient ValueState<M> microModel; >> private final double threshold; >> private boolean updateIfAnomaly; >> private M initModel; >> >> public AnomalyFlatMap(double threshold, M model, boolean >> updateIfAnomaly) { >> this.threshold = threshold; >> this.updateIfAnomaly = updateIfAnomaly; >> this.initModel = model; >> >> } >> >> @Override >> public void open(Configuration parameters) throws Exception { >> ValueStateDescriptor<M> descriptor = >> new ValueStateDescriptor<>( >> "RollingMicroModel", >> TypeInformation.of(new TypeHint<M>() { >> }),initModel >> ); >> microModel = getRuntimeContext().getState(descriptor); >> } >> >> @Override >> public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> >> collector) throws Exception { >> M model = microModel.value(); >> Anomaly res = model.calculateAnomaly(sample.f0); >> >> if ( res.getScore() <= threshold || updateIfAnomaly){ >> model.addWindow(sample.f0); >> microModel.update(model); >> } >> collector.collect(new Tuple2<>(res,sample.f1)); >> } >> } >> >> >>