Hi team,

I encountered below exception after I added a new field to a POJO used in
list state and resumed the job from checkpoint



> [error occurred during error reporting , id
> 0xb]\n","stream":"stdout","time":
> \n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> #   http://bugreport.java.com/bugreport/crash.jsp\n
> ","stream":"stdout","time":
> # If you would like to submit a bug report, please
> visit:\n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> # /opt/flink/hs_err_pid1.log\n","stream":"stdout","time":
> # An error report file with more information is saved
> as:\n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> # Core dump written. Default location: /opt/flink/core or
> core.1\n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> # V  [libjvm.so+0x57595f]  Exceptions::_throw(Thread*, char const*, int,
> Handle, char const*)+0x1ef\n","stream":"stdout","time":\# Problematic
> frame:\n","stream":"stdout","time":
> # Java VM: OpenJDK 64-Bit Server VM (25.302-b08 mixed mode linux-amd64
> compressed oops)\n","stream":"stdout","time":
> # JRE version: OpenJDK Runtime Environment (8.0_302-b08) (build
> 1.8.0_302-b08)\n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> #  SIGSEGV (0xb) at pc=0x00007f6a2ad5d95f, pid=1,
> tid=0x00007f69bac9f700\n","stream":"stdout","time":
> #\n","stream":"stdout","time":
> # A fatal error has been detected by the Java Runtime
> Environment:\n","stream":"stdout","time":
> #\n","stream":"stdout","time":]
>
>
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:193)
>
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:243)
>
> org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:412)
> at java.lang.reflect.Field.set(Field.java:764)
> at
> sun.reflect.UnsafeIntegerFieldAccessorImpl.set(UnsafeIntegerFieldAccessorImpl.java:80)
> at
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:171)
> at
> sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:167)
> Caused by: java.lang.IllegalArgumentException: Can not set int field
> xxxxx.hr to null value
>
> org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:214)
> Exception in thread \"Thread-23\"
> org.apache.flink.util.FlinkRuntimeException: Failed to deserialize list
> element for TTL compaction filter
>

I verified that Flink recognized the state change

Performing state migration for state ListStateDescriptor{name=novimp,
> defaultValue=null,
> serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6fe249e5}
> because the state serializer's schema, i.e. serialization format, has
> changed.
>

and successfully migrated the state with new POJO class (I updated Flink
source code with my own debugging log)

before migration object size *480* :after object size :*481*


However when the exception occurred the object read from state has byte
length of original size not updated size. It appears that the state
migration during the state recovery phase didn't take effect or persist to
RocksDB. Can you pls give me some pointers to debug this problem further?

> object size *480*
>


Flink version is 1.13.2
RocksDB statebackend with incremental, aligned checkpoint
List state with TTL (24H) enabled

-- 
Regards,
Tao

Reply via email to