Re: Design documents for consolidated DataStream API

2015-07-14 Thread Gyula Fóra
I think Marton has some good points here.

1) Is KeyedDataStream a better name if this is only a renaming?

2) the discretize semantics is unclear indeed. Are we operating on a single
or sequence of datasets? If the latter why not call it something else
(dstream). How are joins and other binary operators defined for different
discretizations etc.
On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi  wrote:

> Generally I agree with the new design. Two concerns:
>
> 1) Does KeyedDataStream replace GroupedDataStream or is it the latter a
> special case of the former?
>
> The KeyedDataStream as described in the design document is a bit unclear
> for me. It lists the following usages:
>   a) It is the first step in building a window stream, on top of which the
> grouped/windowed aggregation and reduce-style function can be applied
>   b) It allows to use the "by-key" state of functions. Here, every record
> has access to a state that is scoped by its key. Key-scoped state can be
> automatically redistributed and repartitioned.
>
> The code snippet describes a use case where the computation and the access
> of the state is used the way currently the GroupedDataStream should work. I
> suppose this is the example for case b). Would case a) also window elements
> by key? If yes, then this is practically a renaming and enhancement of the
> GroupedDataStream functionality with keyed state. Then the
> StreamExecutionEnvironment.createKeyedStream(Partitioner,
> KeySelector)construction does not make much sense as the user only operates
> within the scope of the keyselector and not the partitioner anyway.
>
> I personally think KeyedDataStream as a name does not necessarily suggest
> that the records are grouped by key, it only suggests partitioning by key -
> at least for me. :)
>
> 2) The API for discretization is not convenient IMHO
>
> The discretization part declares that the output of DataStream.discretize()
> is a sequence of DataSets. I love this approach, but then in the code
> snippet the return value of this function is simply a DataSet and uses it
> as such. The take home message of that code is the following: this is
> actually the way you would like to program on these sequence of DataSets,
> most probably you would like to do the same with each of them. If that is
> the case we should provide a nice utility for that. I think Spark
> Streaming's DStream.foreachRDD() is fairly useful for this purpose.
>
> On Mon, Jul 13, 2015 at 6:30 PM, Gyula Fóra  wrote:
>
> > +1
> > On Mon, Jul 13, 2015 at 6:23 PM Stephan Ewen  wrote:
> >
> > > If naming is the only concern, then we should go ahead, because we can
> > > change names easily (before the release).
> > >
> > > In fact, I don't think it leaves a bad impression. Global windows are
> > > non-parallel windows. There are also parallel windows. Pick what you
> need
> > > and what works.
> > >
> > >
> > > On Mon, Jul 13, 2015 at 6:13 PM, Gyula Fóra 
> > wrote:
> > >
> > > > I think we agree on everything its more of a naming issue :)
> > > >
> > > > I thought it might be misleading that global time windows are
> > > > "non-parallel" windows. We dont want to give a bad impression. (Also
> we
> > > > dont want them to think that every global window is parallel but
> thats
> > > not
> > > > a problem here)
> > > >
> > > > Gyula
> > > > On Mon, Jul 13, 2015 at 5:22 PM Stephan Ewen 
> wrote:
> > > >
> > > > > Okay, what is missing about the windowing in your opinion?
> > > > >
> > > > > The core points of the document are:
> > > > >
> > > > >   - The parallel windows are per group only.
> > > > >
> > > > >   - The implementation of the parallel windows holds window data in
> > the
> > > > > group buffers.
> > > > >
> > > > >   - The global windows are non-parallel. May have parallel
> > > > pre-aggregation,
> > > > > if they are time windows.
> > > > >
> > > > >   - Time may be operator time (timer thread), or watermark time.
> > > > Watermark
> > > > > time can refer to ingress or event time.
> > > > >
> > > > >   - Windows that do not pre-aggregate may require elements in
> order.
> > > Not
> > > > > part of the first prototype.
> > > > >
> > > > > Do we agree on those points?
> > > > >
> > > > >
> > > > > On Mon, Jul 13, 2015 at 4:50 PM, Gyula Fóra 
> > > > wrote:
> > > > >
> > > > > > In general I like it, although the main difference between the
> > > current
> > > > > and
> > > > > > the new one is the windowing and that is still not very clear.
> > > > > >
> > > > > > Where do we have the full stream time windows for instance?(which
> > is
> > > > > > parallel but not keyed)
> > > > > > On Mon, Jul 13, 2015 at 4:28 PM Aljoscha Krettek <
> > > aljos...@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > > > +1 I like it as well.
> > > > > > >
> > > > > > > On Mon, 13 Jul 2015 at 16:17 Kostas Tzoumas <
> ktzou...@apache.org
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > +1 from my side
> > > > > > > >
> > > > > > > > On Mon, Jul 13, 2015 at 4:15 PM, Stephan Ewen 

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Stephan Ewen
Concerning your comments:

1) In the new design, there is no grouping without windowing. The
KeyedDataStream subsumes the grouping and key-ing for partitioned state.

The keyBy() + window() makes a parallel grouped window
keyBy() alone allows access to partitioned state.

My thought was that this is simpler, because it needs not groupBy() and
keyBy(), but one construct to handle both cases.

2) The discretization is a rough thought and is nothing for the short term.
It totally needs more thoughts. I put it there to have it as a sketch for
how to evolve this.

The idea is of course to not have a single data set, but a series of
data set. In each discrete time slice, the data set can be treated like a
regular data set.

Let's kick off a separate design for the discretization. Joins are good
to talk about (data sets can be joined with data set), and I am sure there
are more questions coming up.


Does that make sense?





On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra  wrote:

> I think Marton has some good points here.
>
> 1) Is KeyedDataStream a better name if this is only a renaming?
>
> 2) the discretize semantics is unclear indeed. Are we operating on a single
> or sequence of datasets? If the latter why not call it something else
> (dstream). How are joins and other binary operators defined for different
> discretizations etc.
> On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi 
> wrote:
>
> > Generally I agree with the new design. Two concerns:
> >
> > 1) Does KeyedDataStream replace GroupedDataStream or is it the latter a
> > special case of the former?
> >
> > The KeyedDataStream as described in the design document is a bit unclear
> > for me. It lists the following usages:
> >   a) It is the first step in building a window stream, on top of which
> the
> > grouped/windowed aggregation and reduce-style function can be applied
> >   b) It allows to use the "by-key" state of functions. Here, every record
> > has access to a state that is scoped by its key. Key-scoped state can be
> > automatically redistributed and repartitioned.
> >
> > The code snippet describes a use case where the computation and the
> access
> > of the state is used the way currently the GroupedDataStream should
> work. I
> > suppose this is the example for case b). Would case a) also window
> elements
> > by key? If yes, then this is practically a renaming and enhancement of
> the
> > GroupedDataStream functionality with keyed state. Then the
> > StreamExecutionEnvironment.createKeyedStream(Partitioner,
> > KeySelector)construction does not make much sense as the user only
> operates
> > within the scope of the keyselector and not the partitioner anyway.
> >
> > I personally think KeyedDataStream as a name does not necessarily suggest
> > that the records are grouped by key, it only suggests partitioning by
> key -
> > at least for me. :)
> >
> > 2) The API for discretization is not convenient IMHO
> >
> > The discretization part declares that the output of
> DataStream.discretize()
> > is a sequence of DataSets. I love this approach, but then in the code
> > snippet the return value of this function is simply a DataSet and uses it
> > as such. The take home message of that code is the following: this is
> > actually the way you would like to program on these sequence of DataSets,
> > most probably you would like to do the same with each of them. If that is
> > the case we should provide a nice utility for that. I think Spark
> > Streaming's DStream.foreachRDD() is fairly useful for this purpose.
> >
> > On Mon, Jul 13, 2015 at 6:30 PM, Gyula Fóra 
> wrote:
> >
> > > +1
> > > On Mon, Jul 13, 2015 at 6:23 PM Stephan Ewen  wrote:
> > >
> > > > If naming is the only concern, then we should go ahead, because we
> can
> > > > change names easily (before the release).
> > > >
> > > > In fact, I don't think it leaves a bad impression. Global windows are
> > > > non-parallel windows. There are also parallel windows. Pick what you
> > need
> > > > and what works.
> > > >
> > > >
> > > > On Mon, Jul 13, 2015 at 6:13 PM, Gyula Fóra 
> > > wrote:
> > > >
> > > > > I think we agree on everything its more of a naming issue :)
> > > > >
> > > > > I thought it might be misleading that global time windows are
> > > > > "non-parallel" windows. We dont want to give a bad impression.
> (Also
> > we
> > > > > dont want them to think that every global window is parallel but
> > thats
> > > > not
> > > > > a problem here)
> > > > >
> > > > > Gyula
> > > > > On Mon, Jul 13, 2015 at 5:22 PM Stephan Ewen 
> > wrote:
> > > > >
> > > > > > Okay, what is missing about the windowing in your opinion?
> > > > > >
> > > > > > The core points of the document are:
> > > > > >
> > > > > >   - The parallel windows are per group only.
> > > > > >
> > > > > >   - The implementation of the parallel windows holds window data
> in
> > > the
> > > > > > group buffers.
> > > > > >
> > > > > >   - The global windows are non-parallel. May have parallel
> > > 

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Gyula Fóra
If we only want to have either keyBy or groupBy, why not keep groupBy? That
would be more consistent with the batch api.
On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen  wrote:

