Please use "new ValueStateDescriptor<>("mystate", TypeInformation.of(new TypeHint <Tuple2<Integer,ObjectNode>>(){}));
That should work... On Mon, May 8, 2017 at 1:11 PM, Chesnay Schepler <ches...@apache.org> wrote: > If you want to use generics you have to either provide a TypeInformation > instead of a class or create a class that extends Tuple2(Integer, > ObjectNode) and use it as the class argument. > > > On 07.05.2017 15:14, yunfan123 wrote: > >> My process function is like : >> >> private static class MergeFunction extends >> RichProcessFunction<Tuple2<Integer, ObjectNode>, Tuple2<Integer, >> ObjectNode>> { >> >> private ValueState<Tuple2<Integer, ObjectNode>> state; >> >> @Override >> @SuppressWarnings("unchecked") >> public void open(Configuration parameters) throws Exception { >> state = getRuntimeContext().getState(new >> ValueStateDescriptor<>("mystate", >> (Class<Tuple2<Integer,ObjectNode>>) >> (Object)Tuple2.class)); >> } >> } >> >> >> When I running the code: >> 05/07/2017 21:17:47 Process -> (Sink: Unnamed, Sink: Unnamed)(1/1) >> switched >> to FAILED >> java.lang.RuntimeException: Cannot create full type information based on >> the >> given class. If the type has generics, please >> at >> org.apache.flink.api.common.state.StateDescriptor.<init>(Sta >> teDescriptor.java:124) >> at >> org.apache.flink.api.common.state.ValueStateDescriptor.<init >> >(ValueStateDescriptor.java:101) >> at >> com.bytedance.flinkjob.activationSource.AppActivationSource$ >> MergeFunction.open(AppActivationSource.java:134) >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.ope >> nFunction(FunctionUtils.java:36) >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOp >> erator.open(AbstractUdfStreamOperator.java:112) >> at >> org.apache.flink.streaming.api.operators.ProcessOperator.ope >> n(ProcessOperator.java:55) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO >> perators(StreamTask.java:375) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:251) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665) >> at java.lang.Thread.run(Thread.java:745) >> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: >> Tuple needs to be parameterized by using generics. >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.createType >> InfoWithTypeHierarchy(TypeExtractor.java:673) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.privateCre >> ateTypeInfo(TypeExtractor.java:607) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.createType >> Info(TypeExtractor.java:561) >> at >> org.apache.flink.api.java.typeutils.TypeExtractor.createType >> Info(TypeExtractor.java:557) >> at >> org.apache.flink.api.common.state.StateDescriptor.<init>(Sta >> teDescriptor.java:122) >> ... 9 more >> >> Can I use generics with ValueState? >> >> >> >> -- >> View this message in context: http://apache-flink-user-maili >> ng-list-archive.2336050.n4.nabble.com/Can-ValueState-use- >> generics-tp13038.html >> Sent from the Apache Flink User Mailing List archive. mailing list >> archive at Nabble.com. >> >> >