Hi Hangxiang and Martijn,

Thanks for the tips! That state processor API looks interesting, I'll have
to dig into that more. The point about the query plan makes sense, but it
also means that using SQL in my job end-to-end is a little dangerous if I
need to guarantee that I'll never have to dump state.

So it sounds like if I had retained checkpoints enabled and used DataStream
for the source/sink with SQL in the middle, then I could've added some
filtering using the DataStream API before the SQL (so the query plan
shouldn't change) and deployed the new job by restoring either the retained
checkpoint or savepoint without losing anything?

Thanks,

Tim


On Thu, Dec 22, 2022 at 7:45 AM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Tim,
>
> > Our job happens to be stateless, so we're okay this time, but if we had
> used state (like joining two streams or something) we would end up losing
> data to fix this bug. Is the only solution to just use the DataStream API?
>
> In case you have a change in your SQL statement, then yes you would
> lose your state. With the DataStream API you can work around that problem,
> because you define explicitly define your execution plan. With SQL, the
> planner generates and optimizes your statement into an optimized execution
> plan. If you change that, your plan can change and then any state backup
> can become incompatible. It depends on the change in SQL of course.
>
> > But my main concern is that when an error occurs, if we haven't
> savepointed recently, we're not going to be able to change anything about
> the job without losing state. Is the answer just... always savepoint
> frequently so you never have this issue?
>
> If you don't snapshot your state and you need to change anything (or if
> your Flink cluster restarts or fails for whatever reason and you need to
> restart), then your state is gone. Do check out the production readiness
> checklist for some things that you should consider when going to production
> with a Flink application. See
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/production_ready/#choose-the-right-checkpoint-interval
>
> Best regards,
>
> Martijn
>
> On Thu, Dec 22, 2022 at 8:50 AM Hangxiang Yu <master...@gmail.com> wrote:
>
>> Hi Tim.
>> > Is the only solution to just use the DataStream API?
>> Just as Martijn mentioned, if the execution plan has been changed, it's
>> difficult to reuse the original state to restore.
>> Only if you are dropping some operators, then you could use --
>> allowNonRestoredState to restore withouting dropping all states.
>> Otherwise, maybe State Processor API[2] could help to modify the
>> snapshots to match the newest execution plan. But it may cost a lot of
>> effort.
>>
>> > But my main concern is that when an error occurs, if we haven't
>> savepointed recently, we're not going to be able to change anything about
>> the job without losing state.
>> I think "retained checkpoints" could help you. Flink will always keep the
>> recent checkpoint even if the job fails or stops when you configure it.
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#allowing-non-restored-state
>> [2]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>> [3]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints/#retained-checkpoints
>>
>> On Thu, Dec 22, 2022 at 11:01 AM Timothy Bess <tdbga...@gmail.com> wrote:
>>
>>> Hi Martijn,
>>>
>>> Sorry I didn't see your response! Basically we had a bad event that was
>>> blowing up our python UDF, so we wanted to change the SQL to add a where
>>> clause that filters out the event to mitigate the issue. Our job happens to
>>> be stateless, so we're okay this time, but if we had used state (like
>>> joining two streams or something) we would end up losing data to fix this
>>> bug. Is the only solution to just use the DataStream API?
>>>
>>> But my main concern is that when an error occurs, if we haven't
>>> savepointed recently, we're not going to be able to change anything about
>>> the job without losing state. Is the answer just... always savepoint
>>> frequently so you never have this issue? Just a little concerned about
>>> small oversights causing us to have to dump state in our jobs later.
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>> On Fri, Dec 16, 2022 at 3:13 AM Martijn Visser <martijnvis...@apache.org>
>>> wrote:
>>>
>>>> Hi Tim,
>>>>
>>>> If I understand correctly, you need to deploy a new SQL statement in
>>>> order to fix your issue? If so, the problem is that a new SQL statement
>>>> might lead to a different execution plan which can't be restored. See
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#state-management
>>>> for more details on this topic.
>>>>
>>>> Best regards,
>>>>
>>>> Martijn
>>>>
>>>> On Fri, Dec 16, 2022 at 12:34 AM Timothy Bess <tdbga...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> We have a pyflink/SQL job that has a bug that we fixed and are trying
>>>>> to deploy. Here's the issue though. The job successfully restores from the
>>>>> checkpoint, but has no recent savepoints. We can't seem to get it to 
>>>>> accept
>>>>> our new SQL unless we savepoint/restore, but we can't trigger a savepoint
>>>>> since our bug is crashing the job.
>>>>>
>>>>> How do people generally get around issues like this without losing all
>>>>> Flink state? It seems weird that I'd have to lose my Flink state
>>>>> considering that I can successfully restore the checkpoint. I must be
>>>>> missing something here.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Tim
>>>>>
>>>>
>>
>> --
>> Best,
>> Hangxiang.
>>
>

Reply via email to