> Concerning your comments:
>
> 1) In the new design, there is no grouping without windowing. The
> KeyedDataStream subsumes the grouping and key-ing for partitioned state.
>
> The keyBy() + window() makes a parallel grouped window
> keyBy() alone allows access to partitioned state.
>
> My thought was that this is simpler, because it needs not groupBy() and
> keyBy(), but one construct to handle both cases.
>
> 2) The discretization is a rough thought and is nothing for the short term.
> It totally needs more thoughts. I put it there to have it as a sketch for
> how to evolve this.
>
> The idea is of course to not have a single data set, but a series of
> data set. In each discrete time slice, the data set can be treated like a
> regular data set.
>
> Let's kick off a separate design for the discretization. Joins are good
> to talk about (data sets can be joined with data set), and I am sure there
> are more questions coming up.
>
>
> Does that make sense?
>
>
>
>
>
> On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra  wrote:
>
> > I think Marton has some good points here.
> >
> > 1) Is KeyedDataStream a better name if this is only a renaming?
> >
> > 2) the discretize semantics is unclear indeed. Are we operating on a
> single
> > or sequence of datasets? If the latter why not call it something else
> > (dstream). How are joins and other binary operators defined for different
> > discretizations etc.
> > On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi 
> > wrote:
> >
> > > Generally I agree with the new design. Two concerns:
> > >
> > > 1) Does KeyedDataStream replace GroupedDataStream or is it the latter a
> > > special case of the former?
> > >
> > > The KeyedDataStream as described in the design document is a bit
> unclear
> > > for me. It lists the following usages:
> > >   a) It is the first step in building a window stream, on top of which
> > the
> > > grouped/windowed aggregation and reduce-style function can be applied
> > >   b) It allows to use the "by-key" state of functions. Here, every
> record
> > > has access to a state that is scoped by its key. Key-scoped state can
> be
> > > automatically redistributed and repartitioned.
> > >
> > > The code snippet describes a use case where the computation and the
> > access
> > > of the state is used the way currently the GroupedDataStream should
> > work. I
> > > suppose this is the example for case b). Would case a) also window
> > elements
> > > by key? If yes, then this is practically a renaming and enhancement of
> > the
> > > GroupedDataStream functionality with keyed state. Then the
> > > StreamExecutionEnvironment.createKeyedStream(Partitioner,
> > > KeySelector)construction does not make much sense as the user only
> > operates
> > > within the scope of the keyselector and not the partitioner anyway.
> > >
> > > I personally think KeyedDataStream as a name does not necessarily
> suggest
> > > that the records are grouped by key, it only suggests partitioning by
> > key -
> > > at least for me. :)
> > >
> > > 2) The API for discretization is not convenient IMHO
> > >
> > > The discretization part declares that the output of
> > DataStream.discretize()
> > > is a sequence of DataSets. I love this approach, but then in the code
> > > snippet the return value of this function is simply a DataSet and uses
> it
> > > as such. The take home message of that code is the following: this is
> > > actually the way you would like to program on these sequence of
> DataSets,
> > > most probably you would like to do the same with each of them. If that
> is
> > > the case we should provide a nice utility for that. I think Spark
> > > Streaming's DStream.foreachRDD() is fairly useful for this purpose.
> > >
> > > On Mon, Jul 13, 2015 at 6:30 PM, Gyula Fóra 
> > wrote:
> > >
> > > > +1
> > > > On Mon, Jul 13, 2015 at 6:23 PM Stephan Ewen 
> wrote:
> > > >
> > > > > If naming is the only concern, then we should go ahead, because we
> > can
> > > > > change names easily (before the release).
> > > > >
> > > > > In fact, I don't think it leaves a bad impression. Global windows
> are
> > > > > non-parallel windows. There are also parallel windows. Pick what
> you
> > > need
> > > > > and what works.
> > > > >
> > > > >
> > > > > On Mon, Jul 13, 2015 at 6:13 PM, Gyula Fóra 
> > > > wrote:
> > > > >
> > > > > > I think we agree on everything its more of a naming issue :)
> > > > > >
> > > > > > I thought it might be misleading that global time windows are
> > > > > > "non-parallel" windows. We dont want to give a bad impression.
> > (Also
> > > we
> > > > > > dont want them to think that every global window is parallel but
> > > thats
> > > > > not
> > > > > > a problem here)
> > > > > >
> > > > > > Gyula
> > > > > > On Mon, Jul 13, 2015 at 5:22 PM Stephan Ewen 
> > > wrote:
> > >

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Stephan Ewen
keyBy() does not do any grouping. Grouping in streams in not defined
without windows.

On Tue, Jul 14, 2015 at 10:48 AM, Gyula Fóra  wrote:

> If we only want to have either keyBy or groupBy, why not keep groupBy? That
> would be more consistent with the batch api.
> On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen  wrote:
>
> > Concerning your comments:
> >
> > 1) In the new design, there is no grouping without windowing. The
> > KeyedDataStream subsumes the grouping and key-ing for partitioned state.
> >
> > The keyBy() + window() makes a parallel grouped window
> > keyBy() alone allows access to partitioned state.
> >
> > My thought was that this is simpler, because it needs not groupBy()
> and
> > keyBy(), but one construct to handle both cases.
> >
> > 2) The discretization is a rough thought and is nothing for the short
> term.
> > It totally needs more thoughts. I put it there to have it as a sketch for
> > how to evolve this.
> >
> > The idea is of course to not have a single data set, but a series of
> > data set. In each discrete time slice, the data set can be treated like a
> > regular data set.
> >
> > Let's kick off a separate design for the discretization. Joins are
> good
> > to talk about (data sets can be joined with data set), and I am sure
> there
> > are more questions coming up.
> >
> >
> > Does that make sense?
> >
> >
> >
> >
> >
> > On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra 
> wrote:
> >
> > > I think Marton has some good points here.
> > >
> > > 1) Is KeyedDataStream a better name if this is only a renaming?
> > >
> > > 2) the discretize semantics is unclear indeed. Are we operating on a
> > single
> > > or sequence of datasets? If the latter why not call it something else
> > > (dstream). How are joins and other binary operators defined for
> different
> > > discretizations etc.
> > > On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi 
> > > wrote:
> > >
> > > > Generally I agree with the new design. Two concerns:
> > > >
> > > > 1) Does KeyedDataStream replace GroupedDataStream or is it the
> latter a
> > > > special case of the former?
> > > >
> > > > The KeyedDataStream as described in the design document is a bit
> > unclear
> > > > for me. It lists the following usages:
> > > >   a) It is the first step in building a window stream, on top of
> which
> > > the
> > > > grouped/windowed aggregation and reduce-style function can be applied
> > > >   b) It allows to use the "by-key" state of functions. Here, every
> > record
> > > > has access to a state that is scoped by its key. Key-scoped state can
> > be
> > > > automatically redistributed and repartitioned.
> > > >
> > > > The code snippet describes a use case where the computation and the
> > > access
> > > > of the state is used the way currently the GroupedDataStream should
> > > work. I
> > > > suppose this is the example for case b). Would case a) also window
> > > elements
> > > > by key? If yes, then this is practically a renaming and enhancement
> of
> > > the
> > > > GroupedDataStream functionality with keyed state. Then the
> > > > StreamExecutionEnvironment.createKeyedStream(Partitioner,
> > > > KeySelector)construction does not make much sense as the user only
> > > operates
> > > > within the scope of the keyselector and not the partitioner anyway.
> > > >
> > > > I personally think KeyedDataStream as a name does not necessarily
> > suggest
> > > > that the records are grouped by key, it only suggests partitioning by
> > > key -
> > > > at least for me. :)
> > > >
> > > > 2) The API for discretization is not convenient IMHO
> > > >
> > > > The discretization part declares that the output of
> > > DataStream.discretize()
> > > > is a sequence of DataSets. I love this approach, but then in the code
> > > > snippet the return value of this function is simply a DataSet and
> uses
> > it
> > > > as such. The take home message of that code is the following: this is
> > > > actually the way you would like to program on these sequence of
> > DataSets,
> > > > most probably you would like to do the same with each of them. If
> that
> > is
> > > > the case we should provide a nice utility for that. I think Spark
> > > > Streaming's DStream.foreachRDD() is fairly useful for this purpose.
> > > >
> > > > On Mon, Jul 13, 2015 at 6:30 PM, Gyula Fóra 
> > > wrote:
> > > >
> > > > > +1
> > > > > On Mon, Jul 13, 2015 at 6:23 PM Stephan Ewen 
> > wrote:
> > > > >
> > > > > > If naming is the only concern, then we should go ahead, because
> we
> > > can
> > > > > > change names easily (before the release).
> > > > > >
> > > > > > In fact, I don't think it leaves a bad impression. Global windows
> > are
> > > > > > non-parallel windows. There are also parallel windows. Pick what
> > you
> > > > need
> > > > > > and what works.
> > > > > >
> > > > > >
> > > > > > On Mon, Jul 13, 2015 at 6:13 PM, Gyula Fóra <
> gyula.f...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > I think we agree on everything its more of 

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Stephan Ewen
It is not a bit different than the batch API, because streaming semantics
are a bit different ;-)

