Hello,

I'm looking for a way to modify state inside an operator in Flink. I’m 
following State Processor API guide - 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#modifying-savepoints
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#modifying-savepoints>
However when I execute locally using macOS it throws me an error related to 
assertion failure - apparently when writing the Savepoint.

Assertion failed: (last_ref), function ~ColumnFamilySet, file 
db/column_family.cc, line 1238.

Here are the project dependencies
<dependencies>
    <!--        Lombok-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>${lombok.version}</version>
    </dependency>
    <!--        Avro-->
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>${avro.version}</version>
    </dependency>
    <!--        Google Cloud-->
    <dependency>
        <groupId>com.google.cloud.bigdataoss</groupId>
        <artifactId>gcs-connector</artifactId>
        <version>hadoop2-2.0.0</version>
        <exclusions>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>google-cloud-nio</artifactId>
        <version>0.121.0</version>
    </dependency>
    <!--        Flink-->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-state-processor-api_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-runtime_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-avro</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-shaded-hadoop2</artifactId>
        <version>2.8.3-1.8.3</version>
    </dependency>
    <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>1.2.17</version>
    </dependency>
</dependencies>

And here is the code we are using to just update the field setBookingPeriod to 
"2020/01” in all states.
public class Migrator implements Serializable {

    private static final String SAVEPOINT = 
"gs://XXX-XXX-flink-state/savepoints/savepoint-292a66-d3fe1a6595c3";

    public static void main(String[] args) throws Exception {
        new Migrator().execute();
    }

    public void execute() throws Exception {
        ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
        ExistingSavepoint savepoint = Savepoint.load(environment, SAVEPOINT, 
new RocksDBStateBackend(SAVEPOINT));
        BookingKeyStateMigrator.withOperationId("aggregateBookingLine")
                               .migrate(savepoint)
                               .write(SAVEPOINT.concat("-migrated"));

        environment.execute("State Migrate Job");
    }

}

public class BookingKeyStateMigrator implements Serializable {

    private final String operatorUid;
    private final ValueStateDescriptor<Aggregation> stateDescriptor;

    public static BookingKeyStateMigrator withOperationId(String operatorUid) {
        return new BookingKeyStateMigrator(operatorUid);
    }

    private BookingKeyStateMigrator(String operatorUid) {
        this.operatorUid = operatorUid;
        this.stateDescriptor = new ValueStateDescriptor<>("currentAggregation", 
Aggregation.class);
    }


    public ExistingSavepoint migrate(ExistingSavepoint savepoint) throws 
IOException {
        DataSet<KeyedState<BookingKey, Aggregation>> keyedStateDataSet = 
savepoint.readKeyedState(operatorUid, new 
BookingKeyStateReader(stateDescriptor));

        return savepoint.removeOperator(operatorUid)
                        .withOperator(operatorUid,
                                      
OperatorTransformation.bootstrapWith(keyedStateDataSet)
                                                            .keyBy(keyedState 
-> 
BookingKeyTransformation.toBookingKeyWithNewBookingPeriod(keyedState.getKey()))
                                                            .transform(new 
BookingKeyStateBootstrap(stateDescriptor)));
    }

}

public class BookingKeyStateReader extends KeyedStateReaderFunction<BookingKey, 
KeyedState<BookingKey, Aggregation>> {

    private final ValueStateDescriptor<Aggregation> stateDescriptor;
    private transient ValueState<Aggregation> state;

    public BookingKeyStateReader(ValueStateDescriptor<Aggregation> 
stateDescriptor) {
        this.stateDescriptor = stateDescriptor;
    }

    @Override
    public void open(Configuration configuration) {
        state = getRuntimeContext().getState(stateDescriptor);
    }

    @Override
    public void readKey(BookingKey key, Context context, 
Collector<KeyedState<BookingKey, Aggregation>> out) throws Exception {
        KeyedState<BookingKey, Aggregation> data = new KeyedState<>(key, 
state.value());
        out.collect(data);
    }

}

public class BookingKeyStateBootstrap extends 
KeyedStateBootstrapFunction<BookingKey, KeyedState<BookingKey, Aggregation>> {

    private final ValueStateDescriptor<Aggregation> stateDescriptor;
    private transient ValueState<Aggregation> state;

    public BookingKeyStateBootstrap(ValueStateDescriptor<Aggregation> 
stateDescriptor) {
        this.stateDescriptor = stateDescriptor;
    }

    @Override
    public void open(Configuration configuration) {
        state = getRuntimeContext().getState(stateDescriptor);
    }

    @Override
    public void processElement(KeyedState<BookingKey, Aggregation> keyedState, 
Context context) throws Exception {
        state.update(keyedState.getValue());
    }

}

