Hi Matthias, Thank you for looking into this. That change makes the example work, but my real job still has issues. There is a key difference that might be causing the problem, but that's not so easy to replicate in the example I made.
Essentially, I'm trying to modify the partition key of an operator, so in my real job my bootstrap stream comes from a SavepointReader getting data from an existing UID so that I can key it differently and write it with a different UID. So far so good, however, I'm also trying to modify my state POJO at the same time - the equivalent in my example would be to add a field to the GenericService POJO. I'm guessing this is causing some inconsistency. The reason I think this is that, assuming I debugged the right classes, I can see that the keyGroupIndex that is used when I write the savepoint is not the same as when I read from it afterward to validate; I can actually see the state I bootstrapped in one of the state backend's tables, but since the indices don't correspond, I get an iterator with a null entry. Regards, Alexis. Am Fr., 2. Aug. 2024 um 13:55 Uhr schrieb Schwalbe Matthias < matthias.schwa...@viseca.ch>: > Hi Alexis, > > > > > > I've worked it out: > > > > The input of your com.test.Application.StateReader#readWindow(..., > Iterable<GenericService> elements, ...) is of the > > projection type com.test.Application.AggregateFunctionForMigration: > AggregateFunction<..., OUT = GenericService>. > > I.e. you need to implement > com.test.Application.AggregateFunctionForMigration#getResult e.g. as > > > > @Override > > public GenericService getResult(GenericService accumulator) { > > return accumulator; > > } > > > > If you take a closer look at your call to > org.apache.flink.state.api.WindowSavepointReader#aggregate(...) you'll see > that this is indeed the case: > > > > /** > > * Reads window state generated using an {@link AggregateFunction}. > > * > > * @param uid The uid of the operator. > > * @param aggregateFunction The aggregate function used to create the > window. > > * @param readerFunction The window reader function. > > * @param keyType The key type of the window. > > * @param accType The type information of the accumulator function. > > * @param outputType The output type of the reader function. > > * @param <K> The type of the key. > > * @param <T> The type of the values that are aggregated. > > * @param <ACC> The type of the accumulator (intermediate aggregate > state). > > * @param <R> The type of the aggregated result. > > * @param <OUT> The output type of the reader function. > > * @return A {@code DataStream} of objects read from keyed state. > > * @throws IOException If savepoint does not contain the specified uid. > > */ > > public <K, T, ACC, R, OUT> DataStream<OUT> aggregate( > > String uid, > > AggregateFunction<T, ACC, R> aggregateFunction, > > WindowReaderFunction<R, OUT, K, W> readerFunction, > > TypeInformation<K> keyType, > > TypeInformation<ACC> accType, > > TypeInformation<OUT> outputType) > > throws IOException { > > > > > > Cheers > > > > Thias > > > > PS: will you come to the FlinkForward conference in October in Berlin (to > socialize)? > > > > > > > > > > *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com> > *Sent:* Wednesday, July 31, 2024 3:46 PM > *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch> > *Cc:* user <user@flink.apache.org> > *Subject:* Re: Using state processor for a custom windowed aggregate > function > > > > Hi Matthias, > > > > This indeed compiles, I am able to actually generate a savepoint, it's > just that all the window states in that savepoint appear to be null. The > second argument of withOperator(...) is specified via > OperatorTransformation...aggregate(), so the final transformation is built > by WindowedStateTransformation#aggregate(). > > > > I don't have any special logic with timers or even multiple events per > key, in fact, my "stateToMigrate" already contains a single state instance > for each key of interest, so my AggregateFunctionForMigration simply > returns "value" in its add() method, no other logic there. > > > > Regards, > > Alexis. > > > > Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias < > matthias.schwa...@viseca.ch>: > > Hi Alexis, > > > > Just a couple of points to double-check: > > - Does your code compile? (the second argument of withOperator(..) > should derive StateBootstrapTransformation<T> instead of > SingleOutputStreamOperator<T>) > - From the documentation of savepoint API you’ll find examples for > each type of state > - Your preparation needs to generate events that within your > StateBootstrapTransformation impementation get set into state primitives > much the same as you would do with a normal streaming operator > - Please note that a savepoint api job always runs in batch-mode, hence > > > - Keyed events are processed in key order first and the in time order > - Triggers will only be fired after processing all events of a > respective key are processed > - Semantics are therefore slightly different as for streaming timers > > > > Hope that helps 😊 > > > > Thias > > > > > > > > *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com> > *Sent:* Monday, July 29, 2024 9:18 PM > *To:* user <user@flink.apache.org> > *Subject:* Using state processor for a custom windowed aggregate function > > > > Hello, > > > > I am trying to create state for an aggregate function that is used with a > GlobalWindow. This basically looks like: > > > > savepointWriter.withOperator( > OperatorIdentifier.forUid(UID), > OperatorTransformation.bootstrapWith(stateToMigrate) > .keyBy(...) > .window(GlobalWindows.create()) > .aggregate(new AggregateFunctionForMigration()) > ) > > The job runs correctly and writes a savepoint, but if I then read the > savepoint I just created and load the state for that UID, the "elements" > iterable in the WindowReaderFunction's readWindow() method has a non-zero > size, but every element is null. > > > > I've tried specifying a custom trigger between window() and aggregate(), > always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference. > > > > Am I missing something? > > > > Regards, > > Alexis. > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >