Hello David,

I have a test setup, where the input is all the time the same.
After processing, I check all the output if each sequence number ist just used once.

Another output field is a random UUID generated on startup of a Task (in the open-method of the (c)-keyed process function).
In the output I saw, that the sequence number started at 1 again on the same time when the scaling happend and the change of the UUID fitted also to that.

Some output:

"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":2
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":3
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":4
"key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":5
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047",,"recordSequenceNumber":1
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":3
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":2
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":4
"key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":5
 
The percentage is then the number of output records which uses a already given sequence number (for each key1) compared to all output records.
 

Right now I change the flink job so, that instead of the reinterpretAsKeyedStream it has an explict keyBy before function (c) again.
I will check if this will fix the issue in my job and test setup.

BR
Martin

 

David Morávek schrieb am 07.01.2022 09:37 (GMT +01:00):

Hi Martin,
 
_reinterpretAsKeyedStream_ should definitely work with the reactive mode, if it doesn't it's a bug that needs to be fixed

For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.

Can you please provide details on how did you verify this?
 
Best,
D.

On Fri, Jan 7, 2022 at 8:10 AM Martin <mar...@sonicdev.de> wrote:

Hi,

typo: "I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode."
-> I run it of course via standalone kubernetes deployment, to make elastic scaling possible.

BR
Martin

 

mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00):

Hi,

I have a job where I do a keyBy'd process function (a), then on the downstream a normal process function (b) and and then I use reinterpretAsKeyedStream to have yet another keyBy'd process function (c).

The last keyed process function use keyed state for a increasing sequence number.

I run that job via Native Kubernetes deployment and use elastic scaling in reactive mode.

I use Flink 1.14.2.

I test that job on four use cases: (1) static parallelism, (2) scale out, (3) scale-in, (4) task manager file*.

* via kill -SIGTERM inside the container for the flink JVM

For test use cases (1) and (2) everything is fine.

For test use cases (3) and (4) the state of the keyed process function (c) seems only available for around 50% of the events processed after scale-in/fail.

Is the reinterpretAsKeyedStream feature in general usable with Elastic Scaling in Reactive Mode in Flink 1.14.2?

If yes, already any ideas what the root cause could be?

 

BR
Martin

 

Reply via email to