Hello, I'm looking for a way to modify state inside an operator in Flink. I’m following State Processor API guide - https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#modifying-savepoints <https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#modifying-savepoints> However when I execute locally using macOS it throws me an error related to assertion failure - apparently when writing the Savepoint.
Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238. Here are the project dependencies <dependencies> <!-- Lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>${lombok.version}</version> </dependency> <!-- Avro--> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>${avro.version}</version> </dependency> <!-- Google Cloud--> <dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>gcs-connector</artifactId> <version>hadoop2-2.0.0</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.google.cloud</groupId> <artifactId>google-cloud-nio</artifactId> <version>0.121.0</version> </dependency> <!-- Flink--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-state-processor-api_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-avro</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-hadoop2</artifactId> <version>2.8.3-1.8.3</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies> And here is the code we are using to just update the field setBookingPeriod to "2020/01” in all states. public class Migrator implements Serializable { private static final String SAVEPOINT = "gs://XXX-XXX-flink-state/savepoints/savepoint-292a66-d3fe1a6595c3"; public static void main(String[] args) throws Exception { new Migrator().execute(); } public void execute() throws Exception { ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); ExistingSavepoint savepoint = Savepoint.load(environment, SAVEPOINT, new RocksDBStateBackend(SAVEPOINT)); BookingKeyStateMigrator.withOperationId("aggregateBookingLine") .migrate(savepoint) .write(SAVEPOINT.concat("-migrated")); environment.execute("State Migrate Job"); } } public class BookingKeyStateMigrator implements Serializable { private final String operatorUid; private final ValueStateDescriptor<Aggregation> stateDescriptor; public static BookingKeyStateMigrator withOperationId(String operatorUid) { return new BookingKeyStateMigrator(operatorUid); } private BookingKeyStateMigrator(String operatorUid) { this.operatorUid = operatorUid; this.stateDescriptor = new ValueStateDescriptor<>("currentAggregation", Aggregation.class); } public ExistingSavepoint migrate(ExistingSavepoint savepoint) throws IOException { DataSet<KeyedState<BookingKey, Aggregation>> keyedStateDataSet = savepoint.readKeyedState(operatorUid, new BookingKeyStateReader(stateDescriptor)); return savepoint.removeOperator(operatorUid) .withOperator(operatorUid, OperatorTransformation.bootstrapWith(keyedStateDataSet) .keyBy(keyedState -> BookingKeyTransformation.toBookingKeyWithNewBookingPeriod(keyedState.getKey())) .transform(new BookingKeyStateBootstrap(stateDescriptor))); } } public class BookingKeyStateReader extends KeyedStateReaderFunction<BookingKey, KeyedState<BookingKey, Aggregation>> { private final ValueStateDescriptor<Aggregation> stateDescriptor; private transient ValueState<Aggregation> state; public BookingKeyStateReader(ValueStateDescriptor<Aggregation> stateDescriptor) { this.stateDescriptor = stateDescriptor; } @Override public void open(Configuration configuration) { state = getRuntimeContext().getState(stateDescriptor); } @Override public void readKey(BookingKey key, Context context, Collector<KeyedState<BookingKey, Aggregation>> out) throws Exception { KeyedState<BookingKey, Aggregation> data = new KeyedState<>(key, state.value()); out.collect(data); } } public class BookingKeyStateBootstrap extends KeyedStateBootstrapFunction<BookingKey, KeyedState<BookingKey, Aggregation>> { private final ValueStateDescriptor<Aggregation> stateDescriptor; private transient ValueState<Aggregation> state; public BookingKeyStateBootstrap(ValueStateDescriptor<Aggregation> stateDescriptor) { this.stateDescriptor = stateDescriptor; } @Override public void open(Configuration configuration) { state = getRuntimeContext().getState(stateDescriptor); } @Override public void processElement(KeyedState<BookingKey, Aggregation> keyedState, Context context) throws Exception { state.update(keyedState.getValue()); } } public class BookingKeyTransformation { private BookingKeyTransformation() { } static BookingKey toBookingKeyWithNewBookingPeriod(BookingKey bookingKey) { return BookingKey.newBuilder(bookingKey) .setBookingPeriod("2020/01") .build(); } } Here is the debug messages that we have when running the code 2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Rate of successful kerberos logins and latency (milliseconds)], valueName=Time) 2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Rate of failed kerberos logins and latency (milliseconds)], valueName=Time) 2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field org.apache.hadoop.metrics2.lib.MutableRate org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[GetGroups], valueName=Time) 2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field private org.apache.hadoop.metrics2.lib.MutableGaugeLong org.apache.hadoop.security.UserGroupInformation$UgiMetrics.renewalFailuresTotal with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Renewal failures since startup], valueName=Time) 2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field private org.apache.hadoop.metrics2.lib.MutableGaugeInt org.apache.hadoop.security.UserGroupInformation$UgiMetrics.renewalFailures with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, sampleName=Ops, always=false, type=DEFAULT, value=[Renewal failures since last successful login], valueName=Time) 2020/05/06 19:02 45 [DEBUG] [main] org.apache.hadoop.metrics2.impl.MetricsSystemImpl.register:231 - UgiMetrics, User and group related metrics 2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.security.Groups.getUserToGroupsMappingService:448 - Creating new Groups object 2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.util.NativeCodeLoader.<clinit>:46 - Trying to load the custom-built native-hadoop library... 2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.util.NativeCodeLoader.<clinit>:55 - Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in java.library.path 2020/05/06 19:02 46 [ WARN] [main] org.apache.hadoop.util.NativeCodeLoader.<clinit>:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.<init>:45 - Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping 2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.security.Groups.<init>:152 - Group mapping impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; cacheTimeout=300000; warningDeltaMs=5000 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory.newDefaultFactory:47 - Using Log4J as the default logging framework 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.InternalThreadLocalMap.<clinit>:54 - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.InternalThreadLocalMap.<clinit>:57 - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.isOsx0:966 - Platform: MacOS 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.explicitNoUnsafeCause0:395 - -Dio.netty.noUnsafe: false 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.javaVersion0:871 - Java version: 8 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:120 - sun.misc.Unsafe.theUnsafe: available 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:144 - sun.misc.Unsafe.copyMemory: available 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:182 - java.nio.Buffer.address: available 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:243 - direct buffer constructor: available 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:313 - java.nio.Bits.unaligned: available, true 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:378 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior to Java9 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:385 - java.nio.DirectByteBuffer.<init>(long, int): available 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.unsafeUnavailabilityCause0:992 - sun.misc.Unsafe: available 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.tmpdir0:1086 - -Dio.netty.tmpdir: /var/folders/xc/9tp7jyt532371thglfsyh3x1nj7_ry/T (java.io.tmpdir) 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.bitMode0:1165 - -Dio.netty.bitMode: 64 (sun.arch.data.model) 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.<clinit>:157 - -Dio.netty.maxDirectMemory: 3817865216 bytes 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.<clinit>:164 - -Dio.netty.uninitializedArrayAllocationThreshold: -1 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.CleanerJava6.<clinit>:92 - java.nio.ByteBuffer.cleaner(): available 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.<clinit>:184 - -Dio.netty.noPreferDirect: false 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.<clinit>:44 - -Dio.netty.eventLoopThreads: 24 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.<clinit>:104 - -Dio.netty.noKeySetOptimization: false 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.<clinit>:105 - -Dio.netty.selectorAutoRebuildThreshold: 512 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent$Mpsc.<clinit>:860 - org.jctools-core.MpscChunkedArrayQueue: available 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelId.<clinit>:79 - -Dio.netty.processId: 76289 (auto-detected) 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.NetUtil.<clinit>:139 - -Djava.net.preferIPv4Stack: false 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.NetUtil.<clinit>:140 - -Djava.net.preferIPv6Addresses: false 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.NetUtil.<clinit>:224 - Loopback interface: lo0 (lo0, 0:0:0:0:0:0:0:1%lo0) 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.NetUtil$1.run:289 - Failed to get SOMAXCONN from sysctl and file /proc/sys/net/core/somaxconn. Default: 128 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelId.<clinit>:101 - -Dio.netty.machineId: 3c:22:fb:ff:fe:18:a2:68 (auto-detected) 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector.<clinit>:129 - -Dorg.apache.flink.shaded.netty4.io.netty.leakDetection.level: simple 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector.<clinit>:130 - -Dorg.apache.flink.shaded.netty4.io.netty.leakDetection.targetRecords: 4 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:120 - -Dio.netty.allocator.numHeapArenas: 24 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:121 - -Dio.netty.allocator.numDirectArenas: 24 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:123 - -Dio.netty.allocator.pageSize: 8192 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:128 - -Dio.netty.allocator.maxOrder: 11 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:132 - -Dio.netty.allocator.chunkSize: 16777216 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:133 - -Dio.netty.allocator.tinyCacheSize: 512 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:134 - -Dio.netty.allocator.smallCacheSize: 256 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:135 - -Dio.netty.allocator.normalCacheSize: 64 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:136 - -Dio.netty.allocator.maxCachedBufferCapacity: 32768 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:137 - -Dio.netty.allocator.cacheTrimInterval: 8192 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:138 - -Dio.netty.allocator.useCacheForAllThreads: true 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil.<clinit>:83 - -Dio.netty.allocator.type: pooled 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil.<clinit>:92 - -Dio.netty.threadLocalDirectBufferSize: 0 2020/05/06 19:02 51 [DEBUG] [main] org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil.<clinit>:95 - -Dio.netty.maxThreadLocalCharBufferSize: 16384 2020/05/06 19:02 52 [DEBUG] [DataSource (at unionOperatorStates(WritableSavepoint.java:109) (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1)] org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.<clinit>:61 - -Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true 2020/05/06 19:02 52 [DEBUG] [DataSource (at unionOperatorStates(WritableSavepoint.java:109) (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1)] org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.<clinit>:62 - -Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true 2020/05/06 19:02 52 [DEBUG] [DataSource (at unionOperatorStates(WritableSavepoint.java:109) (org.apache.flink.api.java.io.CollectionInputFormat)) (1/1)] org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory.newResourceLeakDetector:202 - Loaded default ResourceLeakDetector: org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@3ac57bf3 Assertion failed: (last_ref), function ~ColumnFamilySet, file db/column_family.cc, line 1238. Could you please help me out to understand why this assertion failure is happening? Thanks, Luis Amaral