Hi Matthias, This indeed compiles, I am able to actually generate a savepoint, it's just that all the window states in that savepoint appear to be null. The second argument of withOperator(...) is specified via OperatorTransformation...aggregate(), so the final transformation is built by WindowedStateTransformation#aggregate().
I don't have any special logic with timers or even multiple events per key, in fact, my "stateToMigrate" already contains a single state instance for each key of interest, so my AggregateFunctionForMigration simply returns "value" in its add() method, no other logic there. Regards, Alexis. Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias < matthias.schwa...@viseca.ch>: > Hi Alexis, > > > > Just a couple of points to double-check: > > - Does your code compile? (the second argument of withOperator(..) > should derive StateBootstrapTransformation<T> instead of > SingleOutputStreamOperator<T>) > - From the documentation of savepoint API you’ll find examples for > each type of state > - Your preparation needs to generate events that within your > StateBootstrapTransformation impementation get set into state primitives > much the same as you would do with a normal streaming operator > - Please note that a savepoint api job always runs in batch-mode, hence > - Keyed events are processed in key order first and the in time > order > - Triggers will only be fired after processing all events of a > respective key are processed > - Semantics are therefore slightly different as for streaming timers > > > > Hope that helps 😊 > > > > Thias > > > > > > > > *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com> > *Sent:* Monday, July 29, 2024 9:18 PM > *To:* user <user@flink.apache.org> > *Subject:* Using state processor for a custom windowed aggregate function > > > > Hello, > > > > I am trying to create state for an aggregate function that is used with a > GlobalWindow. This basically looks like: > > > > savepointWriter.withOperator( > OperatorIdentifier.forUid(UID), > OperatorTransformation.bootstrapWith(stateToMigrate) > .keyBy(...) > .window(GlobalWindows.create()) > .aggregate(new AggregateFunctionForMigration()) > ) > > The job runs correctly and writes a savepoint, but if I then read the > savepoint I just created and load the state for that UID, the "elements" > iterable in the WindowReaderFunction's readWindow() method has a non-zero > size, but every element is null. > > > > I've tried specifying a custom trigger between window() and aggregate(), > always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference. > > > > Am I missing something? > > > > Regards, > > Alexis. > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >