Adding dev@ to get some traction. Any help would be greatly appreciated. Thanks.
On Thu, Jul 23, 2020 at 11:48 AM Sivaprasanna <sivaprasanna...@gmail.com> wrote: > +user...@flink.apache.org <user...@flink.apache.org> > > A follow up question. I tried taking a savepoint but the job failed > immediately. It happens everytime I take a savepoint. The job is running on > a Yarn cluster so it fails with "container running out of memory". The > state size averages around 1.2G but also peaks to ~4.5 GB sometimes (please > refer to the screenshot below). The job is running with 2GB task manager > heap & 2GB task manager managed memory. I increased the managed memory to > 6GB assuming the failure has something to do with RocksDB but it failed > even with 6GB managed memory. I guess I am missing on some configurations. > Can you folks please help me with this? > > [image: Screenshot 2020-07-23 at 10.34.29 AM.png] > > On Wed, Jul 22, 2020 at 7:32 PM Sivaprasanna <sivaprasanna...@gmail.com> > wrote: > >> Hi, >> >> We are trying out state schema migration for one of our stateful >> pipelines. We use few Avro type states. Changes made to the job: >> 1. Updated the schema for one of the states (added a new 'boolean' >> field with default value). >> 2. Modified the code by removing a couple of ValueStates. >> >> To push these changes, I stopped the live job and resubmitted the new jar >> with the latest *checkpoint* path. However, the job failed with the >> following error: >> >> java.lang.RuntimeException: Error while getting state >> at >> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62) >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144) >> ... >> ... >> Caused by: org.apache.flink.util.StateMigrationException: The new state >> serializer cannot be incompatible. >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491) >> >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652) >> >> I was going through the state schema evolution doc. The document mentions >> that we need to take a *savepoint* and restart the job with the savepoint >> path. We are using RocksDB backend with incremental checkpoint enabled. Can >> we not use the latest checkpoint available when we are dealing with state >> schema changes? >> >> Complete stacktrace is attached with this mail. >> >> - >> Sivaprasanna >> >