Re: Using state processor for a custom windowed aggregate function

2024-08-05 Thread Alexis Sarda-Espinosa
s > > > > > > > > *From:* Alexis Sarda-Espinosa > *Sent:* Friday, August 2, 2024 7:47 PM > *To:* Schwalbe Matthias > *Cc:* user > *Subject:* Re: Using state processor for a custom windowed aggregate > function > > > > Hi Matthias, > > > &

RE: Using state processor for a custom windowed aggregate function

2024-08-05 Thread Schwalbe Matthias
* you can aggregate it into the new state * (cardinalities could change) Thias From: Alexis Sarda-Espinosa Sent: Friday, August 2, 2024 7:47 PM To: Schwalbe Matthias Cc: user Subject: Re: Using state processor for a custom windowed aggregate function Hi Matthias, Thank you for looking

Re: Using state processor for a custom windowed aggregate function

2024-08-02 Thread Alexis Sarda-Espinosa
n, > > TypeInformation keyType, > > TypeInformation accType, > > TypeInformation outputType) > > throws IOException { > > > > > > Cheers > > > > Thias > > > > PS: will you come to the FlinkForward conferen

RE: Using state processor for a custom windowed aggregate function

2024-08-02 Thread Schwalbe Matthias
ation outputType) throws IOException { Cheers Thias PS: will you come to the FlinkForward conference in October in Berlin (to socialize)? From: Alexis Sarda-Espinosa Sent: Wednesday, July 31, 2024 3:46 PM To: Schwalbe Matthias Cc: user Subject: Re: Using state processor for a custom wi

Re: Using state processor for a custom windowed aggregate function

2024-07-31 Thread Alexis Sarda-Espinosa
Hi again, I realized it's easy to create a reproducible example, see this specific commit: https://github.com/asardaes/test-flink-state-processor/commit/95e65f88fd1e38bcba63ebca68e3128789c0d2f2 When I run that application, I see the following output: Savepoint created KEY=GenericServiceComposit

Re: Using state processor for a custom windowed aggregate function

2024-07-31 Thread Alexis Sarda-Espinosa
Hi Matthias, This indeed compiles, I am able to actually generate a savepoint, it's just that all the window states in that savepoint appear to be null. The second argument of withOperator(...) is specified via OperatorTransformation...aggregate(), so the final transformation is built by WindowedS

RE: Using state processor for a custom windowed aggregate function

2024-07-31 Thread Schwalbe Matthias
Hi Alexis, Just a couple of points to double-check: * Does your code compile? (the second argument of withOperator(..) should derive StateBootstrapTransformation instead of SingleOutputStreamOperator) * From the documentation of savepoint API you’ll find examples for each type of state