[jira] [Created] (FLINK-12868) Yarn cluster can not be deployed if plugins dir does not exist

2019-06-17 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-12868:
--

 Summary: Yarn cluster can not be deployed if plugins dir does not 
exist
 Key: FLINK-12868
 URL: https://issues.apache.org/jira/browse/FLINK-12868
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.9.0
Reporter: Piotr Nowojski
Assignee: Piotr Nowojski
 Fix For: 1.9.0



{noformat}
---\\n The program 
finished with the following 
exception:\\n\\norg.apache.flink.client.deployment.ClusterDeploymentException: 
Couldn't deploy Yarn session cluster\\n\\tat 
org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:385)\\n\\tat
 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:616)\\n\\tat
 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$3(FlinkYarnSessionCli.java:844)\\n\\tat
 java.security.AccessController.doPrivileged(Native Method)\\n\\tat 
javax.security.auth.Subject.doAs(Subject.java:422)\\n\\tat 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)\\n\\tat
 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)\\n\\tat
 
org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:844)\\nCaused
 by: 
org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: 
The environment variable 'FLINK_PLUGINS_DIR' is set to '/opt/flink/plugins' but 
the directory doesn't exist.\\n\\tat 
{noformat}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: About Deprecating split/select for DataStream API

2019-06-17 Thread Dawid Wysakowicz
Hi all,

Thank you for starting the discussion. To start with I have to say I am
not entirely against leaving them. On the other hand I totally disagree
that the semantics are clearly defined. Actually the design is
fundamentally flawed.

 1. We use String as a selector for elements. This is not the cleanest
design, but I agree it is not the worst.
 2. Users cannot define different types for different splits.
 3. (The actual reason why I think it's actually better to drop the
split/select and introduce a better mechanism) The behavior of a
split is to actually add an output selector. We can have just a
single selector on a single operator, but the API allows (I would
even say encourages) to create chains of split/select, which leads
to undefined behavior. Take this for example: ds.split().select("a",
"b").select("c", "d"). Which tags should be forwarded? ("a", "b",
"c", "d") (union) or () (intersection). In my opinion the most
obvious answer in this case would be the intersection. Let's modify
it slightly though and I would assume a different behavior (the union)

           splitted = ds.split();

           splitted.select("a", "b").map()

       splitted.select("c", "d").map()

Taking the 3rd argument into consideration I would be in favor of
removing the current mechanism. I think the side outputs serve the
purpose much better with much cleaner semantics. I get the argument that
users are now forced to use processFunction if they want to use the side
outputs. If this is the main problem how about enabling them e.g. for
flatMap as well?

Best,

Dawid

On 17/06/2019 08:51, Jark Wu wrote:
> +1 to keep the split/select API. I think if there are some problems with
> the API, it's better to fix them instead of deprecating them.
> And select/split are straightforward and convenient APIs. It's worth to
> have them.
>
> Regards,
> Jark
>
> On Mon, 17 Jun 2019 at 14:46, vino yang  wrote:
>
>> Hi,
>>
>> I also think it is valuable and reasonable to keep the split/select APIs.
>> They are very convenient and widely used in our platform. I think they are
>> also used in other users' jobs.
>> If the community has doubts about this, IMHO, it would be better to start a
>> user survey.
>>
>> Best,
>> Vino
>>
>> SHI Xiaogang  于2019年6月17日周一 上午11:55写道:
>>
>>> Hi Xingcan,
>>>
>>> Thanks for bringing it up for discusson.
>>>
>>> I agree with you that we should not deprecate the split/select methods.
>>> Their semantics are very clear and they are widely adopted by Flink
>> users.
>>> We should fix these problems instead of simply deprecating the methods.
>>>
>>> Regards,
>>> Xiaogang
>>>
>>> Xingcan Cui  于2019年6月15日周六 下午4:13写道:
>>>
 Hi all,

 Recently, I noticed that the split/select methods in DataStream API
>> have
 been marked as deprecated since 1.7.2 and 1.8.0 (the related JIRA issue
 FLINK-11084 ).

 Although the two methods can be replaced by the more powerful side
>> output
 feature[1], I still doubt whether we should really remove them in the
 future.

 1. From semantics, the split/select is the reverse operation to the
>> union
 transformation. Without them, the DataStream API seems to be missing a
 piece.

 2. From accessibility, the side output only works for process
>> functions,
 which means it forces the user to dive into a lower API.

 According to FLINK-11084 <
 https://issues.apache.org/jira/browse/FLINK-11084>, there exist some
 problems with the current implementation of the two methods. Maybe we
 should fix the problems and re-active them again. Or if they really
>> need
>>> to
 be deprecated, we should at least mark the corresponding documentation
>>> for
 that : )

 What do you think?

 Best,
 Xingcan

 [1]

>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html
 <

>> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html


signature.asc
Description: OpenPGP digital signature


Re: About Deprecating split/select for DataStream API

2019-06-17 Thread SHI Xiaogang
Hi Dawid,

As the select method is only allowed on SplitStreams, it's impossible to
construct the example ds.split().select("a", "b").select("c", "d").

Are you meaning ds.split().select("a", "b").split().select("c", "d")?
If so, then the tagging in the first split operation should not affect the
second one. Then
splitted.select("a", "b") => empty
splitted.select("c", "d") => ds

I cannot quite catch your point here. It's appreciated if you can provide a
more concrete explanation?

Regards,
Xiaogang Shi




Dawid Wysakowicz  于2019年6月17日周一 下午3:10写道:

> Hi all,
>
> Thank you for starting the discussion. To start with I have to say I am
> not entirely against leaving them. On the other hand I totally disagree
> that the semantics are clearly defined. Actually the design is
> fundamentally flawed.
>
>1. We use String as a selector for elements. This is not the cleanest
>design, but I agree it is not the worst.
>2. Users cannot define different types for different splits.
>3. (The actual reason why I think it's actually better to drop the
>split/select and introduce a better mechanism) The behavior of a split is
>to actually add an output selector. We can have just a single selector on a
>single operator, but the API allows (I would even say encourages) to create
>chains of split/select, which leads to undefined behavior. Take this for
>example: ds.split().select("a", "b").select("c", "d"). Which tags should be
>forwarded? ("a", "b", "c", "d") (union) or () (intersection). In my opinion
>the most obvious answer in this case would be the intersection. Let's
>modify it slightly though and I would assume a different behavior (the
>union)
>
>splitted = ds.split();
>
>splitted.select("a", "b").map()
>
>splitted.select("c", "d").map()
>
> Taking the 3rd argument into consideration I would be in favor of removing
> the current mechanism. I think the side outputs serve the purpose much
> better with much cleaner semantics. I get the argument that users are now
> forced to use processFunction if they want to use the side outputs. If this
> is the main problem how about enabling them e.g. for flatMap as well?
>
> Best,
>
> Dawid
> On 17/06/2019 08:51, Jark Wu wrote:
>
> +1 to keep the split/select API. I think if there are some problems with
> the API, it's better to fix them instead of deprecating them.
> And select/split are straightforward and convenient APIs. It's worth to
> have them.
>
> Regards,
> Jark
>
> On Mon, 17 Jun 2019 at 14:46, vino yang  
>  wrote:
>
>
> Hi,
>
> I also think it is valuable and reasonable to keep the split/select APIs.
> They are very convenient and widely used in our platform. I think they are
> also used in other users' jobs.
> If the community has doubts about this, IMHO, it would be better to start a
> user survey.
>
> Best,
> Vino
>
> SHI Xiaogang   于2019年6月17日周一 
> 上午11:55写道:
>
>
> Hi Xingcan,
>
> Thanks for bringing it up for discusson.
>
> I agree with you that we should not deprecate the split/select methods.
> Their semantics are very clear and they are widely adopted by Flink
>
> users.
>
> We should fix these problems instead of simply deprecating the methods.
>
> Regards,
> Xiaogang
>
> Xingcan Cui   于2019年6月15日周六 下午4:13写道:
>
>
> Hi all,
>
> Recently, I noticed that the split/select methods in DataStream API
>
> have
>
> been marked as deprecated since 1.7.2 and 1.8.0 (the related JIRA issue
> FLINK-11084  
> ).
>
> Although the two methods can be replaced by the more powerful side
>
> output
>
> feature[1], I still doubt whether we should really remove them in the
> future.
>
> 1. From semantics, the split/select is the reverse operation to the
>
> union
>
> transformation. Without them, the DataStream API seems to be missing a
> piece.
>
> 2. From accessibility, the side output only works for process
>
> functions,
>
> which means it forces the user to dive into a lower API.
>
> According to FLINK-11084 <
> https://issues.apache.org/jira/browse/FLINK-11084> 
> , there exist some
> problems with the current implementation of the two methods. Maybe we
> should fix the problems and re-active them again. Or if they really
>
> need
>
> to
>
> be deprecated, we should at least mark the corresponding documentation
>
> for
>
> that : )
>
> What do you think?
>
> Best,
> Xingcan
>
> [1]
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html
>
> <
>
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/side_output.html
>
>


[jira] [Created] (FLINK-12869) Add yarn acls capability to flink containers

2019-06-17 Thread Nicolas Fraison (JIRA)
Nicolas Fraison created FLINK-12869:
---

 Summary: Add yarn acls capability to flink containers
 Key: FLINK-12869
 URL: https://issues.apache.org/jira/browse/FLINK-12869
 Project: Flink
  Issue Type: New Feature
  Components: Deployment / YARN
Reporter: Nicolas Fraison


Yarn provide application acls mechanism to be able to provide specific rights 
to other users than the one running the job (view logs through the 
resourcemanager/job history, kill the application)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread Hequn Cheng
Hi Vino,

Thanks for the proposal, I think it is a very good feature!

One thing I want to make sure is the semantics for the `localKeyBy`. From
the document, the `localKeyBy` API returns an instance of `KeyedStream`
which can also perform sum(), so in this case, what's the semantics for
`localKeyBy()`. For example, will the following code share the same result?
and what're the differences between them?

1. input.keyBy(0).sum(1)
2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)

Would also be great if we can add this into the document. Thank you very
much.

Best, Hequn


On Fri, Jun 14, 2019 at 11:34 AM vino yang  wrote:

> Hi Aljoscha,
>
> I have looked at the "*Process*" section of FLIP wiki page.[1] This mail
> thread indicates that it has proceeded to the third step.
>
> When I looked at the fourth step(vote step), I didn't find the
> prerequisites for starting the voting process.
>
> Considering that the discussion of this feature has been done in the old
> thread. [2] So can you tell me when should I start voting? Can I start now?
>
> Best,
> Vino
>
> [1]:
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> [2]:
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
>
> leesf  于2019年6月13日周四 上午9:19写道:
>
> > +1 for the FLIP, thank vino for your efforts.
> >
> > Best,
> > Leesf
> >
> > vino yang  于2019年6月12日周三 下午5:46写道:
> >
> > > Hi folks,
> > >
> > > I would like to start the FLIP discussion thread about supporting local
> > > aggregation in Flink.
> > >
> > > In short, this feature can effectively alleviate data skew. This is the
> > > FLIP:
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
> > >
> > >
> > > *Motivation* (copied from FLIP)
> > >
> > > Currently, keyed streams are widely used to perform aggregating
> > operations
> > > (e.g., reduce, sum and window) on the elements that have the same key.
> > When
> > > executed at runtime, the elements with the same key will be sent to and
> > > aggregated by the same task.
> > >
> > > The performance of these aggregating operations is very sensitive to
> the
> > > distribution of keys. In the cases where the distribution of keys
> > follows a
> > > powerful law, the performance will be significantly downgraded. More
> > > unluckily, increasing the degree of parallelism does not help when a
> task
> > > is overloaded by a single key.
> > >
> > > Local aggregation is a widely-adopted method to reduce the performance
> > > degraded by data skew. We can decompose the aggregating operations into
> > two
> > > phases. In the first phase, we aggregate the elements of the same key
> at
> > > the sender side to obtain partial results. Then at the second phase,
> > these
> > > partial results are sent to receivers according to their keys and are
> > > combined to obtain the final result. Since the number of partial
> results
> > > received by each receiver is limited by the number of senders, the
> > > imbalance among receivers can be reduced. Besides, by reducing the
> amount
> > > of transferred data the performance can be further improved.
> > >
> > > *More details*:
> > >
> > > Design documentation:
> > >
> > >
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > >
> > > Old discussion thread:
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > >
> > > JIRA: FLINK-12786 
> > >
> > > We are looking forwards to your feedback!
> > >
> > > Best,
> > > Vino
> > >
> >
>


Re: Sort streams in windows

2019-06-17 Thread Jan Lukavský

Hi Eugene,

I'd say that what you want essentially is not "sort in windows", because 
(as you mention), you want to emit elements from windows as soon as 
watermark passes some timestamp. Maybe a better approach would be to 
implement this using stateful processing, where you keep a buffer of 
(unsorted) inputs and setup a timer for minimal time of elements in the 
buffer (plus allowed lateness), and the sort elements with timestamp <= 
the timer (very ofter single elements). I'm actually working on this for 
Apache Beam (design doc [1]), but this is still a work-in-progress.


Another drawback is that something like "sorted map state" will probably 
be needed in order to efficiently query the state for minimal timestamp. 
A less efficient implementation might work with ListState as well.


Jan

[1] 
https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing


On 6/14/19 3:58 PM, Евгений Юшин wrote:

Hi folks

I want to sort stream based on event time field derived from events. To do
this I can use one of the existing windows like TimeWindow to collect
events in a window of a particular size, or SlidingWindow to run sort logic
more often (and sort within slide).
Ideally, I want to sort events as fast as they pass watermark (with
out-of-order ts extractor). None of the current windows allow me to do
this. And I think to implement custom merging window similar to
SlidingWindow. Each element will be assigned to Window(event_ts,
event_ts+1), and then all windows with 'start < watermark' will be merged.
To implement this I need time service available in
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L232
Unfortunately, 'getCurrentProcessingTime'is only there for now.

I can pass function to extract timestamp to my new window extractor, but in
this case logic for calculation min watermark for
parallel/unioned/co-joined streams won't simply work.

@devs would you mind if I extend WindowAssignerContext with
  getCurrentWatermark or the whole time service reference?

Would be really glad to hear ypur concerns.

Regards,
Eugene



[jira] [Created] (FLINK-12870) Improve documentation of keys schema evolution

2019-06-17 Thread Alexander Fedulov (JIRA)
Alexander Fedulov created FLINK-12870:
-

 Summary: Improve documentation of keys schema evolution
 Key: FLINK-12870
 URL: https://issues.apache.org/jira/browse/FLINK-12870
 Project: Flink
  Issue Type: Improvement
  Components: Documentation, Runtime / REST
Reporter: Alexander Fedulov
Assignee: Chesnay Schepler


In several places of the REST API we use custom JSON (de)serializers for 
writing data. This is very problematic in regards to the documentation, as 
there is no way to actually generated it when these serializers are used.

I doubt we can fix this issue entirely at the moment, but I already found areas 
that we can improve.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [ANNOUNCE] Weekly Community Update 2019/24

2019-06-17 Thread Konstantin Knauf
Hi Zili,

thank you for adding these threads :) I would have otherwise picked them up
next week, just couldn't put everything into one email.

Cheers,

Konstantin

On Sun, Jun 16, 2019 at 11:07 PM Zili Chen  wrote:

> Hi Konstantin and all,
>
> Thank Konstantin very much for reviving this tradition! It reminds
> me of the joyful time I can easily catch up interesting ongoing threads.
> Thanks for Till's work, too.
>
> Besides exciting updates and news above, I'd like to pick up
> some other threads you guys may be interested in.
>
> * xiaogang has recently started a discussion[1] on allowing
> at-most-once delivery in case of failures, which adapts Flink
> to more scenarios.
>
> * vino has raised a discussion[2] on supporting local aggregation
> in Flink, which was received a lot of positive feedbacks and now
> there is a ongoing FLIP-44 thread[3].
>
> * Jeff Zhang has raised a discussion[4] and drafted a design doc[5]
> on Flink client API enhancement, which aims at overcoming limitation
> when integrating Flink with projects such as Zepplin or Beam.
>
> Best,
> tison.
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Allow-at-most-once-delivery-in-case-of-failures-td29464.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-44-Support-Local-Aggregation-in-Flink-td29513.html
> [4]
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> [5]
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/
>
>
> Konstantin Knauf  于2019年6月17日周一 上午12:10写道:
>
>> Dear community,
>>
>> last year Till did a great job on summarizing recent developments in the
>> Flink community in a "Weekly community update" thread. I found this very
>> helpful and would like to revive this tradition with a focus on topics &
>> threads which are particularly relevant to the wider community of Flink
>> users.
>>
>> As we haven't had such an update for some time (since December 2018), I
>> find it impossible to cover everything that's currently going on in this
>> email. I'll try to include most ongoing discussions and FLIPs over the
>> course of the next weeks to catch up. Afterwards I am going to go back to
>> only focus on news since the last update.
>>
>> You are welcome to share any additional news and updates with the
>> community in this thread.
>>
>> Flink Development
>> ===
>>
>> * [releases] The community is currently working on a Flink 1.8.1 release
>> [1]. The first release candidate should be ready soon (one critical bug to
>> fix as of writing, FLINK-12863).
>> * [releases] Kurt and Gordon stepped up as release managers for Flink 1.9
>> and started a thread [2] to sync on the status of various development
>> threads targeted for Flink 1.9. Check it out to see if the feature you are
>> waiting for is likely to make it or not.
>> * [savepoints] Gordon, Kostas and Congxian have recently started a
>> discussion [3] on unifying the savepoint format across StateBackends, which
>> will enable users to switch between StateBackends when recovering from a
>> Savepoint. The related discussion on introducing Stop-With-Checkpoint [4]
>> initiated by Yu Li is closely related and worth a read to understand the
>> long term vision.
>> * [savepoints] Seth and Gordon have started a discussion to add a State
>> Processing API ("Savepoint Connector"), which will allow reading &
>> modifying existing Savepoints as well as creating new Savepoints from
>> scratch with the DataSet API. The feature is targeted for Flink 1.9.0 as a
>> new *library*.
>> * [python-support] Back in April we had a discussion on the mailing list
>> about adding Python Support to the Table API [6]. This support will likely
>> be available in Flink 1.9 (without UDFs and later with UDF support as
>> well). Therefore, Stephan has started a discussion [7] to deprecate the
>> current Python API in Flink 1.9. This has gotten a lot of positive feedback
>> and the only open question as of writing is whether to only deprecate it or
>> to remove it directly.
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Flink-1-8-1-td29154.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Features-for-Apache-Flink-1-9-0-td28701.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-41-Unify-Keyed-State-Snapshot-Binary-Format-for-Savepoints-td29197.html
>> [4] https://issues.apache.org/jira/browse/FLINK-12619
>> [5]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-FLIP-43-Savepoint-Connector-td29232.html
>> [6]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-38-Support-python-language-in-flink-TableAPI-td28061.html#a28096
>> [7]
>> htt

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread vino yang
Hi Hequn,

Thanks for your reply.

The purpose of localKeyBy API is to provide a tool which can let users do
pre-aggregation in the local. The behavior of the pre-aggregation is
similar to keyBy API.

So the three cases are different, I will describe them one by one:

1. input.keyBy(0).sum(1)

*In this case, the result is event-driven, each event can produce one sum
aggregation result and it is the latest one from the source start.*

2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)

*In this case, the semantic may have a problem, it would do the local sum
aggregation and will produce the latest partial result from the source
start for every event. *
*These latest partial results from the same key are hashed to one node to
do the global sum aggregation.*
*In the global aggregation, when it received multiple partial results (they
are all calculated from the source start) and sum them will get the wrong
result.*

3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)

*In this case, it would just get a partial aggregation result for the 5
records in the count window. The partial aggregation results from the same
key will be aggregated globally.*

So the first case and the third case can get the *same* result, the
difference is the output-style and the latency.

Generally speaking, the local key API is just an optimization API. We do
not limit the user's usage, but the user has to understand its semantics
and use it correctly.

Best,
Vino

Hequn Cheng  于2019年6月17日周一 下午4:18写道:

> Hi Vino,
>
> Thanks for the proposal, I think it is a very good feature!
>
> One thing I want to make sure is the semantics for the `localKeyBy`. From
> the document, the `localKeyBy` API returns an instance of `KeyedStream`
> which can also perform sum(), so in this case, what's the semantics for
> `localKeyBy()`. For example, will the following code share the same result?
> and what're the differences between them?
>
> 1. input.keyBy(0).sum(1)
> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
>
> Would also be great if we can add this into the document. Thank you very
> much.
>
> Best, Hequn
>
>
> On Fri, Jun 14, 2019 at 11:34 AM vino yang  wrote:
>
> > Hi Aljoscha,
> >
> > I have looked at the "*Process*" section of FLIP wiki page.[1] This mail
> > thread indicates that it has proceeded to the third step.
> >
> > When I looked at the fourth step(vote step), I didn't find the
> > prerequisites for starting the voting process.
> >
> > Considering that the discussion of this feature has been done in the old
> > thread. [2] So can you tell me when should I start voting? Can I start
> now?
> >
> > Best,
> > Vino
> >
> > [1]:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> > [2]:
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> >
> > leesf  于2019年6月13日周四 上午9:19写道:
> >
> > > +1 for the FLIP, thank vino for your efforts.
> > >
> > > Best,
> > > Leesf
> > >
> > > vino yang  于2019年6月12日周三 下午5:46写道:
> > >
> > > > Hi folks,
> > > >
> > > > I would like to start the FLIP discussion thread about supporting
> local
> > > > aggregation in Flink.
> > > >
> > > > In short, this feature can effectively alleviate data skew. This is
> the
> > > > FLIP:
> > > >
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
> > > >
> > > >
> > > > *Motivation* (copied from FLIP)
> > > >
> > > > Currently, keyed streams are widely used to perform aggregating
> > > operations
> > > > (e.g., reduce, sum and window) on the elements that have the same
> key.
> > > When
> > > > executed at runtime, the elements with the same key will be sent to
> and
> > > > aggregated by the same task.
> > > >
> > > > The performance of these aggregating operations is very sensitive to
> > the
> > > > distribution of keys. In the cases where the distribution of keys
> > > follows a
> > > > powerful law, the performance will be significantly downgraded. More
> > > > unluckily, increasing the degree of parallelism does not help when a
> > task
> > > > is overloaded by a single key.
> > > >
> > > > Local aggregation is a widely-adopted method to reduce the
> performance
> > > > degraded by data skew. We can decompose the aggregating operations
> into
> > > two
> > > > phases. In the first phase, we aggregate the elements of the same key
> > at
> > > > the sender side to obtain partial results. Then at the second phase,
> > > these
> > > > partial results are sent to receivers according to their keys and are
> > > > combined to obtain the final result. Since the number of partial
> > results
> > > > received by each receiver is limited by the number of senders, the
> > > > imbalance among receivers can be reduced. Besides, by reducing the
> > amount
> > > > of transferred data the performanc

Re: [DISCUSS] FLIP-39: Flink ML pipeline and ML libs

2019-06-17 Thread Gen Luo
Hi all,

In the review of PR for FLINK-12473, there were a few comments regarding
pipeline exportation. We would like to start a follow up discussions to
address some related comments.

Currently, FLIP-39 proposal gives a way for users to persist a pipeline in
JSON format. But it does not specify how users can export a pipeline for
serving purpose. We summarized some thoughts on this in the following doc.

https://docs.google.com/document/d/1B84b-1CvOXtwWQ6_tQyiaHwnSeiRqh-V96Or8uHqCp8/edit?usp=sharing

After we reach consensus on the pipeline exportation, we will add a
corresponding section in FLIP-39.


Shaoxuan Wang  于2019年6月5日周三 上午8:47写道:

> Stavros,
> They have the similar logic concept, but the implementation details are
> quite different. It is hard to migrate the interface with different
> implementations. The built-in algorithms are useful legacy that we will
> consider migrate to the new API (but still with different implementations).
> BTW, the new API has already been merged via FLINK-12473.
>
> Thanks,
> Shaoxuan
>
>
>
> On Mon, Jun 3, 2019 at 6:08 PM Stavros Kontopoulos <
> st.kontopou...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Some portion of the code could be migrated to the new Table API no?
> > I am saying that because the new API design is based on scikit-learn and
> > the old one was also inspired by it.
> >
> > Best,
> > Stavros
> > On Wed, May 22, 2019 at 1:24 PM Shaoxuan Wang 
> wrote:
> >
> > > Another consensus (from the offline discussion) is that we will
> > > delete/deprecate flink-libraries/flink-ml. I have started a survey and
> > > discussion [1] in dev/user-ml to collect the feedback. Depending on the
> > > replies, we will decide if we shall delete it in Flink1.9 or
> > > deprecate&delete in the next release after 1.9.
> > >
> > > [1]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/SURVEY-Usage-of-flink-ml-and-DISCUSS-Delete-flink-ml-td29057.html
> > >
> > > Regards,
> > > Shaoxuan
> > >
> > >
> > > On Tue, May 21, 2019 at 9:22 PM Gen Luo  wrote:
> > >
> > > > Yes, this is our conclusion. I'd like to add only one point that
> > > > registering user defined aggregator is also needed which is currently
> > > > provided by 'bridge' and finally will be merged into Table API. It's
> > same
> > > > with collect().
> > > >
> > > > I will add a TableEnvironment argument in Estimator.fit() and
> > > > Transformer.transform() to get rid of the dependency on
> > > > flink-table-planner. This will be committed soon.
> > > >
> > > > Aljoscha Krettek  于2019年5月21日周二 下午7:31写道:
> > > >
> > > > > We discussed this in private and came to the conclusion that we
> > should
> > > > > (for now) have the dependency on flink-table-api-xxx-bridge because
> > we
> > > > need
> > > > > access to the collect() method, which is not yet available in the
> > Table
> > > > > API. Once that is available the code can be refactored but for now
> we
> > > > want
> > > > > to unblock work on this new module.
> > > > >
> > > > > We also agreed that we don’t need a direct dependency on
> > > > > flink-table-planner.
> > > > >
> > > > > I hope I summarised our discussion correctly.
> > > > >
> > > > > > On 17. May 2019, at 12:20, Gen Luo  wrote:
> > > > > >
> > > > > > Thanks for your reply.
> > > > > >
> > > > > > For the first question, it's not strictly necessary. But I perfer
> > not
> > > > to
> > > > > > have a TableEnvironment argument in Estimator.fit() or
> > > > > > Transformer.transform(), which is not part of machine learning
> > > concept,
> > > > > and
> > > > > > may make our API not as clean and pretty as other systems do. I
> > would
> > > > > like
> > > > > > another way other than introducing flink-table-planner to do
> this.
> > If
> > > > > it's
> > > > > > impossible or severely opposed, I may make the concession to add
> > the
> > > > > > argument.
> > > > > >
> > > > > > Other than that, "flink-table-api-xxx-bridge"s are still needed.
> A
> > > vary
> > > > > > common case is that an algorithm needs to guarantee that it's
> > running
> > > > > under
> > > > > > a BatchTableEnvironment, which makes it possible to collect
> result
> > > each
> > > > > > iteration. A typical algorithm like this is ALS. By flink1.8,
> this
> > > can
> > > > be
> > > > > > only achieved by converting Table to DataSet than call
> > > > DataSet.collect(),
> > > > > > which is available in flink-table-api-xxx-bridge. Besides,
> > > registering
> > > > > > UDAGG is also depending on it.
> > > > > >
> > > > > > In conclusion, '"planner" can be removed from dependencies but
> > > > > introducing
> > > > > > "bridge"s are inevitable. Whether and how to acquire
> > TableEnvironment
> > > > > from
> > > > > > a Table can be discussed.
> > > > >
> > > > >
> > > >
> > >
> >
>


Re: Unit tests consuming a lot of disk

2019-06-17 Thread Piotr Nowojski
Good to know that you have solved your problem :)

Piotrek

