Hi Ahmad, I think Alexander is right. You've declared the state descriptor transient, which effectively makes it null at the worker node, when the state access is happening. Remove the transient modifier or instantiate the descriptor in the open method. The common pattern is to have the state itself as a transient field rather than the descriptor.
Best, Dawid On 22/10/2018 15:15, Alexander Smirnov wrote: > I think that's because you declared it as transient field. > > Move the declaration inside of "open" function to resolve that > > On Mon, Oct 22, 2018 at 3:48 PM Ahmad Hassan <ahmad.has...@gmail.com > <mailto:ahmad.has...@gmail.com>> wrote: > > 2018-10-22 13:46:31,944 INFO > org.apache.flink.runtime.taskmanager.Task - > Window(SlidingProcessingTimeWindows(180000, 180000), TimeTrigger, > MetricWindowFunction) -> Map -> Sink: Unnamed (1/1) > (5677190a0d292df3ad8f3521519cd980) switched from RUNNING to FAILED. > > java.lang.NullPointerException: The state properties must not be null > > at > org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75) > > at > > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:174) > > at > > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:168) > > at > > com.sap.hybris.conversion.flink.processors.chain.MetricWindowFunction.open(MetricWindowFunction.java:62) > > at > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > at > > org.apache.flink.api.java.operators.translation.WrappingFunction.open(WrappingFunction.java:45) > > at > > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > > at > > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > > at > > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.open(WindowOperator.java:219) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424) > > at > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > > at java.lang.Thread.run(Thread.java:745) > > > > > > On Sat, 20 Oct 2018 at 11:29, vino yang <yanghua1...@gmail.com > <mailto:yanghua1...@gmail.com>> wrote: > > Hi Ahmad, > > Can you try to dump thread info from the Task Manager's JVM > instance? > > Thanks, vino. > > Ahmad Hassan <ahmad.has...@gmail.com > <mailto:ahmad.has...@gmail.com>> 于2018年10月20日周六 > 下午4:24写道: > > Flink 1.6.0. Valuestate initialises successful but > mapstate hangs > > Regards > > On 20 Oct 2018, at 02:55, vino yang <yanghua1...@gmail.com > <mailto:yanghua1...@gmail.com>> wrote: > >> Hi Ahmad, >> >> Which version of Flink do you use? >> >> Thanks, vino. >> >> Ahmad Hassan <ahmad.has...@gmail.com >> <mailto:ahmad.has...@gmail.com>> 于2018年10月19日周五 >> 下午11:32写道: >> >> Hi, >> >> Initializing mapstate hangs in window function. >> However if i use valuestate then it is initialized >> succcessfully. I am using rocksdb to store the state. >> >> public class MyWindowFunction extends >> RichWindowFunction<Event, Payload, Tuple, TimeWindow> >> { >> private transient MapStateDescriptor<String, String> >> productsDescriptor = new MapStateDescriptor<>( >> "mapState", String.class, String.class); >> >> @Override >> public void apply(Tuple key, TimeWindow window, final >> Iterable<Event> input, >> final Collector<Payload> out) >> { >> // do something >> } >> >> @Override >> public void open(Configuration parameters) throws >> Exception >> { >> System.out.println("## open init window state "); >> *MapState<String, String> state = >> this.getRuntimeContext().getMapState(productsDescriptor); >> <<< program hangs here* >> System.out.println("## open window state " + state); >> } >> } >> >> Thanks for the help. >>
signature.asc
Description: OpenPGP digital signature