public class BookingKeyTransformation {

    private BookingKeyTransformation() {
    }

    static BookingKey toBookingKeyWithNewBookingPeriod(BookingKey bookingKey) {
        return BookingKey.newBuilder(bookingKey)
                         .setBookingPeriod("2020/01")
                         .build();
    }

}

Here is the debug messages that we have when running the code
2020/05/06 19:02 45 [DEBUG] [main] 
org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field 
org.apache.hadoop.metrics2.lib.MutableRate 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginSuccess with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, 
sampleName=Ops, always=false, type=DEFAULT, value=[Rate of successful kerberos 
logins and latency (milliseconds)], valueName=Time)
2020/05/06 19:02 45 [DEBUG] [main] 
org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field 
org.apache.hadoop.metrics2.lib.MutableRate 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.loginFailure with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, 
sampleName=Ops, always=false, type=DEFAULT, value=[Rate of failed kerberos 
logins and latency (milliseconds)], valueName=Time)
2020/05/06 19:02 45 [DEBUG] [main] 
org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field 
org.apache.hadoop.metrics2.lib.MutableRate 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.getGroups with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, 
sampleName=Ops, always=false, type=DEFAULT, value=[GetGroups], valueName=Time)
2020/05/06 19:02 45 [DEBUG] [main] 
org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field 
private org.apache.hadoop.metrics2.lib.MutableGaugeLong 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.renewalFailuresTotal 
with annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, 
sampleName=Ops, always=false, type=DEFAULT, value=[Renewal failures since 
startup], valueName=Time)
2020/05/06 19:02 45 [DEBUG] [main] 
org.apache.hadoop.metrics2.lib.MutableMetricsFactory.newForField:42 - field 
private org.apache.hadoop.metrics2.lib.MutableGaugeInt 
org.apache.hadoop.security.UserGroupInformation$UgiMetrics.renewalFailures with 
annotation @org.apache.hadoop.metrics2.annotation.Metric(about=, 
sampleName=Ops, always=false, type=DEFAULT, value=[Renewal failures since last 
successful login], valueName=Time)
2020/05/06 19:02 45 [DEBUG] [main] 
org.apache.hadoop.metrics2.impl.MetricsSystemImpl.register:231 - UgiMetrics, 
User and group related metrics
2020/05/06 19:02 46 [DEBUG] [main] 
org.apache.hadoop.security.Groups.getUserToGroupsMappingService:448 -  Creating 
new Groups object
2020/05/06 19:02 46 [DEBUG] [main] 
org.apache.hadoop.util.NativeCodeLoader.<clinit>:46 - Trying to load the 
custom-built native-hadoop library...
2020/05/06 19:02 46 [DEBUG] [main] 
org.apache.hadoop.util.NativeCodeLoader.<clinit>:55 - Failed to load 
native-hadoop with error: java.lang.UnsatisfiedLinkError: no hadoop in 
java.library.path
2020/05/06 19:02 46 [ WARN] [main] 
org.apache.hadoop.util.NativeCodeLoader.<clinit>:62 - Unable to load 
native-hadoop library for your platform... using builtin-java classes where 
applicable
2020/05/06 19:02 46 [DEBUG] [main] 
org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback.<init>:45 - 
Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping
2020/05/06 19:02 46 [DEBUG] [main] org.apache.hadoop.security.Groups.<init>:152 
- Group mapping 
impl=org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback; 
cacheTimeout=300000; warningDeltaMs=5000
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.logging.InternalLoggerFactory.newDefaultFactory:47
 - Using Log4J as the default logging framework
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.InternalThreadLocalMap.<clinit>:54
 - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.InternalThreadLocalMap.<clinit>:57
 - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.isOsx0:966
 - Platform: MacOS
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.explicitNoUnsafeCause0:395
 - -Dio.netty.noUnsafe: false
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.javaVersion0:871
 - Java version: 8
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:120
 - sun.misc.Unsafe.theUnsafe: available
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:144
 - sun.misc.Unsafe.copyMemory: available
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:182
 - java.nio.Buffer.address: available
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:243
 - direct buffer constructor: available
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:313
 - java.nio.Bits.unaligned: available, true
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:378
 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable prior 