One good thing is that we can make things better that were sub-optimal in
the Batch API.

On Tue, Jul 14, 2015 at 10:55 AM, Stephan Ewen  wrote:

> keyBy() does not do any grouping. Grouping in streams in not defined
> without windows.
>
> On Tue, Jul 14, 2015 at 10:48 AM, Gyula Fóra  wrote:
>
>> If we only want to have either keyBy or groupBy, why not keep groupBy?
>> That
>> would be more consistent with the batch api.
>> On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen  wrote:
>>
>> > Concerning your comments:
>> >
>> > 1) In the new design, there is no grouping without windowing. The
>> > KeyedDataStream subsumes the grouping and key-ing for partitioned state.
>> >
>> > The keyBy() + window() makes a parallel grouped window
>> > keyBy() alone allows access to partitioned state.
>> >
>> > My thought was that this is simpler, because it needs not groupBy()
>> and
>> > keyBy(), but one construct to handle both cases.
>> >
>> > 2) The discretization is a rough thought and is nothing for the short
>> term.
>> > It totally needs more thoughts. I put it there to have it as a sketch
>> for
>> > how to evolve this.
>> >
>> > The idea is of course to not have a single data set, but a series of
>> > data set. In each discrete time slice, the data set can be treated like
>> a
>> > regular data set.
>> >
>> > Let's kick off a separate design for the discretization. Joins are
>> good
>> > to talk about (data sets can be joined with data set), and I am sure
>> there
>> > are more questions coming up.
>> >
>> >
>> > Does that make sense?
>> >
>> >
>> >
>> >
>> >
>> > On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra 
>> wrote:
>> >
>> > > I think Marton has some good points here.
>> > >
>> > > 1) Is KeyedDataStream a better name if this is only a renaming?
>> > >
>> > > 2) the discretize semantics is unclear indeed. Are we operating on a
>> > single
>> > > or sequence of datasets? If the latter why not call it something else
>> > > (dstream). How are joins and other binary operators defined for
>> different
>> > > discretizations etc.
>> > > On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi 
>> > > wrote:
>> > >
>> > > > Generally I agree with the new design. Two concerns:
>> > > >
>> > > > 1) Does KeyedDataStream replace GroupedDataStream or is it the
>> latter a
>> > > > special case of the former?
>> > > >
>> > > > The KeyedDataStream as described in the design document is a bit
>> > unclear
>> > > > for me. It lists the following usages:
>> > > >   a) It is the first step in building a window stream, on top of
>> which
>> > > the
>> > > > grouped/windowed aggregation and reduce-style function can be
>> applied
>> > > >   b) It allows to use the "by-key" state of functions. Here, every
>> > record
>> > > > has access to a state that is scoped by its key. Key-scoped state
>> can
>> > be
>> > > > automatically redistributed and repartitioned.
>> > > >
>> > > > The code snippet describes a use case where the computation and the
>> > > access
>> > > > of the state is used the way currently the GroupedDataStream should
>> > > work. I
>> > > > suppose this is the example for case b). Would case a) also window
>> > > elements
>> > > > by key? If yes, then this is practically a renaming and enhancement
>> of
>> > > the
>> > > > GroupedDataStream functionality with keyed state. Then the
>> > > > StreamExecutionEnvironment.createKeyedStream(Partitioner,
>> > > > KeySelector)construction does not make much sense as the user only
>> > > operates
>> > > > within the scope of the keyselector and not the partitioner anyway.
>> > > >
>> > > > I personally think KeyedDataStream as a name does not necessarily
>> > suggest
>> > > > that the records are grouped by key, it only suggests partitioning
>> by
>> > > key -
>> > > > at least for me. :)
>> > > >
>> > > > 2) The API for discretization is not convenient IMHO
>> > > >
>> > > > The discretization part declares that the output of
>> > > DataStream.discretize()
>> > > > is a sequence of DataSets. I love this approach, but then in the
>> code
>> > > > snippet the return value of this function is simply a DataSet and
>> uses
>> > it
>> > > > as such. The take home message of that code is the following: this
>> is
>> > > > actually the way you would like to program on these sequence of
>> > DataSets,
>> > > > most probably you would like to do the same with each of them. If
>> that
>> > is
>> > > > the case we should provide a nice utility for that. I think Spark
>> > > > Streaming's DStream.foreachRDD() is fairly useful for this purpose.
>> > > >
>> > > > On Mon, Jul 13, 2015 at 6:30 PM, Gyula Fóra 
>> > > wrote:
>> > > >
>> > > > > +1
>> > > > > On Mon, Jul 13, 2015 at 6:23 PM Stephan Ewen 
>> > wrote:
>> > > > >
>> > > > > > If naming is the only concern, then we should go ahead, because
>> we
>> > > can
>> > > > > > change names easily (before the release).
>> > > > > >
>> >

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Aljoscha Krettek
I agree, the groupBy, in the batch API is misleading, since a
ds.groupBy().reduce() does not really build any groups, it is really a
ds.keyBy().reduceByKey(). In the streaming API we can still fix this, IMHO.

On Tue, 14 Jul 2015 at 10:56 Stephan Ewen  wrote:

