Hello David,

right now I cant share the complete code. But I will try in some days to simplify it and reduce the code to still trigger the issue.

First I will check, if the explict keyBy instead of the reinterpretAsKeyedStream  fix the issue.
If yes, that would assume - for me - that its a bug with reinterpretAsKeyedStream and the elastic scaling.
If no, its probably another issue caused by my code, instead of Flink.


David Morávek schrieb am 07.01.2022 10:22 (GMT +01:00):

Would you be able share the code of your test by any chance?

On Fri, Jan 7, 2022 at 10:06 AM Martin <mar...@sonicdev.de> wrote:

Hello David,

I have a test setup, where the input is all the time the same.
After processing, I check all the output if each sequence number ist just used once.

Another output field is a random UUID generated on startup of a Task (in the open-method of the (c)-keyed process function).
In the output I saw, that the sequence number started at 1 again on the same time when the scaling happend and the change of the UUID fitted also to that.

Some output:

The percentage is then the number of output records which uses a already given sequence number (for each key1) compared to all output records.

Right now I change the flink job so, that instead of the reinterpretAsKeyedStream it has an explict keyBy before function (c) again.
I will check if this will fix the issue in my job and test setup.



David Morávek schrieb am 07.01.2022 09:37 (GMT +01:00):

Hi Martin,
_reinterpretAsKeyedStream_ should definitely work with the reactive mode, if it doesn't it's a bug that needs to be fixed

For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.

Can you please provide details on how did you verify this?

On Fri, Jan 7, 2022 at 8:10 AM Martin <mar...@sonicdev.de> wrote:


typo: "I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode."
-> I run it of course via standalone kubernetes deployment, to make elastic scaling possible.



mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00):


I have a job where I do a keyBy'd process function (a), then on the downstream a normal process function (b) and and then I use reinterpretAsKeyedStream to have yet another keyBy'd process function (c).

The last keyed process function use keyed state for a increasing sequence number.

I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode.

I use Flink 1.14.2.

I test that job on four use cases: (1) static parallelism, (2) scale out, (3) scale-in, (4) task manager file*.

* via kill -SIGTERM inside the container for the flink JVM

For test use cases (1) and (2) everything is fine.

For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.

Is the reinterpretAsKeyedStream feature in general usable with Elastic Scaling in Reactive Mode in Flink 1.14.2?

If yes, already any ideas what the root cause could be?




Reply via email to