OK, my first intuition would be some kind of misuse of the state API. Other guess would be, has any checkpoint completed prior triggering of the re-scaling event?
I'll also try to verify the scenario you've described, but these would be the things that I'd check first. D. On Fri, Jan 7, 2022 at 10:46 AM Martin <mar...@sonicdev.de> wrote: > Hello David, > > right now I cant share the complete code. But I will try in some days to > simplify it and reduce the code to still trigger the issue. > > First I will check, if the explict keyBy instead of the > reinterpretAsKeyedStream fix the issue. > If yes, that would assume - for me - that its a bug with > reinterpretAsKeyedStream and the elastic scaling. > If no, its probably another issue caused by my code, instead of Flink. > > BR > Martin > > David Morávek schrieb am 07.01.2022 10:22 (GMT +01:00): > > Would you be able share the code of your test by any chance? > > Best, > D. > > On Fri, Jan 7, 2022 at 10:06 AM Martin <mar...@sonicdev.de> wrote: > >> Hello David, >> >> I have a test setup, where the input is all the time the same. >> After processing, I check all the output if each sequence number ist just >> used once. >> >> Another output field is a random UUID generated on startup of a Task (in >> the open-method of the (c)-keyed process function). >> In the output I saw, that the sequence number started at 1 again on the >> same time when the scaling happend and the change of the UUID fitted also >> to that. >> >> Some output: >> >> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":2 >> >> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":3 >> >> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":4 >> >> "key1":151978,"taskid":"a406fb52-5f0c-11ec-bea3-0255ac100029","recordSequenceNumber":5 >> >> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047",,"recordSequenceNumber":1 >> >> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":3 >> >> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":2 >> >> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":4 >> >> "key1":151978,"taskid":"68731ce4-5f11-11ec-a5cd-0255ac100047","recordSequenceNumber":5 >> >> The percentage is then the number of output records which uses a already >> given sequence number (for each key1) compared to all output records. >> >> >> Right now I change the flink job so, that instead of the >> reinterpretAsKeyedStream it has an explict keyBy before function (c) again. >> I will check if this will fix the issue in my job and test setup. >> >> BR >> Martin >> >> >> >> David Morávek schrieb am 07.01.2022 09:37 (GMT +01:00): >> >> Hi Martin, >> >> _reinterpretAsKeyedStream_ should definitely work with the reactive mode, >> if it doesn't it's a bug that needs to be fixed >> >>> For test use cases (3) and (4) the state of the keyed process function >>> (c) seems only available for around 50% of the events processed after >>> scale-in/fail. >>> >> Can you please provide details on how did you verify this? >> >> Best, >> D. >> >> On Fri, Jan 7, 2022 at 8:10 AM Martin <mar...@sonicdev.de> wrote: >> >>> Hi, >>> >>> typo: "I run that job via Native Kubernetes deployment and use elastic >>> scaling in reactive mode." >>> -> I run it of course via standalone kubernetes deployment, to make >>> elastic scaling possible. >>> >>> BR >>> Martin >>> >>> >>> >>> mar...@sonicdev.de schrieb am 06.01.2022 21:38 (GMT +01:00): >>> >>> Hi, >>> >>> I have a job where I do a keyBy'd process function (a), then on the >>> downstream a normal process function (b) and and then I use >>> reinterpretAsKeyedStream to have yet another keyBy'd process function (c). >>> >>> The last keyed process function use keyed state for a increasing >>> sequence number. >>> >>> I run that job via Native Kubernetes deployment and use elastic scaling >>> in reactive mode. >>> >>> I use Flink 1.14.2. >>> >>> I test that job on four use cases: (1) static parallelism, (2) scale >>> out, (3) scale-in, (4) task manager file*. >>> >>> * via kill -SIGTERM inside the container for the flink JVM >>> >>> For test use cases (1) and (2) everything is fine. >>> >>> For test use cases (3) and (4) the state of the keyed process function >>> (c) seems only available for around 50% of the events processed after >>> scale-in/fail. >>> >>> Is the reinterpretAsKeyedStream feature in general usable with Elastic >>> Scaling in Reactive Mode in Flink 1.14.2? >>> >>> If yes, already any ideas what the root cause could be? >>> >>> >>> >>> BR >>> Martin >>> >>> >>> >>>