Glad to hear this!
Best, Zakelly On Fri, Jan 19, 2024 at 9:22 AM Konstantinos Karavitis <kkaravi...@gmail.com> wrote: > I would like again to thank you as we managed to fix this strange issue we > had by moving all the state initializations into the open method of > ProcessFunction! > > On Thu, Jan 18, 2024 at 11:53 PM Konstantinos Karavitis < > kkaravi...@gmail.com> wrote: > >> Thank you very much Zakelly for taking the time to answer to my question. >> I appreciate it a lot. >> Unfortunately, I cannot share the source code as it is confidential and >> owned by the company that I co-operate with. >> But, yes you are right that inside the code, I can see that the state >> initialization happens inside the AbstractProcessFunction#processElement >> method. >> >> Thank you very much, >> Kostas >> >> On Thu, Jan 18, 2024 at 1:17 PM Zakelly Lan <zakelly....@gmail.com> >> wrote: >> >>> Hi, >>> >>> Could you please share the code of state initialization (getting state >>> from a state descriptor)? It seems you are creating a state in >>> #processElement? >>> >>> >>> Best, >>> Zakelly >>> >>> On Thu, Jan 18, 2024 at 2:25 PM Zakelly Lan <zakelly....@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> Could you please share the code of state initialization (getting state >>>> from a state descriptor)? It seems you are creating a state in >>>> #processElement? >>>> >>>> >>>> Best, >>>> Zakelly >>>> >>>> On Thu, Jan 18, 2024 at 3:47 AM Konstantinos Karavitis < >>>> kkaravi...@gmail.com> wrote: >>>> >>>>> Have you ever met the following error when a flink application >>>>> restarts and tries to restore the state from RocksDB? >>>>> >>>>> >>>>> *Caused by: java.lang.UnsupportedOperationException: A serializer has >>>>> already been registered for the state; re-registration is not allowed. >>>>> at >>>>> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)* >>>>> >>>>> May that be a potential bug of a race condition where the namespace >>>>> serializer is being registered by more than one place concurrently? >>>>> >>>>> Here's also the full stack trace >>>>> >>>>> at >>>>> org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:125) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >>>>> at >>>>> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) >>>>> at >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) >>>>> at >>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) >>>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) >>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) >>>>> at java.base/java.lang.Thread.run(Unknown Source) >>>>> >>>>> *Caused by: java.lang.UnsupportedOperationException: A serializer has >>>>> already been registered for the state; re-registration is not allowed. >>>>> at >>>>> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)* >>>>> Caused by: java.lang.UnsupportedOperationException: A serializer has >>>>> already been registered for the state; re-registration is not allowed. >>>>> *at >>>>> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)* >>>>> at >>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:734) >>>>> at >>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667) >>>>> at >>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883) >>>>> at >>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870) >>>>> at >>>>> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47) >>>>> at >>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73) >>>>> at >>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362) >>>>> at >>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413) >>>>> at >>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) >>>>> at >>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >>>>> ... 36 common frames omitted >>>>> >>>>> >>>>> Many thanks in advance! >>>>> >>>>> >>>>>