Hi again,

I finally figured out that the reason the keyGroupIndex appeared different
is because the field I'm adding to my POJO is an enum, and the hash code of
enums is not well-defined/stable in the JVM. I think this was the only
issue...

Regards,
Alexis.

Am Mo., 5. Aug. 2024 um 09:55 Uhr schrieb Schwalbe Matthias <
matthias.schwa...@viseca.ch>:

> Mornin’ Alexis,
>
>
>
> The thing that puzzled me (and I guess you as well) was the switch from
> the accumulator type to the projection type R in the savepoint window
> operator.
>
> I do these things quite often, but I always want to understand
>
>    - what shape of data ends up how in windowed state
>    - the shape before, the shape after
>    - breaking into the trigger functions and the aggregation functions
>    and the going up a couple of call stacks up to the window operator
>    implementation really helps
>    - In your case when you bootstrap from a previous window operator state
>       - The state will consist of previous key x namespace (window) x
>       accumulator types nested
>       - When building up a new migrated window state you’ll need to find
>       a way to
>          - First synthesize (map/flatmap) the new state data element (key
>          x window x accumulator) from the old state such that
>          - you can aggregate it into the new state
>          - (cardinalities could change)
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com>
> *Sent:* Friday, August 2, 2024 7:47 PM
> *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: Using state processor for a custom windowed aggregate
> function
>
>
>
> Hi Matthias,
>
>
>
> Thank you for looking into this. That change makes the example work, but
> my real job still has issues. There is a key difference that might be
> causing the problem, but that's not so easy to replicate in the example I
> made.
>
>
>
> Essentially, I'm trying to modify the partition key of an operator, so in
> my real job my bootstrap stream comes from a SavepointReader getting data
> from an existing UID so that I can key it differently and write it with a
> different UID. So far so good, however, I'm also trying to modify my state
> POJO at the same time - the equivalent in my example would be to add a
> field to the GenericService POJO. I'm guessing this is causing some
> inconsistency.
>
>
>
> The reason I think this is that, assuming I debugged the right classes, I
> can see that the keyGroupIndex that is used when I write the savepoint is
> not the same as when I read from it afterward to validate; I can actually
> see the state I bootstrapped in one of the state backend's tables, but
> since the indices don't correspond, I get an iterator with a null entry.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Am Fr., 2. Aug. 2024 um 13:55 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
> Hi Alexis,
>
>
>
>
>
> I've worked it out:
>
>
>
> The input of your com.test.Application.StateReader#readWindow(...,
> Iterable<GenericService> elements, ...) is of the
>
> projection type com.test.Application.AggregateFunctionForMigration:
> AggregateFunction<..., OUT = GenericService>.
>
> I.e. you need to implement
> com.test.Application.AggregateFunctionForMigration#getResult e.g. as
>
>
>
>         @Override
>
>         public GenericService getResult(GenericService accumulator) {
>
>             return accumulator;
>
>         }
>
>
>
> If you take a closer look at your call to
> org.apache.flink.state.api.WindowSavepointReader#aggregate(...) you'll see
> that this is indeed the case:
>
>
>
>     /**
>
>      * Reads window state generated using an {@link AggregateFunction}.
>
>      *
>
>      * @param uid The uid of the operator.
>
>      * @param aggregateFunction The aggregate function used to create the
> window.
>
>      * @param readerFunction The window reader function.
>
>      * @param keyType The key type of the window.
>
>      * @param accType The type information of the accumulator function.
>
>      * @param outputType The output type of the reader function.
>
>      * @param <K> The type of the key.
>
>      * @param <T> The type of the values that are aggregated.
>
>      * @param <ACC> The type of the accumulator (intermediate aggregate
> state).
>
>      * @param <R> The type of the aggregated result.
>
>      * @param <OUT> The output type of the reader function.
>
>      * @return A {@code DataStream} of objects read from keyed state.
>
>      * @throws IOException If savepoint does not contain the specified uid.
>
>      */
>
>     public <K, T, ACC, R, OUT> DataStream<OUT> aggregate(
>
>             String uid,
>
>             AggregateFunction<T, ACC, R> aggregateFunction,
>
>             WindowReaderFunction<R, OUT, K, W> readerFunction,
>
>             TypeInformation<K> keyType,
>
>             TypeInformation<ACC> accType,
>
>             TypeInformation<OUT> outputType)
>
>             throws IOException {
>
>
>
>
>
> Cheers
>
>
>
> Thias
>
>
>
> PS: will you come to the FlinkForward conference in October in Berlin (to
> socialize)?
>
>
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com>
> *Sent:* Wednesday, July 31, 2024 3:46 PM
> *To:* Schwalbe Matthias <matthias.schwa...@viseca.ch>
> *Cc:* user <user@flink.apache.org>
> *Subject:* Re: Using state processor for a custom windowed aggregate
> function
>
>
>
> Hi Matthias,
>
>
>
> This indeed compiles, I am able to actually generate a savepoint, it's
> just that all the window states in that savepoint appear to be null. The
> second argument of withOperator(...) is specified via
> OperatorTransformation...aggregate(), so the final transformation is built
> by WindowedStateTransformation#aggregate().
>
>
>
> I don't have any special logic with timers or even multiple events per
> key, in fact, my "stateToMigrate" already contains a single state instance
> for each key of interest, so my AggregateFunctionForMigration simply
> returns "value" in its add() method, no other logic there.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
> Hi Alexis,
>
>
>
> Just a couple of points to double-check:
>
>    - Does your code compile? (the second argument of withOperator(..)
>    should derive StateBootstrapTransformation<T> instead of
>    SingleOutputStreamOperator<T>)
>    - From the documentation of savepoint API you’ll find examples for
>    each type of state
>    - Your preparation needs to generate events that within your
>    StateBootstrapTransformation impementation get set into state primitives
>    much the same as you would do with a normal streaming operator
>    - Please note that a savepoint api job always runs in batch-mode, hence
>
>
>    - Keyed events are processed in key order first and the in time order
>       - Triggers will only be fired after processing all events of a
>       respective key are processed
>       - Semantics are therefore slightly different as for streaming timers
>
>
>
> Hope that helps 😊
>
>
>
> Thias
>
>
>
>
>
>
>
> *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com>
> *Sent:* Monday, July 29, 2024 9:18 PM
> *To:* user <user@flink.apache.org>
> *Subject:* Using state processor for a custom windowed aggregate function
>
>
>
> Hello,
>
>
>
> I am trying to create state for an aggregate function that is used with a
> GlobalWindow. This basically looks like:
>
>
>
> savepointWriter.withOperator(
>     OperatorIdentifier.forUid(UID),
>     OperatorTransformation.bootstrapWith(stateToMigrate)
>         .keyBy(...)
>         .window(GlobalWindows.create())
>         .aggregate(new AggregateFunctionForMigration())
> )
>
> The job runs correctly and writes a savepoint, but if I then read the
> savepoint I just created and load the state for that UID, the "elements"
> iterable in the WindowReaderFunction's readWindow() method has a non-zero
> size, but every element is null.
>
>
>
> I've tried specifying a custom trigger between window() and aggregate(),
> always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.
>
>
>
> Am I missing something?
>
>
>
> Regards,
>
> Alexis.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to