> On 15 Jun 2019, at 00:48, Timothy Farkas  wrote:
> 
> Resolved the issue. I was running a very old version of macos, after
> upgrading to Mojave the issue disappeared. Disk usage stopped spiking and I
> stopped running out of disk space. I'm guessing there was a bug with how
> macos used to handle swap space or memory mapped files that was later
> fixed. Shame on me for being delinquent with my upgrades :) .
> 
> Thanks for the help Ken.
> Tim
> 
> On Fri, Jun 14, 2019 at 12:50 PM Timothy Farkas <
> timothytiborfar...@gmail.com> wrote:
> 
>> Hi Ken,
>> 
>> I don't believe so, my main disk should be used to store /tmp.
>> 
>> Timothys-MacBook-Pro:~ tfarkas$ diskutil list
>> 
>> /dev/disk0 (internal):
>> 
>>   #:   TYPE NAMESIZE
>> IDENTIFIER
>> 
>>   0:  GUID_partition_scheme 251.0 GB   disk0
>> 
>>   1:EFI EFI 314.6 MB
>> disk0s1
>> 
>>   2:  Apple_CoreStorage Macintosh HD250.0 GB
>> disk0s2
>> 
>>   3: Apple_Boot Recovery HD 650.0 MB
>> disk0s3
>> 
>> 
>> /dev/disk1 (internal, virtual):
>> 
>>   #:   TYPE NAMESIZE
>> IDENTIFIER
>> 
>>   0:  Apple_HFS Macintosh HD   +249.7 GB   disk1
>> 
>> Logical Volume on disk0s2
>> 
>> DA9C82BE-D97D-4D65-8166-9F742F9AC884
>> 
>> Unencrypted
>> 
>> 
>> Timothys-MacBook-Pro:~ tfarkas$ mount
>> 
>> /dev/disk1 on / (hfs, local, journaled)
>> 
>> devfs on /dev (devfs, local, nobrowse)
>> 
>> map -hosts on /net (autofs, nosuid, automounted, nobrowse)
>> 
>> map auto_home on /home (autofs, automounted, nobrowse)
>> 
>> 
>> 
>> Thanks,
>> 
>> Tim
>> 
>> On Fri, Jun 14, 2019 at 12:33 PM Ken Krugler 
>> wrote:
>> 
>>> Hi Tim,
>>> 
>>> I wouldn’t expect these tests to consume 30GB of space.
>>> 
>>> Any chance your temp dir is using a mount point with much less free space?
>>> 
>>> — Ken
>>> 
>>> 
>>> On Jun 14, 2019, at 12:28 PM, Timothy Farkas <
>>> timothytiborfar...@gmail.com> wrote:
>>> 
>>> Hi All,
>>> 
>>> I get *Caused by: java.io.IOException: No space left on device* errors
>>> from
>>> some tests when running the flink unit tests on my mac. I have 30 GB free
>>> space on my machine and I am building the latest code from the master
>>> branch. The following tests in flink-runtime are failing with this error
>>> 
>>> [INFO] Results:
>>> 
>>> [INFO]
>>> 
>>> [ERROR] Errors:
>>> 
>>> [ERROR]
>>>  
>>> SlotCountExceedingParallelismTest.testNoSlotSharingAndBlockingResultBoth:91->submitJobGraphAndWait:97
>>> » JobExecution
>>> 
>>> [ERROR]
>>>  
>>> SlotCountExceedingParallelismTest.testNoSlotSharingAndBlockingResultReceiver:84->submitJobGraphAndWait:97
>>> » JobExecution
>>> 
>>> [ERROR]
>>>  
>>> SlotCountExceedingParallelismTest.testNoSlotSharingAndBlockingResultSender:77->submitJobGraphAndWait:97
>>> » JobExecution
>>> 
>>> [ERROR]
>>>  ScheduleOrUpdateConsumersTest.testMixedPipelinedAndBlockingResults:128
>>> » JobExecution
>>> 
>>> I tried reducing the test parallelism with  -Dflink.forkCount=2 , however
>>> that did not help. I'm confident that the tests are the issue since I can
>>> see disk usage increase in real-time as I run the tests. After the tests
>>> complete, the disk usage decreases.
>>> 
>>> Is this a known issue? Or would this be something worth investigating as
>>> an
>>> improvement?
>>> 
>>> Thanks,
>>> Tim
>>> 
>>> 
>>> --
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>>> 



RE: [DISCUSS] FLIP-41: Unify Keyed State Snapshot Binary Format for Savepoints

2019-06-17 Thread Visser, M.J.H. (Martijn)
On a related subject, it would be interesting to have the capability to encrypt 
savepoints. That would allow processing and storing of sensitive data in Flink. 

-Original Message-
From: Tzu-Li (Gordon) Tai  
Sent: maandag 17 juni 2019 04:15
To: dev 
Subject: Re: [DISCUSS] FLIP-41: Unify Keyed State Snapshot Binary Format for 
Savepoints

Thanks for the inputs Yu and Aljoscha!

I agree to rename this FLIP. Will call it "Unified binary format for Keyed 
State".

I will proceed to open a VOTE thread to formally adopt the FLIP now.

On Fri, Jun 14, 2019 at 10:03 PM Aljoscha Krettek 
wrote:

> Please also see my comment on
> https://issues.apache.org/jira/browse/FLINK-12619?focusedCommentId=168
> 64098&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-t
> abpanel#comment-16864098
> <
> https://issues.apache.org/jira/browse/FLINK-12619?focusedCommentId=168
> 64098&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tab
> panel#comment-16864098
> >
>
> For this FLIP-41 it means we go forward with the design basically as 
> is but should call it “Unified Format” or something like it.
>
> If no-one else comments, we should proceed to a [VOTE] thread to 
> formally adopt the FLIP.
>
> Aljoscha
>
> > On 14. Jun 2019, at 15:40, Yu Li  wrote:
> >
> > Hi Aljoscha and all,
> >
> > My 2 cents here:
> >
> > 1. Conceptually it worth a second thought about introducing an 
> > optimized snapshot format for now (i.e. use checkpoint format in 
> > savepoint), just like it's not recommended to use snapshot for 
> > backup in database
> (although
> > practically it could be implemented).
> >
> > 2. Stop-with-checkpoint mechanism is like stopping database instance
> with a
> > data flush, thus (IMHO) a different story from the 
> > checkpoint/savepoint
> (db
> > snapshot/backup) diversity.
> >
> > 3. In the long run we may improve the checkpoint to allow a short 
> > enough interval thus it may become some format of transactional log, 
> > then we
> could
> > enable checkpoint-based savepoint (like transactional log based 
> > backup),
> so
> > I agree to still call the new format in FLIP-41 a "Unified Format"
> although
> > in the short term it only unifies savepoint.
> >
> > I've also wrote a document [1] to include more details and please 
> > refer
> to
> > it if interested. Thanks!
> >
> > [1] 
> > https://docs.google.com/document/d/1uE4R3wNal6e67FkDe0UvcnsIMMDpr35j
> >
> > Best Regards,
> > Yu
> >
> >
> > On Thu, 6 Jun 2019 at 19:42, Aljoscha Krettek 
> wrote:
> >
> >> Btw, I think this FLIP is a very good effort, we just need to 
> >> reframe
> the
> >> effort a tiny bit. +1
> >>
> >>> On 6. Jun 2019, at 13:41, Aljoscha Krettek 
> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I had a brief discussion with Stephan that helped me sort my 
> >>> thoughts
> on
> >> the broader topics of checkpoints, savepoints, binary formats, 
> >> user-triggered checkpoints, and periodic savepoints. I’ll try to
> summarise
> >> my stance on this and also comment with the same message on the 
> >> other relevant Jira Issues and threads.
> >>>
> >>> For reference, the relevant FLIP and Jira issues are these:
> >>>
> >>> -
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41%3A+Unify+Key
> ed+State+Snapshot+Binary+Format+for+Savepoints
> :
> >> <
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41:+Unify+Keyed
> +State+Snapshot+Binary+Format+for+Savepoints
> :>
> >> Unified Savepoint Format
> >>> - https://issues.apache.org/jira/browse/FLINK-12619: Add support 
> >>> for
> >> stop-with-checkpoint
> >>> - https://issues.apache.org/jira/browse/FLINK-6755: User-triggered
> >> checkpoints
> >>> - https://issues.apache.org/jira/browse/FLINK-4620: Automatically
> >> creating savepoints
> >>> - https://issues.apache.org/jira/browse/FLINK-4511: Schedule 
> >>> periodic
> >> savepoints
> >>>
> >>> There are roughly two different dimensions in the topic of
> >> savepoints/checkpoints (I’ll use snapshot as the generic term for both):
> >>> 1) who controls the snapshot
> >>> 2) what’s the (binary) format of the snapshot
> >>>
> >>> For 1), we currently have checkpoints and savepoints. Checkpoints 
> >>> are
> >> created by the system for fault tolerance. They are managed by the
> system
> >> and the system is free to discard them when it sees fit. Savepoints 
> >> are
> in
> >> the control of the user. A user can choose to create a save point, 
> >> they
> can
> >> delete them, they can restore from them at will. The system will 
> >> not
> clean
> >> up savepoints. We should try and keep this separation and not 
> >> muddle the two concepts.
> >>>
> >>> For 2), we currently have various different formats between the
> >> different state backends and also for the same backend. I.e. 
> >> RocksDB
> can do
> >> full or incremental snapshots, local snapshots, and probably more.
> >>>
> >>> FLIP-41 aims at introducing a unified “savepoint" format that is
> >> interchangeable between the different state backends. In 

Re: [DISCUSS] Flink client api enhancement for downstream project

2019-06-17 Thread Flavio Pompermaier
Is there any possibility to have something like Apache Livy [1] also for
Flink in the future?

[1] https://livy.apache.org/

On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang  wrote:

> >>>  Any API we expose should not have dependencies on the runtime
> (flink-runtime) package or other implementation details. To me, this means
> that the current ClusterClient cannot be exposed to users because it   uses
> quite some classes from the optimiser and runtime packages.
>
> We should change ClusterClient from class to interface.
> ExecutionEnvironment only use the interface ClusterClient which should be
> in flink-clients while the concrete implementation class could be in
> flink-runtime.
>
> >>> What happens when a failure/restart in the client happens? There need
> to be a way of re-establishing the connection to the job, set up the
> listeners again, etc.
>
> Good point.  First we need to define what does failure/restart in the
> client mean. IIUC, that usually mean network failure which will happen in
> class RestClient. If my understanding is correct, restart/retry mechanism
> should be done in RestClient.
>
>
>
>
>
> Aljoscha Krettek  于2019年6月11日周二 下午11:10写道:
>
> > Some points to consider:
> >
> > * Any API we expose should not have dependencies on the runtime
> > (flink-runtime) package or other implementation details. To me, this
> means
> > that the current ClusterClient cannot be exposed to users because it
>  uses
> > quite some classes from the optimiser and runtime packages.
> >
> > * What happens when a failure/restart in the client happens? There need
> to
> > be a way of re-establishing the connection to the job, set up the
> listeners
> > again, etc.
> >
> > Aljoscha
> >
> > > On 29. May 2019, at 10:17, Jeff Zhang  wrote:
> > >
> > > Sorry folks, the design doc is late as you expected. Here's the design
> > doc
> > > I drafted, welcome any comments and feedback.
> > >
> > >
> >
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> > >
> > >
> > >
> > > Stephan Ewen  于2019年2月14日周四 下午8:43写道:
> > >
> > >> Nice that this discussion is happening.
> > >>
> > >> In the FLIP, we could also revisit the entire role of the environments
> > >> again.
> > >>
> > >> Initially, the idea was:
> > >>  - the environments take care of the specific setup for standalone (no
> > >> setup needed), yarn, mesos, etc.
> > >>  - the session ones have control over the session. The environment
> holds
> > >> the session client.
> > >>  - running a job gives a "control" object for that job. That behavior
> is
> > >> the same in all environments.
> > >>
> > >> The actual implementation diverged quite a bit from that. Happy to
> see a
> > >> discussion about straitening this out a bit more.
> > >>
> > >>
> > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang  wrote:
> > >>
> > >>> Hi folks,
> > >>>
> > >>> Sorry for late response, It seems we reach consensus on this, I will
> > >> create
> > >>> FLIP for this with more detailed design
> > >>>
> > >>>
> > >>> Thomas Weise  于2018年12月21日周五 上午11:43写道:
> > >>>
> >  Great to see this discussion seeded! The problems you face with the
> >  Zeppelin integration are also affecting other downstream projects,
> > like
> >  Beam.
> > 
> >  We just enabled the savepoint restore option in
> > RemoteStreamEnvironment
> > >>> [1]
> >  and that was more difficult than it should be. The main issue is
> that
> >  environment and cluster client aren't decoupled. Ideally it should
> be
> >  possible to just get the matching cluster client from the
> environment
> > >> and
> >  then control the job through it (environment as factory for cluster
> >  client). But note that the environment classes are part of the
> public
> > >>> API,
> >  and it is not straightforward to make larger changes without
> breaking
> >  backward compatibility.
> > 
> >  ClusterClient currently exposes internal classes like JobGraph and
> >  StreamGraph. But it should be possible to wrap this with a new
> public
> > >> API
> >  that brings the required job control capabilities for downstream
> > >>> projects.
> >  Perhaps it is helpful to look at some of the interfaces in Beam
> while
> >  thinking about this: [2] for the portable job API and [3] for the
> old
> >  asynchronous job control from the Beam Java SDK.
> > 
> >  The backward compatibility discussion [4] is also relevant here. A
> new
> > >>> API
> >  should shield downstream projects from internals and allow them to
> >  interoperate with multiple future Flink versions in the same release
> > >> line
> >  without forced upgrades.
> > 
> >  Thanks,
> >  Thomas
> > 
> >  [1] https://github.com/apache/flink/pull/7249
> >  [2]
> > 
> > 
> > >>>
> > >>
> >
> https://github.com/apache/beam/blob/master/model/job-management/src/main/proto/beam_job_api.proto
> >  [3]
> > 
> > 
> > >>>
> > >>
> >
> 

Re: About Deprecating split/select for DataStream API

2019-06-17 Thread Dawid Wysakowicz
Yes you are correct. The problem I described applies to the split not
select as I wrote in the first email. Sorry for that.

I will try to prepare a correct example. Let's have a look at this example:

    val splitted1 = ds.split(if (1) then "a")

    val splitted2 = ds.split(if (!=1) then "a")