> It is not a bit different than the batch API, because streaming semantics
> are a bit different ;-)
>
> One good thing is that we can make things better that were sub-optimal in
> the Batch API.
>
> On Tue, Jul 14, 2015 at 10:55 AM, Stephan Ewen  wrote:
>
> > keyBy() does not do any grouping. Grouping in streams in not defined
> > without windows.
> >
> > On Tue, Jul 14, 2015 at 10:48 AM, Gyula Fóra 
> wrote:
> >
> >> If we only want to have either keyBy or groupBy, why not keep groupBy?
> >> That
> >> would be more consistent with the batch api.
> >> On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen  wrote:
> >>
> >> > Concerning your comments:
> >> >
> >> > 1) In the new design, there is no grouping without windowing. The
> >> > KeyedDataStream subsumes the grouping and key-ing for partitioned
> state.
> >> >
> >> > The keyBy() + window() makes a parallel grouped window
> >> > keyBy() alone allows access to partitioned state.
> >> >
> >> > My thought was that this is simpler, because it needs not
> groupBy()
> >> and
> >> > keyBy(), but one construct to handle both cases.
> >> >
> >> > 2) The discretization is a rough thought and is nothing for the short
> >> term.
> >> > It totally needs more thoughts. I put it there to have it as a sketch
> >> for
> >> > how to evolve this.
> >> >
> >> > The idea is of course to not have a single data set, but a series
> of
> >> > data set. In each discrete time slice, the data set can be treated
> like
> >> a
> >> > regular data set.
> >> >
> >> > Let's kick off a separate design for the discretization. Joins are
> >> good
> >> > to talk about (data sets can be joined with data set), and I am sure
> >> there
> >> > are more questions coming up.
> >> >
> >> >
> >> > Does that make sense?
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra 
> >> wrote:
> >> >
> >> > > I think Marton has some good points here.
> >> > >
> >> > > 1) Is KeyedDataStream a better name if this is only a renaming?
> >> > >
> >> > > 2) the discretize semantics is unclear indeed. Are we operating on a
> >> > single
> >> > > or sequence of datasets? If the latter why not call it something
> else
> >> > > (dstream). How are joins and other binary operators defined for
> >> different
> >> > > discretizations etc.
> >> > > On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi  >
> >> > > wrote:
> >> > >
> >> > > > Generally I agree with the new design. Two concerns:
> >> > > >
> >> > > > 1) Does KeyedDataStream replace GroupedDataStream or is it the
> >> latter a
> >> > > > special case of the former?
> >> > > >
> >> > > > The KeyedDataStream as described in the design document is a bit
> >> > unclear
> >> > > > for me. It lists the following usages:
> >> > > >   a) It is the first step in building a window stream, on top of
> >> which
> >> > > the
> >> > > > grouped/windowed aggregation and reduce-style function can be
> >> applied
> >> > > >   b) It allows to use the "by-key" state of functions. Here, every
> >> > record
> >> > > > has access to a state that is scoped by its key. Key-scoped state
> >> can
> >> > be
> >> > > > automatically redistributed and repartitioned.
> >> > > >
> >> > > > The code snippet describes a use case where the computation and
> the
> >> > > access
> >> > > > of the state is used the way currently the GroupedDataStream
> should
> >> > > work. I
> >> > > > suppose this is the example for case b). Would case a) also window
> >> > > elements
> >> > > > by key? If yes, then this is practically a renaming and
> enhancement
> >> of
> >> > > the
> >> > > > GroupedDataStream functionality with keyed state. Then the
> >> > > > StreamExecutionEnvironment.createKeyedStream(Partitioner,
> >> > > > KeySelector)construction does not make much sense as the user only
> >> > > operates
> >> > > > within the scope of the keyselector and not the partitioner
> anyway.
> >> > > >
> >> > > > I personally think KeyedDataStream as a name does not necessarily
> >> > suggest
> >> > > > that the records are grouped by key, it only suggests partitioning
> >> by
> >> > > key -
> >> > > > at least for me. :)
> >> > > >
> >> > > > 2) The API for discretization is not convenient IMHO
> >> > > >
> >> > > > The discretization part declares that the output of
> >> > > DataStream.discretize()
> >> > > > is a sequence of DataSets. I love this approach, but then in the
> >> code
> >> > > > snippet the return value of this function is simply a DataSet and
> >> uses
> >> > it
> >> > > > as such. The take home message of that code is the following: this
> >> is
> >> > > > actually the way you would like to program on these sequence of
> >> > DataSets,
> >> > > > most probably you would like to do the same wi

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Gyula Fóra
I see your point, reduceByKey is much clearer.

The question is whether we want to introduce this inconsistency across the
two api-s or stick with what we have.
On Tue, Jul 14, 2015 at 10:57 AM Aljoscha Krettek 
wrote:

> I agree, the groupBy, in the batch API is misleading, since a
> ds.groupBy().reduce() does not really build any groups, it is really a
> ds.keyBy().reduceByKey(). In the streaming API we can still fix this, IMHO.
>
> On Tue, 14 Jul 2015 at 10:56 Stephan Ewen  wrote:
>
> > It is not a bit different than the batch API, because streaming semantics
> > are a bit different ;-)
> >
> > One good thing is that we can make things better that were sub-optimal in
> > the Batch API.
> >
> > On Tue, Jul 14, 2015 at 10:55 AM, Stephan Ewen  wrote:
> >
> > > keyBy() does not do any grouping. Grouping in streams in not defined
> > > without windows.
> > >
> > > On Tue, Jul 14, 2015 at 10:48 AM, Gyula Fóra 
> > wrote:
> > >
> > >> If we only want to have either keyBy or groupBy, why not keep groupBy?
> > >> That
> > >> would be more consistent with the batch api.
> > >> On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen 
> wrote:
> > >>
> > >> > Concerning your comments:
> > >> >
> > >> > 1) In the new design, there is no grouping without windowing. The
> > >> > KeyedDataStream subsumes the grouping and key-ing for partitioned
> > state.
> > >> >
> > >> > The keyBy() + window() makes a parallel grouped window
> > >> > keyBy() alone allows access to partitioned state.
> > >> >
> > >> > My thought was that this is simpler, because it needs not
> > groupBy()
> > >> and
> > >> > keyBy(), but one construct to handle both cases.
> > >> >
> > >> > 2) The discretization is a rough thought and is nothing for the
> short
> > >> term.
> > >> > It totally needs more thoughts. I put it there to have it as a
> sketch
> > >> for
> > >> > how to evolve this.
> > >> >
> > >> > The idea is of course to not have a single data set, but a
> series
> > of
> > >> > data set. In each discrete time slice, the data set can be treated
> > like
> > >> a
> > >> > regular data set.
> > >> >
> > >> > Let's kick off a separate design for the discretization. Joins
> are
> > >> good
> > >> > to talk about (data sets can be joined with data set), and I am sure
> > >> there
> > >> > are more questions coming up.
> > >> >
> > >> >
> > >> > Does that make sense?
> > >> >
> > >> >
> > >> >
> > >> >
> > >> >
> > >> > On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra 
> > >> wrote:
> > >> >
> > >> > > I think Marton has some good points here.
> > >> > >
> > >> > > 1) Is KeyedDataStream a better name if this is only a renaming?
> > >> > >
> > >> > > 2) the discretize semantics is unclear indeed. Are we operating
> on a
> > >> > single
> > >> > > or sequence of datasets? If the latter why not call it something
> > else
> > >> > > (dstream). How are joins and other binary operators defined for
> > >> different
> > >> > > discretizations etc.
> > >> > > On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi <
> mbala...@apache.org
> > >
> > >> > > wrote:
> > >> > >
> > >> > > > Generally I agree with the new design. Two concerns:
> > >> > > >
> > >> > > > 1) Does KeyedDataStream replace GroupedDataStream or is it the
> > >> latter a
> > >> > > > special case of the former?
> > >> > > >
> > >> > > > The KeyedDataStream as described in the design document is a bit
> > >> > unclear
> > >> > > > for me. It lists the following usages:
> > >> > > >   a) It is the first step in building a window stream, on top of
> > >> which
> > >> > > the
> > >> > > > grouped/windowed aggregation and reduce-style function can be
> > >> applied
> > >> > > >   b) It allows to use the "by-key" state of functions. Here,
> every
> > >> > record
> > >> > > > has access to a state that is scoped by its key. Key-scoped
> state
> > >> can
> > >> > be
> > >> > > > automatically redistributed and repartitioned.
> > >> > > >
> > >> > > > The code snippet describes a use case where the computation and
> > the
> > >> > > access
> > >> > > > of the state is used the way currently the GroupedDataStream
> > should
> > >> > > work. I
> > >> > > > suppose this is the example for case b). Would case a) also
> window
> > >> > > elements
> > >> > > > by key? If yes, then this is practically a renaming and
> > enhancement
> > >> of
> > >> > > the
> > >> > > > GroupedDataStream functionality with keyed state. Then the
> > >> > > > StreamExecutionEnvironment.createKeyedStream(Partitioner,
> > >> > > > KeySelector)construction does not make much sense as the user
> only
> > >> > > operates
> > >> > > > within the scope of the keyselector and not the partitioner
> > anyway.
> > >> > > >
> > >> > > > I personally think KeyedDataStream as a name does not
> necessarily
> > >> > suggest
> > >> > > > that the records are grouped by key, it only suggests
> partitioning
> > >> by
> > >> > > key -
> > >> > > > at least for me. :)
> > >> > > >
> > >> > > > 2) The API for discretization is not convenient I

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Kostas Tzoumas
I think the though was to explicitly not have the same terminology as the
batch API to not confuse people.

