Hi, you're right there is not much (very little) in the documentation about TypeInformation. There is only the description in the JavaDoc: TypeInformation <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/api/common/typeinfo/TypeInformation.html> Essentially, how it works is that we always use a TypeSerializer<T> when we have to serialize values (for sending across network, storing in state, etc.). A TypeSerializer<T> is created from a TypeInformation<T> and TypeInformation can be obtained in several ways:
- the TypeExtractor tries to analyze user functions to determine a TypeInformation for the input and output type - the TypeExtractor can try and analyze a given Class<T> to determine a TypeInformation - the Scala API uses macros and implicit parameters to create TypeInformation - TypeHint can be created to retrieve a TypeInformation - a TypeInformation can be manually constructed tl;dr In your case you can try TypeInformation.of(initModel.class). If that doesn't work you can try and pass in a function that gives you a TypeInformation for your model type M. Cheers, Aljoscha On Thu, 21 Apr 2016 at 14:16 Martin Neumann <mneum...@sics.se> wrote: > Hej, > > I pass an instance of M in the constructor of the class, can I use that > instead? Maybe give the class a function that returns the right > TypeInformation? I'm trying figure out how TypeInformation works to better > understand the Issue is there any documentation about this? At the moment I > don't really understand what TypeInformation does and how it works. > > cheers Martin > > On Thu, Apr 21, 2016 at 2:08 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> I think it doesn't work because the concrete type of M is not available >> to create a TypeInformation for M. What you can do is manually pass a >> TypeInformation<M> or a TypeSerializer<M> to the AnomalyFlatMap and use >> that when creating the state descriptor. >> >> Cheers, >> Aljoscha >> >> On Thu, 21 Apr 2016 at 13:45 Martin Neumann <mneum...@sics.se> wrote: >> >>> Hey, >>> >>> I have a FlatMap that uses some generics (appended at the end of the >>> mail). >>> I have some trouble with the type inference running into >>> InvalidTypesException on the first line in the open function. >>> >>> How can I fix it? >>> >>> Cheers Martin >>> >>> >>> >>> >>> public class AnomalyFlatMap<M extends Model,V extends ModelValue, T> >>> extends RichFlatMapFunction<Tuple2<V, T>, Tuple2<Anomaly,T>> { >>> private transient ValueState<M> microModel; >>> private final double threshold; >>> private boolean updateIfAnomaly; >>> private M initModel; >>> >>> public AnomalyFlatMap(double threshold, M model, boolean >>> updateIfAnomaly) { >>> this.threshold = threshold; >>> this.updateIfAnomaly = updateIfAnomaly; >>> this.initModel = model; >>> >>> } >>> >>> @Override >>> public void open(Configuration parameters) throws Exception { >>> ValueStateDescriptor<M> descriptor = >>> new ValueStateDescriptor<>( >>> "RollingMicroModel", >>> TypeInformation.of(new TypeHint<M>() { >>> }),initModel >>> ); >>> microModel = getRuntimeContext().getState(descriptor); >>> } >>> >>> @Override >>> public void flatMap(Tuple2<V, T> sample, Collector<Tuple2<Anomaly, T>> >>> collector) throws Exception { >>> M model = microModel.value(); >>> Anomaly res = model.calculateAnomaly(sample.f0); >>> >>> if ( res.getScore() <= threshold || updateIfAnomaly){ >>> model.addWindow(sample.f0); >>> microModel.update(model); >>> } >>> collector.collect(new Tuple2<>(res,sample.f1)); >>> } >>> } >>> >>> >>> >