In those cases splitted1.select("a") -> will output all elements, the
same for splitted2, because the OutputSelector(s) are applied to
previous operator. The behavior I would assume is that splitted1 outputs
only "1"s, whereas splitted2 all but "1"s

On the other hand in a call

    val splitted1 = ds.split(if ("1" or "2") then
"a").select("a").split(if ("3") then "b").select("b")

I would assume an intersection of those two splits, so no results. What
actually happens is that it will be "1", "2" & "3"s. Actually, right
exceptions should be thrown in those cases not to produce confusing
results, but this just shows that this API is broken, if we need to
check for some prohibited configurations during runtime.

Those weird behaviors are in my opinion results of the flawed API, as it
actually assigns an output selector to the previous operator. In other
words it modifies previous operator. I think it would be much cleaner if
this happened inside an operator rather than separately. This is what
SideOutputs do, as you define them inside the ProcessFunction, rather
than afterwards. Therefore I am very much in favor of using them for
those cases. Once again if the problem is that they are available only
in the ProcessFunction I would prefer enabling them e.g. in FlatMap,
rather than keeping the split/select.

   

On 17/06/2019 09:40, SHI Xiaogang wrote:
> Hi Dawid,
>
> As the select method is only allowed on SplitStreams, it's impossible to
> construct the example ds.split().select("a", "b").select("c", "d").
>
> Are you meaning ds.split().select("a", "b").split().select("c", "d")?
> If so, then the tagging in the first split operation should not affect the
> second one. Then
> splitted.select("a", "b") => empty
> splitted.select("c", "d") => ds
>
> I cannot quite catch your point here. It's appreciated if you can provide a
> more concrete explanation?
>
> Regards,
> Xiaogang Shi
>
>
>
>
> Dawid Wysakowicz  于2019年6月17日周一 下午3:10写道:
>
>> Hi all,
>>
>> Thank you for starting the discussion. To start with I have to say I am
>> not entirely against leaving them. On the other hand I totally disagree
>> that the semantics are clearly defined. Actually the design is
>> fundamentally flawed.
>>
>>1. We use String as a selector for elements. This is not the cleanest
>>design, but I agree it is not the worst.
>>2. Users cannot define different types for different splits.
>>3. (The actual reason why I think it's actually better to drop the
>>split/select and introduce a better mechanism) The behavior of a split is
>>to actually add an output selector. We can have just a single selector on 
>> a
>>single operator, but the API allows (I would even say encourages) to 
>> create
>>chains of split/select, which leads to undefined behavior. Take this for
>>example: ds.split().select("a", "b").select("c", "d"). Which tags should 
>> be
>>forwarded? ("a", "b", "c", "d") (union) or () (intersection). In my 
>> opinion
>>the most obvious answer in this case would be the intersection. Let's
>>modify it slightly though and I would assume a different behavior (the
>>union)
>>
>>splitted = ds.split();
>>
>>splitted.select("a", "b").map()
>>
>>splitted.select("c", "d").map()
>>
>> Taking the 3rd argument into consideration I would be in favor of removing
>> the current mechanism. I think the side outputs serve the purpose much
>> better with much cleaner semantics. I get the argument that users are now
>> forced to use processFunction if they want to use the side outputs. If this
>> is the main problem how about enabling them e.g. for flatMap as well?
>>
>> Best,
>>
>> Dawid
>> On 17/06/2019 08:51, Jark Wu wrote:
>>
>> +1 to keep the split/select API. I think if there are some problems with
>> the API, it's better to fix them instead of deprecating them.
>> And select/split are straightforward and convenient APIs. It's worth to
>> have them.
>>
>> Regards,
>> Jark
>>
>> On Mon, 17 Jun 2019 at 14:46, vino yang  
>>  wrote:
>>
>>
>> Hi,
>>
>> I also think it is valuable and reasonable to keep the split/select APIs.
>> They are very convenient and widely used in our platform. I think they are
>> also used in other users' jobs.
>> If the community has doubts about this, IMHO, it would be better to start a
>> user survey.
>>
>> Best,
>> Vino
>>
>> SHI Xiaogang   于2019年6月17日周一 
>> 上午11:55写道:
>>
>>
>> Hi Xingcan,
>>
>> Thanks for bringing it up for discusson.
>>
>> I agree with you that we should not deprecate the split/select methods.
>> Their semantics are very clear and they are widely adopted by Flink
>>
>> users.
>>
>> We should fix these problems inste

Re: [DISCUSS] Flink client api enhancement for downstream project

2019-06-17 Thread SHI Xiaogang
Hi Jeff and Flavio,

Thanks Jeff a lot for proposing the design document.

We are also working on refactoring ClusterClient to allow flexible and
efficient job management in our real-time platform.
We would like to draft a document to share our ideas with you.

I think it's a good idea to have something like Apache Livy for Flink, and
the efforts discussed here will take a great step forward to it.

Regards,
Xiaogang

Flavio Pompermaier  于2019年6月17日周一 下午7:13写道:

> Is there any possibility to have something like Apache Livy [1] also for
> Flink in the future?
>
> [1] https://livy.apache.org/
>
> On Tue, Jun 11, 2019 at 5:23 PM Jeff Zhang  wrote:
>
> > >>>  Any API we expose should not have dependencies on the runtime
> > (flink-runtime) package or other implementation details. To me, this
> means
> > that the current ClusterClient cannot be exposed to users because it
>  uses
> > quite some classes from the optimiser and runtime packages.
> >
> > We should change ClusterClient from class to interface.
> > ExecutionEnvironment only use the interface ClusterClient which should be
> > in flink-clients while the concrete implementation class could be in
> > flink-runtime.
> >
> > >>> What happens when a failure/restart in the client happens? There need
> > to be a way of re-establishing the connection to the job, set up the
> > listeners again, etc.
> >
> > Good point.  First we need to define what does failure/restart in the
> > client mean. IIUC, that usually mean network failure which will happen in
> > class RestClient. If my understanding is correct, restart/retry mechanism
> > should be done in RestClient.
> >
> >
> >
> >
> >
> > Aljoscha Krettek  于2019年6月11日周二 下午11:10写道:
> >
> > > Some points to consider:
> > >
> > > * Any API we expose should not have dependencies on the runtime
> > > (flink-runtime) package or other implementation details. To me, this
> > means
> > > that the current ClusterClient cannot be exposed to users because it
> >  uses
> > > quite some classes from the optimiser and runtime packages.
> > >
> > > * What happens when a failure/restart in the client happens? There need
> > to
> > > be a way of re-establishing the connection to the job, set up the
> > listeners
> > > again, etc.
> > >
> > > Aljoscha
> > >
> > > > On 29. May 2019, at 10:17, Jeff Zhang  wrote:
> > > >
> > > > Sorry folks, the design doc is late as you expected. Here's the
> design
> > > doc
> > > > I drafted, welcome any comments and feedback.
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing
> > > >
> > > >
> > > >
> > > > Stephan Ewen  于2019年2月14日周四 下午8:43写道:
> > > >
> > > >> Nice that this discussion is happening.
> > > >>
> > > >> In the FLIP, we could also revisit the entire role of the
> environments
> > > >> again.
> > > >>
> > > >> Initially, the idea was:
> > > >>  - the environments take care of the specific setup for standalone
> (no
> > > >> setup needed), yarn, mesos, etc.
> > > >>  - the session ones have control over the session. The environment
> > holds
> > > >> the session client.
> > > >>  - running a job gives a "control" object for that job. That
> behavior
> > is
> > > >> the same in all environments.
> > > >>
> > > >> The actual implementation diverged quite a bit from that. Happy to
> > see a
> > > >> discussion about straitening this out a bit more.
> > > >>
> > > >>
> > > >> On Tue, Feb 12, 2019 at 4:58 AM Jeff Zhang 
> wrote:
> > > >>
> > > >>> Hi folks,
> > > >>>
> > > >>> Sorry for late response, It seems we reach consensus on this, I
> will
> > > >> create
> > > >>> FLIP for this with more detailed design
> > > >>>
> > > >>>
> > > >>> Thomas Weise  于2018年12月21日周五 上午11:43写道:
> > > >>>
> > >  Great to see this discussion seeded! The problems you face with
> the
> > >  Zeppelin integration are also affecting other downstream projects,
> > > like
> > >  Beam.
> > > 
> > >  We just enabled the savepoint restore option in
> > > RemoteStreamEnvironment
> > > >>> [1]
> > >  and that was more difficult than it should be. The main issue is
> > that
> > >  environment and cluster client aren't decoupled. Ideally it should
> > be
> > >  possible to just get the matching cluster client from the
> > environment
> > > >> and
> > >  then control the job through it (environment as factory for
> cluster
> > >  client). But note that the environment classes are part of the
> > public
> > > >>> API,
> > >  and it is not straightforward to make larger changes without
> > breaking
> > >  backward compatibility.
> > > 
> > >  ClusterClient currently exposes internal classes like JobGraph and
> > >  StreamGraph. But it should be possible to wrap this with a new
> > public
> > > >> API
> > >  that brings the required job control capabilities for downstream
> > > >>> projects.
> > >  Perhaps it is helpful to look at some of the interfaces in Beam
> > while
> > >  thin

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread Jark Wu
Hi Vino,

Thanks for the proposal.

Regarding to the "input.keyBy(0).sum(1)" vs
"input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", have you done
some benchmark?
Because I'm curious about how much performance improvement can we get by
using count window as the local operator.

Best,
Jark



On Mon, 17 Jun 2019 at 17:48, vino yang  wrote:

> Hi Hequn,
>
> Thanks for your reply.
>
> The purpose of localKeyBy API is to provide a tool which can let users do
> pre-aggregation in the local. The behavior of the pre-aggregation is
> similar to keyBy API.
>
> So the three cases are different, I will describe them one by one:
>
> 1. input.keyBy(0).sum(1)
>
> *In this case, the result is event-driven, each event can produce one sum
> aggregation result and it is the latest one from the source start.*
>
> 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
>
> *In this case, the semantic may have a problem, it would do the local sum
> aggregation and will produce the latest partial result from the source
> start for every event. *
> *These latest partial results from the same key are hashed to one node to
> do the global sum aggregation.*
> *In the global aggregation, when it received multiple partial results (they
> are all calculated from the source start) and sum them will get the wrong
> result.*
>
> 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
>
> *In this case, it would just get a partial aggregation result for the 5
> records in the count window. The partial aggregation results from the same
> key will be aggregated globally.*
>
> So the first case and the third case can get the *same* result, the
> difference is the output-style and the latency.
>
> Generally speaking, the local key API is just an optimization API. We do
> not limit the user's usage, but the user has to understand its semantics
> and use it correctly.
>
> Best,
> Vino
>
> Hequn Cheng  于2019年6月17日周一 下午4:18写道:
>
> > Hi Vino,
> >
> > Thanks for the proposal, I think it is a very good feature!
> >
> > One thing I want to make sure is the semantics for the `localKeyBy`. From
> > the document, the `localKeyBy` API returns an instance of `KeyedStream`
> > which can also perform sum(), so in this case, what's the semantics for
> > `localKeyBy()`. For example, will the following code share the same
> result?
> > and what're the differences between them?
> >
> > 1. input.keyBy(0).sum(1)
> > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> >
> > Would also be great if we can add this into the document. Thank you very
> > much.
> >
> > Best, Hequn
> >
> >
> > On Fri, Jun 14, 2019 at 11:34 AM vino yang 
> wrote:
> >
> > > Hi Aljoscha,
> > >
> > > I have looked at the "*Process*" section of FLIP wiki page.[1] This
> mail
> > > thread indicates that it has proceeded to the third step.
> > >
> > > When I looked at the fourth step(vote step), I didn't find the
> > > prerequisites for starting the voting process.
> > >
> > > Considering that the discussion of this feature has been done in the
> old
> > > thread. [2] So can you tell me when should I start voting? Can I start
> > now?
> > >
> > > Best,
> > > Vino
> > >
> > > [1]:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> > > [2]:
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > >
> > > leesf  于2019年6月13日周四 上午9:19写道:
> > >
> > > > +1 for the FLIP, thank vino for your efforts.
> > > >
> > > > Best,
> > > > Leesf
> > > >
> > > > vino yang  于2019年6月12日周三 下午5:46写道:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I would like to start the FLIP discussion thread about supporting
> > local
> > > > > aggregation in Flink.
> > > > >
> > > > > In short, this feature can effectively alleviate data skew. This is
> > the
> > > > > FLIP:
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-44%3A+Support+Local+Aggregation+in+Flink
> > > > >
> > > > >
> > > > > *Motivation* (copied from FLIP)
> > > > >
> > > > > Currently, keyed streams are widely used to perform aggregating
> > > > operations
> > > > > (e.g., reduce, sum and window) on the elements that have the same
> > key.
> > > > When
> > > > > executed at runtime, the elements with the same key will be sent to
> > and
> > > > > aggregated by the same task.
> > > > >
> > > > > The performance of these aggregating operations is very sensitive
> to
> > > the
> > > > > distribution of keys. In the cases where the distribution of keys
> > > > follows a
> > > > > powerful law, the performance will be significantly downgraded.
> More
> > > > > unluckily, increasing the degree of parallelism does not help when
> a
> > > task
> > > > > is overloaded by a single key.
> > > > >
> > > > > Local aggregation is a widely-adopted method to reduce the
> > performance

[jira] [Created] (FLINK-12871) Wrong SSL setup examples in docs

2019-06-17 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-12871:
---

 Summary: Wrong SSL setup examples in docs
 Key: FLINK-12871
 URL: https://issues.apache.org/jira/browse/FLINK-12871
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.8.0
Reporter: Nico Kruber
Assignee: Nico Kruber


The SSL setup examples [1] were updated to rely on PKCS12 format (instead of 
the old JKS keystore) but PKCS12 does not support separate passwords for the 
key store and the key itself.
Also, some of the examples still rely on the old JKS keystore and are not using 
PKCS12 yet.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/security-ssl.html#example-ssl-setup-standalone-and-kubernetes



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-12872) WindowOperator may fail with UnsupportedOperationException when merging windows

