Hello Aljoscha,

Thank you for your reply.

But I believe, reading from the docs, that any user function has to be a Rich Function, if it wishes to have state.
Now any Rich Function cannot be used or accepted on a Window.

For instances looking at flink source version 1.1.3 which is the one I'm currently using, on the class WindowedStream.java we find the following snippet.

"
public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType) {
        if (foldFunction instanceof RichFunction) {
throw new UnsupportedOperationException("FoldFunction of apply can not be a RichFunction.");
        }
        if (windowAssigner instanceof MergingWindowAssigner) {
throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
        }
...
"

Now I can see that window operator creates a FoldDescriptor, as you have said it uses the APIs you have described.
However I can't see how everything fits.
For instances the Count class here described which can only extend a FoldFunction and not a RichFoldFunction, how does flink keeps track of the accumulator ?
Because from my tests it seems that it does not.

Everytime the program/stream/job is restart the accumulator start from the Initial Value.

Kind Regards,
Daniel Santos


On 11/04/2016 11:01 AM, Aljoscha Krettek wrote:
Hi Daniel,
Flink will checkpoint the state of all operations (in your case to HDFS). Flink has several APIs for dealing with state in user functions: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/state.html The window operator also internally uses these APIs.

Let me know if you need anything more.

Cheers,
Aljoscha

On Thu, 3 Nov 2016 at 19:43 Daniel Santos <dsan...@cryptolab.net <mailto:dsan...@cryptolab.net>> wrote:

    Hello,

    I have some question that has been bugging me.
    Let's say we have a Kafka Source.
    Checkpoint is enabled, with a period of 5 seconds.
    We have a FSBackend ( Hadoop ).

    Now imagine we have a window a tumbling of 10 Minutes.

    For simplicity we are going to say that we are counting all elements
    arrinving in 10 Minutes. Something like this.

    class Count extends FoldFunction[Event, Long] {

        override def fold(accumulator: Long, value: Event): Long =  {
          accumulator + 1
        }

    }

    So we have

    source.
          window(<Tumbling>).
          apply(0, Count(), WindowFunction())

    In the first 2 Minutes arrives 10 events, then we stop the
    stream/task/job or it fails and then it is restarted, what will be the
    state of the fold function ?
    Will it be 10 and it will resume from there ? Or will it be 0 ?

    It is kinda important to know because imagine we have a Window of
    1 day.
    And the job fails mid day. How will it resume ?

    Best Regards


Reply via email to