to Java9
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent0.<clinit>:385
 - java.nio.DirectByteBuffer.<init>(long, int): available
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.unsafeUnavailabilityCause0:992
 - sun.misc.Unsafe: available
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.tmpdir0:1086
 - -Dio.netty.tmpdir: /var/folders/xc/9tp7jyt532371thglfsyh3x1nj7_ry/T 
(java.io.tmpdir)
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.bitMode0:1165
 - -Dio.netty.bitMode: 64 (sun.arch.data.model)
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.<clinit>:157
 - -Dio.netty.maxDirectMemory: 3817865216 bytes
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.<clinit>:164
 - -Dio.netty.uninitializedArrayAllocationThreshold: -1
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.CleanerJava6.<clinit>:92 
- java.nio.ByteBuffer.cleaner(): available
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent.<clinit>:184
 - -Dio.netty.noPreferDirect: false
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.channel.MultithreadEventLoopGroup.<clinit>:44
 - -Dio.netty.eventLoopThreads: 24
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.<clinit>:104 - 
-Dio.netty.noKeySetOptimization: false
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.<clinit>:105 - 
-Dio.netty.selectorAutoRebuildThreshold: 512
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.internal.PlatformDependent$Mpsc.<clinit>:860
 - org.jctools-core.MpscChunkedArrayQueue: available
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelId.<clinit>:79 - 
-Dio.netty.processId: 76289 (auto-detected)
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.NetUtil.<clinit>:139 - 
-Djava.net.preferIPv4Stack: false
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.NetUtil.<clinit>:140 - 
-Djava.net.preferIPv6Addresses: false
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.NetUtil.<clinit>:224 - Loopback 
interface: lo0 (lo0, 0:0:0:0:0:0:0:1%lo0)
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.NetUtil$1.run:289 - Failed to get 
SOMAXCONN from sysctl and file /proc/sys/net/core/somaxconn. Default: 128
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelId.<clinit>:101 - 
-Dio.netty.machineId: 3c:22:fb:ff:fe:18:a2:68 (auto-detected)
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector.<clinit>:129 
- -Dorg.apache.flink.shaded.netty4.io.netty.leakDetection.level: simple
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector.<clinit>:130 
- -Dorg.apache.flink.shaded.netty4.io.netty.leakDetection.targetRecords: 4
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:120
 - -Dio.netty.allocator.numHeapArenas: 24
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:121
 - -Dio.netty.allocator.numDirectArenas: 24
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:123
 - -Dio.netty.allocator.pageSize: 8192
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:128
 - -Dio.netty.allocator.maxOrder: 11
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:132
 - -Dio.netty.allocator.chunkSize: 16777216
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:133
 - -Dio.netty.allocator.tinyCacheSize: 512
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:134
 - -Dio.netty.allocator.smallCacheSize: 256
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:135
 - -Dio.netty.allocator.normalCacheSize: 64
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:136
 - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:137
 - -Dio.netty.allocator.cacheTrimInterval: 8192
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.PooledByteBufAllocator.<clinit>:138
 - -Dio.netty.allocator.useCacheForAllThreads: true
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil.<clinit>:83 - 
-Dio.netty.allocator.type: pooled
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil.<clinit>:92 - 
-Dio.netty.threadLocalDirectBufferSize: 0
2020/05/06 19:02 51 [DEBUG] [main] 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufUtil.<clinit>:95 - 
-Dio.netty.maxThreadLocalCharBufferSize: 16384
2020/05/06 19:02 52 [DEBUG] [DataSource (at 
unionOperatorStates(WritableSavepoint.java:109) 
(org.apache.flink.api.java.io.CollectionInputFormat)) (1/1)] 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.<clinit>:61 - 
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true
2020/05/06 19:02 52 [DEBUG] [DataSource (at 
unionOperatorStates(WritableSavepoint.java:109) 
(org.apache.flink.api.java.io.CollectionInputFormat)) (1/1)] 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.<clinit>:62 - 
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true
2020/05/06 19:02 52 [DEBUG] [DataSource (at 
unionOperatorStates(WritableSavepoint.java:109) 
(org.apache.flink.api.java.io.CollectionInputFormat)) (1/1)] 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory$DefaultResourceLeakDetectorFactory.newResourceLeakDetector:202
 - Loaded default ResourceLeakDetector: 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@3ac57bf3
Assertion failed: (last_ref), function ~ColumnFamilySet, file 
db/column_family.cc, line 1238.


Could you please help me out to understand why this assertion failure is 
happening?

Thanks,
Luis Amaral

Reply via email to