But this is a minor naming issue IMO.

On Tue, Jul 14, 2015 at 12:40 PM, Gyula Fóra  wrote:

> I see your point, reduceByKey is much clearer.
>
> The question is whether we want to introduce this inconsistency across the
> two api-s or stick with what we have.
> On Tue, Jul 14, 2015 at 10:57 AM Aljoscha Krettek 
> wrote:
>
> > I agree, the groupBy, in the batch API is misleading, since a
> > ds.groupBy().reduce() does not really build any groups, it is really a
> > ds.keyBy().reduceByKey(). In the streaming API we can still fix this,
> IMHO.
> >
> > On Tue, 14 Jul 2015 at 10:56 Stephan Ewen  wrote:
> >
> > > It is not a bit different than the batch API, because streaming
> semantics
> > > are a bit different ;-)
> > >
> > > One good thing is that we can make things better that were sub-optimal
> in
> > > the Batch API.
> > >
> > > On Tue, Jul 14, 2015 at 10:55 AM, Stephan Ewen 
> wrote:
> > >
> > > > keyBy() does not do any grouping. Grouping in streams in not defined
> > > > without windows.
> > > >
> > > > On Tue, Jul 14, 2015 at 10:48 AM, Gyula Fóra 
> > > wrote:
> > > >
> > > >> If we only want to have either keyBy or groupBy, why not keep
> groupBy?
> > > >> That
> > > >> would be more consistent with the batch api.
> > > >> On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen 
> > wrote:
> > > >>
> > > >> > Concerning your comments:
> > > >> >
> > > >> > 1) In the new design, there is no grouping without windowing. The
> > > >> > KeyedDataStream subsumes the grouping and key-ing for partitioned
> > > state.
> > > >> >
> > > >> > The keyBy() + window() makes a parallel grouped window
> > > >> > keyBy() alone allows access to partitioned state.
> > > >> >
> > > >> > My thought was that this is simpler, because it needs not
> > > groupBy()
> > > >> and
> > > >> > keyBy(), but one construct to handle both cases.
> > > >> >
> > > >> > 2) The discretization is a rough thought and is nothing for the
> > short
> > > >> term.
> > > >> > It totally needs more thoughts. I put it there to have it as a
> > sketch
> > > >> for
> > > >> > how to evolve this.
> > > >> >
> > > >> > The idea is of course to not have a single data set, but a
> > series
> > > of
> > > >> > data set. In each discrete time slice, the data set can be treated
> > > like
> > > >> a
> > > >> > regular data set.
> > > >> >
> > > >> > Let's kick off a separate design for the discretization. Joins
> > are
> > > >> good
> > > >> > to talk about (data sets can be joined with data set), and I am
> sure
> > > >> there
> > > >> > are more questions coming up.
> > > >> >
> > > >> >
> > > >> > Does that make sense?
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra <
> gyula.f...@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > I think Marton has some good points here.
> > > >> > >
> > > >> > > 1) Is KeyedDataStream a better name if this is only a renaming?
> > > >> > >
> > > >> > > 2) the discretize semantics is unclear indeed. Are we operating
> > on a
> > > >> > single
> > > >> > > or sequence of datasets? If the latter why not call it something
> > > else
> > > >> > > (dstream). How are joins and other binary operators defined for
> > > >> different
> > > >> > > discretizations etc.
> > > >> > > On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi <
> > mbala...@apache.org
> > > >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Generally I agree with the new design. Two concerns:
> > > >> > > >
> > > >> > > > 1) Does KeyedDataStream replace GroupedDataStream or is it the
> > > >> latter a
> > > >> > > > special case of the former?
> > > >> > > >
> > > >> > > > The KeyedDataStream as described in the design document is a
> bit
> > > >> > unclear
> > > >> > > > for me. It lists the following usages:
> > > >> > > >   a) It is the first step in building a window stream, on top
> of
> > > >> which
> > > >> > > the
> > > >> > > > grouped/windowed aggregation and reduce-style function can be
> > > >> applied
> > > >> > > >   b) It allows to use the "by-key" state of functions. Here,
> > every
> > > >> > record
> > > >> > > > has access to a state that is scoped by its key. Key-scoped
> > state
> > > >> can
> > > >> > be
> > > >> > > > automatically redistributed and repartitioned.
> > > >> > > >
> > > >> > > > The code snippet describes a use case where the computation
> and
> > > the
> > > >> > > access
> > > >> > > > of the state is used the way currently the GroupedDataStream
> > > should
> > > >> > > work. I
> > > >> > > > suppose this is the example for case b). Would case a) also
> > window
> > > >> > > elements
> > > >> > > > by key? If yes, then this is practically a renaming and
> > > enhancement
> > > >> of
> > > >> > > the
> > > >> > > > GroupedDataStream functionality with keyed state. Then the
> > > >> > > > StreamExecutionEnvironment.createKeyedStream(Partitioner,
> >

Re: Design documents for consolidated DataStream API

2015-07-14 Thread Stephan Ewen
There is no inconsistency between the Batch and Streaming API. They have
different semantics - the batch API is implicitly always windowed.

There is a naming difference between the two APIs.

There is a strong inconsistency within the Streaming API right now.
Grouping and aggregating without windows is plain dangerous in streaming.
It either blows up or is undefined in its behavior.



On Tue, Jul 14, 2015 at 12:40 PM, Gyula Fóra  wrote:

> I see your point, reduceByKey is much clearer.
>
> The question is whether we want to introduce this inconsistency across the
> two api-s or stick with what we have.
> On Tue, Jul 14, 2015 at 10:57 AM Aljoscha Krettek 
> wrote:
>
> > I agree, the groupBy, in the batch API is misleading, since a
> > ds.groupBy().reduce() does not really build any groups, it is really a
> > ds.keyBy().reduceByKey(). In the streaming API we can still fix this,
> IMHO.
> >
> > On Tue, 14 Jul 2015 at 10:56 Stephan Ewen  wrote:
> >
> > > It is not a bit different than the batch API, because streaming
> semantics
> > > are a bit different ;-)
> > >
> > > One good thing is that we can make things better that were sub-optimal
> in
> > > the Batch API.
> > >
> > > On Tue, Jul 14, 2015 at 10:55 AM, Stephan Ewen 
> wrote:
> > >
> > > > keyBy() does not do any grouping. Grouping in streams in not defined
> > > > without windows.
> > > >
> > > > On Tue, Jul 14, 2015 at 10:48 AM, Gyula Fóra 
> > > wrote:
> > > >
> > > >> If we only want to have either keyBy or groupBy, why not keep
> groupBy?
> > > >> That
> > > >> would be more consistent with the batch api.
> > > >> On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen 
> > wrote:
> > > >>
> > > >> > Concerning your comments:
> > > >> >
> > > >> > 1) In the new design, there is no grouping without windowing. The
> > > >> > KeyedDataStream subsumes the grouping and key-ing for partitioned
> > > state.
> > > >> >
> > > >> > The keyBy() + window() makes a parallel grouped window
> > > >> > keyBy() alone allows access to partitioned state.
> > > >> >
> > > >> > My thought was that this is simpler, because it needs not
> > > groupBy()
> > > >> and
> > > >> > keyBy(), but one construct to handle both cases.
> > > >> >
> > > >> > 2) The discretization is a rough thought and is nothing for the
> > short
> > > >> term.
> > > >> > It totally needs more thoughts. I put it there to have it as a
> > sketch
> > > >> for
> > > >> > how to evolve this.
> > > >> >
> > > >> > The idea is of course to not have a single data set, but a
> > series
> > > of
> > > >> > data set. In each discrete time slice, the data set can be treated
> > > like
> > > >> a
> > > >> > regular data set.
> > > >> >
> > > >> > Let's kick off a separate design for the discretization. Joins
> > are
> > > >> good
> > > >> > to talk about (data sets can be joined with data set), and I am
> sure
> > > >> there
> > > >> > are more questions coming up.
> > > >> >
> > > >> >
> > > >> > Does that make sense?
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra <
> gyula.f...@gmail.com>
> > > >> wrote:
> > > >> >
> > > >> > > I think Marton has some good points here.
> > > >> > >
> > > >> > > 1) Is KeyedDataStream a better name if this is only a renaming?
> > > >> > >
> > > >> > > 2) the discretize semantics is unclear indeed. Are we operating
> > on a
> > > >> > single
> > > >> > > or sequence of datasets? If the latter why not call it something
> > > else
> > > >> > > (dstream). How are joins and other binary operators defined for
> > > >> different
> > > >> > > discretizations etc.
> > > >> > > On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi <
> > mbala...@apache.org
> > > >
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Generally I agree with the new design. Two concerns:
> > > >> > > >
> > > >> > > > 1) Does KeyedDataStream replace GroupedDataStream or is it the
> > > >> latter a
> > > >> > > > special case of the former?
> > > >> > > >
> > > >> > > > The KeyedDataStream as described in the design document is a
> bit
> > > >> > unclear
> > > >> > > > for me. It lists the following usages:
> > > >> > > >   a) It is the first step in building a window stream, on top
> of
> > > >> which
> > > >> > > the
> > > >> > > > grouped/windowed aggregation and reduce-style function can be
> > > >> applied
> > > >> > > >   b) It allows to use the "by-key" state of functions. Here,
> > every
> > > >> > record
> > > >> > > > has access to a state that is scoped by its key. Key-scoped
> > state
> > > >> can
> > > >> > be
> > > >> > > > automatically redistributed and repartitioned.
> > > >> > > >
> > > >> > > > The code snippet describes a use case where the computation
> and
> > > the
> > > >> > > access
> > > >> > > > of the state is used the way currently the GroupedDataStream
> > > should
> > > >> > > work. I
> > > >> > > > suppose this is the example for case b). Would case a) also
> > window
> > > >> > > elements
> > > >> > > > by key? If

[jira] [Created] (FLINK-2354) Recover running jobs on JobManager failure

2015-07-14 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2354:
--

 Summary: Recover running jobs on JobManager failure
 Key: FLINK-2354
 URL: https://issues.apache.org/jira/browse/FLINK-2354
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager
Affects Versions: master
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


tl;dr Persist JobGraphs in state backend and coordinate reference to state 
handle via ZooKeeper.

Problem: When running multiple JobManagers in high availability mode, the 
leading job manager looses all running jobs when it fails. After a new leading 
job manager is elected, it is not possible to recover any previously running 
jobs.

Solution: The leading job manager, which receives the job graph writes 1) the 
job graph to a state backend, and 2) a reference to the respective state handle 
to ZooKeeper. In general, job graphs can become large (multiple MBs, because 
they include closures etc.). ZooKeeper is not designed for data of this size. 
The level of indirection via the reference to the state backend keeps the data 
in ZooKeeper small.

Proposed ZooKeeper layout:

/flink (default)
  +- currentJobs
   +- job id i
+- state handle reference of job graph i

The 'currentJobs' node needs to be persistent to allow recovery of jobs between 
job managers. The currentJobs node needs to satisfy the following invariant: 
There is a reference to a job graph with id i IFF the respective job graph 
needs to be recovered by a newly elected job manager leader.

With this in place, jobs will be recovered from their initial state (as if 
resubmitted). The next step is to backup the runtime state handles of 
checkpoints in a similar manner.

---

This work will be based on [~trohrm...@apache.org]'s implementation of 
FLINK-2291. The leader election service notifies the job manager about 
granted/revoked leadership. This notification happens via Akka and thus 
serially *per* job manager, but results in eventually consistent state between 
job managers. For some snapshots of time it is possible to have a new leader 
granted leadership, before the old one has been revoked its leadership.

[~trohrm...@apache.org], can you confirm that leadership does not guarantee 
mutually exclusive access to the shared 'currentJobs' state?

For example, the following can happen:

- JM 1 is leader, JM 2 is standby
- JOB i is running (and hence /flink/currentJobs/i exists)
- ZK notifies leader election service (LES) of JM 1 and JM 2
- LES 2 immediately notifies JM 2 about granted leadership, but LES 1 
notification revoking leadership takes longer
- JOB i finishes (TMs don't notice leadership change yet) and JM 1 receives 
final JobStatusChange
- JM 2 resubmits the job /flink/currentJobs/i
- JM 1 removes /flink/currentJobs/i, because it is now finished
=> inconsistent state (wrt the specified invariant above)

If it is indeed a problem, we can circumvent this with a Curator recipe for 
[shared locks|http://curator.apache.org/curator-recipes/shared-lock.html] to 
coordinate the access to currentJobs. The lock needs to be acquired on 
leadership.

---

Minimum required tests:
- Unit tests for job graph serialization and writing to state backend and 
ZooKeeper with expected nodes
- Unit tests for job submission to job manager in leader/non-leader state
- Unit tests for leadership granting/revoking and job submission/restarting 
interleavings
- Process failure integration tests with single and multiple running jobs



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2355) Job hanging in collector, waiting for request buffer

2015-07-14 Thread William Saar (JIRA)
William Saar created FLINK-2355:
---

 Summary: Job hanging in collector, waiting for request buffer
 Key: FLINK-2355
 URL: https://issues.apache.org/jira/browse/FLINK-2355
 Project: Flink
  Issue Type: Bug
Affects Versions: master
Reporter: William Saar


Running locally on a machine with 8 threads.

Daemon Thread [Flat Map -> (Filter, Filter -> Flat Map -> Filter -> (Stream 
Sink, Stream Sink)) (6/8)] (Suspended)  
owns: SpanningRecordSerializer  (id=533) 
waited by: Daemon Thread [Thread-173] (Suspended)   
waiting for: ArrayDeque  (id=534)
Object.wait(long) line: not available [native method]   
LocalBufferPool.requestBuffer(boolean) line: 163
LocalBufferPool.requestBufferBlocking() line: 133   
StreamRecordWriter(RecordWriter).emit(T) line: 92 
StreamRecordWriter.emit(T) line: 58  
StreamOutput.collect(OUT) line: 62 
CollectorWrapper.collect(OUT) line: 40 
StreamFilter.processElement(IN) line: 34
OutputHandler$CopyingOperatorCollector.collect(T) line: 278  
CollectorWrapper.collect(OUT) line: 40 
IteratedDataModelOp.lambda$0(Collector, InternalMessage) line: 102 
437981089.accept(Object) line: not available
ArrayList.forEach(Consumer) line: not available   
IteratedDataModelOp.processInput(I, 
Collector>) line: 99   
IteratedDataModelOp.flatMap(MessageWrapper, 
Collector>) line: 70  
IteratedDataModelOp.flatMap(Object, Collector) line: 1 
StreamFlatMap.processElement(IN) line: 35   
OneInputStreamTask.invoke() line: 103   
Task.run() line: 567
Thread.run() line: not available


Daemon Thread [Thread-173] (Suspended)  
waiting for: SpanningRecordSerializer  (id=533)  
owned by: Daemon Thread [Flat Map -> (Filter, Filter -> Flat 
Map -> Filter -> (Stream Sink, Stream Sink)) (6/8)] (Suspended)
waiting for: ArrayDeque  (id=534)
StreamRecordWriter(RecordWriter).flush() line: 149
StreamRecordWriter$OutputFlusher.run() line: 90 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2356) Resource leak in checkpoint coordinator

2015-07-14 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2356:
--

 Summary: Resource leak in checkpoint coordinator
 Key: FLINK-2356
 URL: https://issues.apache.org/jira/browse/FLINK-2356
 Project: Flink
  Issue Type: Bug
  Components: JobManager, Streaming
Affects Versions: 0.9, master
Reporter: Ufuk Celebi
 Fix For: 0.10, 0.9.1


The shutdown method of the checkpoint coordinator is not called when a Flink 
cluster is shutdown via SIGINT. The issue is that the checkpoint coordinator 
shutdown/cleanup is only called after the job enters a final state. This does 
not happen for regular cluster shutdown (via kill). Because we don't have 
proper stopping of streaming jobs, this means that every program using 
checkpointing is suffering from this.

I've tested this only locally for now with a custom WordCount checkpointing the 
current count. When stopping the process, the files still exist. Since this is 
the same mechanism as in a distributed setup with HDFS, this should mean that 
files in HDFS will be lingering around.

The problem is that the postStop method of the JM actor is not called when 
shutting down. The task manager components, which need to do resource cleanup 
register custom shutdown hooks and don't rely on a shutdown call from the task 
manager.