2019-06-17 Thread Piotr Nowojski (JIRA)
Piotr Nowojski created FLINK-12872:
--

 Summary: WindowOperator may fail with 
UnsupportedOperationException when merging windows
 Key: FLINK-12872
 URL: https://issues.apache.org/jira/browse/FLINK-12872
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.8.0, 1.7.2, 1.6.4
Reporter: Piotr Nowojski


[Reported 
|http://mail-archives.apache.org/mod_mbox/flink-user/201906.mbox/%3CCALDWsfhbP6D9+pnTzYuGaP0V4nReKJ4s9VsG_Xe1hZJq4O=z...@mail.gmail.com%3E]
 by a user.

{noformat}
I have a job that uses processing time session window with inactivity gap of 
60ms where I intermittently run into the following exception. I'm trying to 
figure out what happened here. Haven't been able to reproduce this scenario. 
Any thoughts?

java.lang.UnsupportedOperationException: The end timestamp of a processing-time 
window cannot become earlier than the current processing time by merging. 
Current processing time: 1560493731808 window: TimeWindow{start=1560493731654, 
end=1560493731778}
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
at 
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
{noformat}

This is happening probably because {{System.currentTimeMillis()}} is not a 
monotonic function and {{WindowOperator}} accesses it at least twice: once when 
it creates a window and second time during performing the above mentioned check 
(that has failed). However I would guess there are more places like this, not 
only in {{WindowOperator}}.

The fix could be either to make sure that processing time is monotonic, or to 
access it only once per operator per record or to drop processing time in 
favour of ingestion time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] FLIP-41: Unified binary format for keyed state

2019-06-17 Thread Congxian Qiu
+1 from my side.
Best,
Congxian


Tzu-Li (Gordon) Tai  于2019年6月17日周一 上午10:20写道:

> Hi Flink devs,
>
> I want to officially start a voting thread to formally adopt FLIP-41 [1].
>
> There are two relevant discussions threads for this feature [2] [3].
>
> The voting time will end on June 19th 17:00 CET.
>
> Cheers,
> Gordon
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41:+Unify+Keyed+State+Snapshot+Binary+Format+for+Savepoints
>
> [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-41-Unify-Keyed-State-Snapshot-Binary-Format-for-Savepoints-td29197.html
>
> [3]  https://issues.apache.org/jira/browse/FLINK-12619
>


Re: About Deprecating split/select for DataStream API

2019-06-17 Thread SHI Xiaogang
Hi Dawid,

Thanks a lot for your example.

I think most users will expect splitted1 to be empty in the example.

The unexpected results produced, in my opinion, is due to our problematic
implementation, instead of the confusing semantics.
We can fix the problem if we add a SELECT operator to filter out unexpected
records (Of course, we can find some optimization to improve the
efficiency.).

After all, i prefer to fix the problems to make the results as expected.
What do you think?

Regards,
Xiaogang

Dawid Wysakowicz  于2019年6月17日周一 下午7:21写道:

> Yes you are correct. The problem I described applies to the split not
> select as I wrote in the first email. Sorry for that.
>
> I will try to prepare a correct example. Let's have a look at this example:
>
> val splitted1 = ds.split(if (1) then "a")
>
> val splitted2 = ds.split(if (!=1) then "a")
>
> In those cases splitted1.select("a") -> will output all elements, the
> same for splitted2, because the OutputSelector(s) are applied to
> previous operator. The behavior I would assume is that splitted1 outputs
> only "1"s, whereas splitted2 all but "1"s
>
> On the other hand in a call
>
> val splitted1 = ds.split(if ("1" or "2") then
> "a").select("a").split(if ("3") then "b").select("b")
>
> I would assume an intersection of those two splits, so no results. What
> actually happens is that it will be "1", "2" & "3"s. Actually, right
> exceptions should be thrown in those cases not to produce confusing
> results, but this just shows that this API is broken, if we need to
> check for some prohibited configurations during runtime.
>
> Those weird behaviors are in my opinion results of the flawed API, as it
> actually assigns an output selector to the previous operator. In other
> words it modifies previous operator. I think it would be much cleaner if
> this happened inside an operator rather than separately. This is what
> SideOutputs do, as you define them inside the ProcessFunction, rather
> than afterwards. Therefore I am very much in favor of using them for
> those cases. Once again if the problem is that they are available only
> in the ProcessFunction I would prefer enabling them e.g. in FlatMap,
> rather than keeping the split/select.
>
>
>
> On 17/06/2019 09:40, SHI Xiaogang wrote:
> > Hi Dawid,
> >
> > As the select method is only allowed on SplitStreams, it's impossible to
> > construct the example ds.split().select("a", "b").select("c", "d").
> >
> > Are you meaning ds.split().select("a", "b").split().select("c", "d")?
> > If so, then the tagging in the first split operation should not affect
> the
> > second one. Then
> > splitted.select("a", "b") => empty
> > splitted.select("c", "d") => ds
> >
> > I cannot quite catch your point here. It's appreciated if you can
> provide a
> > more concrete explanation?
> >
> > Regards,
> > Xiaogang Shi
> >
> >
> >
> >
> > Dawid Wysakowicz  于2019年6月17日周一 下午3:10写道:
> >
> >> Hi all,
> >>
> >> Thank you for starting the discussion. To start with I have to say I am
> >> not entirely against leaving them. On the other hand I totally disagree
> >> that the semantics are clearly defined. Actually the design is
> >> fundamentally flawed.
> >>
> >>1. We use String as a selector for elements. This is not the cleanest
> >>design, but I agree it is not the worst.
> >>2. Users cannot define different types for different splits.
> >>3. (The actual reason why I think it's actually better to drop the
> >>split/select and introduce a better mechanism) The behavior of a
> split is
> >>to actually add an output selector. We can have just a single
> selector on a
> >>single operator, but the API allows (I would even say encourages) to
> create
> >>chains of split/select, which leads to undefined behavior. Take this
> for
> >>example: ds.split().select("a", "b").select("c", "d"). Which tags
> should be
> >>forwarded? ("a", "b", "c", "d") (union) or () (intersection). In my
> opinion
> >>the most obvious answer in this case would be the intersection. Let's
> >>modify it slightly though and I would assume a different behavior
> (the
> >>union)
> >>
> >>splitted = ds.split();
> >>
> >>splitted.select("a", "b").map()
> >>
> >>splitted.select("c", "d").map()
> >>
> >> Taking the 3rd argument into consideration I would be in favor of
> removing
> >> the current mechanism. I think the side outputs serve the purpose much
> >> better with much cleaner semantics. I get the argument that users are
> now
> >> forced to use processFunction if they want to use the side outputs. If
> this
> >> is the main problem how about enabling them e.g. for flatMap as well?
> >>
> >> Best,
> >>
> >> Dawid
> >> On 17/06/2019 08:51, Jark Wu wrote:
> >>
> >> +1 to keep the split/select API. I think if there are some problems with
> >> the API, it's better to fix them instead of deprecating them.
> >> And select/split are straightforward and convenient APIs. It's 

[jira] [Created] (FLINK-12873) Create a separate maven module for Shuffle API

2019-06-17 Thread Andrey Zagrebin (JIRA)
Andrey Zagrebin created FLINK-12873:
---

 Summary: Create a separate maven module for Shuffle API
 Key: FLINK-12873
 URL: https://issues.apache.org/jira/browse/FLINK-12873
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Configuration, Runtime / Network
Reporter: Andrey Zagrebin


At the moment, shuffle service API is a part of flink-runtime maven module. The 
implementers of other shuffle services will have to depend on the fat 
dependency of flink-runtime. We should consider factoring out the shuffle API 
interfaces to a separate maven module which depends only on flink-core. Later 
we can consider the same for the custom high availability services.

The final structure could be e.g. (up to discussion):
 * flink-runtime (already includes default shuffle and high availability 
implementations)
 * flink-runtime-extensions
 ** flink-runtime-extensions-core
 ** flink-shuffle-extensions-api
 ** flink-high-availability-extensions-api



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: Sort streams in windows

2019-06-17 Thread Евгений Юшин
Hi Jan

Thanks for a quick reply.

Doing stateful transformation requires re-writing the same logic which is
already defined in Flink by itself. Let's consider example from my original
message:
There can be out-of-order data -> data should be propagated to next
operator only when watermark crosses out-of-order boundaries -> all records
with 'ts < watermark' should be pre-processed (e.g. sorted)

Stateful function: all records should be stored in state, for each new
record the whole state should be traversed to understand if out-of-order
events can be propagated further. For unioned streams there should be logic
to take min ts for each stream to compare, but info about which records
goes to which stream is already lost. State should be persisted, and this
adds some footprint during checkpoints.
Flink windows handle all these duties under the hood.

So I think Flink Windows (merging one for this particular case) interface
is a perfect fit for such kind of activities when pre-processing should be
done at first place.



пн, 17 июн. 2019 г. в 11:35, Jan Lukavský :

> Hi Eugene,
>
> I'd say that what you want essentially is not "sort in windows", because
> (as you mention), you want to emit elements from windows as soon as
> watermark passes some timestamp. Maybe a better approach would be to
> implement this using stateful processing, where you keep a buffer of
> (unsorted) inputs and setup a timer for minimal time of elements in the
> buffer (plus allowed lateness), and the sort elements with timestamp <=
> the timer (very ofter single elements). I'm actually working on this for
> Apache Beam (design doc [1]), but this is still a work-in-progress.
>
> Another drawback is that something like "sorted map state" will probably
> be needed in order to efficiently query the state for minimal timestamp.
> A less efficient implementation might work with ListState as well.
>
> Jan
>
> [1]
>
> https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/edit?usp=sharing
>
> On 6/14/19 3:58 PM, Евгений Юшин wrote:
> > Hi folks
> >
> > I want to sort stream based on event time field derived from events. To
> do
> > this I can use one of the existing windows like TimeWindow to collect
> > events in a window of a particular size, or SlidingWindow to run sort
> logic
> > more often (and sort within slide).
> > Ideally, I want to sort events as fast as they pass watermark (with
> > out-of-order ts extractor). None of the current windows allow me to do
> > this. And I think to implement custom merging window similar to
> > SlidingWindow. Each element will be assigned to Window(event_ts,
> > event_ts+1), and then all windows with 'start < watermark' will be
> merged.
> > To implement this I need time service available in
> >
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L232
> > Unfortunately, 'getCurrentProcessingTime'is only there for now.
> >
> > I can pass function to extract timestamp to my new window extractor, but
> in
> > this case logic for calculation min watermark for
> > parallel/unioned/co-joined streams won't simply work.
> >
> > @devs would you mind if I extend WindowAssignerContext with
> >   getCurrentWatermark or the whole time service reference?
> >
> > Would be really glad to hear ypur concerns.
> >
> > Regards,
> > Eugene
> >
>


Re: [VOTE] FLIP-41: Unified binary format for keyed state

2019-06-17 Thread Aljoscha Krettek
+1

With the restriction that it should be “canonical format”/“unified format” (or 
something like it) and not save point format, i.e. not 
KeyedBackendSavepointStrategyBase in the doc, for example

Aljoscha

> On 17. Jun 2019, at 14:05, Congxian Qiu  wrote:
> 
> +1 from my side.
> Best,
> Congxian
> 
> 
> Tzu-Li (Gordon) Tai  于2019年6月17日周一 上午10:20写道:
> 
>> Hi Flink devs,
>> 
>> I want to officially start a voting thread to formally adopt FLIP-41 [1].
>> 
>> There are two relevant discussions threads for this feature [2] [3].
>> 
>> The voting time will end on June 19th 17:00 CET.
>> 
>> Cheers,
>> Gordon
>> 
>> [1]
>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-41:+Unify+Keyed+State+Snapshot+Binary+Format+for+Savepoints
>> 
>> [2]
>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-41-Unify-Keyed-State-Snapshot-Binary-Format-for-Savepoints-td29197.html
>> 
>> [3]  https://issues.apache.org/jira/browse/FLINK-12619
>> 



[jira] [Created] (FLINK-12874) Improve the semantics of zero length character strings

2019-06-17 Thread Timo Walther (JIRA)
Timo Walther created FLINK-12874:


 Summary: Improve the semantics of zero length character strings
 Key: FLINK-12874
 URL: https://issues.apache.org/jira/browse/FLINK-12874
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Timo Walther
Assignee: Timo Walther


Zero-length character strings need special treatment as the SQL standard 
forbids to declare those type of character strings. For the type inference 
(e.g. determine the return type of a \{{TRIM('')}}) it should be possible to 
return zero-lengths VARCHAR types.

 

In any case, those type should not have a serializable string representation. 
Similar behavior is done in the Oracle system: \{{SELECT DUMP(TRIM('')) AS tt 
FROM dual;}} returns \{{NULL}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


RE: About Deprecating split/select for DataStream API

2019-06-17 Thread xingcanc
Hi all,

Thanks for sharing your thoughts on this topic.

First, we must admit that the current implementation for split/select is 
flawed. I roughly went through the source codes, the problem may be that for 
consecutive select/split(s), the former one will be overridden by the later one 
during StreamGraph generation phase. That's why we forbid this consecutive 
logic in FLINK-11084.

Now the question is whether we should guide users to migrate to the new side 
output feature or thoroughly rework the broken API with the correct semantics 
(instead of just trying to forbid all the "invalid" usages). 

Personally, I prefer the later solution because

1. The split/select may have been widely used without touching the broken part.
2. Though restricted compared with side output, the semantics for split/select 
itself is acceptable since union does not support different data types either.
3. We need a complete and easy-to-use transformation set for DataStream API. 
Enabling side output for flatMap may not be an ultimate solution.

To summarize, maybe we should not easily deprecate the split/select public API. 
If we come to a consensus on that, how about rewriting it based on side output? 
(like the implementation for join on coGroup)

Any feedback is welcome : )

Best,
Xingcan

-Original Message-
From: SHI Xiaogang  
Sent: Monday, June 17, 2019 8:08 AM
To: Dawid Wysakowicz 
Cc: dev@flink.apache.org
Subject: Re: About Deprecating split/select for DataStream API

Hi Dawid,

Thanks a lot for your example.

I think most users will expect splitted1 to be empty in the example.

The unexpected results produced, in my opinion, is due to our problematic 
implementation, instead of the confusing semantics.
We can fix the problem if we add a SELECT operator to filter out unexpected 
records (Of course, we can find some optimization to improve the efficiency.).

After all, i prefer to fix the problems to make the results as expected.
What do you think?

Regards,
Xiaogang

Dawid Wysakowicz  于2019年6月17日周一 下午7:21写道:

> Yes you are correct. The problem I described applies to the split not 
> select as I wrote in the first email. Sorry for that.
>
> I will try to prepare a correct example. Let's have a look at this example:
>
> val splitted1 = ds.split(if (1) then "a")
>
> val splitted2 = ds.split(if (!=1) then "a")
>
> In those cases splitted1.select("a") -> will output all elements, the 
> same for splitted2, because the OutputSelector(s) are applied to 
> previous operator. The behavior I would assume is that splitted1 
> outputs only "1"s, whereas splitted2 all but "1"s
>
> On the other hand in a call
>
> val splitted1 = ds.split(if ("1" or "2") then 
> "a").select("a").split(if ("3") then "b").select("b")
>
> I would assume an intersection of those two splits, so no results. 
> What actually happens is that it will be "1", "2" & "3"s. Actually, 
> right exceptions should be thrown in those cases not to produce 
> confusing results, but this just shows that this API is broken, if we 
> need to check for some prohibited configurations during runtime.
>
> Those weird behaviors are in my opinion results of the flawed API, as 
> it actually assigns an output selector to the previous operator. In 
> other words it modifies previous operator. I think it would be much 
> cleaner if this happened inside an operator rather than separately. 
> This is what SideOutputs do, as you define them inside the 
> ProcessFunction, rather than afterwards. Therefore I am very much in 
> favor of using them for those cases. Once again if the problem is that 
> they are available only in the ProcessFunction I would prefer enabling 
> them e.g. in FlatMap, rather than keeping the split/select.
>
>
>
> On 17/06/2019 09:40, SHI Xiaogang wrote:
> > Hi Dawid,
> >
> > As the select method is only allowed on SplitStreams, it's 
> > impossible to construct the example ds.split().select("a", "b").select("c", 
> > "d").
> >
> > Are you meaning ds.split().select("a", "b").split().select("c", "d")?
> > If so, then the tagging in the first split operation should not 
> > affect
> the
> > second one. Then
> > splitted.select("a", "b") => empty
> > splitted.select("c", "d") => ds
> >
> > I cannot quite catch your point here. It's appreciated if you can
> provide a
> > more concrete explanation?
> >
> > Regards,
> > Xiaogang Shi
> >
> >
> >
> >
> > Dawid Wysakowicz  于2019年6月17日周一 下午3:10写道:
> >
> >> Hi all,
> >>
> >> Thank you for starting the discussion. To start with I have to say 
> >> I am not entirely against leaving them. On the other hand I totally 
> >> disagree that the semantics are clearly defined. Actually the 
> >> design is fundamentally flawed.
> >>
> >>1. We use String as a selector for elements. This is not the cleanest
> >>design, but I agree it is not the worst.
> >>2. Users cannot define different types for different splits.
> >>3. (The actual reason why I think it's actually better to drop the
> >>

Re: [ANNOUNCEMENT] March 2019 Bay Area Apache Flink Meetup

2019-06-17 Thread Xuefu Zhang
Hi all,

The scheduled meetup is only about a week away. Please note that RSVP at
meetup.com is required.  In order for us to get the actual headcount to
prepare for the event, please sign up as soon as possible if you plan to
join. Thank you very much for your cooperation.

Regards,
Xuefu

On Thu, Feb 14, 2019 at 4:32 PM Xuefu Zhang  wrote:

> Hi all,
>
> I'm very excited to announce that the community is planning the next
> meetup in Bay Area on March 25, 2019. The event is just announced on
> Meetup.com [1].
>
> To make the event successful, your participation and help will be needed.
> Currently, we are looking for an organization that can host the event.
> Please let me know if you have any leads.
>
> Secondly, we encourage Flink users and developers to take this as an
> opportunity to share experience or development. Thus, please let me know if
> you like to give a short talk.
>
> I look forward to meeting you all in the Meetup.
>
> Regards,
> Xuefu
>
> [1] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/258975465
>


[jira] [Created] (FLINK-12875) support char, varchar, timestamp, date, decimal in input arg conversion for Hive functions

2019-06-17 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12875:


 Summary: support char, varchar, timestamp, date, decimal in input 
arg conversion for Hive functions
 Key: FLINK-12875
 URL: https://issues.apache.org/jira/browse/FLINK-12875
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread Kurt Young
Hi Vino,

Thanks for the proposal, I like the general idea and IMO it's very useful
feature.
But after reading through the document, I feel that we may over design the
required
operator for proper local aggregation. The main reason is we want to have a
clear definition and behavior about the "local keyed state" which in my
opinion is not
necessary for local aggregation, at least for start.

Another issue I noticed is the local key by operator cannot change element
type, it will
also restrict a lot of use cases which can be benefit from local
aggregation, like "average".

We also did similar logic in SQL and the only thing need to be done is
introduce
a stateless lightweight operator which is *chained* before `keyby()`. The
operator will flush all buffered
elements during `StreamOperator::prepareSnapshotPreBarrier()` and make
himself stateless.
By the way, in the earlier version we also did the similar approach by
introducing a stateful
local aggregation operator but it's not performed as well as the later one,
and also effect the barrie
alignment time. The later one is fairly simple and more efficient.

I would highly suggest you to consider to have a stateless approach at the
first step.

Best,
Kurt


On Mon, Jun 17, 2019 at 7:32 PM Jark Wu  wrote:

> Hi Vino,
>
> Thanks for the proposal.
>
> Regarding to the "input.keyBy(0).sum(1)" vs
> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", have you done
> some benchmark?
> Because I'm curious about how much performance improvement can we get by
> using count window as the local operator.
>
> Best,
> Jark
>
>
>
> On Mon, 17 Jun 2019 at 17:48, vino yang  wrote:
>
> > Hi Hequn,
> >
> > Thanks for your reply.
> >
> > The purpose of localKeyBy API is to provide a tool which can let users do
> > pre-aggregation in the local. The behavior of the pre-aggregation is
> > similar to keyBy API.
> >
> > So the three cases are different, I will describe them one by one:
> >
> > 1. input.keyBy(0).sum(1)
> >
> > *In this case, the result is event-driven, each event can produce one sum
> > aggregation result and it is the latest one from the source start.*
> >
> > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> >
> > *In this case, the semantic may have a problem, it would do the local sum
> > aggregation and will produce the latest partial result from the source
> > start for every event. *
> > *These latest partial results from the same key are hashed to one node to
> > do the global sum aggregation.*
> > *In the global aggregation, when it received multiple partial results
> (they
> > are all calculated from the source start) and sum them will get the wrong
> > result.*
> >
> > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> >
> > *In this case, it would just get a partial aggregation result for the 5
> > records in the count window. The partial aggregation results from the
> same
> > key will be aggregated globally.*
> >
> > So the first case and the third case can get the *same* result, the
> > difference is the output-style and the latency.
> >
> > Generally speaking, the local key API is just an optimization API. We do
> > not limit the user's usage, but the user has to understand its semantics
> > and use it correctly.
> >
> > Best,
> > Vino
> >
> > Hequn Cheng  于2019年6月17日周一 下午4:18写道:
> >
> > > Hi Vino,
> > >
> > > Thanks for the proposal, I think it is a very good feature!
> > >
> > > One thing I want to make sure is the semantics for the `localKeyBy`.
> From
> > > the document, the `localKeyBy` API returns an instance of `KeyedStream`
> > > which can also perform sum(), so in this case, what's the semantics for
> > > `localKeyBy()`. For example, will the following code share the same
> > result?
> > > and what're the differences between them?
> > >
> > > 1. input.keyBy(0).sum(1)
> > > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > >
> > > Would also be great if we can add this into the document. Thank you
> very
> > > much.
> > >
> > > Best, Hequn
> > >
> > >
> > > On Fri, Jun 14, 2019 at 11:34 AM vino yang 
> > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > I have looked at the "*Process*" section of FLIP wiki page.[1] This
> > mail
> > > > thread indicates that it has proceeded to the third step.
> > > >
> > > > When I looked at the fourth step(vote step), I didn't find the
> > > > prerequisites for starting the voting process.
> > > >
> > > > Considering that the discussion of this feature has been done in the
> > old
> > > > thread. [2] So can you tell me when should I start voting? Can I
> start
> > > now?
> > > >
> > > > Best,
> > > > Vino
> > > >
> > > > [1]:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> > > > [2]:
> > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a

Re: About Deprecating split/select for DataStream API

2019-06-17 Thread Dian Fu
Hi all,

Thanks a lot for the discussion. I'm also in favor of rewriting/redesigning the 
split/select API instead of removing them. It has been a consensus that the 
side output API can achieve all the functionalities of the split/select API. 
The problem is whether we should also support some easy-to-use APIs on top of 
it. IMO, we should do that as long as the APIs have clear semantic and wide 
usage scenario. I think split/select API is such a kind of API. 

Regards,
Dian

> 在 2019年6月18日,上午12:30,xingc...@gmail.com 写道:
> 
> Hi all,
> 
> Thanks for sharing your thoughts on this topic.
> 
> First, we must admit that the current implementation for split/select is 
> flawed. I roughly went through the source codes, the problem may be that for 
> consecutive select/split(s), the former one will be overridden by the later 
> one during StreamGraph generation phase. That's why we forbid this 
> consecutive logic in FLINK-11084.
> 
> Now the question is whether we should guide users to migrate to the new side 
> output feature or thoroughly rework the broken API with the correct semantics 
> (instead of just trying to forbid all the "invalid" usages). 
> 
> Personally, I prefer the later solution because
> 
> 1. The split/select may have been widely used without touching the broken 
> part.
> 2. Though restricted compared with side output, the semantics for 
> split/select itself is acceptable since union does not support different data 
> types either.
> 3. We need a complete and easy-to-use transformation set for DataStream API. 
> Enabling side output for flatMap may not be an ultimate solution.
> 
> To summarize, maybe we should not easily deprecate the split/select public 
> API. If we come to a consensus on that, how about rewriting it based on side 
> output? (like the implementation for join on coGroup)
> 
> Any feedback is welcome : )
> 
> Best,
> Xingcan
> 
> -Original Message-
> From: SHI Xiaogang  
> Sent: Monday, June 17, 2019 8:08 AM
> To: Dawid Wysakowicz 
> Cc: dev@flink.apache.org
> Subject: Re: About Deprecating split/select for DataStream API
> 
> Hi Dawid,
> 
> Thanks a lot for your example.
> 
> I think most users will expect splitted1 to be empty in the example.
> 
> The unexpected results produced, in my opinion, is due to our problematic 
> implementation, instead of the confusing semantics.
> We can fix the problem if we add a SELECT operator to filter out unexpected 
> records (Of course, we can find some optimization to improve the efficiency.).
> 
> After all, i prefer to fix the problems to make the results as expected.
> What do you think?
> 
> Regards,
> Xiaogang
> 
> Dawid Wysakowicz  于2019年6月17日周一 下午7:21写道:
> 
>> Yes you are correct. The problem I described applies to the split not 
>> select as I wrote in the first email. Sorry for that.
>> 
>> I will try to prepare a correct example. Let's have a look at this example:
>> 
>>val splitted1 = ds.split(if (1) then "a")
>> 
>>val splitted2 = ds.split(if (!=1) then "a")
>> 
>> In those cases splitted1.select("a") -> will output all elements, the 
>> same for splitted2, because the OutputSelector(s) are applied to 
>> previous operator. The behavior I would assume is that splitted1 
>> outputs only "1"s, whereas splitted2 all but "1"s
>> 
>> On the other hand in a call
>> 
>>val splitted1 = ds.split(if ("1" or "2") then 
>> "a").select("a").split(if ("3") then "b").select("b")
>> 
>> I would assume an intersection of those two splits, so no results. 
>> What actually happens is that it will be "1", "2" & "3"s. Actually, 
>> right exceptions should be thrown in those cases not to produce 
>> confusing results, but this just shows that this API is broken, if we 
>> need to check for some prohibited configurations during runtime.
>> 
>> Those weird behaviors are in my opinion results of the flawed API, as 
>> it actually assigns an output selector to the previous operator. In 
>> other words it modifies previous operator. I think it would be much 
>> cleaner if this happened inside an operator rather than separately. 
>> This is what SideOutputs do, as you define them inside the 
>> ProcessFunction, rather than afterwards. Therefore I am very much in 
>> favor of using them for those cases. Once again if the problem is that 
>> they are available only in the ProcessFunction I would prefer enabling 
>> them e.g. in FlatMap, rather than keeping the split/select.
>> 
>> 
>> 
>> On 17/06/2019 09:40, SHI Xiaogang wrote:
>>> Hi Dawid,
>>> 
>>> As the select method is only allowed on SplitStreams, it's 
>>> impossible to construct the example ds.split().select("a", "b").select("c", 
>>> "d").
>>> 
>>> Are you meaning ds.split().select("a", "b").split().select("c", "d")?
>>> If so, then the tagging in the first split operation should not 
>>> affect
>> the
>>> second one. Then
>>>splitted.select("a", "b") => empty
>>>splitted.select("c", "d") => ds
>>> 
>>> I cannot quite catch your point here. It's appreciated if yo

[jira] [Created] (FLINK-12876) Adapt region failover NG for legacy scheduler

2019-06-17 Thread Zhu Zhu (JIRA)
Zhu Zhu created FLINK-12876:
---

 Summary: Adapt region failover NG for legacy scheduler
 Key: FLINK-12876
 URL: https://issues.apache.org/jira/browse/FLINK-12876
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.9.0
Reporter: Zhu Zhu
Assignee: Zhu Zhu
 Fix For: 1.9.0


We need a adapter to adapt flip1.RestartPipelinedRegionStrategy for legacy 
scheduler, so that the legacy scheduler can support fine grained recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread vino yang
Hi Jark,

We have done a comparative test. The effect is obvious.

>From our observation, the optimized effect mainly depends on two factors:


   - the degree of the skew: this factor depends on users business ;
   - the size of the window: localKeyBy support all the type of window
   which provided by Flink. Obviously, the larger the size of the window, the
   more obvious the effect.

In production, we can not decide the first factor. About the second factor,
it's the result of a trade-off. The size of the window affects the latency
of the pre-aggregation. That's to say:


   - the larger the size of the window, the more obvious the effect;
   - the larger the size of the window, the larger latency of the result

Best,
Vino

Jark Wu  于2019年6月17日周一 下午7:32写道:

> Hi Vino,
>
> Thanks for the proposal.
>
> Regarding to the "input.keyBy(0).sum(1)" vs
> "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", have you done
> some benchmark?
> Because I'm curious about how much performance improvement can we get by
> using count window as the local operator.
>
> Best,
> Jark
>
>
>
> On Mon, 17 Jun 2019 at 17:48, vino yang  wrote:
>
> > Hi Hequn,
> >
> > Thanks for your reply.
> >
> > The purpose of localKeyBy API is to provide a tool which can let users do
> > pre-aggregation in the local. The behavior of the pre-aggregation is
> > similar to keyBy API.
> >
> > So the three cases are different, I will describe them one by one:
> >
> > 1. input.keyBy(0).sum(1)
> >
> > *In this case, the result is event-driven, each event can produce one sum
> > aggregation result and it is the latest one from the source start.*
> >
> > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> >
> > *In this case, the semantic may have a problem, it would do the local sum
> > aggregation and will produce the latest partial result from the source
> > start for every event. *
> > *These latest partial results from the same key are hashed to one node to
> > do the global sum aggregation.*
> > *In the global aggregation, when it received multiple partial results
> (they
> > are all calculated from the source start) and sum them will get the wrong
> > result.*
> >
> > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> >
> > *In this case, it would just get a partial aggregation result for the 5
> > records in the count window. The partial aggregation results from the
> same
> > key will be aggregated globally.*
> >
> > So the first case and the third case can get the *same* result, the
> > difference is the output-style and the latency.
> >
> > Generally speaking, the local key API is just an optimization API. We do
> > not limit the user's usage, but the user has to understand its semantics
> > and use it correctly.
> >
> > Best,
> > Vino
> >
> > Hequn Cheng  于2019年6月17日周一 下午4:18写道:
> >
> > > Hi Vino,
> > >
> > > Thanks for the proposal, I think it is a very good feature!
> > >
> > > One thing I want to make sure is the semantics for the `localKeyBy`.
> From
> > > the document, the `localKeyBy` API returns an instance of `KeyedStream`
> > > which can also perform sum(), so in this case, what's the semantics for
> > > `localKeyBy()`. For example, will the following code share the same
> > result?
> > > and what're the differences between them?
> > >
> > > 1. input.keyBy(0).sum(1)
> > > 2. input.localKeyBy(0).sum(1).keyBy(0).sum(1)
> > > 3. input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)
> > >
> > > Would also be great if we can add this into the document. Thank you
> very
> > > much.
> > >
> > > Best, Hequn
> > >
> > >
> > > On Fri, Jun 14, 2019 at 11:34 AM vino yang 
> > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > I have looked at the "*Process*" section of FLIP wiki page.[1] This
> > mail
> > > > thread indicates that it has proceeded to the third step.
> > > >
> > > > When I looked at the fourth step(vote step), I didn't find the
> > > > prerequisites for starting the voting process.
> > > >
> > > > Considering that the discussion of this feature has been done in the
> > old
> > > > thread. [2] So can you tell me when should I start voting? Can I
> start
> > > now?
> > > >
> > > > Best,
> > > > Vino
> > > >
> > > > [1]:
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-FLIPround-up
> > > > [2]:
> > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
> > > >
> > > > leesf  于2019年6月13日周四 上午9:19写道:
> > > >
> > > > > +1 for the FLIP, thank vino for your efforts.
> > > > >
> > > > > Best,
> > > > > Leesf
> > > > >
> > > > > vino yang  于2019年6月12日周三 下午5:46写道:
> > > > >
> > > > > > Hi folks,
> > > > > >
> > > > > > I would like to start the FLIP discussion thread about supporting
> > > local
> > > > > > aggregation in Flink.
> > > > > >
> > > > > > In short, this feature can effectively alleviate data skew. This
> is
> > > the
> > > > > > F

