Hi Edward, could you try adding the static keyword to ExecQueue and RingBufferExec? As is they hold a reference to the MyKeyedProcessFunction, which has unforeseen consequences.
On Sun, Oct 11, 2020 at 5:38 AM Colletta, Edward <edward.colle...@fmr.com> wrote: > Tried to attach tar file but it got blocked. Resending with files > attached individually. > > > > Ok, have minimal reproducible example. Attaching a tar file of the job > that crashed. > > The crash has nothing to do with the number of state variables. But it > does seem to be caused by using a type for the state variable that is a > class nested in the KeyedProcessFunction. > > Reduced to a single state variable. The type of the state variable was a > class (ExecQueue) defined in class implementing KeyedProcessFunction. > Moving the ExecQueue definition to its own file fixed the problem. > > > > The attached example always crashes the taskManager in 30 seconds to 5 > minutes. > > > > MyKeyedProcessFunction.java and also cut and pasted here: > > > > package crash; > > > > import org.slf4j.Logger; > > import org.slf4j.LoggerFactory; > > > > import org.apache.flink.api.common.state.ValueStateDescriptor; > > import org.apache.flink.api.common.typeinfo.TypeHint; > > import org.apache.flink.api.common.typeinfo.TypeInformation; > > import org.apache.flink.api.common.state.ValueState; > > import org.apache.flink.configuration.Configuration; > > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > > import > org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context; > > import > org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext; > > import org.apache.flink.util.Collector; > > > > public class MyKeyedProcessFunction extends KeyedProcessFunction<String, > Exec, Exec> { > > private static final Logger LOG = > LoggerFactory.getLogger(MyKeyedProcessFunction.class); > > public TypeInformation<ExecQueue> leftTypeInfo; > > public transient ValueState<ExecQueue> leftState; > > > > public int initQueueSize; > > public long emitFrequencyMs; > > > > public MyKeyedProcessFunction() { > > initQueueSize = 10; > > emitFrequencyMs = 1; > > } > > > > @Override > > public void open(Configuration conf) { > > leftTypeInfo = TypeInformation.of(new TypeHint<ExecQueue>(){}); > > leftState = getRuntimeContext().getState( > > new ValueStateDescriptor<>("left", leftTypeInfo, > null)); > > } > > > > @Override > > public void processElement(Exec leftIn, Context ctx, Collector<Exec> > out) { > > try { > > ExecQueue eq = leftState.value(); > > if (eq == null) { > > eq = new ExecQueue(10); > > > ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() > + emitFrequencyMs); > > } > > leftState.update(eq); > > } > > catch (Exception e) { > > LOG.error("Exception in processElement1. Key: " + > ctx.getCurrentKey() + ". " + e + ". trace = " ); > > for (java.lang.StackTraceElement s:e.getStackTrace()) > > LOG.error(s.toString()); > > > > } > > } > > > > > > @Override > > public void onTimer(long timestamp, OnTimerContext ctx, > Collector<Exec> out) { > > try { > > ExecQueue eq = leftState.value(); > > > ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() > + emitFrequencyMs); > > } > > catch ( Exception e) { > > LOG.error("Exception in onTimer. Key: " + ctx.getCurrentKey() > + ". " + e + ". trace = " ); > > for (java.lang.StackTraceElement s:e.getStackTrace()) > > LOG.error(s.toString()); > > } > > } > > public class ExecQueue { > > public RingBufferExec queue; > > public ExecQueue (){} > > public ExecQueue (int initSize) { > > queue = new RingBufferExec(initSize); > > } > > > > public class RingBufferExec { > > public Integer size; > > public Integer count; > > public RingBufferExec(){ } > > public RingBufferExec(int sizeIn){ > > size = sizeIn; > > count = 0; > > } > > } > > } > > } > > > > > > *From:* Dawid Wysakowicz <dwysakow...@apache.org> > *Sent:* Thursday, October 8, 2020 6:26 AM > *To:* Colletta, Edward <edward.colle...@fmr.com>; user@flink.apache.org > *Subject:* Re: state access causing segmentation fault > > > > Hi, > > It should be absolutely fine to use multiple state objects. I am not aware > of any limits to that. A minimal, reproducible example would definitely be > helpful. For those kind of exceptions, I'd look into the serializers you > use. Other than that I cannot think of an obvious reason for that kind of > exceptions. > > Best, > > Dawid > > On 08/10/2020 12:12, Colletta, Edward wrote: > > Using Flink 1.9.2, Java, FsStateBackend. Running Session cluster on EC2 > instances. > > > > I have a KeyedProcessFunction that is causing a segmentation fault, > crashing the flink task manager. The seems to be caused by using 3 State > variables in the operator. The crash happens consistently after some load > is processed. > > This is the second time I have encountered this. The first time I had 3 > ValueState variables, this time I had 2 ValueState variables and a MapState > variable. Both times the error was alleviated by removing one of the state > variables. > > This time I replaced the 2 valueState variables with a Tuple2 of the types > of the individual variables. I can try to put together a minimal example, > but I was wondering if anyone has encountered this problem. > > > > Are there any documented limits of the number of state variables 1 > operator can use? > > > > For background the reason I use multiple state variables is the operator > is processing 2 types of inputs, Left and Right. When Left is received it > is put it into a PriorityQueue. When the Right type is received I put that > into a ring buffer. > > I replaced the PriorityQueue with a queue of Ids and MapState to hold the > elements. So I have Left stored in a queue ValueState variable and > MapState variable, and Right is stored in the ring buffer ValueState > variable. > > > > > > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng