Hi again, I realized it's easy to create a reproducible example, see this specific commit:
https://github.com/asardaes/test-flink-state-processor/commit/95e65f88fd1e38bcba63ebca68e3128789c0d2f2 When I run that application, I see the following output: Savepoint created KEY=GenericServiceCompositeKey(serviceId=X, countryCode=BAR) Why is this null? So a key is missing, and the key that was written has a null state. Regards, Alexis. Am Mi., 31. Juli 2024 um 15:45 Uhr schrieb Alexis Sarda-Espinosa < sarda.espin...@gmail.com>: > Hi Matthias, > > This indeed compiles, I am able to actually generate a savepoint, it's > just that all the window states in that savepoint appear to be null. The > second argument of withOperator(...) is specified via > OperatorTransformation...aggregate(), so the final transformation is built > by WindowedStateTransformation#aggregate(). > > I don't have any special logic with timers or even multiple events per > key, in fact, my "stateToMigrate" already contains a single state instance > for each key of interest, so my AggregateFunctionForMigration simply > returns "value" in its add() method, no other logic there. > > Regards, > Alexis. > > Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias < > matthias.schwa...@viseca.ch>: > >> Hi Alexis, >> >> >> >> Just a couple of points to double-check: >> >> - Does your code compile? (the second argument of withOperator(..) >> should derive StateBootstrapTransformation<T> instead of >> SingleOutputStreamOperator<T>) >> - From the documentation of savepoint API you’ll find examples for >> each type of state >> - Your preparation needs to generate events that within your >> StateBootstrapTransformation impementation get set into state primitives >> much the same as you would do with a normal streaming operator >> - Please note that a savepoint api job always runs in batch-mode, >> hence >> - Keyed events are processed in key order first and the in time >> order >> - Triggers will only be fired after processing all events of a >> respective key are processed >> - Semantics are therefore slightly different as for streaming >> timers >> >> >> >> Hope that helps 😊 >> >> >> >> Thias >> >> >> >> >> >> >> >> *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com> >> *Sent:* Monday, July 29, 2024 9:18 PM >> *To:* user <user@flink.apache.org> >> *Subject:* Using state processor for a custom windowed aggregate function >> >> >> >> Hello, >> >> >> >> I am trying to create state for an aggregate function that is used with a >> GlobalWindow. This basically looks like: >> >> >> >> savepointWriter.withOperator( >> OperatorIdentifier.forUid(UID), >> OperatorTransformation.bootstrapWith(stateToMigrate) >> .keyBy(...) >> .window(GlobalWindows.create()) >> .aggregate(new AggregateFunctionForMigration()) >> ) >> >> The job runs correctly and writes a savepoint, but if I then read the >> savepoint I just created and load the state for that UID, the "elements" >> iterable in the WindowReaderFunction's readWindow() method has a non-zero >> size, but every element is null. >> >> >> >> I've tried specifying a custom trigger between window() and aggregate(), >> always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference. >> >> >> >> Am I missing something? >> >> >> >> Regards, >> >> Alexis. >> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >> dieser Informationen ist streng verboten. >> >> This message is intended only for the named recipient and may contain >> confidential or privileged information. As the confidentiality of email >> communication cannot be guaranteed, we do not accept any responsibility for >> the confidentiality and the intactness of this message. If you have >> received it in error, please advise the sender by return e-mail and delete >> this message and any attachments. Any unauthorised use or dissemination of >> this information is strictly prohibited. >> >