[jira] [Created] (FLINK-12877) Unify catalog database implementations and remove CatalogDatabase interfaces

2019-06-17 Thread Bowen Li (JIRA)
Bowen Li created FLINK-12877:


 Summary: Unify catalog database implementations and remove 
CatalogDatabase interfaces
 Key: FLINK-12877
 URL: https://issues.apache.org/jira/browse/FLINK-12877
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Hive, Table SQL / API
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.9.0


per discussion in https://issues.apache.org/jira/browse/FLINK-12841



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread vino yang
Hi Kurt,

Thanks for your comments.

It seems we both implemented local aggregation feature to optimize the
issue of data skew.
However, IMHO, the API level of optimizing revenue is different.

*Your optimization benefits from Flink SQL and it's not user's faces.(If I
understand it incorrectly, please correct this.)*
*Our implementation employs it as an optimization tool API for DataStream,
it just like a local version of the keyBy API.*

Based on this, I want to say support it as a DataStream API can provide
these advantages:


   - The localKeyBy API has a clear semantic and it's flexible not only for
   processing data skew but also for implementing some user cases, for
   example, if we want to calculate the multiple-level aggregation, we can do
   multiple-level aggregation in the local aggregation:
   input.localKeyBy("a").sum(1).localKeyBy("b").window(); // here "a" is a
   sub-category, while "b" is a category, here we do not need to shuffle data
   in the network.
   - The users of DataStream API will benefit from this. Actually, we have
   a lot of scenes need to use DataStream API. Currently, DataStream API is
   the cornerstone of the physical plan of Flink SQL. With a localKeyBy API,
   the optimization of SQL at least may use this optimized API, this is a
   further topic.
   - Based on the window operator, our state would benefit from Flink State
   and checkpoint, we do not need to worry about OOM and job failed.

Now, about your questions:

1. About our design cannot change the data type and about the
implementation of average:

Just like my reply to Hequn, the localKeyBy is an API provides to the users
who use DataStream API to build their jobs.
Users should know its semantics and the difference with keyBy API, so if
they want to the average aggregation, they should carry local sum result
and local count result.
I admit that it will be convenient to use keyBy directly. But we need to
pay a little price when we get some benefits. I think this price is
reasonable. Considering that the DataStream API itself is a low-level API
(at least for now).

2. About stateless operator and
`StreamOperator::prepareSnapshotPreBarrier()`:

Actually, I have discussed this opinion with @dianfu in the old mail
thread. I will copy my opinion from there:

   - for your design, you still need somewhere to give the users configure
   the trigger threshold (maybe memory availability?), this design cannot
   guarantee a deterministic semantics (it will bring trouble for testing and
   debugging).
   - if the implementation depends on the timing of checkpoint, it would
   affect the checkpoint's progress, and the buffered data may cause OOM
   issue. In addition, if the operator is stateless, it can not provide fault
   tolerance.

Best,
Vino

Kurt Young  于2019年6月18日周二 上午9:22写道:

> Hi Vino,
>
> Thanks for the proposal, I like the general idea and IMO it's very useful
> feature.
> But after reading through the document, I feel that we may over design the
> required
> operator for proper local aggregation. The main reason is we want to have a
> clear definition and behavior about the "local keyed state" which in my
> opinion is not
> necessary for local aggregation, at least for start.
>
> Another issue I noticed is the local key by operator cannot change element
> type, it will
> also restrict a lot of use cases which can be benefit from local
> aggregation, like "average".
>
> We also did similar logic in SQL and the only thing need to be done is
> introduce
> a stateless lightweight operator which is *chained* before `keyby()`. The
> operator will flush all buffered
> elements during `StreamOperator::prepareSnapshotPreBarrier()` and make
> himself stateless.
> By the way, in the earlier version we also did the similar approach by
> introducing a stateful
> local aggregation operator but it's not performed as well as the later one,
> and also effect the barrie
> alignment time. The later one is fairly simple and more efficient.
>
> I would highly suggest you to consider to have a stateless approach at the
> first step.
>
> Best,
> Kurt
>
>
> On Mon, Jun 17, 2019 at 7:32 PM Jark Wu  wrote:
>
> > Hi Vino,
> >
> > Thanks for the proposal.
> >
> > Regarding to the "input.keyBy(0).sum(1)" vs
> > "input.localKeyBy(0).countWindow(5).sum(1).keyBy(0).sum(1)", have you
> done
> > some benchmark?
> > Because I'm curious about how much performance improvement can we get by
> > using count window as the local operator.
> >
> > Best,
> > Jark
> >
> >
> >
> > On Mon, 17 Jun 2019 at 17:48, vino yang  wrote:
> >
> > > Hi Hequn,
> > >
> > > Thanks for your reply.
> > >
> > > The purpose of localKeyBy API is to provide a tool which can let users
> do
> > > pre-aggregation in the local. The behavior of the pre-aggregation is
> > > similar to keyBy API.
> > >
> > > So the three cases are different, I will describe them one by one:
> > >
> > > 1. input.keyBy(0).sum(1)
> > >
> > > *In this case, the result is event-driven, each event can 

[jira] [Created] (FLINK-12878) Add travis profile for flink-table-planner-blink/flink-table-runtime-blink

2019-06-17 Thread godfrey he (JIRA)
godfrey he created FLINK-12878:
--

 Summary: Add travis profile for 
flink-table-planner-blink/flink-table-runtime-blink
 Key: FLINK-12878
 URL: https://issues.apache.org/jira/browse/FLINK-12878
 Project: Flink
  Issue Type: Improvement
  Components: Travis
Reporter: godfrey he
Assignee: godfrey he


The flink-table-planner-blink/flink-table-runtime-blink profiles takes almost 
30 minutes, and that may cause libraries profile frequently hits timeouts; we 
can resolve this by moving flink-table-planner-blink and 
flink-table-runtime-blink into a separate profile.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread Kurt Young
Hi Vino,

