Glad to hear this!

Best,
Zakelly

On Fri, Jan 19, 2024 at 9:22 AM Konstantinos Karavitis <kkaravi...@gmail.com>
wrote:

> I would like again to thank you as we managed to fix this strange issue we
> had by moving all the state initializations into the open method of
> ProcessFunction!
>
> On Thu, Jan 18, 2024 at 11:53 PM Konstantinos Karavitis <
> kkaravi...@gmail.com> wrote:
>
>> Thank you very much Zakelly for taking the time to answer to my question.
>> I appreciate it a lot.
>> Unfortunately, I cannot share the source code as it is confidential and
>> owned by the company that I co-operate with.
>> But, yes you are right that inside the code, I can see that the state
>> initialization happens inside the AbstractProcessFunction#processElement
>>  method.
>>
>>  Thank you very much,
>> Kostas
>>
>> On Thu, Jan 18, 2024 at 1:17 PM Zakelly Lan <zakelly....@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> Could you please share the code of state initialization (getting state
>>> from a state descriptor)? It seems you are creating a state in
>>> #processElement?
>>>
>>>
>>> Best,
>>> Zakelly
>>>
>>> On Thu, Jan 18, 2024 at 2:25 PM Zakelly Lan <zakelly....@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> Could you please share the code of state initialization (getting state
>>>> from a state descriptor)? It seems you are creating a state in
>>>> #processElement?
>>>>
>>>>
>>>> Best,
>>>> Zakelly
>>>>
>>>> On Thu, Jan 18, 2024 at 3:47 AM Konstantinos Karavitis <
>>>> kkaravi...@gmail.com> wrote:
>>>>
>>>>> Have you ever met the following error when a flink application
>>>>> restarts and tries to restore the state from RocksDB?
>>>>>
>>>>>
>>>>> *Caused by: java.lang.UnsupportedOperationException: A serializer has
>>>>> already been registered for the state; re-registration is not allowed.
>>>>> at
>>>>> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)*
>>>>>
>>>>> May that be a potential bug of a race condition where the namespace
>>>>> serializer is being registered by more than one place concurrently?
>>>>>
>>>>> Here's also the full stack trace
>>>>>
>>>>> at
>>>>> org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:125)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
>>>>>     at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
>>>>>     at
>>>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
>>>>>     at
>>>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
>>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
>>>>>     at java.base/java.lang.Thread.run(Unknown Source)
>>>>>
>>>>> *Caused by: java.lang.UnsupportedOperationException: A serializer has
>>>>> already been registered for the state; re-registration is not allowed.
>>>>> at
>>>>> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)*
>>>>> Caused by: java.lang.UnsupportedOperationException: A serializer has
>>>>> already been registered for the state; re-registration is not allowed.
>>>>>     *at
>>>>> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)*
>>>>>     at
>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:734)
>>>>>     at
>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667)
>>>>>     at
>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883)
>>>>>     at
>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870)
>>>>>     at
>>>>> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
>>>>>     at
>>>>> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
>>>>>     at
>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362)
>>>>>     at
>>>>> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413)
>>>>>     at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
>>>>>     at
>>>>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
>>>>>     ... 36 common frames omitted
>>>>>
>>>>>
>>>>> Many thanks in advance!
>>>>>
>>>>>
>>>>>

Reply via email to