For 0.9.1 we need to make sure that the state is simply cleaned up with a 
shutdown hook (as in the blob manager). For 0.10 with HA we need to be more 
careful and not clean it up when other job manager instances need access. See 
FLINK-2354 for details.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2357) New JobManager Runtime Web Frontend

2015-07-14 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2357:
---

 Summary: New JobManager Runtime Web Frontend
 Key: FLINK-2357
 URL: https://issues.apache.org/jira/browse/FLINK-2357
 Project: Flink
  Issue Type: New Feature
  Components: Webfrontend
Affects Versions: 0.10
Reporter: Stephan Ewen


We need to improve rework the Job Manager Web Frontend.

The current web frontend is limited and has a lot of design issues
  - It does not display and progress while operators are running. This is 
especially problematic for streaming jobs
  - It has no graph representation of the data flows
  - it does not allow to look into execution attempts
  - it has no hook to deal with the upcoming live accumulators
  - The architecture is not very modular/extensible

I propose to add a new JobManager web frontend:
  - Based on Netty HTTP (very lightweight)
  - Using rest-style URLs for jobs and vertices
  - integrating the D3 graph renderer of the previews with the runtime monitor
  - with details on execution attempts
  - first class visualization of records processed and bytes processed



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2358) Add Netty-HTTP based server and server handlers

2015-07-14 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2358:
---

 Summary: Add Netty-HTTP based server and server handlers
 Key: FLINK-2358
 URL: https://issues.apache.org/jira/browse/FLINK-2358
 Project: Flink
  Issue Type: Sub-task
  Components: Webfrontend
Affects Versions: 0.10
Reporter: Stephan Ewen
 Fix For: 0.10






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2359) Add factory methods to the Java TupleX types

2015-07-14 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2359:
--

 Summary: Add factory methods to the Java TupleX types
 Key: FLINK-2359
 URL: https://issues.apache.org/jira/browse/FLINK-2359
 Project: Flink
  Issue Type: Improvement
  Components: Java API
Affects Versions: 0.10
Reporter: Gabor Gevay
Assignee: Gabor Gevay
Priority: Minor


The compiler doesn't infer generic type arguments from constructor arguments, 
which means that we have to call Tuple constructors like this:

Tuple2 = new Tuple2(5, "foo");

I propose adding a factory method, which would provide the following 
alternative:

Tuple2 = Tuple2.create(5, "foo");

(Note that C++ and C# Tuples also have similar factory methods for the same 
reason.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2360) EOFException

2015-07-14 Thread Andra Lungu (JIRA)
Andra Lungu created FLINK-2360:
--

 Summary: EOFException
 Key: FLINK-2360
 URL: https://issues.apache.org/jira/browse/FLINK-2360
 Project: Flink
  Issue Type: Bug
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Andra Lungu
Priority: Critical


The following code:
https://github.com/andralungu/gelly-partitioning/blob/master/src/main/java/example/NodeSplittingConnectedComponents.java

What the code does, on a very high level:
1). Discovers the skewed nodes in a graph and splits them into subnodes, 
recursively, in levels until we achieve a more uniform degree distribution.
2). Creates a delta iteration that takes the split data set as a solution set. 
On this, it runs the Connected Components Algorithm. At the end of each 
superstep, the partial results computed by the subvertices is gathered back 
into the initial vertex, updating the overall value in the split vertices.
3). Once the iteration converged, the graph is brought back to its initial 
state.

Ran on the twitter follower graph: 
http://twitter.mpi-sws.org/data-icwsm2010.html

With a similar configuration to the one in FLINK-2293. 

Fails with: 
Caused by: java.io.EOFException
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition$WriteView.nextSegment(InMemoryPartition.java:333)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.writeByte(AbstractPagedOutputView.java:223)
at 
org.apache.flink.runtime.memorymanager.AbstractPagedOutputView.write(AbstractPagedOutputView.java:173)
at org.apache.flink.types.StringValue.writeString(StringValue.java:796)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:63)
at 
org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116)
at 
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at 
org.apache.flink.runtime.operators.hash.InMemoryPartition.appendRecord(InMemoryPartition.java:219)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.insertOrReplaceRecord(CompactingHashTable.java:536)
at 
org.apache.flink.runtime.operators.hash.CompactingHashTable.buildTableWithUniqueKey(CompactingHashTable.java:347)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.readInitialSolutionSet(IterationHeadPactTask.java:209)
at 
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:270)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:722)

Job Manager log:
https://gist.github.com/andralungu/9fc100603ba8d4b8d686



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2361) flatMap + distict gives eroneous results for big data sets

2015-07-14 Thread Andra Lungu (JIRA)
Andra Lungu created FLINK-2361:
--

 Summary: flatMap + distict gives eroneous results for big data sets
 Key: FLINK-2361
 URL: https://issues.apache.org/jira/browse/FLINK-2361
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Affects Versions: 0.10
Reporter: Andra Lungu


When running the simple Connected Components algorithm (currently in Gelly) on 
the twitter follower graph, with 1, 100 or 1 iterations, I get the 
following error:

Caused by: java.lang.Exception: Target vertex '657282846' does not exist!.
at 
org.apache.flink.graph.spargel.VertexCentricIteration$VertexUpdateUdfSimpleVV.coGroup(VertexCentricIteration.java:300)
at 
org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver.run(CoGroupWithSolutionSetSecondDriver.java:220)
at 
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
at 
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
at 
org.apache.flink.runtime.iterative.task.IterationTailPactTask.run(IterationTailPactTask.java:107)
at 
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:722)

Now this is very bizzare as the DataSet of vertices is produced from the 
DataSet of edges... Which means there cannot be a an edge with an invalid 
target id... The method calls flatMap to isolate the src and trg ids and 
distinct to ensure their uniqueness.  

The algorithm works fine for smaller data sets... 





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2362) distinct is missing in DataSet API documentation

2015-07-14 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-2362:


 Summary: distinct is missing in DataSet API documentation
 Key: FLINK-2362
 URL: https://issues.apache.org/jira/browse/FLINK-2362
 Project: Flink
  Issue Type: Bug
  Components: Documentation, Java API, Scala API
Affects Versions: 0.9, 0.10
Reporter: Fabian Hueske
 Fix For: 0.10, 0.9.1


The DataSet transformation {{distinct}} is not described or listed in the 
documentation. It is not contained in the DataSet API programming guide 
(https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/programming_guide.html)
 and not in the DataSet Transformation 