Now I feel that we may have different understandings about what kind of
problems or improvements you want to
resolve. Currently, most of the feedback are focusing on *how to do a
proper local aggregation to improve performance
and maybe solving the data skew issue*. And my gut feeling is this is
exactly what users want at the first place,
especially those +1s. (Sorry to try to summarize here, please correct me if
i'm wrong).

But I still think the design is somehow diverged from the goal. If we want
to have an efficient and powerful way to
have local aggregation, supporting intermedia result type is essential IMO.
Both runtime's `AggregateFunction` and
SQL`s `UserDefinedAggregateFunction` have a proper support of intermediate
result type and can do `merge` operation
on them.

Now, we have a lightweight alternatives which performs well, and have a
nice fit with the local aggregate requirements.
Mostly importantly,  it's much less complex because it's stateless. And it
can also achieve the similar multiple-aggregation
scenario.

I still not convinced why we shouldn't consider it as a first step.

Best,
Kurt


On Tue, Jun 18, 2019 at 11:35 AM vino yang  wrote:

> Hi Kurt,
>
> Thanks for your comments.
>
> It seems we both implemented local aggregation feature to optimize the
> issue of data skew.
> However, IMHO, the API level of optimizing revenue is different.
>
> *Your optimization benefits from Flink SQL and it's not user's faces.(If I
> understand it incorrectly, please correct this.)*
> *Our implementation employs it as an optimization tool API for DataStream,
> it just like a local version of the keyBy API.*
>
> Based on this, I want to say support it as a DataStream API can provide
> these advantages:
>
>
>- The localKeyBy API has a clear semantic and it's flexible not only for
>processing data skew but also for implementing some user cases, for
>example, if we want to calculate the multiple-level aggregation, we can
> do
>multiple-level aggregation in the local aggregation:
>input.localKeyBy("a").sum(1).localKeyBy("b").window(); // here "a" is a
>sub-category, while "b" is a category, here we do not need to shuffle
> data
>in the network.
>- The users of DataStream API will benefit from this. Actually, we have
>a lot of scenes need to use DataStream API. Currently, DataStream API is
>the cornerstone of the physical plan of Flink SQL. With a localKeyBy
> API,
>the optimization of SQL at least may use this optimized API, this is a
>further topic.
>- Based on the window operator, our state would benefit from Flink State
>and checkpoint, we do not need to worry about OOM and job failed.
>
> Now, about your questions:
>
> 1. About our design cannot change the data type and about the
> implementation of average:
>
> Just like my reply to Hequn, the localKeyBy is an API provides to the users
> who use DataStream API to build their jobs.
> Users should know its semantics and the difference with keyBy API, so if
> they want to the average aggregation, they should carry local sum result
> and local count result.
> I admit that it will be convenient to use keyBy directly. But we need to
> pay a little price when we get some benefits. I think this price is
> reasonable. Considering that the DataStream API itself is a low-level API
> (at least for now).
>
> 2. About stateless operator and
> `StreamOperator::prepareSnapshotPreBarrier()`:
>
> Actually, I have discussed this opinion with @dianfu in the old mail
> thread. I will copy my opinion from there:
>
>- for your design, you still need somewhere to give the users configure
>the trigger threshold (maybe memory availability?), this design cannot
>guarantee a deterministic semantics (it will bring trouble for testing
> and
>debugging).
>- if the implementation depends on the timing of checkpoint, it would
>affect the checkpoint's progress, and the buffered data may cause OOM
>issue. In addition, if the operator is stateless, it can not provide
> fault
>tolerance.
>
> Best,
> Vino
>
> Kurt Young  于2019年6月18日周二 上午9:22写道:
>
> > Hi Vino,
> >
> > Thanks for the proposal, I like the general idea and IMO it's very useful
> > feature.
> > But after reading through the document, I feel that we may over design
> the
> > required
> > operator for proper local aggregation. The main reason is we want to
> have a
> > clear definition and behavior about the "local keyed state" which in my
> > opinion is not
> > necessary for local aggregation, at least for start.
> >
> > Another issue I noticed is the local key by operator cannot change
> element
> > type, it will
> > also restrict a lot of use cases which can be benefit from local
> > aggregation, like "average".
> >
> > We also did similar logic in SQL and the only thing need to be done is
> > introduce
> > a stateless lightweight operator which is *chained* before `keyby()`. The
> > operator will flush all buffered
> > elements 

[jira] [Created] (FLINK-12879) Improve the performance of AbstractBinaryWriter

2019-06-17 Thread Liya Fan (JIRA)
Liya Fan created FLINK-12879:


 Summary: Improve the performance of AbstractBinaryWriter
 Key: FLINK-12879
 URL: https://issues.apache.org/jira/browse/FLINK-12879
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / Runtime
Reporter: Liya Fan
Assignee: Liya Fan


Improve the performance of AbstractBinaryWriter by:
1. remove unnecessary memory copy
2. improve the performance of rounding buffer size.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread vino yang
Hi Kurt,

Thanks for your reply.

Actually, I am not against you to raise your design.

>From your description before, I just can imagine your high-level
implementation is about SQL and the optimization is inner of the API. Is it
automatically? how to give the configuration option about trigger
pre-aggregation?

Maybe after I get more information, it sounds more reasonable.

IMO, first of all, it would be better to make your user interface concrete,
it's the basis of the discussion.

For example, can you give an example code snippet to introduce how to help
users to process data skew caused by the jobs which built with DataStream
API?

If you give more details we can discuss further more. I think if one design
introduces an exact interface and another does not.

The implementation has an obvious difference. For example, we introduce an
exact API in DataStream named localKeyBy, about the pre-aggregation we need
to define the trigger mechanism of local aggregation, so we find reused
window API and operator is a good choice. This is a reasoning link from
design to implementation.

What do you think?

Best,
Vino


Kurt Young  于2019年6月18日周二 上午11:58写道:

> Hi Vino,
>
> Now I feel that we may have different understandings about what kind of
> problems or improvements you want to
> resolve. Currently, most of the feedback are focusing on *how to do a
> proper local aggregation to improve performance
> and maybe solving the data skew issue*. And my gut feeling is this is
> exactly what users want at the first place,
> especially those +1s. (Sorry to try to summarize here, please correct me if
> i'm wrong).
>
> But I still think the design is somehow diverged from the goal. If we want
> to have an efficient and powerful way to
> have local aggregation, supporting intermedia result type is essential IMO.
> Both runtime's `AggregateFunction` and
> SQL`s `UserDefinedAggregateFunction` have a proper support of intermediate
> result type and can do `merge` operation
> on them.
>
> Now, we have a lightweight alternatives which performs well, and have a
> nice fit with the local aggregate requirements.
> Mostly importantly,  it's much less complex because it's stateless. And it
> can also achieve the similar multiple-aggregation
> scenario.
>
> I still not convinced why we shouldn't consider it as a first step.
>
> Best,
> Kurt
>
>
> On Tue, Jun 18, 2019 at 11:35 AM vino yang  wrote:
>
> > Hi Kurt,
> >
> > Thanks for your comments.
> >
> > It seems we both implemented local aggregation feature to optimize the
> > issue of data skew.
> > However, IMHO, the API level of optimizing revenue is different.
> >
> > *Your optimization benefits from Flink SQL and it's not user's faces.(If
> I
> > understand it incorrectly, please correct this.)*
> > *Our implementation employs it as an optimization tool API for
> DataStream,
> > it just like a local version of the keyBy API.*
> >
> > Based on this, I want to say support it as a DataStream API can provide
> > these advantages:
> >
> >
> >- The localKeyBy API has a clear semantic and it's flexible not only
> for
> >processing data skew but also for implementing some user cases, for
> >example, if we want to calculate the multiple-level aggregation, we
> can
> > do
> >multiple-level aggregation in the local aggregation:
> >input.localKeyBy("a").sum(1).localKeyBy("b").window(); // here "a" is
> a
> >sub-category, while "b" is a category, here we do not need to shuffle
> > data
> >in the network.
> >- The users of DataStream API will benefit from this. Actually, we
> have
> >a lot of scenes need to use DataStream API. Currently, DataStream API
> is
> >the cornerstone of the physical plan of Flink SQL. With a localKeyBy
> > API,
> >the optimization of SQL at least may use this optimized API, this is a
> >further topic.
> >- Based on the window operator, our state would benefit from Flink
> State
> >and checkpoint, we do not need to worry about OOM and job failed.
> >
> > Now, about your questions:
> >
> > 1. About our design cannot change the data type and about the
> > implementation of average:
> >
> > Just like my reply to Hequn, the localKeyBy is an API provides to the
> users
> > who use DataStream API to build their jobs.
> > Users should know its semantics and the difference with keyBy API, so if
> > they want to the average aggregation, they should carry local sum result
> > and local count result.
> > I admit that it will be convenient to use keyBy directly. But we need to
> > pay a little price when we get some benefits. I think this price is
> > reasonable. Considering that the DataStream API itself is a low-level API
> > (at least for now).
> >
> > 2. About stateless operator and
> > `StreamOperator::prepareSnapshotPreBarrier()`:
> >
> > Actually, I have discussed this opinion with @dianfu in the old mail
> > thread. I will copy my opinion from there:
> >
> >- for your design, you still need somewhere to give the user

Re: About Deprecating split/select for DataStream API

2019-06-17 Thread Dawid Wysakowicz
Hi all,

I think we are getting closer to a consensus. I think most of us already
agree that the current behavior is broken. The remaining difference I
see is that I think those problems are caused by the design of the
split/select method. The current contract of the split method is that it
is actually applied to the previous operation, rather than it creates a
new operator. (I don't think this is an implementation detail. This is
the contract of the API. It was described in docs, conference talks,
workshops etc.).

I agree that we could reimplement the split with side outputs in a way
that it would add additional operator in a chain and emit results via
side outputs. This would change the core concepts of the split method
though, making the "fixed" split method a completely new one with a new
behavior. Therefore I am in favor of dropping the old method and
introducing a new one. This would be an explicit information for the
users that this is something different, so that they can make a
conscious choice. Moreover if we reimplement the split method in a way
that it introduces additional operator and emits results via side
outputs, this is basically equivalent to enabling side outputs in a
flatMap method. You can think of the split method as a flatMap method. I
see no benefit of having additional split method that would have a
limited functionality of such flatMap with side outputs.

So to sum up my stance:

1. I am against changing the behavior of current split/select methods.

2. I would be ok with introducing a similar, but new methods, with a new
behavior (but would prefer not to do that, we could achieve the same or
even more with existing APIs)

3. I would be in favor of enabling side outputs for flatMap method (if
it is possible)

Best,

Dawid

On 18/06/2019 03:45, Dian Fu wrote:
> Hi all,
>
> Thanks a lot for the discussion. I'm also in favor of rewriting/redesigning 
> the split/select API instead of removing them. It has been a consensus that 
> the side output API can achieve all the functionalities of the split/select 
> API. The problem is whether we should also support some easy-to-use APIs on 
> top of it. IMO, we should do that as long as the APIs have clear semantic and 
> wide usage scenario. I think split/select API is such a kind of API. 
>
> Regards,
> Dian
>
>> 在 2019年6月18日,上午12:30,xingc...@gmail.com 写道:
>>
>> Hi all,
>>
>> Thanks for sharing your thoughts on this topic.
>>
>> First, we must admit that the current implementation for split/select is 
>> flawed. I roughly went through the source codes, the problem may be that for 
>> consecutive select/split(s), the former one will be overridden by the later 
>> one during StreamGraph generation phase. That's why we forbid this 
>> consecutive logic in FLINK-11084.
>>
>> Now the question is whether we should guide users to migrate to the new side 
>> output feature or thoroughly rework the broken API with the correct 
>> semantics (instead of just trying to forbid all the "invalid" usages). 
>>
>> Personally, I prefer the later solution because
>>
>> 1. The split/select may have been widely used without touching the broken 
>> part.
>> 2. Though restricted compared with side output, the semantics for 
>> split/select itself is acceptable since union does not support different 
>> data types either.
>> 3. We need a complete and easy-to-use transformation set for DataStream API. 
>> Enabling side output for flatMap may not be an ultimate solution.
>>
>> To summarize, maybe we should not easily deprecate the split/select public 
>> API. If we come to a consensus on that, how about rewriting it based on side 
>> output? (like the implementation for join on coGroup)
>>
>> Any feedback is welcome : )
>>
>> Best,
>> Xingcan
>>
>> -Original Message-
>> From: SHI Xiaogang  
>> Sent: Monday, June 17, 2019 8:08 AM
>> To: Dawid Wysakowicz 
>> Cc: dev@flink.apache.org
>> Subject: Re: About Deprecating split/select for DataStream API
>>
>> Hi Dawid,
>>
>> Thanks a lot for your example.
>>
>> I think most users will expect splitted1 to be empty in the example.
>>
>> The unexpected results produced, in my opinion, is due to our problematic 
>> implementation, instead of the confusing semantics.
>> We can fix the problem if we add a SELECT operator to filter out unexpected 
>> records (Of course, we can find some optimization to improve the 
>> efficiency.).
>>
>> After all, i prefer to fix the problems to make the results as expected.
>> What do you think?
>>
>> Regards,
>> Xiaogang
>>
>> Dawid Wysakowicz  于2019年6月17日周一 下午7:21写道:
>>
>>> Yes you are correct. The problem I described applies to the split not 
>>> select as I wrote in the first email. Sorry for that.
>>>
>>> I will try to prepare a correct example. Let's have a look at this example:
>>>
>>>val splitted1 = ds.split(if (1) then "a")
>>>
>>>val splitted2 = ds.split(if (!=1) then "a")
>>>
>>> In those cases splitted1.select("a") -> will output all elements, the 
>>> same for spl

Re: [DISCUSS] FLIP-44: Support Local Aggregation in Flink

2019-06-17 Thread Kurt Young
Yeah, sorry for not expressing myself clearly. I will try to provide more
details to make sure we are on the same page.

For DataStream API, it shouldn't be optimized automatically. You have to
explicitly call API to do local aggregation
as well as the trigger policy of the local aggregation. Take average for
example, the user program may look like this (just a draft):

assuming the input type is DataStream>

ds.localAggregate(
0,   // The local key, which is
the String from Tuple2
PartitionAvg(1), // The partial aggregation
function, produces Tuple2, indicating sum and count
CountTrigger.of(1000L)// Trigger policy, note this should be
best effort, and also be composited with time based or memory size based
trigger
)   // The return type is local
aggregate Tuple2>
.keyBy(0) // Further keyby it with required
key
.aggregate(1)  // This will merge all the partial
results and get the final average.

(This is only a draft, only trying to explain what it looks like. )

The local aggregate operator can be stateless, we can keep a memory buffer
or other efficient data structure to improve the aggregate performance.

Let me know if you have any other questions.

Best,
Kurt


On Tue, Jun 18, 2019 at 1:29 PM vino yang  wrote:

> Hi Kurt,
>
> Thanks for your reply.
>
> Actually, I am not against you to raise your design.
>
> From your description before, I just can imagine your high-level
> implementation is about SQL and the optimization is inner of the API. Is it
> automatically? how to give the configuration option about trigger
> pre-aggregation?
>
> Maybe after I get more information, it sounds more reasonable.
>
> IMO, first of all, it would be better to make your user interface concrete,
> it's the basis of the discussion.
>
> For example, can you give an example code snippet to introduce how to help
> users to process data skew caused by the jobs which built with DataStream
> API?
>
> If you give more details we can discuss further more. I think if one design
> introduces an exact interface and another does not.
>
> The implementation has an obvious difference. For example, we introduce an
> exact API in DataStream named localKeyBy, about the pre-aggregation we need
> to define the trigger mechanism of local aggregation, so we find reused
> window API and operator is a good choice. This is a reasoning link from
> design to implementation.
>
> What do you think?
>
> Best,
> Vino
>
>
> Kurt Young  于2019年6月18日周二 上午11:58写道:
>
> > Hi Vino,
> >
> > Now I feel that we may have different understandings about what kind of
> > problems or improvements you want to
> > resolve. Currently, most of the feedback are focusing on *how to do a
> > proper local aggregation to improve performance
> > and maybe solving the data skew issue*. And my gut feeling is this is
> > exactly what users want at the first place,
> > especially those +1s. (Sorry to try to summarize here, please correct me
> if
> > i'm wrong).
> >
> > But I still think the design is somehow diverged from the goal. If we
> want
> > to have an efficient and powerful way to
> > have local aggregation, supporting intermedia result type is essential
> IMO.
> > Both runtime's `AggregateFunction` and
> > SQL`s `UserDefinedAggregateFunction` have a proper support of
> intermediate
> > result type and can do `merge` operation
> > on them.
> >
> > Now, we have a lightweight alternatives which performs well, and have a
> > nice fit with the local aggregate requirements.
> > Mostly importantly,  it's much less complex because it's stateless. And
> it
> > can also achieve the similar multiple-aggregation
> > scenario.
> >
> > I still not convinced why we shouldn't consider it as a first step.
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Jun 18, 2019 at 11:35 AM vino yang 
> wrote:
> >
> > > Hi Kurt,
> > >
> > > Thanks for your comments.
> > >
> > > It seems we both implemented local aggregation feature to optimize the
> > > issue of data skew.
> > > However, IMHO, the API level of optimizing revenue is different.
> > >
> > > *Your optimization benefits from Flink SQL and it's not user's
> faces.(If
> > I
> > > understand it incorrectly, please correct this.)*
> > > *Our implementation employs it as an optimization tool API for
> > DataStream,
> > > it just like a local version of the keyBy API.*
> > >
> > > Based on this, I want to say support it as a DataStream API can provide
> > > these advantages:
> > >
> > >
> > >- The localKeyBy API has a clear semantic and it's flexible not only
> > for
> > >processing data skew but also for implementing some user cases, for
> > >example, if we want to calculate the multiple-level aggregation, we
> > can
> > > do
> > >multiple-level aggregation in the local aggregation:
> > >input.localKeyBy("a").sum(1).localKeyBy("b").window(); // here "a

Something wrong with travis?

2019-06-17 Thread Kurt Young
Hi dev,

I noticed that all the travis tests triggered by pull request are failed
with the same error:

"Cached flink dir /home/travis/flink_cache/x/flink does not exist.
Exiting build."

Anyone have a clue on what happened and how to fix this?

Best,
Kurt