Hello Aljoscha,
Thank you for your reply.
But I believe, reading from the docs, that any user function has to be a
Rich Function, if it wishes to have state.
Now any Rich Function cannot be used or accepted on a Window.
For instances looking at flink source version 1.1.3 which is the one I'm
currently using, on the class WindowedStream.java we find the following
snippet.
"
public <R> SingleOutputStreamOperator<R> apply(R initialValue,
FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function,
TypeInformation<R> resultType) {
if (foldFunction instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction of
apply can not be a RichFunction.");
}
if (windowAssigner instanceof MergingWindowAssigner) {
throw new UnsupportedOperationException("Fold cannot be
used with a merging WindowAssigner.");
}
...
"
Now I can see that window operator creates a FoldDescriptor, as you have
said it uses the APIs you have described.
However I can't see how everything fits.
For instances the Count class here described which can only extend a
FoldFunction and not a RichFoldFunction, how does flink keeps track of
the accumulator ?
Because from my tests it seems that it does not.
Everytime the program/stream/job is restart the accumulator start from
the Initial Value.
Kind Regards,
Daniel Santos
On 11/04/2016 11:01 AM, Aljoscha Krettek wrote:
Hi Daniel,
Flink will checkpoint the state of all operations (in your case to
HDFS). Flink has several APIs for dealing with state in user
functions:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html The
window operator also internally uses these APIs.
Let me know if you need anything more.
Cheers,
Aljoscha
On Thu, 3 Nov 2016 at 19:43 Daniel Santos <dsan...@cryptolab.net
<mailto:dsan...@cryptolab.net>> wrote:
Hello,
I have some question that has been bugging me.
Let's say we have a Kafka Source.
Checkpoint is enabled, with a period of 5 seconds.
We have a FSBackend ( Hadoop ).
Now imagine we have a window a tumbling of 10 Minutes.
For simplicity we are going to say that we are counting all elements
arrinving in 10 Minutes. Something like this.
class Count extends FoldFunction[Event, Long] {
override def fold(accumulator: Long, value: Event): Long = {
accumulator + 1
}
}
So we have
source.
window(<Tumbling>).
apply(0, Count(), WindowFunction())
In the first 2 Minutes arrives 10 events, then we stop the
stream/task/job or it fails and then it is restarted, what will be the
state of the fold function ?
Will it be 10 and it will resume from there ? Or will it be 0 ?
It is kinda important to know because imagine we have a Window of
1 day.
And the job fails mid day. How will it resume ?
Best Regards