Hi again,

I realized it's easy to create a reproducible example, see this specific
commit:

https://github.com/asardaes/test-flink-state-processor/commit/95e65f88fd1e38bcba63ebca68e3128789c0d2f2

When I run that application, I see the following output:

Savepoint created
KEY=GenericServiceCompositeKey(serviceId=X, countryCode=BAR)
Why is this null?

So a key is missing, and the key that was written has a null state.

Regards,
Alexis.

Am Mi., 31. Juli 2024 um 15:45 Uhr schrieb Alexis Sarda-Espinosa <
sarda.espin...@gmail.com>:

> Hi Matthias,
>
> This indeed compiles, I am able to actually generate a savepoint, it's
> just that all the window states in that savepoint appear to be null. The
> second argument of withOperator(...) is specified via
> OperatorTransformation...aggregate(), so the final transformation is built
> by WindowedStateTransformation#aggregate().
>
> I don't have any special logic with timers or even multiple events per
> key, in fact, my "stateToMigrate" already contains a single state instance
> for each key of interest, so my AggregateFunctionForMigration simply
> returns "value" in its add() method, no other logic there.
>
> Regards,
> Alexis.
>
> Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias <
> matthias.schwa...@viseca.ch>:
>
>> Hi Alexis,
>>
>>
>>
>> Just a couple of points to double-check:
>>
>>    - Does your code compile? (the second argument of withOperator(..)
>>    should derive StateBootstrapTransformation<T> instead of
>>    SingleOutputStreamOperator<T>)
>>    - From the documentation of savepoint API you’ll find examples for
>>    each type of state
>>    - Your preparation needs to generate events that within your
>>    StateBootstrapTransformation impementation get set into state primitives
>>    much the same as you would do with a normal streaming operator
>>    - Please note that a savepoint api job always runs in batch-mode,
>>    hence
>>       - Keyed events are processed in key order first and the in time
>>       order
>>       - Triggers will only be fired after processing all events of a
>>       respective key are processed
>>       - Semantics are therefore slightly different as for streaming
>>       timers
>>
>>
>>
>> Hope that helps 😊
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>> *From:* Alexis Sarda-Espinosa <sarda.espin...@gmail.com>
>> *Sent:* Monday, July 29, 2024 9:18 PM
>> *To:* user <user@flink.apache.org>
>> *Subject:* Using state processor for a custom windowed aggregate function
>>
>>
>>
>> Hello,
>>
>>
>>
>> I am trying to create state for an aggregate function that is used with a
>> GlobalWindow. This basically looks like:
>>
>>
>>
>> savepointWriter.withOperator(
>>     OperatorIdentifier.forUid(UID),
>>     OperatorTransformation.bootstrapWith(stateToMigrate)
>>         .keyBy(...)
>>         .window(GlobalWindows.create())
>>         .aggregate(new AggregateFunctionForMigration())
>> )
>>
>> The job runs correctly and writes a savepoint, but if I then read the
>> savepoint I just created and load the state for that UID, the "elements"
>> iterable in the WindowReaderFunction's readWindow() method has a non-zero
>> size, but every element is null.
>>
>>
>>
>> I've tried specifying a custom trigger between window() and aggregate(),
>> always returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.
>>
>>
>>
>> Am I missing something?
>>
>>
>>
>> Regards,
>>
>> Alexis.
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>

Reply via email to