Hi Alexis,
I've worked it out:
The input of your com.test.Application.StateReader#readWindow(...,
Iterable<GenericService> elements, ...) is of the
projection type com.test.Application.AggregateFunctionForMigration:
AggregateFunction<..., OUT = GenericService>.
I.e. you need to implement
com.test.Application.AggregateFunctionForMigration#getResult e.g. as
@Override
public GenericService getResult(GenericService accumulator) {
return accumulator;
}
If you take a closer look at your call to
org.apache.flink.state.api.WindowSavepointReader#aggregate(...) you'll see that
this is indeed the case:
/**
* Reads window state generated using an {@link AggregateFunction}.
*
* @param uid The uid of the operator.
* @param aggregateFunction The aggregate function used to create the
window.
* @param readerFunction The window reader function.
* @param keyType The key type of the window.
* @param accType The type information of the accumulator function.
* @param outputType The output type of the reader function.
* @param <K> The type of the key.
* @param <T> The type of the values that are aggregated.
* @param <ACC> The type of the accumulator (intermediate aggregate state).
* @param <R> The type of the aggregated result.
* @param <OUT> The output type of the reader function.
* @return A {@code DataStream} of objects read from keyed state.
* @throws IOException If savepoint does not contain the specified uid.
*/
public <K, T, ACC, R, OUT> DataStream<OUT> aggregate(
String uid,
AggregateFunction<T, ACC, R> aggregateFunction,
WindowReaderFunction<R, OUT, K, W> readerFunction,
TypeInformation<K> keyType,
TypeInformation<ACC> accType,
TypeInformation<OUT> outputType)
throws IOException {
Cheers
Thias
PS: will you come to the FlinkForward conference in October in Berlin (to
socialize)?
From: Alexis Sarda-Espinosa <[email protected]>
Sent: Wednesday, July 31, 2024 3:46 PM
To: Schwalbe Matthias <[email protected]>
Cc: user <[email protected]>
Subject: Re: Using state processor for a custom windowed aggregate function
Hi Matthias,
This indeed compiles, I am able to actually generate a savepoint, it's just
that all the window states in that savepoint appear to be null. The second
argument of withOperator(...) is specified via
OperatorTransformation...aggregate(), so the final transformation is built by
WindowedStateTransformation#aggregate().
I don't have any special logic with timers or even multiple events per key, in
fact, my "stateToMigrate" already contains a single state instance for each key
of interest, so my AggregateFunctionForMigration simply returns "value" in its
add() method, no other logic there.
Regards,
Alexis.
Am Mi., 31. Juli 2024 um 09:11 Uhr schrieb Schwalbe Matthias
<[email protected]<mailto:[email protected]>>:
Hi Alexis,
Just a couple of points to double-check:
* Does your code compile? (the second argument of withOperator(..) should
derive StateBootstrapTransformation<T> instead of SingleOutputStreamOperator<T>)
* From the documentation of savepoint API you’ll find examples for each
type of state
* Your preparation needs to generate events that within your
StateBootstrapTransformation impementation get set into state primitives much
the same as you would do with a normal streaming operator
* Please note that a savepoint api job always runs in batch-mode, hence
* Keyed events are processed in key order first and the in time order
* Triggers will only be fired after processing all events of a
respective key are processed
* Semantics are therefore slightly different as for streaming timers
Hope that helps 😊
Thias
From: Alexis Sarda-Espinosa
<[email protected]<mailto:[email protected]>>
Sent: Monday, July 29, 2024 9:18 PM
To: user <[email protected]<mailto:[email protected]>>
Subject: Using state processor for a custom windowed aggregate function
Hello,
I am trying to create state for an aggregate function that is used with a
GlobalWindow. This basically looks like:
savepointWriter.withOperator(
OperatorIdentifier.forUid(UID),
OperatorTransformation.bootstrapWith(stateToMigrate)
.keyBy(...)
.window(GlobalWindows.create())
.aggregate(new AggregateFunctionForMigration())
)
The job runs correctly and writes a savepoint, but if I then read the savepoint
I just created and load the state for that UID, the "elements" iterable in the
WindowReaderFunction's readWindow() method has a non-zero size, but every
element is null.
I've tried specifying a custom trigger between window() and aggregate(), always
returning FIRE or FIRE_AND_PURGE, but it didn't make any difference.
Am I missing something?
Regards,
Alexis.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.