OK, my first intuition would be some kind of misuse of the state API. Other
guess would be, has any checkpoint completed prior triggering of the
re-scaling event?

I'll also try to verify the scenario you've described, but these would be
the things that I'd check first.

D.

On Fri, Jan 7, 2022 at 10:46 AM Martin <mar...@sonicdev.de> wrote:

> Hello David,
>
> right now I cant share the complete code. But I will try in some days to
> simplify it and reduce the code to still trigger the issue.
>
> First I will check, if the explict keyBy instead of the
> reinterpretAsKeyedStream  fix the issue.
> If yes, that would assume - for me - that its a bug with
> reinterpretAsKeyedStream and the elastic scaling.
> If no, its probably another issue caused by my code, instead of Flink.
>
> BR
> Martin
>
> David Morávek schrieb am 07.01.2022 10:22 (GMT +01:00):
>
> Would you be able share the code of your test by any chance?
>
> Best,
> D.
>
> On Fri, Jan 7, 2022 at 10:06 AM Martin <mar...@sonicdev.de> wrote:
>
>> Hello David,
>>
>> I have a test setup, where the input is all the time the same.
>> After processing, I check all the output if each sequence number ist just
>> used once.
>>
>> Another output field is a random UUID generated on startup of a Task (in
>> the open-method of the (c)-keyed process function).
>> In the output I saw, that the sequence number started at 1 again on the
>> same time when the scaling happend and the change of the UUID fitted also
>> to that.
>>
>> Some output:
>>
>> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":2
>>
>> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":3
>>
>> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":4
>>
>> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":5
>>
>> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047",,"recordSequenceNumber":1
>>
>> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":3
>>
>> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":2
>>
>> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":4
>>
>> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":5
>>
>> The percentage is then the number of output records which uses a already
>> given sequence number (for each key1) compared to all output records.
>>
>>
>> Right now I change the flink job so, that instead of the
>> reinterpretAsKeyedStream it has an explict keyBy before function (c) again.
>> I will check if this will fix the issue in my job and test setup.
>>
>> BR
>> Martin
>>
>>
>>
>> David Morávek schrieb am 07.01.2022 09:37 (GMT +01:00):
>>
>> Hi Martin,
>>
>> _reinterpretAsKeyedStream_ should definitely work with the reactive mode,
>> if it doesn't it's a bug that needs to be fixed
>>
>>> For test use cases (3) and (4) the state of the keyed process function
>>> (c) seems only available for around 50% of the events processed after
>>> scale-in/fail.
>>>
>> Can you please provide details on how did you verify this?
>>
>> Best,
>> D.
>>
>> On Fri, Jan 7, 2022 at 8:10 AM Martin <mar...@sonicdev.de> wrote:
>>
>>> Hi,
>>>
>>> typo: "I run that job via Native Kubernetes deployment and use elastic
>>> scaling in reactive mode."
>>> -> I run it of course via standalone kubernetes deployment, to make
>>> elastic scaling possible.
>>>
>>> BR
>>> Martin
>>>
>>>
>>>
>>> mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00):
>>>
>>> Hi,
>>>
>>> I have a job where I do a keyBy'd process function (a), then on the
>>> downstream a normal process function (b) and and then I use
>>> reinterpretAsKeyedStream to have yet another keyBy'd process function (c).
>>>
>>> The last keyed process function use keyed state for a increasing
>>> sequence number.
>>>
>>> I run that job via Native Kubernetes deployment and use elastic scaling
>>> in reactive mode.
>>>
>>> I use Flink 1.14.2.
>>>
>>> I test that job on four use cases: (1) static parallelism, (2) scale
>>> out, (3) scale-in, (4) task manager file*.
>>>
>>> * via kill -SIGTERM inside the container for the flink JVM
>>>
>>> For test use cases (1) and (2) everything is fine.
>>>
>>> For test use cases (3) and (4) the state of the keyed process function
>>> (c) seems only available for around 50% of the events processed after
>>> scale-in/fail.
>>>
>>> Is the reinterpretAsKeyedStream feature in general usable with Elastic
>>> Scaling in Reactive Mode in Flink 1.14.2?
>>>
>>> If yes, already any ideas what the root cause could be?
>>>
>>>
>>>
>>> BR
>>> Martin
>>>
>>>
>>>
>>>

Reply via email to