I would like again to thank you as we managed to fix this strange issue we had by moving all the state initializations into the open method of ProcessFunction!
On Thu, Jan 18, 2024 at 11:53 PM Konstantinos Karavitis < kkaravi...@gmail.com> wrote: > Thank you very much Zakelly for taking the time to answer to my question. > I appreciate it a lot. > Unfortunately, I cannot share the source code as it is confidential and > owned by the company that I co-operate with. > But, yes you are right that inside the code, I can see that the state > initialization happens inside the AbstractProcessFunction#processElement > method. > > Thank you very much, > Kostas > > On Thu, Jan 18, 2024 at 1:17 PM Zakelly Lan <zakelly....@gmail.com> wrote: > >> Hi, >> >> Could you please share the code of state initialization (getting state >> from a state descriptor)? It seems you are creating a state in >> #processElement? >> >> >> Best, >> Zakelly >> >> On Thu, Jan 18, 2024 at 2:25 PM Zakelly Lan <zakelly....@gmail.com> >> wrote: >> >>> Hi, >>> >>> Could you please share the code of state initialization (getting state >>> from a state descriptor)? It seems you are creating a state in >>> #processElement? >>> >>> >>> Best, >>> Zakelly >>> >>> On Thu, Jan 18, 2024 at 3:47 AM Konstantinos Karavitis < >>> kkaravi...@gmail.com> wrote: >>> >>>> Have you ever met the following error when a flink application restarts >>>> and tries to restore the state from RocksDB? >>>> >>>> >>>> *Caused by: java.lang.UnsupportedOperationException: A serializer has >>>> already been registered for the state; re-registration is not allowed. >>>> at >>>> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)* >>>> >>>> May that be a potential bug of a race condition where the namespace >>>> serializer is being registered by more than one place concurrently? >>>> >>>> Here's also the full stack trace >>>> >>>> at >>>> org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:125) >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217) >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183) >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266) >>>> at >>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >>>> at >>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >>>> at >>>> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) >>>> at >>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) >>>> at >>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) >>>> at java.base/java.lang.Thread.run(Unknown Source) >>>> >>>> *Caused by: java.lang.UnsupportedOperationException: A serializer has >>>> already been registered for the state; re-registration is not allowed. >>>> at >>>> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)* >>>> Caused by: java.lang.UnsupportedOperationException: A serializer has >>>> already been registered for the state; re-registration is not allowed. >>>> *at >>>> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)* >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:734) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870) >>>> at >>>> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47) >>>> at >>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362) >>>> at >>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413) >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) >>>> at >>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >>>> ... 36 common frames omitted >>>> >>>> >>>> Many thanks in advance! >>>> >>>> >>>>