Hi, I am not sure that FLINK-8836 <https://issues.apache.org/jira/browse/FLINK-8836> is related to the failure in the stack trace.
You say you are using Flink in production, does it mean it always worked and has started to fail recently? >From the stack trace, it looks like the arity of some Tuple type changed in some operator state. The number of tuple fields could have increased after job restart. In that case Flink expects tuples with more fields stored in checkpoint and fails. Such change would require an explicit state migration. Could it be the case? When did the failure start to happen and why the operator state was restored? Job restart? Best, Andrey