Hi,
Thank you very much.
I see. Ok it makes sense.
I believe there is kinda catch with parallelism.
Say one does a savepoint and then it changes the parallelism.
I believe the job won't start from the last savepoint is that correct,
on versions ( > 1.2 ), it will start afresh ?
Best Regards,
Daniel Santos
On 11/04/2016 05:54 PM, Aljoscha Krettek wrote:
Hi,
the state of the window is kept by the WindowOperator (which uses the
state descriptor you mentioned to access the state). The FoldFunction
does not itself keep the state but is only used to update the state
inside the WindowOperator, if you will.
When you say restart, are you talking about stopping the job manually
and then restarting? In that case I expect the state to be reset.
Flink will perform checkpoints of the state so that it can recover in
case of failures, these checkpoints, however, don't survive stopping a
job. If you want to persist the state across stopping/restarting you
should look into save points:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/savepoints.html
Cheers,
Aljoscha
On Fri, 4 Nov 2016 at 16:40 Daniel Santos <dsan...@cryptolab.net
<mailto:dsan...@cryptolab.net>> wrote:
Hello Aljoscha,
Thank you for your reply.
But I believe, reading from the docs, that any user function has
to be a Rich Function, if it wishes to have state.
Now any Rich Function cannot be used or accepted on a Window.
For instances looking at flink source version 1.1.3 which is the
one I'm currently using, on the class WindowedStream.java we find
the following snippet.
"
public <R> SingleOutputStreamOperator<R> apply(R initialValue,
FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W>
function, TypeInformation<R> resultType) {
if (foldFunction instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction
of apply can not be a RichFunction.");
}
if (windowAssigner instanceof MergingWindowAssigner) {
throw new UnsupportedOperationException("Fold cannot
be used with a merging WindowAssigner.");
}
...
"
Now I can see that window operator creates a FoldDescriptor, as
you have said it uses the APIs you have described.
However I can't see how everything fits.
For instances the Count class here described which can only extend
a FoldFunction and not a RichFoldFunction, how does flink keeps
track of the accumulator ?
Because from my tests it seems that it does not.
Everytime the program/stream/job is restart the accumulator start
from the Initial Value.
Kind Regards,
Daniel Santos
On 11/04/2016 11:01 AM, Aljoscha Krettek wrote:
Hi Daniel,
Flink will checkpoint the state of all operations (in your case
to HDFS). Flink has several APIs for dealing with state in user
functions:
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html
The
window operator also internally uses these APIs.
Let me know if you need anything more.
Cheers,
Aljoscha
On Thu, 3 Nov 2016 at 19:43 Daniel Santos <dsan...@cryptolab.net
<mailto:dsan...@cryptolab.net>> wrote:
Hello,
I have some question that has been bugging me.
Let's say we have a Kafka Source.
Checkpoint is enabled, with a period of 5 seconds.
We have a FSBackend ( Hadoop ).
Now imagine we have a window a tumbling of 10 Minutes.
For simplicity we are going to say that we are counting all
elements
arrinving in 10 Minutes. Something like this.
class Count extends FoldFunction[Event, Long] {
override def fold(accumulator: Long, value: Event): Long = {
accumulator + 1
}
}
So we have
source.
window(<Tumbling>).
apply(0, Count(), WindowFunction())
In the first 2 Minutes arrives 10 events, then we stop the
stream/task/job or it fails and then it is restarted, what
will be the
state of the fold function ?
Will it be 10 and it will resume from there ? Or will it be 0 ?
It is kinda important to know because imagine we have a
Window of 1 day.
And the job fails mid day. How will it resume ?
Best Regards