(https://ci.apache.org/projects/flink/flink-docs-release-0.9/apis/dataset_transformations.html)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Student looking to contribute to Stratosphere

2015-07-14 Thread Rohit Shinde
Hi,

Sorry for the brief hiatus. I was preparing for my GRE exam, but I am back.
I am starting to build Flink and a doubt which I had was, is a single-node
cluster configuration of Hadoop enough? I assume Hadoop is needed since it
is given on the build page.

On Sat, Jun 27, 2015 at 8:02 PM, Chiwan Park  wrote:

> Hi, You can choose any unassigned issue about Flink Machine Learning
> Library (flink-ml) in JIRA. [1]
> There are some issues for starter in flink-ml such as FLINK-1737 [2],
> FLINK-1748 [3], FLINK-1994 [4].
>
> First, It would be better to read some articles about contributing to
> Flink. [5][6]
> And if you decide a issue to contribute, please assign it to you. If you
> don’t have permission to
> assign, just comment into the issue. Then other people give permission to
> you and assign
> the issue to you.
>
> Regards,
> Chiwan Park
>
> [1] https://issues.apache.org/jira/
> [2] https://issues.apache.org/jira/browse/FLINK-1737
> [3] https://issues.apache.org/jira/browse/FLINK-1748
> [4] https://issues.apache.org/jira/browse/FLINK-1994
> [5] http://flink.apache.org/how-to-contribute.html
> [6] http://flink.apache.org/coding-guidelines.html
>
> > On Jun 27, 2015, at 11:20 PM, Rohit Shinde 
> wrote:
> >
> > Hello everyone,
> >
> > I came across Stratosphere while looking for GSOC organisations working
> in
> > Machine Learning. I got to know that it had become Apache Flink.
> >
> > I am interested in this project:
> >
> https://github.com/stratosphere/stratosphere/wiki/Google-Summer-of-Code-2014#implement-one-or-multiple-machine-learning-algorithms-for-stratosphere
> >
> > Backgroundd: I am proficient in C++, Java, Python and Scheme. I have
> taken
> > undergrad courses in machine learning and data mining. How can I
> contribute
> > to the above project?
> >
> > Thank you,
> > Rohit Shinde.
>
>
>
>
>
>


Re: Student looking to contribute to Stratosphere

2015-07-14 Thread Márton Balassi
Hi,

Hadoop is not a necessity for running Flink, but rather an option. Try the
steps of the setup guide. [1]
If you really nee HDFS though to get the best IO performance I would
suggest having Hadoop on all your machines running Flink.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-0.9/quickstart/setup_quickstart.html

On Jul 15, 2015 5:27 AM, "Rohit Shinde"  wrote:

> Hi,
>
> Sorry for the brief hiatus. I was preparing for my GRE exam, but I am back.
> I am starting to build Flink and a doubt which I had was, is a single-node
> cluster configuration of Hadoop enough? I assume Hadoop is needed since it
> is given on the build page.
>
> On Sat, Jun 27, 2015 at 8:02 PM, Chiwan Park 
> wrote:
>
> > Hi, You can choose any unassigned issue about Flink Machine Learning
> > Library (flink-ml) in JIRA. [1]
> > There are some issues for starter in flink-ml such as FLINK-1737 [2],
> > FLINK-1748 [3], FLINK-1994 [4].
> >
> > First, It would be better to read some articles about contributing to
> > Flink. [5][6]
> > And if you decide a issue to contribute, please assign it to you. If you
> > don’t have permission to
> > assign, just comment into the issue. Then other people give permission to
> > you and assign
> > the issue to you.
> >
> > Regards,
> > Chiwan Park
> >
> > [1] https://issues.apache.org/jira/
> > [2] https://issues.apache.org/jira/browse/FLINK-1737
> > [3] https://issues.apache.org/jira/browse/FLINK-1748
> > [4] https://issues.apache.org/jira/browse/FLINK-1994
> > [5] http://flink.apache.org/how-to-contribute.html
> > [6] http://flink.apache.org/coding-guidelines.html
> >
> > > On Jun 27, 2015, at 11:20 PM, Rohit Shinde <
> rohit.shinde12...@gmail.com>
> > wrote:
> > >
> > > Hello everyone,
> > >
> > > I came across Stratosphere while looking for GSOC organisations working
> > in
> > > Machine Learning. I got to know that it had become Apache Flink.
> > >
> > > I am interested in this project:
> > >
> >
> https://github.com/stratosphere/stratosphere/wiki/Google-Summer-of-Code-2014#implement-one-or-multiple-machine-learning-algorithms-for-stratosphere
> > >
> > > Backgroundd: I am proficient in C++, Java, Python and Scheme. I have
> > taken
> > > undergrad courses in machine learning and data mining. How can I
> > contribute
> > > to the above project?
> > >
> > > Thank you,
> > > Rohit Shinde.
> >
> >
> >
> >
> >
> >
>


Re: Design documents for consolidated DataStream API

2015-07-14 Thread Márton Balassi
Ok, thanks for the clarification. Let us try to document it in a way that
those thoughts are reflected then. Discretization will not happen upfront
we can wait with that.

On Tue, Jul 14, 2015 at 12:49 PM, Stephan Ewen  wrote:

> There is no inconsistency between the Batch and Streaming API. They have
> different semantics - the batch API is implicitly always windowed.
>
> There is a naming difference between the two APIs.
>
> There is a strong inconsistency within the Streaming API right now.
> Grouping and aggregating without windows is plain dangerous in streaming.
> It either blows up or is undefined in its behavior.
>
>
>
> On Tue, Jul 14, 2015 at 12:40 PM, Gyula Fóra  wrote:
>
> > I see your point, reduceByKey is much clearer.
> >
> > The question is whether we want to introduce this inconsistency across
> the
> > two api-s or stick with what we have.
> > On Tue, Jul 14, 2015 at 10:57 AM Aljoscha Krettek 
> > wrote:
> >
> > > I agree, the groupBy, in the batch API is misleading, since a
> > > ds.groupBy().reduce() does not really build any groups, it is really a
> > > ds.keyBy().reduceByKey(). In the streaming API we can still fix this,
> > IMHO.
> > >
> > > On Tue, 14 Jul 2015 at 10:56 Stephan Ewen  wrote:
> > >
> > > > It is not a bit different than the batch API, because streaming
> > semantics
> > > > are a bit different ;-)
> > > >
> > > > One good thing is that we can make things better that were
> sub-optimal
> > in
> > > > the Batch API.
> > > >
> > > > On Tue, Jul 14, 2015 at 10:55 AM, Stephan Ewen 
> > wrote:
> > > >
> > > > > keyBy() does not do any grouping. Grouping in streams in not
> defined
> > > > > without windows.
> > > > >
> > > > > On Tue, Jul 14, 2015 at 10:48 AM, Gyula Fóra  >
> > > > wrote:
> > > > >
> > > > >> If we only want to have either keyBy or groupBy, why not keep
> > groupBy?
> > > > >> That
> > > > >> would be more consistent with the batch api.
> > > > >> On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen 
> > > wrote:
> > > > >>
> > > > >> > Concerning your comments:
> > > > >> >
> > > > >> > 1) In the new design, there is no grouping without windowing.
> The
> > > > >> > KeyedDataStream subsumes the grouping and key-ing for
> partitioned
> > > > state.
> > > > >> >
> > > > >> > The keyBy() + window() makes a parallel grouped window
> > > > >> > keyBy() alone allows access to partitioned state.
> > > > >> >
> > > > >> > My thought was that this is simpler, because it needs not
> > > > groupBy()
> > > > >> and
> > > > >> > keyBy(), but one construct to handle both cases.
> > > > >> >
> > > > >> > 2) The discretization is a rough thought and is nothing for the
> > > short
> > > > >> term.
> > > > >> > It totally needs more thoughts. I put it there to have it as a
> > > sketch
> > > > >> for
> > > > >> > how to evolve this.
> > > > >> >
> > > > >> > The idea is of course to not have a single data set, but a
> > > series
> > > > of
> > > > >> > data set. In each discrete time slice, the data set can be
> treated
> > > > like
> > > > >> a
> > > > >> > regular data set.
> > > > >> >
> > > > >> > Let's kick off a separate design for the discretization.
> Joins
> > > are
> > > > >> good
> > > > >> > to talk about (data sets can be joined with data set), and I am
> > sure
> > > > >> there
> > > > >> > are more questions coming up.
> > > > >> >
> > > > >> >
> > > > >> > Does that make sense?
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > On Tue, Jul 14, 2015 at 10:05 AM, Gyula Fóra <
> > gyula.f...@gmail.com>
> > > > >> wrote:
> > > > >> >
> > > > >> > > I think Marton has some good points here.
> > > > >> > >
> > > > >> > > 1) Is KeyedDataStream a better name if this is only a
> renaming?
> > > > >> > >
> > > > >> > > 2) the discretize semantics is unclear indeed. Are we
> operating
> > > on a
> > > > >> > single
> > > > >> > > or sequence of datasets? If the latter why not call it
> something
> > > > else
> > > > >> > > (dstream). How are joins and other binary operators defined
> for
> > > > >> different
> > > > >> > > discretizations etc.
> > > > >> > > On Mon, Jul 13, 2015 at 7:37 PM Márton Balassi <
> > > mbala...@apache.org
> > > > >
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Generally I agree with the new design. Two concerns:
> > > > >> > > >
> > > > >> > > > 1) Does KeyedDataStream replace GroupedDataStream or is it
> the
> > > > >> latter a
> > > > >> > > > special case of the former?
> > > > >> > > >
> > > > >> > > > The KeyedDataStream as described in the design document is a
> > bit
> > > > >> > unclear
> > > > >> > > > for me. It lists the following usages:
> > > > >> > > >   a) It is the first step in building a window stream, on
> top
> > of
> > > > >> which
> > > > >> > > the
> > > > >> > > > grouped/windowed aggregation and reduce-style function can
> be
> > > > >> applied
> > > > >> > > >   b) It allows to use the "by-key" state of functions. Here,
> > > every
> > > > >> > record
> > > > >> > > > ha