Hi Vishal,

The FLINK-28274 <https://issues.apache.org/jira/browse/FLINK-28274> is on
going.  And I think a better solution is that you can migrate your job to
the new FileSource[1], because this issue only occurs when using the legacy
file source(which will be deprecated soon).

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/filesystem

Best,
Lijie

Vishal Surana <vis...@moengage.com> 于2022年7月4日周一 22:27写道:

> Wow! This is bad! I am using reactive mode and this is indeed the issue.
> This should have been urgently patched as jobs with upgraded Flink version
> are in very precarious position. With all the other upgrades (rocksdb,
> etc.) going into 1.15.0 there's no easy rollback.
>
> On Fri, Jul 1, 2022 at 8:14 AM Lijie Wang <wangdachui9...@gmail.com>
> wrote:
>
>> Hi,
>> Are you using the reactive mode? There is a known issue like that:
>> https://issues.apache.org/jira/browse/FLINK-28274
>>
>> Best,
>> Lijie
>>
>> yuxia <luoyu...@alumni.sjtu.edu.cn> 于2022年7月1日周五 09:49写道:
>>
>>> I'm not sure why it happened. But from the Flink source code, it seems
>>> try to restore from an invalid state.  Seems the state actually contains
>>> more that one value, but Flink expected the state should contains one or
>>> zero value.
>>>
>>> Best regards,
>>> Yuxia
>>>
>>> ------------------------------
>>> *发件人: *"Vishal Surana" <vis...@moengage.com>
>>> *收件人: *"User" <user@flink.apache.org>
>>> *发送时间: *星期五, 2022年 7 月 01日 上午 5:28:07
>>> *主题: *ContinuousFileMonitoringFunction retrieved invalid state.
>>>
>>> My job is unable to restore state after savepoint due to the following
>>> exception. Seems to be a rare exception as I haven't found any forum
>>> discussing it. Please advise.
>>>
>>> java.lang.IllegalArgumentException: ContinuousFileMonitoringFunction
>>> retrieved invalid state.
>>> at
>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.initializeState(ContinuousFileMonitoringFunction.java:167)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at
>>> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>>> ~[job-0.14-SNAPSHOT.jar:0.15-SNAPSHOT]
>>> at java.lang.Thread.run(Thread.java:829) ~[?:?]
>>>
>>> --
>>> Regards,
>>> Vishal
>>>
>>>
>
> --
> Regards,
> Vishal
>

Reply via email to