Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Kurt Young
Thanks Bowen for driving this.

+1 for the general idea. It makes the function resolved behavior more
clear and deterministic. Besides, the user can use all hive built-in
functions, which is a great feature.

I only have one comment, but maybe it may touch your design so I think
it would make sense to reply this mail instead of comment on google doc.
Regarding to the classfication of functions, you currently have 4 types
of functions, which are:
1. temporary functions
2. Flink built-in functions
3. Hive built-in functions (or generalized as external built-in functions)
4. catalog functions

What I want to propose is we can merge #3 and #4, make them both under
"catalog" concept, by extending catalog function to make it have ability to
have built-in catalog functions. Some benefits I can see from this approach:
1. We don't have to introduce new concept like external built-in functions.
Actually
I don't see a full story about how to treat a built-in functions, and it
seems a little
bit disrupt with catalog. As a result, you have to make some restriction
like "hive
built-in functions can only be used when current catalog is hive catalog".

2. It makes us easier to adopt another system's built-in functions to
Flink, such as
MySQL. If we don't treat uniformly with  "external built-in functions" and
"external
catalog function", things like user set current catalog to hive but want to
use MySQL's
built-in function will happen.

One more thing, follow this approach, it's clear for your question about
how to support
external built-in functions, which is "add a  getBuiltInFunction to current
Catalog API".

What do you think?

Best,
Kurt


On Fri, Aug 30, 2019 at 7:14 AM Bowen Li  wrote:

> Thanks everyone for the feedback.
>
> I have updated the document accordingly. Here're the summary of changes:
>
> - clarify the concept of temporary functions, to facilitate deciding
> function resolution order
> - provide two options to support Hive built-in functions, with the 2nd one
> being preferred
> - add detailed prototype code for FunctionCatalog#lookupFunction(name)
> - move the section of ”rename existing FunctionCatalog APIs in favor of
> temporary functions“ out of the scope of the FLIP
> - add another reasonable limitation for function resolution, to not
> consider resolving overloaded functions - those with the same name but
> different params. (It's still valid to have a single function with
> overloaded eval() methods)
>
> Please take another look.
>
> Thanks,
> Bowen
>
> On Tue, Aug 27, 2019 at 11:49 AM Bowen Li  wrote:
>
> > Hi folks,
> >
> > I'd like to kick off a discussion on reworking Flink's FunctionCatalog.
> > It's critically helpful to improve function usability in SQL.
> >
> >
> >
> https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing
> >
> > In short, it:
> > - adds support for precise function reference with fully/partially
> > qualified name
> > - redefines function resolution order for ambiguous function reference
> > - adds support for Hive's rich built-in functions (support for Hive user
> > defined functions was already added in 1.9.0)
> > - clarifies the concept of temporary functions
> >
> > Would love to hear your thoughts.
> >
> > Bowen
> >
>


Please add me as contributor

2019-09-03 Thread Jan Lukavský

Hi,

I'd like to be able to assign JIRAs to myself, can I be added as 
contributor, please? My JIRA ID is 'janl'.


Thanks,

 Jan



Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread JingsongLee
Thanks Bowen:

+1 for this. And +1 to Kurt's suggestion. My other points are:

1.Hive built-in functions is an intermediate solution. So we should
 not introduce interfaces to influence the framework. To make
 Flink itself more powerful, we should implement the functions
 we need to add.

2.Non-flink built-in functions are easy for users to change their 
behavior. If we support some flink built-in functions in the
 future but act differently from non-flink built-in, this will lead to
 changes in user behavior.

3.Fallback to Non-flink built-in functions is a bad choice to
 performance. Without flink internal codegen and data format,
 and bring data format conversion, the performance is not so
 good.

We need to support more complete hive jobs now, we need to
 have this fallback strategy. But it's not worth adding this
 concept at the catalog interface level, and it's not worth
 encouraging other catalogs to do so.

Another question is, does this fallback include all
 hive built-in functions? As far as I know, some hive functions
 have some hacky. If possible, can we start with a white list?
Once we implement some functions to flink built-in, we can
also update the whitelist.

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2019年9月3日(星期二) 15:41
To:dev 
Subject:Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

Thanks Bowen for driving this.

+1 for the general idea. It makes the function resolved behavior more
clear and deterministic. Besides, the user can use all hive built-in
functions, which is a great feature.

I only have one comment, but maybe it may touch your design so I think
it would make sense to reply this mail instead of comment on google doc.
Regarding to the classfication of functions, you currently have 4 types
of functions, which are:
1. temporary functions
2. Flink built-in functions
3. Hive built-in functions (or generalized as external built-in functions)
4. catalog functions

What I want to propose is we can merge #3 and #4, make them both under
"catalog" concept, by extending catalog function to make it have ability to
have built-in catalog functions. Some benefits I can see from this approach:
1. We don't have to introduce new concept like external built-in functions.
Actually
I don't see a full story about how to treat a built-in functions, and it
seems a little
bit disrupt with catalog. As a result, you have to make some restriction
like "hive
built-in functions can only be used when current catalog is hive catalog".

2. It makes us easier to adopt another system's built-in functions to
Flink, such as
MySQL. If we don't treat uniformly with  "external built-in functions" and
"external
catalog function", things like user set current catalog to hive but want to
use MySQL's
built-in function will happen.

One more thing, follow this approach, it's clear for your question about
how to support
external built-in functions, which is "add a  getBuiltInFunction to current
Catalog API".

What do you think?

Best,
Kurt


On Fri, Aug 30, 2019 at 7:14 AM Bowen Li  wrote:

> Thanks everyone for the feedback.
>
> I have updated the document accordingly. Here're the summary of changes:
>
> - clarify the concept of temporary functions, to facilitate deciding
> function resolution order
> - provide two options to support Hive built-in functions, with the 2nd one
> being preferred
> - add detailed prototype code for FunctionCatalog#lookupFunction(name)
> - move the section of ”rename existing FunctionCatalog APIs in favor of
> temporary functions“ out of the scope of the FLIP
> - add another reasonable limitation for function resolution, to not
> consider resolving overloaded functions - those with the same name but
> different params. (It's still valid to have a single function with
> overloaded eval() methods)
>
> Please take another look.
>
> Thanks,
> Bowen
>
> On Tue, Aug 27, 2019 at 11:49 AM Bowen Li  wrote:
>
> > Hi folks,
> >
> > I'd like to kick off a discussion on reworking Flink's FunctionCatalog.
> > It's critically helpful to improve function usability in SQL.
> >
> >
> >
> https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing
> >
> > In short, it:
> > - adds support for precise function reference with fully/partially
> > qualified name
> > - redefines function resolution order for ambiguous function reference
> > - adds support for Hive's rich built-in functions (support for Hive user
> > defined functions was already added in 1.9.0)
> > - clarifies the concept of temporary functions
> >
> > Would love to hear your thoughts.
> >
> > Bowen
> >
>



Re: Please add me as contributor

2019-09-03 Thread Dawid Wysakowicz
Hi Jan,

Recently the community changed the contribution process a bit and there
are no longer contributor privileges. The jira issues are supposed to be
assigned by committers that are willing to help you with getting the
contribution in. Please look at the contribution guidelines[1]. Do you
have some particular jira ticket in mind that you are interested in
working on?

Best,

Dawid


[1] https://flink.apache.org/contributing/contribute-code.html

On 03/09/2019 10:18, Jan Lukavský wrote:
> Hi,
>
> I'd like to be able to assign JIRAs to myself, can I be added as
> contributor, please? My JIRA ID is 'janl'.
>
> Thanks,
>
>  Jan
>



signature.asc
Description: OpenPGP digital signature


Re: Please add me as contributor

2019-09-03 Thread Jan Lukavský

Hi Dawid,

thanks for the explanation. I got warning from PR [1] associated with 
JIRA [2].


Jan

[1] https://github.com/apache/flink/pull/9579

[2] https://issues.apache.org/jira/browse/FLINK-13925

On 9/3/19 10:43 AM, Dawid Wysakowicz wrote:

Hi Jan,

Recently the community changed the contribution process a bit and there
are no longer contributor privileges. The jira issues are supposed to be
assigned by committers that are willing to help you with getting the
contribution in. Please look at the contribution guidelines[1]. Do you
have some particular jira ticket in mind that you are interested in
working on?

Best,

Dawid


[1] https://flink.apache.org/contributing/contribute-code.html

On 03/09/2019 10:18, Jan Lukavský wrote:

Hi,

I'd like to be able to assign JIRAs to myself, can I be added as
contributor, please? My JIRA ID is 'janl'.

Thanks,

  Jan



Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-03 Thread Timo Walther

Hi Becket,

it is definitely API but I would rather consider configuration as 
second-level API. Usually, a connector or plugin should have some nice 
builder pattern with helpful JavaDocs instead of just exposing the pure 
ConfigOptions.


I think there is some general misunderstanding what a configurable 
object should be:


1) When serializing a configurable object into a string representation, 
all attributes are put under a single top-level key. Your example of a 
plugin class should already spread multiple keys and should not use a 
configurable object.


2) A configurable object should not represent an entire configuration 
like ExecutionConfig. Because why should you put all 50 attributes under 
a single key?


3) "avoid letting the Configurations to host arbitrary objects beyond 
the primitive types": We need a way of expressing ListString, Boolean>> or List>. There is no clean 
way of expressing such data structures by only using primitive types.


Regards,
Timo



On 03.09.19 09:08, Dawid Wysakowicz wrote:


Hi Becket,

Regarding your example of the Configurable, this is not what we 
envisioned. What we think of a Configurable is a way to store simple 
POJOs that might be parameters for your some part of your system. The 
example you described should be rather expressed in a following way:


// Define the Plugin class ConfigOption.
ConfigOption> option =
 ConfigOptions.key("pluginConfigKey")
  .classType(org.apache.flink.SomePluginInterface.class);

class Host extends Configurable {

private String address;
private Int port;

private static final ConfigOption addressOption
private static final ConfigOption portOption;
  


//let's assume the original solution for now
private void fromConfiguration(ConfigurationReader config) {

this.address = config.get(addressOption);
this.port = config.get(portOption);
}

}

ConfigOption hostOption =
 ConfigOptions.key("configKey1ForMyPluginClass")
  .classType(org.apache.flink.SomePluginInterface.class);


  // Instantiate the user configured plugin
Class pluginClass = configuration.get(pluginConfigKey);
Host host = configuration.get(hostOption);
pluginClass.newInstance(host);

  // Programmatically, users will do the following to set the plugin.
configurations.set(option, MyPluginClass.class);
configurations.set(hostOption, new Host()); // set the
configurations required by my plugin class.

// Non-programmatically, users can do the following to set the plugin in a
config file, e.g. yaml
pluginConfigKey: PATH_TO_MyPluginClass
configKey1ForMyPluginClass: host: localhost, port: 1234 // see that all options 
of Configurable are stored in a single key

"I'd imagine that in most cases, after a concrete Configurable (say 
ExecutionConfig) instance is created from the Configuration instance, 
we will just pass around the ExecutionConfig instead of the 
Configuration object."


That's not the case. It's the Configuration object that's sent around 
in most of the cases. Itwould be a major rework of the Configuration 
handling to actually change that.


Best,

Dawid


On 03/09/2019 05:59, Becket Qin wrote:

Hi Timo,

I think I might have misunderstood the scope or motivation of the FLIP a
little bit. Please let me clarify a little bit.

Regarding "great if we don't put this burden on users", we should

consider who is actually using this API. It is not first-level API but
mostly API for Flink contributors. Most of the users will use API
classes ike ExecutionConfig or TableConfig or other builders for
performing configuration. They will never use the ConfigOptions classes
directly. So enforcing a duplicate method does not sound like a burden
to me.

I thought the configuration will be used by all the plugins and components
such as connectors. If that is the case, the Configuration sounds a public
API just like UDF. So one may implement a plugin and set that plugin class
in the configuration. Flink will do something like following:

// Define the Plugin ConfigOption. SomePluginInterface extends Configurable.
ConfigOption option =
 ConfigOptions.key("pluginConfigKey")

  .ConfigurableType(org.apache.flink.SomePluginInterface.class);

// Instantiate the user configured plugin
SomePluginInterface plugin =
configuration.getConfigurableInstance(pluginConfigKey);

// Programmatically, users will do the following to set the plugin.
// Set the plugin class, alternatively, we can always require a
ConfigurableFactory class as value.
configurations.setConfigurable("pluginConfigKey", MyPluginClass.class);
configurations.setInt("configKey1ForMyPluginClass", 123); // set the
configurations required by my plugin class.

// Non-programmatically, users can do the following to set the plugin in a
config file, e.g. yaml
pluginConfigKey: PATH_TO_MyPluginClass // Or PATH_TO_MyPluginClassFactory
configKey1ForMyPluginClass: 123

Internally, the configuration may discover the MyPluginClassFacto

Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-03 Thread Till Rohrmann
Thanks everyone for the input again. I'll then conclude this survey thread
and start a discuss thread to set the default restart delay to 1s.

@Arvid, I agree that a better documentation how to tune Flink with sane
settings for certain scenarios is super helpful. However, as you've said it
is somewhat hijacking the discussion and I would exclude it from my
proposed changes. The best thing to do would be to start a separate
discussion/effort for it.

Concerning the restart strategy configuration options, they are currently
only documented here [1]. I'm about to change it with this PR [2].

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html
[2] https://github.com/apache/flink/pull/9562

Cheers,
Till

On Tue, Sep 3, 2019 at 8:21 AM Arvid Heise  wrote:

> Hi all,
>
> just wanted to share my experience with configurations with you. For
> non-expert users configurations of Flink can be very daunting. The list of
> common properties is already helping a lot [1], but it's not clear how they
> depend on each other and settings common for specific use cases are not
> listed.
>
> If we can give somewhat clear recommendations for the start for the most
> common use cases (batch small/large cluster, streaming high throughput/low
> latency), I think users would be able start much more quickly with a
> somewhat well-configured system and fine-tune the settings later. For
> example, Kafka Streams has a section on how to set the parameters for
> maximum resilience [2].
>
> I'd propose to leave the current configuration page as a reference page,
> but also have a recommended configuration settings page that's directly
> linked in the first section, such that new users are not overwhelmed.
>
> Sorry if this response is hijacking the discussion.
> Btw, is restart-strategy configuration missing in the main configuration
> page? Is this a conscious decision?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#common-options
> [2]
> https://docs.confluent.io/current/streams/developer-guide/config-streams.html#recommended-configuration-parameters-for-resiliency
>
> On Tue, Sep 3, 2019 at 5:10 AM Zhu Zhu  wrote:
>
>> 1s looks good to me.
>> And I think the conclusion that when a user should override the delay is
>> worth to be documented.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Steven Wu  于2019年9月3日周二 上午4:42写道:
>>
>>> 1s sounds a good tradeoff to me.
>>>
>>> On Mon, Sep 2, 2019 at 1:30 PM Till Rohrmann 
>>> wrote:
>>>
 Thanks a lot for all your feedback. I see there is a slight tendency
 towards having a non zero default delay so far.

 However, Yu has brought up some valid points. Maybe I can shed some
 light on a).

 Before FLINK-9158 we set the default delay to 10s because Flink did not
 support queued scheduling which meant that if one slot was missing/still
 being occupied, then Flink would fail right away with
 a NoResourceAvailableException. In order to prevent this we added the
 delay. This also covered the case when the job was failing because of an
 overloaded external system.

 When we finished FLIP-6, we thought that we could improve the user
 experience by decreasing the default delay to 0s because all Flink related
 problems (slot still occupied, slot missing because of reconnecting TM)
 could be handled by the default slot request time out which allowed the
 slots to become ready after the scheduling was kicked off. However, we did
 not properly take the case of overloaded external systems into account.

 For b) I agree that any default value should be properly documented.
 This was clearly an oversight when FLINK-9158 has been merged. Moreover, I
 believe that there won't be the solve it all default value. There are
 always cases where one needs to adapt it to ones needs. But this is ok. The
 goal should be to find the default value which works for most cases.

 So maybe the middle ground between 10s and 0s could be a solution.
 Setting the default restart delay to 1s should prevent restart storms
 caused by overloaded external systems and still be fast enough to not slow
 down recoveries noticeably in most cases. If one needs a super fast
 recovery, then one should set the delay value to 0s. If one requires a
 longer delay because of a particular infrastructure, then one needs to
 change the value too. What do you think?

 Cheers,
 Till

 On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:

> -1 on increasing the default delay to none zero, with below reasons:
>
> a) I could see some concerns about setting the delay to zero in the
> very original JIRA (FLINK-2993
> ) but later on in
> FLINK-9158  we
> still decided to make the change, so I'm wondering whether the decision
> also 

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Timo Walther

Hi Bowen,

thanks for your proposal. Here are some thoughts:

1) We should not have the restriction "hive built-in functions can only 
be used when current catalog is hive catalog". Switching a catalog 
should only have implications on the cat.db.object resolution but not 
functions. It would be quite convinient for users to use Hive built-ins 
even if they use a Confluent schema registry or just the in-memory catalog.


2) I would propose to have separate concepts for catalog and built-in 
functions. In particular it would be nice to modularize built-in 
functions. Some built-in functions are very crucial (like AS, CAST, 
MINUS), others are more optional but stable (MD5, CONCAT_WS), and maybe 
we add more experimental functions in the future or function for some 
special application area (Geo functions, ML functions). A data platform 
team might not want to make every built-in function available. Or a 
function module like ML functions is in a different Maven module.


3) Following the suggestion above, we can have a separate discovery 
mechanism for built-in functions. Instead of just going through a static 
list like in BuiltInFunctionDefinitions, a platform team should be able 
to select function modules like 
catalogManager.setFunctionModules(CoreFunctions, GeoFunctions, 
HiveFunctions) or via service discovery;


3) Dawid and I discussed the resulution order again. I agree with Kurt 
that we should unify built-in function (external or internal) under a 
common layer. However, the resolution order should be:

  1. built-in functions
  2. temporary functions
  3. regular catalog resolution logic
Otherwise a temporary function could cause clashes with Flink's built-in 
functions. If you take a look at other vendors, like SQL Server they 
also do not allow to overwrite built-in functions.


Regards,
Timo


On 03.09.19 10:35, JingsongLee wrote:

Thanks Bowen:

+1 for this. And +1 to Kurt's suggestion. My other points are:

1.Hive built-in functions is an intermediate solution. So we should
  not introduce interfaces to influence the framework. To make
  Flink itself more powerful, we should implement the functions
  we need to add.

2.Non-flink built-in functions are easy for users to change their
behavior. If we support some flink built-in functions in the
  future but act differently from non-flink built-in, this will lead to
  changes in user behavior.

3.Fallback to Non-flink built-in functions is a bad choice to
  performance. Without flink internal codegen and data format,
  and bring data format conversion, the performance is not so
  good.

We need to support more complete hive jobs now, we need to
  have this fallback strategy. But it's not worth adding this
  concept at the catalog interface level, and it's not worth
  encouraging other catalogs to do so.

Another question is, does this fallback include all
  hive built-in functions? As far as I know, some hive functions
  have some hacky. If possible, can we start with a white list?
Once we implement some functions to flink built-in, we can
also update the whitelist.

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2019年9月3日(星期二) 15:41
To:dev 
Subject:Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

Thanks Bowen for driving this.

+1 for the general idea. It makes the function resolved behavior more
clear and deterministic. Besides, the user can use all hive built-in
functions, which is a great feature.

I only have one comment, but maybe it may touch your design so I think
it would make sense to reply this mail instead of comment on google doc.
Regarding to the classfication of functions, you currently have 4 types
of functions, which are:
1. temporary functions
2. Flink built-in functions
3. Hive built-in functions (or generalized as external built-in functions)
4. catalog functions

What I want to propose is we can merge #3 and #4, make them both under
"catalog" concept, by extending catalog function to make it have ability to
have built-in catalog functions. Some benefits I can see from this approach:
1. We don't have to introduce new concept like external built-in functions.
Actually
I don't see a full story about how to treat a built-in functions, and it
seems a little
bit disrupt with catalog. As a result, you have to make some restriction
like "hive
built-in functions can only be used when current catalog is hive catalog".

2. It makes us easier to adopt another system's built-in functions to
Flink, such as
MySQL. If we don't treat uniformly with  "external built-in functions" and
"external
catalog function", things like user set current catalog to hive but want to
use MySQL's
built-in function will happen.

One more thing, follow this approach, it's clear for your question about
how to support
external built-in functions, which is "add a  getBuiltInFunction to current
Catalog API".

What do you think?

Best,
Kurt


On Fri, Aug 30, 2019 at 7:14 AM Bowen Li  wrote:


Than

Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-03 Thread jincheng sun
+1 FLINK-13940  is a
blocker, due to loss data is very important bug, And great thanks for
helping fix it  Kostas!

Best, Jincheng

Kostas Kloudas  于2019年9月2日周一 下午7:20写道:

> Hi all,
>
> I think this should be also considered a blocker
> https://issues.apache.org/jira/browse/FLINK-13940.
> It is not a regression but it can result to data loss.
>
> I think I can have a quick fix by tomorrow.
>
> Cheers,
> Kostas
>
> On Mon, Sep 2, 2019 at 12:01 PM jincheng sun 
> wrote:
> >
> > Thanks for all of your feedback!
> >
> > Hi Jark, Glad to see that you are doing what RM should doing.
> >
> > Only one tips here is before the RC1 all the blocker should be fixed, but
> > othrers is nice to have. So you can decide when to prepare RC1 after the
> > blokcer is resolved.
> >
> > Feel free to tell me if you have any questions.
> >
> > Best,Jincheng
> >
> > Aljoscha Krettek  于2019年9月2日周一 下午5:03写道:
> >
> > > I cut a PR for FLINK-13586: https://github.com/apache/flink/pull/9595
> <
> > > https://github.com/apache/flink/pull/9595>
> > >
> > > > On 2. Sep 2019, at 05:03, Yu Li  wrote:
> > > >
> > > > +1 for a 1.8.2 release, thanks for bringing this up Jincheng!
> > > >
> > > > Best Regards,
> > > > Yu
> > > >
> > > >
> > > > On Mon, 2 Sep 2019 at 09:19, Thomas Weise  wrote:
> > > >
> > > >> +1 for the 1.8.2 release
> > > >>
> > > >> I marked https://issues.apache.org/jira/browse/FLINK-13586 for this
> > > >> release. It would be good to compensate for the backward
> incompatible
> > > >> change to ClosureCleaner that was introduced in 1.8.1, which affects
> > > >> downstream dependencies.
> > > >>
> > > >> Thanks,
> > > >> Thomas
> > > >>
> > > >>
> > > >> On Sun, Sep 1, 2019 at 5:10 PM jincheng sun <
> sunjincheng...@gmail.com>
> > > >> wrote:
> > > >>
> > > >>> Hi Jark,
> > > >>>
> > > >>> Glad to hear that you want to be the Release Manager of flink
> 1.8.2.
> > > >>> I believe that you will be a great RM, and I am very willing to
> help
> > > you
> > > >>> with the final release in the final stages. :)
> > > >>>
> > > >>> The release of Apache Flink involves a number of tasks. For
> details,
> > > you
> > > >>> can consult the documentation [1]. If you have any questions,
> please
> > > let
> > > >> me
> > > >>> know and let us work together.
> > > >>>
> > > >>>
> > > >>>
> > > >>
> > >
> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release#CreatingaFlinkRelease-Checklisttoproceedtothenextstep.1
> > > >>>
> > > >>> Cheers,
> > > >>> Jincheng
> > > >>>
> > > >>> Till Rohrmann  于2019年8月31日周六 上午12:59写道:
> > > >>>
> > >  +1 for a 1.8.2 bug fix release. Thanks for kicking this
> discussion off
> > >  Jincheng.
> > > 
> > >  Cheers,
> > >  Till
> > > 
> > >  On Fri, Aug 30, 2019 at 6:45 PM Jark Wu  wrote:
> > > 
> > > > Thanks Jincheng for bringing this up.
> > > >
> > > > +1 to the 1.8.2 release, because it already contains a couple of
> > >  important
> > > > fixes and it has been a long time since 1.8.1 came out.
> > > > I'm willing to help the community as much as possible. I'm
> wondering
> > > >>> if I
> > > > can be the release manager of 1.8.2 or work with you together
> > > >>> @Jincheng?
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Fri, 30 Aug 2019 at 18:58, Hequn Cheng 
> > > >>> wrote:
> > > >
> > > >> Hi Jincheng,
> > > >>
> > > >> +1 for a 1.8.2 release.
> > > >> Thanks a lot for raising the discussion. It would be nice to
> have
> > > >>> these
> > > >> critical fixes.
> > > >>
> > > >> Best, Hequn
> > > >>
> > > >>
> > > >> On Fri, Aug 30, 2019 at 6:31 PM Maximilian Michels <
> m...@apache.org
> > > >>>
> > > > wrote:
> > > >>
> > > >>> Hi Jincheng,
> > > >>>
> > > >>> +1 I would be for a 1.8.2 release such that we can fix the
> > > >> problems
> > > > with
> > > >>> the nested closure cleaner which currently block 1.8.1 users
> with
> > >  Beam:
> > > >>> https://issues.apache.org/jira/browse/FLINK-13367
> > > >>>
> > > >>> Thanks,
> > > >>> Max
> > > >>>
> > > >>> On 30.08.19 11:25, jincheng sun wrote:
> > >  Hi Flink devs,
> > > 
> > >  It has been nearly 2 months since the 1.8.1 released. So, what
> > > >> do
> > >  you
> > > >>> think
> > >  about releasing Flink 1.8.2 soon?
> > > 
> > >  We already have some blocker and critical fixes in the
> > > >>> release-1.8
> > > >>> branch:
> > > 
> > >  [Blocker]
> > >  - FLINK-13159 java.lang.ClassNotFoundException when restore
> job
> > >  - FLINK-10368 'Kerberized YARN on Docker test' unstable
> > >  - FLINK-12578 Use secure URLs for Maven repositories
> > > 
> > >  [Critical]
> > >  - FLINK-12736 ResourceManager may release TM with allocated
> > > >> slots
> > >  - FLINK-128

[jira] [Created] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.

2019-09-03 Thread Stefano (Jira)
Stefano created FLINK-13944:
---

 Summary: Table.toAppendStream: InvalidProgramException: Table 
program cannot be compiled.
 Key: FLINK-13944
 URL: https://issues.apache.org/jira/browse/FLINK-13944
 Project: Flink
  Issue Type: Bug
  Components: API / Scala, Table SQL / API
Affects Versions: 1.9.0, 1.8.1
 Environment: {{$ java -version}}
{{ openjdk version "1.8.0_222"}}
{{ OpenJDK Runtime Environment (build 
1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)}}
{{ OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)}}

{{--}}

{{$ scala -version}}
{{Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL}}

{{--}}

{{build.}}{{sbt}}

[...]

ThisBuild / scalaVersion := "2.11.12"

val flinkVersion = "1.9.0"

val flinkDependencies = Seq(
 "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
 "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
 "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided")

[...]

 
Reporter: Stefano
 Attachments: app.zip

{{Using: Scala streaming API and the StreamTableEnvironment.}}

Given the classes:

{{object EntityType extends Enumeration {}}
{{ type EntityType = Value}}
{{ val ACTIVITY = Value}}
{{}}}

{{sealed trait Entity extends Serializable}}

{{case class Activity(card_id: Long, date_time: Timestamp, second: Long, 
station_id: Long, station_name: String, activity_code: Long, amount: Long) 
extends Entity}}

 

What I try to do is{{ to convert a table after selection to an appendStream.}}

 

{{/** activity table **/}}
{{val activityDataStream = partialComputation1}}
{{ .filter(_._1 == EntityType.ACTIVITY)}}
{{ .map(x => x._3.asInstanceOf[Activity])}}
{{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 
'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}


{{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
{{selectedTable.printSchema()}}
{{// root}}
{{// |-- card_id: BIGINT}}
{{// |-- second: BIGINT}}

{{// ATTEMPT 1}}
{{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 2}}
{{// val output = tableEnv.toAppendStream[(java.lang.Long, 
java.lang.Long)](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 3}}
{{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 4}}
{{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = 
tableEnv.toAppendStream[Test](selectedTable)}}
{{output.print}}

 

The result for each of the attempts is always the same:

 

{{--- The program finished with the 
following exception:}}
 {{org.apache.flink.client.program.ProgramInvocationException: Job failed. 
(JobID: 334fe364c516008ca34b76e27c5c6f79) at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
 at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at

... 23 more

Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program 
cannot be compiled. This is a bug. Please file an issue.* at 
org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36)
 at 
org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50)
 at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 at 
org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)}}

 

My project in which I face the error is attached.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[DISCUSS] FLIP-62: Set default restart delay for FixedDelay- and FailureRateRestartStrategy to 1s

2019-09-03 Thread Till Rohrmann
Hi everyone,

I'd like to discuss changing the default restart delay for FixedDelay- and
FailureRateRestartStrategy to "1 s" [1].

According to a user survey about the default value of the restart delay
[2], it turned out that the current default value of "0 s" is not optimal.
In practice Flink users tend to set it to a non-zero value (e.g. "10 s") in
order to prevent restart storms originating from overloaded external
systems.

I would like to set the default restart delay of the
FixedDelayRestartStrategy ("restart-strategy.fixed-delay.delay") and of the
FailureRateRestartStrategy ("restart-strategy.failure-rate.delay") to "1
s". "1 s" should prevent restart storms originating from causes outside of
Flink (e.g. overloaded external systems) and still be fast enough to not
having a noticeable effect on most Flink deployments.

However, this change will affect all users who currently rely on the
current default restart delay value ("0 s"). The plan is to add a release
note to make these users aware of this change when upgrading Flink.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-62%3A+Set+default+restart+delay+for+FixedDelay-+and+FailureRateRestartStrategy+to+1s
[2]
https://lists.apache.org/thread.html/107b15de6b8ac849610d99c4754715d2a8a2f32ddfe9f8da02af2ccc@%3Cdev.flink.apache.org%3E

Cheers,
Till


Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-03 Thread Till Rohrmann
The FLIP-62 discuss thread can be found here [1].

[1]
https://lists.apache.org/thread.html/9602b342602a0181fcb618581f3b12e692ed2fad98c59fd6c1caeabd@%3Cdev.flink.apache.org%3E

Cheers,
Till

On Tue, Sep 3, 2019 at 11:13 AM Till Rohrmann  wrote:

> Thanks everyone for the input again. I'll then conclude this survey thread
> and start a discuss thread to set the default restart delay to 1s.
>
> @Arvid, I agree that a better documentation how to tune Flink with sane
> settings for certain scenarios is super helpful. However, as you've said it
> is somewhat hijacking the discussion and I would exclude it from my
> proposed changes. The best thing to do would be to start a separate
> discussion/effort for it.
>
> Concerning the restart strategy configuration options, they are currently
> only documented here [1]. I'm about to change it with this PR [2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html
> [2] https://github.com/apache/flink/pull/9562
>
> Cheers,
> Till
>
> On Tue, Sep 3, 2019 at 8:21 AM Arvid Heise 
> wrote:
>
>> Hi all,
>>
>> just wanted to share my experience with configurations with you. For
>> non-expert users configurations of Flink can be very daunting. The list of
>> common properties is already helping a lot [1], but it's not clear how they
>> depend on each other and settings common for specific use cases are not
>> listed.
>>
>> If we can give somewhat clear recommendations for the start for the most
>> common use cases (batch small/large cluster, streaming high throughput/low
>> latency), I think users would be able start much more quickly with a
>> somewhat well-configured system and fine-tune the settings later. For
>> example, Kafka Streams has a section on how to set the parameters for
>> maximum resilience [2].
>>
>> I'd propose to leave the current configuration page as a reference page,
>> but also have a recommended configuration settings page that's directly
>> linked in the first section, such that new users are not overwhelmed.
>>
>> Sorry if this response is hijacking the discussion.
>> Btw, is restart-strategy configuration missing in the main configuration
>> page? Is this a conscious decision?
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#common-options
>> [2]
>> https://docs.confluent.io/current/streams/developer-guide/config-streams.html#recommended-configuration-parameters-for-resiliency
>>
>> On Tue, Sep 3, 2019 at 5:10 AM Zhu Zhu  wrote:
>>
>>> 1s looks good to me.
>>> And I think the conclusion that when a user should override the delay is
>>> worth to be documented.
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> Steven Wu  于2019年9月3日周二 上午4:42写道:
>>>
 1s sounds a good tradeoff to me.

 On Mon, Sep 2, 2019 at 1:30 PM Till Rohrmann 
 wrote:

> Thanks a lot for all your feedback. I see there is a slight tendency
> towards having a non zero default delay so far.
>
> However, Yu has brought up some valid points. Maybe I can shed some
> light on a).
>
> Before FLINK-9158 we set the default delay to 10s because Flink did
> not support queued scheduling which meant that if one slot was
> missing/still being occupied, then Flink would fail right away with
> a NoResourceAvailableException. In order to prevent this we added the
> delay. This also covered the case when the job was failing because of an
> overloaded external system.
>
> When we finished FLIP-6, we thought that we could improve the user
> experience by decreasing the default delay to 0s because all Flink related
> problems (slot still occupied, slot missing because of reconnecting TM)
> could be handled by the default slot request time out which allowed the
> slots to become ready after the scheduling was kicked off. However, we did
> not properly take the case of overloaded external systems into account.
>
> For b) I agree that any default value should be properly documented.
> This was clearly an oversight when FLINK-9158 has been merged. Moreover, I
> believe that there won't be the solve it all default value. There are
> always cases where one needs to adapt it to ones needs. But this is ok. 
> The
> goal should be to find the default value which works for most cases.
>
> So maybe the middle ground between 10s and 0s could be a solution.
> Setting the default restart delay to 1s should prevent restart storms
> caused by overloaded external systems and still be fast enough to not slow
> down recoveries noticeably in most cases. If one needs a super fast
> recovery, then one should set the delay value to 0s. If one requires a
> longer delay because of a particular infrastructure, then one needs to
> change the value too. What do you think?
>
> Cheers,
> Till
>
> On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:
>
>> -1 on increasing the default delay to none ze

Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-03 Thread Timo Walther

Hi Jincheng,

thanks for your response.

2. Serializability of functions: Using some arbitrary serialization 
format for shipping a function to worker sounds fine to me. But once we 
store functions a the catalog we need to think about backwards 
compatibility and evolution of interfaces etc. I'm not sure if 
CloudPickle is the right long-term storage format for this. If we don't 
think about this in advance, we are basically violating our code quality 
guide [1] of never use Java Serialization but in the Python-way. We are 
using the RPC serialization for persistence.


3. TableEnvironment: Can you add some example to the FLIP? Because API 
code like the following is not covered there:


self.t_env.register_function("add_one", udf(lambda i: i + 1, 
DataTypes.BIGINT(),

    DataTypes.BIGINT()))
self.t_env.register_function("subtract_one", udf(SubtractOne(), 
DataTypes.BIGINT(),

DataTypes.BIGINT()))
self.t_env.register_function("add", add)

4. FunctionDefinition: Your response still doesn't answer my question 
entirely. Why do we need FunctionDefinition.getLanguage() if this is a 
"user-defined function" concept and not a "function" concept. In any 
case, all users should not be able to set this method. So it must be 
final in UserDefinedFunction similar to getKind().


5. Function characteristics: If UserDefinedFunction is defined in 
Python, why is it not used in your example in FLIP-58. You could you 
extend the example to show how to specify these attributes in the FLIP?


Regards,
Timo

[1] https://flink.apache.org/contributing/code-style-and-quality-java.html

On 02.09.19 15:35, jincheng sun wrote:

Hi Timo,

Great thanks for your feedback. I would like to share my thoughts with you
inline. :)

Best,
Jincheng

Timo Walther  于2019年9月2日周一 下午5:04写道:


Hi all,

the FLIP looks awesome. However, I would like to discuss the changes to
the user-facing parts again. Some feedback:

1. DataViews: With the current non-annotation design for DataViews, we
cannot perform eager state declaration, right? At which point during
execution do we know which state is required by the function? We need to
instantiate the function first, right?


We will analysis the Python AggregateFunction and extract the DataViews

used in the Python AggregateFunction. This can be done
by instantiate a Python AggregateFunction, creating an accumulator by
calling method create_accumulator and then analysis the created
accumulator. This is actually similar to the way that Java
AggregateFunction processing codegen logic. The extracted DataViews can
then be used to construct the StateDescriptors in the operator, i.e., we
should have hold the state spec and the state descriptor id in Java
operator and Python worker can access the state by specifying the
corresponding state descriptor id.




2. Serializability of functions: How do we ensure serializability of
functions for catalog persistence? In the Scala/Java API, we would like
to register classes instead of instances soon. This is the only way to
store a function properly in a catalog or we need some
serialization/deserialization logic in the function interfaces to
convert an instance to string properties.


The Python function will be serialized with CloudPickle anyway in the

Python API as we need to transfer it to the Python worker which can then
deserialize it for execution. The serialized Python function can be stored
into catalog.




3. TableEnvironment: What is the signature of `register_function(self,
name, function)`? Does it accept both a class and function? Like `class
Sum` and `def split()`? Could you add some examples for registering both
kinds of functions?


It has been already supported which you mentioned. You can find an

example in the POC code:
https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26




4. FunctionDefinition: Function definition is not a user-defined
function definition. It is the highest interface for both user-defined
and built-in functions. I'm not sure if getLanguage() should be part of
this interface or one-level down which would be `UserDefinedFunction`.
Built-in functions will never be implemented in a different language. In
any case, I would vote for removing the UNKNOWN language, because it
does not solve anything. Why should a user declare a function that the
runtime can not handle? I also find the term `JAVA` confusing for Scala
users. How about `FunctionLanguage.JVM` instead?


Actually we may have built-in Python functions in the future. Regarding

to the following expression: py_udf1(a, b) + py_udf2(c), if there is
built-in Python
funciton for '+' operator, then we don't need to mix using Java and Python
UDFs. In this way, we can improve the execution performance.
Regarding to removing FunctionLanguage.UNKNOWN and renaming
FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense to me.




5. Function characteristics: In the c

Flink SQL - Support Computed Columns in DDL?

2019-09-03 Thread Qi Luo
Hi folks,

Computed columns in Flink SQL DDL is currently disabled in both old planner
and Blink planner (throws "Computed columns for DDL is not supported yet!"
exception in SqlToOperationConverter).

I searched through the JIRA but found no relevant issues. Do we have any
plans to support this nice feature?

Thanks,
Qi


Re: [DISCUSS] FLIP-53: Fine Grained Resource Management

2019-09-03 Thread Xintong Song
 Thanks for the comments, Zhu & Kurt.

Andrey and I also had some discussions offline, and I would like to first
post a summary of our discussion:

   1. The motivation of the fraction based approach is to unify resource
   management for both operators with specified and unknown resource
   requirements.
   2. The fraction based approach proposed in this FLIP should only affect
   streaming jobs (both bounded and unbounded). For DataSet jobs, there are
   already some fraction based approach (in TaskConfig and ChainedDriver), and
   we do not make any change to the existing approach.
   3. The scope of this FLIP does not include discussion of how to set
   ResourceSpec for operators.
  1. For blink jobs, the optimizer can set operator resources for the
  users, according to their configurations (default: unknown)
  2. For DataStream jobs, there are no method / interface to set
  operator resources at the moment (1.10). We can have in the future.
  3. For DataSet jobs, there are existing user interfaces to set
  operator resources.
   4. The FLIP should explain more about how ResourceSpecs works
  1. PhysicalTransformations (deployed with operators into the
  StreamTasks) get ResourceSpec: unknown by default or known (e.g. from the
  Blink planner)
  2. While generating stream graph, calculate fractions and set to
  StreamConfig
  3. While scheduling, convert ResourceSpec to ResourceProfile
  (ResourceSpec + network memory), and deploy to slots / TMs matching the
  resources
  4. While starting Task in TM, each operator gets fraction converted
  back to the original absolute value requested by user or fair
unknown share
  of the slot
  5. We should not set `allSourcesInSamePipelinedRegion` to `false` for
   DataSet jobs. Behaviors of DataSet jobs should not be changed.
   6. The FLIP document should differentiate works planed in this FLIP and
   the future follow-ups more clearly, by put the follow-ups in a separate
   section
   7. Another limitation of the rejected alternative setting fractions at
   scheduling time is that, the scheduler implementation does not know which
   tasks will be deployed into the same slot in advance.

Andrey, Please bring it up if there is anything I missed.

Zhu, regarding your comments:

   1. If we do not set `allSourcesInSamePipelinedRegion` to `false` for
   DataSet jobs (point 5 in the discussion summary above), then there
   shouldn't be any regression right?
   2. I think it makes sense to set the max possible network memory for the
   JobVertex. When you say parallel instances of the same JobVertex may have
   need different network memory, I guess you mean the rescale scenarios where
   parallelisms of upstream / downstream vertex cannot be exactly divided by
   parallelism of downstream / upstream vertex? I would say it's acceptable to
   have slight difference between actually needed and allocated network memory.
   3. Yes, by numOpsUseOnHeapManagedMemory I mean
   numOpsUseOnHeapManagedMemoryInTheSameSharedGroup. I'll update the doc.
   4. Yes, it should be StreamingJobGraphGenerator. Thanks for the
   correction.


Kurt, regarding your comments:

   1. I think we don't have network memory in ResourceSpec, which is the
   user facing API. We only have network memory in ResourceProfile, which is
   used internally for scheduling. The reason we do not expose network memory
   to the user is that, currently how many network buffers each task needs is
   decided by the topology of execution graph (how many input / output
   channels it has).
   2. In the section "Operator Resource Requirements": "For the first
   version, we do not support mixing operators with specified / unknown
   resource requirements in the same job. Either all or none of the operators
   of the same job should specify their resource requirements.
   StreamGraphGenerator should check this and throw an error when mixing of
   specified / unknown resource requirements is detected, during the
   compilation stage."
   3. If the user set a resource requirement, then it is guaranteed that
   the task should get at least the much resource, otherwise there should be
   an exception. That should be guaranteed by the "Dynamic Slot Allocation"
   approach (FLIP-56).


I'll update the FLIP document addressing the comments ASAP.


Thank you~

Xintong Song



On Tue, Sep 3, 2019 at 2:42 PM Kurt Young  wrote:

> Thanks Xingtong for driving this effort, I haven't finished the whole
> document yet,
> but have couple of questions:
>
> 1. Regarding to network memory, the document said it will be derived by
> framework
> automatically. I'm wondering whether we should delete this dimension from
> user-
> facing API?
>
> 2. Regarding to fraction based quota, I don't quite get the meaning of
> "slotSharingGroupOnHeapManagedMem" and "slotSharingGroupOffHeapManagedMem".
> What if the sharing group is mixed with specified resource and UNKNOWN
> reso

Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-03 Thread Stephan Ewen
+1 to the proposal in general

A few things seems to be a bit put of sync with the latest discussions
though.

The section about JVM Parameters states that the
-XX:MaxDirectMemorySize value is set to Task Off-heap Memory, Shuffle
Memory and JVM Overhead.
The way I understand the last discussion conclusion is that it is only the
sum of shuffle memory and user-defined direct memory.

I am someone neutral but unsure about is the separation between
"taskmanager.memory.framework.heap" and "taskmanager.memory.task.heap".
Could that be simply combined under "taskmanager.memory.javaheap"?

It might be good to also expose these values somehow in the web UI so that
users see immediately what amount of memory TMs assume to use for what.

I assume config key names and default values might be adjusted over time as
we get feedback.
  - I would keep the network memory under the name
"taskmanager.memory.network". Because network memory is actually used for
more than shuffling. Also, the old config key seems good, so why change it?

One thing to be aware of is that often, the Java Heap is understood as
"managed memory" as a whole, because it is managed by the GC not explicitly
by the user.
So we need to make sure that we don't confuse users by speaking of managed
heap and unmanaged heap. All heap is managed in Java. Some memory is
explicitly managed by Flink.

Best,
Stephan


On Mon, Sep 2, 2019 at 3:08 PM Xintong Song  wrote:

> Hi everyone,
>
> I'm here to re-start the voting process for FLIP-49 [1], with respect to
> consensus reached in this thread [2] regarding some new comments and
> concerns.
>
> This voting will be open for at least 72 hours. I'll try to close it Sep.
> 5, 14:00 UTC, unless there is an objection or not enough votes.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
>
> On Tue, Aug 27, 2019 at 9:29 PM Xintong Song 
> wrote:
>
> > Alright, then let's keep the discussion in the DISCUSS mailing thread,
> and
> > see whether we need to restart the vote.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Aug 27, 2019 at 8:12 PM Till Rohrmann 
> > wrote:
> >
> >> I had a couple of comments concerning the implementation plan. I've
> posted
> >> them to the original discussion thread. Depending on the outcome of this
> >> discussion we might need to restart the vote.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, Aug 27, 2019 at 11:14 AM Xintong Song 
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > I would like to start the voting process for FLIP-49 [1], which is
> >> > discussed and reached consensus in this thread [2].
> >> >
> >> > This voting will be open for at least 72 hours. I'll try to close it
> >> Aug.
> >> > 30 10:00 UTC, unless there is an objection or not enough votes.
> >> >
> >> > Thank you~
> >> >
> >> > Xintong Song
> >> >
> >> >
> >> > [1]
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> >> > [2]
> >> >
> >> >
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
> >> >
> >>
> >
>


Re: [DISCUSS] FLIP-53: Fine Grained Resource Management

2019-09-03 Thread Zhu Zhu
Thanks Xintong for the explanation.

For question #1, I think it's good as long as DataSet job behaviors remains
the same.

For question #2, agreed that the resource difference is small enough(at
most 1 edge diff) in current supported point-wise execution edge connection
patterns.

Thanks,
Zhu Zhu

Xintong Song  于2019年9月3日周二 下午6:58写道:

>  Thanks for the comments, Zhu & Kurt.
>
> Andrey and I also had some discussions offline, and I would like to first
> post a summary of our discussion:
>
>1. The motivation of the fraction based approach is to unify resource
>management for both operators with specified and unknown resource
>requirements.
>2. The fraction based approach proposed in this FLIP should only affect
>streaming jobs (both bounded and unbounded). For DataSet jobs, there are
>already some fraction based approach (in TaskConfig and ChainedDriver),
> and
>we do not make any change to the existing approach.
>3. The scope of this FLIP does not include discussion of how to set
>ResourceSpec for operators.
>   1. For blink jobs, the optimizer can set operator resources for the
>   users, according to their configurations (default: unknown)
>   2. For DataStream jobs, there are no method / interface to set
>   operator resources at the moment (1.10). We can have in the future.
>   3. For DataSet jobs, there are existing user interfaces to set
>   operator resources.
>4. The FLIP should explain more about how ResourceSpecs works
>   1. PhysicalTransformations (deployed with operators into the
>   StreamTasks) get ResourceSpec: unknown by default or known (e.g.
> from the
>   Blink planner)
>   2. While generating stream graph, calculate fractions and set to
>   StreamConfig
>   3. While scheduling, convert ResourceSpec to ResourceProfile
>   (ResourceSpec + network memory), and deploy to slots / TMs matching
> the
>   resources
>   4. While starting Task in TM, each operator gets fraction converted
>   back to the original absolute value requested by user or fair
> unknown share
>   of the slot
>   5. We should not set `allSourcesInSamePipelinedRegion` to `false` for
>DataSet jobs. Behaviors of DataSet jobs should not be changed.
>6. The FLIP document should differentiate works planed in this FLIP and
>the future follow-ups more clearly, by put the follow-ups in a separate
>section
>7. Another limitation of the rejected alternative setting fractions at
>scheduling time is that, the scheduler implementation does not know
> which
>tasks will be deployed into the same slot in advance.
>
> Andrey, Please bring it up if there is anything I missed.
>
> Zhu, regarding your comments:
>
>1. If we do not set `allSourcesInSamePipelinedRegion` to `false` for
>DataSet jobs (point 5 in the discussion summary above), then there
>shouldn't be any regression right?
>2. I think it makes sense to set the max possible network memory for the
>JobVertex. When you say parallel instances of the same JobVertex may
> have
>need different network memory, I guess you mean the rescale scenarios
> where
>parallelisms of upstream / downstream vertex cannot be exactly divided
> by
>parallelism of downstream / upstream vertex? I would say it's
> acceptable to
>have slight difference between actually needed and allocated network
> memory.
>3. Yes, by numOpsUseOnHeapManagedMemory I mean
>numOpsUseOnHeapManagedMemoryInTheSameSharedGroup. I'll update the doc.
>4. Yes, it should be StreamingJobGraphGenerator. Thanks for the
>correction.
>
>
> Kurt, regarding your comments:
>
>1. I think we don't have network memory in ResourceSpec, which is the
>user facing API. We only have network memory in ResourceProfile, which
> is
>used internally for scheduling. The reason we do not expose network
> memory
>to the user is that, currently how many network buffers each task needs
> is
>decided by the topology of execution graph (how many input / output
>channels it has).
>2. In the section "Operator Resource Requirements": "For the first
>version, we do not support mixing operators with specified / unknown
>resource requirements in the same job. Either all or none of the
> operators
>of the same job should specify their resource requirements.
>StreamGraphGenerator should check this and throw an error when mixing of
>specified / unknown resource requirements is detected, during the
>compilation stage."
>3. If the user set a resource requirement, then it is guaranteed that
>the task should get at least the much resource, otherwise there should
> be
>an exception. That should be guaranteed by the "Dynamic Slot Allocation"
>approach (FLIP-56).
>
>
> I'll update the FLIP document addressing the comments ASAP.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Sep 3, 2019 at 2:42 PM Kurt Young  wrote:
>
> >

Re: Flink SQL - Support Computed Columns in DDL?

2019-09-03 Thread Danny Chan
Yeah, we are planning to implement this feature in release-1.10, wait for our 
good news !

Best,
Danny Chan
在 2019年9月3日 +0800 PM6:19,Qi Luo ,写道:
> Hi folks,
>
> Computed columns in Flink SQL DDL is currently disabled in both old planner
> and Blink planner (throws "Computed columns for DDL is not supported yet!"
> exception in SqlToOperationConverter).
>
> I searched through the JIRA but found no relevant issues. Do we have any
> plans to support this nice feature?
>
> Thanks,
> Qi


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Danny Chan
Thanks Bowen for bring up this topic, I think it’s a useful refactoring to make 
our function usage more user friendly.

For the topic of how to organize the builtin operators and operators of Hive, 
here is a solution from Apache Calcite, the Calcite way is to make every 
dialect operators a “Library”, user can specify which libraries they want to 
use for a sql query. The builtin operators always comes as the first class 
objects and the others are used from the order they appears. Maybe you can take 
a reference.

[1] 
https://github.com/apache/calcite/commit/9a4eab5240d96379431d14a1ac33bfebaf6fbb28

Best,
Danny Chan
在 2019年8月28日 +0800 AM2:50,Bowen Li ,写道:
> Hi folks,
>
> I'd like to kick off a discussion on reworking Flink's FunctionCatalog.
> It's critically helpful to improve function usability in SQL.
>
> https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing
>
> In short, it:
> - adds support for precise function reference with fully/partially
> qualified name
> - redefines function resolution order for ambiguous function reference
> - adds support for Hive's rich built-in functions (support for Hive user
> defined functions was already added in 1.9.0)
> - clarifies the concept of temporary functions
>
> Would love to hear your thoughts.
>
> Bowen


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-03 Thread Danny Chan
> with the new SQL DDL
based on properties as well as more connectors and formats coming up,
unified configuration becomes more important

I Cann’t agree more, do you think we can unify the config options key format 
here for all the DDL properties ?

Best,
Danny Chan
在 2019年8月16日 +0800 PM10:12,dev@flink.apache.org,写道:
>
> with the new SQL DDL
> based on properties as well as more connectors and formats coming up,
> unified configuration becomes more important


Re: Flink SQL - Support Computed Columns in DDL?

2019-09-03 Thread Jark Wu
Hi Qi,

The computed column is not fully supported in 1.9. We will start a design
discussion in the dev mailing list soon. Please stay tuned!

Btw, could you share with us what's the case why do you want to use
computed column?

Best,
Jark

On Tue, 3 Sep 2019 at 19:25, Danny Chan  wrote:

> Yeah, we are planning to implement this feature in release-1.10, wait for
> our good news !
>
> Best,
> Danny Chan
> 在 2019年9月3日 +0800 PM6:19,Qi Luo ,写道:
> > Hi folks,
> >
> > Computed columns in Flink SQL DDL is currently disabled in both old
> planner
> > and Blink planner (throws "Computed columns for DDL is not supported
> yet!"
> > exception in SqlToOperationConverter).
> >
> > I searched through the JIRA but found no relevant issues. Do we have any
> > plans to support this nice feature?
> >
> > Thanks,
> > Qi
>


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Timo Walther

This sounds exactly as the module approach I mentioned, no?

Regards,
Timo

On 03.09.19 13:42, Danny Chan wrote:

Thanks Bowen for bring up this topic, I think it’s a useful refactoring to make 
our function usage more user friendly.

For the topic of how to organize the builtin operators and operators of Hive, 
here is a solution from Apache Calcite, the Calcite way is to make every 
dialect operators a “Library”, user can specify which libraries they want to 
use for a sql query. The builtin operators always comes as the first class 
objects and the others are used from the order they appears. Maybe you can take 
a reference.

[1] 
https://github.com/apache/calcite/commit/9a4eab5240d96379431d14a1ac33bfebaf6fbb28

Best,
Danny Chan
在 2019年8月28日 +0800 AM2:50,Bowen Li ,写道:

Hi folks,

I'd like to kick off a discussion on reworking Flink's FunctionCatalog.
It's critically helpful to improve function usability in SQL.

https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing

In short, it:
- adds support for precise function reference with fully/partially
qualified name
- redefines function resolution order for ambiguous function reference
- adds support for Hive's rich built-in functions (support for Hive user
defined functions was already added in 1.9.0)
- clarifies the concept of temporary functions

Would love to hear your thoughts.

Bowen





Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-03 Thread Till Rohrmann
Hi Jan,

I've talked with Aljoscha and Stephan offline and we concluded that we
would like to avoid the usage of context class loaders if possible. The
reason for this is that using the context class loader can easily mess up
an otherwise clear class loader hierarchy which makes it hard to reason
about and to debug class loader issues.

Given this, I think it would help to better understand the exact problem
you are trying to solve by using the context class loader. Usually the
usage of the context class loader points towards an API deficiency which we
might be able to address differently.

Cheers,
Till

On Mon, Sep 2, 2019 at 11:32 AM Aljoscha Krettek 
wrote:

> I’m not saying we can’t change that code to use the context class loader.
> I’m just not sure whether this might break other things.
>
> Best,
> Aljoscha
>
> > On 2. Sep 2019, at 11:24, Jan Lukavský  wrote:
> >
> > Essentially, the class loader of Flink should be present in parent
> hierarchy of context class loader. If FlinkUserCodeClassLoader doesn't use
> context class loader, then it is actually impossible to use a hierarchy
> like this:
> >
> >  system class loader -> application class loader -> user-defined class
> loader (defines some UDFs to be used in flink program)
> >
> > Flink now uses the application class loader, and so the UDFs fail to
> deserialize on local flink, because the user-defined class loader is
> bypassed. Moreover, there is no way to add additional classpath elements
> for LocalEnvironment (as opposed to RemoteEnvironment). I'm able to hack
> this by calling addURL method on the application class loader (which is
> terribly hackish), but that works only on JDK <= 8. No sensible workaround
> is available for JDK >= 9.
> >
> > Alternative solution would be to enable adding jars to class loader when
> using LocalEnvironment, but that looks a little odd.
> >
> > Jan
> >
> > On 9/2/19 11:02 AM, Aljoscha Krettek wrote:
> >> Hi,
> >>
> >> I actually don’t know whether that change would be ok.
> FlinkUserCodeClassLoader has taken
> FlinkUserCodeClassLoader.class.getClassLoader() as the parent ClassLoader
> before my change. See:
> https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
> <
> https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
> >.
> >>
> >> I have the feeling that this might be on purpose because we want to
> have the ClassLoader of the Flink Framework components be the parent
> ClassLoader, but I could be wrong. Maybe Stephan would be most appropriate
> for answering this.
> >>
> >> Best,
> >> Aljoscha
> >>
> >>> On 30. Aug 2019, at 16:28, Till Rohrmann  wrote:
> >>>
> >>> Hi Jan,
> >>>
> >>> this looks to me like a bug for which you could create a JIRA and PR
> to fix it. Just to make sure, I've pulled in Aljoscha who is the author of
> this change to check with him whether we are forgetting something.
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Fri, Aug 30, 2019 at 3:44 PM Jan Lukavský  je...@seznam.cz>> wrote:
> >>> Hi,
> >>>
> >>> I have come across an issue with classloading in Flink's MiniCluster.
> >>> The issue is that when I run local flink job from a thread, that has a
> >>> non-default context classloader (for whatever reason), this classloader
> >>> is not taken into account when classloading user defined functions.
> This
> >>> is due to [1]. Is this behavior intentional, or can I file a JIRA and
> >>> use Thread.currentThread.getContextClassLoader() there? I have
> validated
> >>> that it fixes issues I'm facing.
> >>>
> >>> Jan
> >>>
> >>> [1]
> >>>
> https://github.com/apache/flink/blob/ce557839d762b5f1ec92aa1885fd3d2ae33d0d0b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java#L280
> <
> https://github.com/apache/flink/blob/ce557839d762b5f1ec92aa1885fd3d2ae33d0d0b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java#L280
> >
> >>>
> >>
>
>


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-03 Thread Timo Walther

Hi Danny,

yes, this FLIP covers all the building blocks we need also for 
unification of the DDL properties.


Regards,
Timo


On 03.09.19 13:45, Danny Chan wrote:

with the new SQL DDL

based on properties as well as more connectors and formats coming up,
unified configuration becomes more important

I Cann’t agree more, do you think we can unify the config options key format 
here for all the DDL properties ?

Best,
Danny Chan
在 2019年8月16日 +0800 PM10:12,dev@flink.apache.org,写道:

with the new SQL DDL
based on properties as well as more connectors and formats coming up,
unified configuration becomes more important





Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Danny Chan
The way you proposed are basically the same as what Calcite does, I think we 
are in the same line.

Best,
Danny Chan
在 2019年9月3日 +0800 PM7:57,Timo Walther ,写道:
> This sounds exactly as the module approach I mentioned, no?
>
> Regards,
> Timo
>
> On 03.09.19 13:42, Danny Chan wrote:
> > Thanks Bowen for bring up this topic, I think it’s a useful refactoring to 
> > make our function usage more user friendly.
> >
> > For the topic of how to organize the builtin operators and operators of 
> > Hive, here is a solution from Apache Calcite, the Calcite way is to make 
> > every dialect operators a “Library”, user can specify which libraries they 
> > want to use for a sql query. The builtin operators always comes as the 
> > first class objects and the others are used from the order they appears. 
> > Maybe you can take a reference.
> >
> > [1] 
> > https://github.com/apache/calcite/commit/9a4eab5240d96379431d14a1ac33bfebaf6fbb28
> >
> > Best,
> > Danny Chan
> > 在 2019年8月28日 +0800 AM2:50,Bowen Li ,写道:
> > > Hi folks,
> > >
> > > I'd like to kick off a discussion on reworking Flink's FunctionCatalog.
> > > It's critically helpful to improve function usability in SQL.
> > >
> > > https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing
> > >
> > > In short, it:
> > > - adds support for precise function reference with fully/partially
> > > qualified name
> > > - redefines function resolution order for ambiguous function reference
> > > - adds support for Hive's rich built-in functions (support for Hive user
> > > defined functions was already added in 1.9.0)
> > > - clarifies the concept of temporary functions
> > >
> > > Would love to hear your thoughts.
> > >
> > > Bowen
>
>


Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-03 Thread Jan Lukavský

Hi Till,

the use-case is pretty much simple - I have a REPL shell in groovy, 
which generates classes at runtime. The actual hierarchy is therefore


 system class loader -> application classloader -> repl classloader 
(GroovyClassLoader actually)


now, when a terminal (sink) operation in the shell occurs, I'm able to 
build a jar, which I can submit to remote cluster (in distributed case). 
But - during testing -  I run the code using local flink. There is no 
(legal) way of adding this new runtime generated jar to local flink. As 
I said, I have a hackish solution which works on JDK <= 8, because it 
uses reflection to call addURL on the application classloader (and 
thefore "pretends", that the generated jar was there all the time from 
the JVM startup). This breaks on JDK >= 9. It might be possible to work 
around this somehow, but I think that the reason why LocalEnvironment is 
not having a way to add jars (as in case of RemoteEnvironment) is that 
is assumes, that you actually have all of the on classpath when using 
local runner. I think that this implies that it either has to use 
context classloader (to be able to work on top of any classloading user 
might have), or is wrong and would need be fixed, so that 
LocalEnvironment would accept files to "stage" - which would mean adding 
them to a class loader (probably URLClassLoader with the application 
class loader as parent).


Or, would you see any other option?

Jan


On 9/3/19 2:00 PM, Till Rohrmann wrote:

Hi Jan,

I've talked with Aljoscha and Stephan offline and we concluded that we
would like to avoid the usage of context class loaders if possible. The
reason for this is that using the context class loader can easily mess up
an otherwise clear class loader hierarchy which makes it hard to reason
about and to debug class loader issues.

Given this, I think it would help to better understand the exact problem
you are trying to solve by using the context class loader. Usually the
usage of the context class loader points towards an API deficiency which we
might be able to address differently.

Cheers,
Till

On Mon, Sep 2, 2019 at 11:32 AM Aljoscha Krettek 
wrote:


I’m not saying we can’t change that code to use the context class loader.
I’m just not sure whether this might break other things.

Best,
Aljoscha


On 2. Sep 2019, at 11:24, Jan Lukavský  wrote:

Essentially, the class loader of Flink should be present in parent

hierarchy of context class loader. If FlinkUserCodeClassLoader doesn't use
context class loader, then it is actually impossible to use a hierarchy
like this:

  system class loader -> application class loader -> user-defined class

loader (defines some UDFs to be used in flink program)

Flink now uses the application class loader, and so the UDFs fail to

deserialize on local flink, because the user-defined class loader is
bypassed. Moreover, there is no way to add additional classpath elements
for LocalEnvironment (as opposed to RemoteEnvironment). I'm able to hack
this by calling addURL method on the application class loader (which is
terribly hackish), but that works only on JDK <= 8. No sensible workaround
is available for JDK >= 9.

Alternative solution would be to enable adding jars to class loader when

using LocalEnvironment, but that looks a little odd.

Jan

On 9/2/19 11:02 AM, Aljoscha Krettek wrote:

Hi,

I actually don’t know whether that change would be ok.

FlinkUserCodeClassLoader has taken
FlinkUserCodeClassLoader.class.getClassLoader() as the parent ClassLoader
before my change. See:
https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
<
https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java

.

I have the feeling that this might be on purpose because we want to

have the ClassLoader of the Flink Framework components be the parent
ClassLoader, but I could be wrong. Maybe Stephan would be most appropriate
for answering this.

Best,
Aljoscha


On 30. Aug 2019, at 16:28, Till Rohrmann  wrote:

Hi Jan,

this looks to me like a bug for which you could create a JIRA and PR

to fix it. Just to make sure, I've pulled in Aljoscha who is the author of
this change to check with him whether we are forgetting something.

Cheers,
Till

On Fri, Aug 30, 2019 at 3:44 PM Jan Lukavský 
je...@seznam.cz>> wrote:

Hi,

I have come across an issue with classloading in Flink's MiniCluster.
The issue is that when I run local flink job from a thread, that has a
non-default context classloader (for whatever reason), this classloader
is not taken into account when classloading user defined functions.

This

is due to [1]. Is this behavior intentional, or can I file a JIRA and
use Thread.currentThread.getContextClassLoader() there? I have

validated

that it fixes issues I'm facing.

Jan

[1]


https://github.com/apache/flink/blob/ce

Re: Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-03 Thread guaishushu1...@163.com




guaishushu1...@163.com
 
From: Jan Lukavský
Date: 2019-09-03 20:17
To: dev
Subject: Re: ClassLoader created by BlobLibraryCacheManager is not using 
context classloader
Hi Till,
 
the use-case is pretty much simple - I have a REPL shell in groovy, 
which generates classes at runtime. The actual hierarchy is therefore
 
 system class loader -> application classloader -> repl classloader 
(GroovyClassLoader actually)
 
now, when a terminal (sink) operation in the shell occurs, I'm able to 
build a jar, which I can submit to remote cluster (in distributed case). 
But - during testing -  I run the code using local flink. There is no 
(legal) way of adding this new runtime generated jar to local flink. As 
I said, I have a hackish solution which works on JDK <= 8, because it 
uses reflection to call addURL on the application classloader (and 
thefore "pretends", that the generated jar was there all the time from 
the JVM startup). This breaks on JDK >= 9. It might be possible to work 
around this somehow, but I think that the reason why LocalEnvironment is 
not having a way to add jars (as in case of RemoteEnvironment) is that 
is assumes, that you actually have all of the on classpath when using 
local runner. I think that this implies that it either has to use 
context classloader (to be able to work on top of any classloading user 
might have), or is wrong and would need be fixed, so that 
LocalEnvironment would accept files to "stage" - which would mean adding 
them to a class loader (probably URLClassLoader with the application 
class loader as parent).
 
Or, would you see any other option?
 
Jan
 
 
On 9/3/19 2:00 PM, Till Rohrmann wrote:
> Hi Jan,
>
> I've talked with Aljoscha and Stephan offline and we concluded that we
> would like to avoid the usage of context class loaders if possible. The
> reason for this is that using the context class loader can easily mess up
> an otherwise clear class loader hierarchy which makes it hard to reason
> about and to debug class loader issues.
>
> Given this, I think it would help to better understand the exact problem
> you are trying to solve by using the context class loader. Usually the
> usage of the context class loader points towards an API deficiency which we
> might be able to address differently.
>
> Cheers,
> Till
>
> On Mon, Sep 2, 2019 at 11:32 AM Aljoscha Krettek 
> wrote:
>
>> I’m not saying we can’t change that code to use the context class loader.
>> I’m just not sure whether this might break other things.
>>
>> Best,
>> Aljoscha
>>
>>> On 2. Sep 2019, at 11:24, Jan Lukavský  wrote:
>>>
>>> Essentially, the class loader of Flink should be present in parent
>> hierarchy of context class loader. If FlinkUserCodeClassLoader doesn't use
>> context class loader, then it is actually impossible to use a hierarchy
>> like this:
>>>   system class loader -> application class loader -> user-defined class
>> loader (defines some UDFs to be used in flink program)
>>> Flink now uses the application class loader, and so the UDFs fail to
>> deserialize on local flink, because the user-defined class loader is
>> bypassed. Moreover, there is no way to add additional classpath elements
>> for LocalEnvironment (as opposed to RemoteEnvironment). I'm able to hack
>> this by calling addURL method on the application class loader (which is
>> terribly hackish), but that works only on JDK <= 8. No sensible workaround
>> is available for JDK >= 9.
>>> Alternative solution would be to enable adding jars to class loader when
>> using LocalEnvironment, but that looks a little odd.
>>> Jan
>>>
>>> On 9/2/19 11:02 AM, Aljoscha Krettek wrote:
 Hi,

 I actually don’t know whether that change would be ok.
>> FlinkUserCodeClassLoader has taken
>> FlinkUserCodeClassLoader.class.getClassLoader() as the parent ClassLoader
>> before my change. See:
>> https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
>> <
>> https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
>>> .
 I have the feeling that this might be on purpose because we want to
>> have the ClassLoader of the Flink Framework components be the parent
>> ClassLoader, but I could be wrong. Maybe Stephan would be most appropriate
>> for answering this.
 Best,
 Aljoscha

> On 30. Aug 2019, at 16:28, Till Rohrmann  wrote:
>
> Hi Jan,
>
> this looks to me like a bug for which you could create a JIRA and PR
>> to fix it. Just to make sure, I've pulled in Aljoscha who is the author of
>> this change to check with him whether we are forgetting something.
> Cheers,
> Till
>
> On Fri, Aug 30, 2019 at 3:44 PM Jan Lukavský > je...@seznam.cz>> wrote:
> Hi,
>
> I have come across an issue with classloading in Flink's MiniCluster.
> The issue is that when I run lo

Re: Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-03 Thread guaishushu1...@163.com




guaishushu1...@163.com
 
From: guaishushu1...@163.com
Date: 2019-09-03 20:23
To: dev
Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not using 
context classloader
 
 
 
 
guaishushu1...@163.com
From: Jan Lukavský
Date: 2019-09-03 20:17
To: dev
Subject: Re: ClassLoader created by BlobLibraryCacheManager is not using 
context classloader
Hi Till,
the use-case is pretty much simple - I have a REPL shell in groovy, 
which generates classes at runtime. The actual hierarchy is therefore
system class loader -> application classloader -> repl classloader 
(GroovyClassLoader actually)
now, when a terminal (sink) operation in the shell occurs, I'm able to 
build a jar, which I can submit to remote cluster (in distributed case). 
But - during testing -  I run the code using local flink. There is no 
(legal) way of adding this new runtime generated jar to local flink. As 
I said, I have a hackish solution which works on JDK <= 8, because it 
uses reflection to call addURL on the application classloader (and 
thefore "pretends", that the generated jar was there all the time from 
the JVM startup). This breaks on JDK >= 9. It might be possible to work 
around this somehow, but I think that the reason why LocalEnvironment is 
not having a way to add jars (as in case of RemoteEnvironment) is that 
is assumes, that you actually have all of the on classpath when using 
local runner. I think that this implies that it either has to use 
context classloader (to be able to work on top of any classloading user 
might have), or is wrong and would need be fixed, so that 
LocalEnvironment would accept files to "stage" - which would mean adding 
them to a class loader (probably URLClassLoader with the application 
class loader as parent).
Or, would you see any other option?
Jan
On 9/3/19 2:00 PM, Till Rohrmann wrote:
> Hi Jan,
>
> I've talked with Aljoscha and Stephan offline and we concluded that we
> would like to avoid the usage of context class loaders if possible. The
> reason for this is that using the context class loader can easily mess up
> an otherwise clear class loader hierarchy which makes it hard to reason
> about and to debug class loader issues.
>
> Given this, I think it would help to better understand the exact problem
> you are trying to solve by using the context class loader. Usually the
> usage of the context class loader points towards an API deficiency which we
> might be able to address differently.
>
> Cheers,
> Till
>
> On Mon, Sep 2, 2019 at 11:32 AM Aljoscha Krettek 
> wrote:
>
>> I’m not saying we can’t change that code to use the context class loader.
>> I’m just not sure whether this might break other things.
>>
>> Best,
>> Aljoscha
>>
>>> On 2. Sep 2019, at 11:24, Jan Lukavský  wrote:
>>>
>>> Essentially, the class loader of Flink should be present in parent
>> hierarchy of context class loader. If FlinkUserCodeClassLoader doesn't use
>> context class loader, then it is actually impossible to use a hierarchy
>> like this:
>>>   system class loader -> application class loader -> user-defined class
>> loader (defines some UDFs to be used in flink program)
>>> Flink now uses the application class loader, and so the UDFs fail to
>> deserialize on local flink, because the user-defined class loader is
>> bypassed. Moreover, there is no way to add additional classpath elements
>> for LocalEnvironment (as opposed to RemoteEnvironment). I'm able to hack
>> this by calling addURL method on the application class loader (which is
>> terribly hackish), but that works only on JDK <= 8. No sensible workaround
>> is available for JDK >= 9.
>>> Alternative solution would be to enable adding jars to class loader when
>> using LocalEnvironment, but that looks a little odd.
>>> Jan
>>>
>>> On 9/2/19 11:02 AM, Aljoscha Krettek wrote:
 Hi,

 I actually don’t know whether that change would be ok.
>> FlinkUserCodeClassLoader has taken
>> FlinkUserCodeClassLoader.class.getClassLoader() as the parent ClassLoader
>> before my change. See:
>> https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
>> <
>> https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
>>> .
 I have the feeling that this might be on purpose because we want to
>> have the ClassLoader of the Flink Framework components be the parent
>> ClassLoader, but I could be wrong. Maybe Stephan would be most appropriate
>> for answering this.
 Best,
 Aljoscha

> On 30. Aug 2019, at 16:28, Till Rohrmann  wrote:
>
> Hi Jan,
>
> this looks to me like a bug for which you could create a JIRA and PR
>> to fix it. Just to make sure, I've pulled in Aljoscha who is the author of
>> this change to check with him whether we are forgetting something.
> Cheers,
> Till
>
> On Fri, Aug 30, 2019 at 3:4

Re: Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-03 Thread guaishushu1...@163.com




guaishushu1...@163.com
 
From: guaishushu1...@163.com
Date: 2019-09-03 20:25
To: dev
Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not using 
context classloader
 
 
 
 
guaishushu1...@163.com
From: guaishushu1...@163.com
Date: 2019-09-03 20:23
To: dev
Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not using 
context classloader
guaishushu1...@163.com
From: Jan Lukavský
Date: 2019-09-03 20:17
To: dev
Subject: Re: ClassLoader created by BlobLibraryCacheManager is not using 
context classloader
Hi Till,
the use-case is pretty much simple - I have a REPL shell in groovy, 
which generates classes at runtime. The actual hierarchy is therefore
system class loader -> application classloader -> repl classloader 
(GroovyClassLoader actually)
now, when a terminal (sink) operation in the shell occurs, I'm able to 
build a jar, which I can submit to remote cluster (in distributed case). 
But - during testing -  I run the code using local flink. There is no 
(legal) way of adding this new runtime generated jar to local flink. As 
I said, I have a hackish solution which works on JDK <= 8, because it 
uses reflection to call addURL on the application classloader (and 
thefore "pretends", that the generated jar was there all the time from 
the JVM startup). This breaks on JDK >= 9. It might be possible to work 
around this somehow, but I think that the reason why LocalEnvironment is 
not having a way to add jars (as in case of RemoteEnvironment) is that 
is assumes, that you actually have all of the on classpath when using 
local runner. I think that this implies that it either has to use 
context classloader (to be able to work on top of any classloading user 
might have), or is wrong and would need be fixed, so that 
LocalEnvironment would accept files to "stage" - which would mean adding 
them to a class loader (probably URLClassLoader with the application 
class loader as parent).
Or, would you see any other option?
Jan
On 9/3/19 2:00 PM, Till Rohrmann wrote:
> Hi Jan,
>
> I've talked with Aljoscha and Stephan offline and we concluded that we
> would like to avoid the usage of context class loaders if possible. The
> reason for this is that using the context class loader can easily mess up
> an otherwise clear class loader hierarchy which makes it hard to reason
> about and to debug class loader issues.
>
> Given this, I think it would help to better understand the exact problem
> you are trying to solve by using the context class loader. Usually the
> usage of the context class loader points towards an API deficiency which we
> might be able to address differently.
>
> Cheers,
> Till
>
> On Mon, Sep 2, 2019 at 11:32 AM Aljoscha Krettek 
> wrote:
>
>> I’m not saying we can’t change that code to use the context class loader.
>> I’m just not sure whether this might break other things.
>>
>> Best,
>> Aljoscha
>>
>>> On 2. Sep 2019, at 11:24, Jan Lukavský  wrote:
>>>
>>> Essentially, the class loader of Flink should be present in parent
>> hierarchy of context class loader. If FlinkUserCodeClassLoader doesn't use
>> context class loader, then it is actually impossible to use a hierarchy
>> like this:
>>>   system class loader -> application class loader -> user-defined class
>> loader (defines some UDFs to be used in flink program)
>>> Flink now uses the application class loader, and so the UDFs fail to
>> deserialize on local flink, because the user-defined class loader is
>> bypassed. Moreover, there is no way to add additional classpath elements
>> for LocalEnvironment (as opposed to RemoteEnvironment). I'm able to hack
>> this by calling addURL method on the application class loader (which is
>> terribly hackish), but that works only on JDK <= 8. No sensible workaround
>> is available for JDK >= 9.
>>> Alternative solution would be to enable adding jars to class loader when
>> using LocalEnvironment, but that looks a little odd.
>>> Jan
>>>
>>> On 9/2/19 11:02 AM, Aljoscha Krettek wrote:
 Hi,

 I actually don’t know whether that change would be ok.
>> FlinkUserCodeClassLoader has taken
>> FlinkUserCodeClassLoader.class.getClassLoader() as the parent ClassLoader
>> before my change. See:
>> https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
>> <
>> https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
>>> .
 I have the feeling that this might be on purpose because we want to
>> have the ClassLoader of the Flink Framework components be the parent
>> ClassLoader, but I could be wrong. Maybe Stephan would be most appropriate
>> for answering this.
 Best,
 Aljoscha

> On 30. Aug 2019, at 16:28, Till Rohrmann  wrote:
>
> Hi Jan,
>
> this looks to me like a bug for which you could create a JIRA and PR
>> to fix it. Just to make sure

Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-03 Thread Andrey Zagrebin
Thanks for starting the vote Xintong

Also +1 for the proposed FLIP-49.

@Stephan regarding namings: network vs shuffle.
My understanding so far was that the network memory is what we basically
give to Shuffle implementations and default netty implementation uses it in
particular mostly for networking.
Are the network pools used for something else outside of the shuffling
scope?

best,
Andrey

On Tue, Sep 3, 2019 at 1:01 PM Stephan Ewen  wrote:

> +1 to the proposal in general
>
> A few things seems to be a bit put of sync with the latest discussions
> though.
>
> The section about JVM Parameters states that the
> -XX:MaxDirectMemorySize value is set to Task Off-heap Memory, Shuffle
> Memory and JVM Overhead.
> The way I understand the last discussion conclusion is that it is only the
> sum of shuffle memory and user-defined direct memory.
>
> I am someone neutral but unsure about is the separation between
> "taskmanager.memory.framework.heap" and "taskmanager.memory.task.heap".
> Could that be simply combined under "taskmanager.memory.javaheap"?
>
> It might be good to also expose these values somehow in the web UI so that
> users see immediately what amount of memory TMs assume to use for what.
>
> I assume config key names and default values might be adjusted over time as
> we get feedback.
>   - I would keep the network memory under the name
> "taskmanager.memory.network". Because network memory is actually used for
> more than shuffling. Also, the old config key seems good, so why change it?
>
> One thing to be aware of is that often, the Java Heap is understood as
> "managed memory" as a whole, because it is managed by the GC not explicitly
> by the user.
> So we need to make sure that we don't confuse users by speaking of managed
> heap and unmanaged heap. All heap is managed in Java. Some memory is
> explicitly managed by Flink.
>
> Best,
> Stephan
>
>
> On Mon, Sep 2, 2019 at 3:08 PM Xintong Song  wrote:
>
> > Hi everyone,
> >
> > I'm here to re-start the voting process for FLIP-49 [1], with respect to
> > consensus reached in this thread [2] regarding some new comments and
> > concerns.
> >
> > This voting will be open for at least 72 hours. I'll try to close it Sep.
> > 5, 14:00 UTC, unless there is an objection or not enough votes.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> > [2]
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
> >
> > On Tue, Aug 27, 2019 at 9:29 PM Xintong Song 
> > wrote:
> >
> > > Alright, then let's keep the discussion in the DISCUSS mailing thread,
> > and
> > > see whether we need to restart the vote.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Aug 27, 2019 at 8:12 PM Till Rohrmann 
> > > wrote:
> > >
> > >> I had a couple of comments concerning the implementation plan. I've
> > posted
> > >> them to the original discussion thread. Depending on the outcome of
> this
> > >> discussion we might need to restart the vote.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Tue, Aug 27, 2019 at 11:14 AM Xintong Song 
> > >> wrote:
> > >>
> > >> > Hi all,
> > >> >
> > >> > I would like to start the voting process for FLIP-49 [1], which is
> > >> > discussed and reached consensus in this thread [2].
> > >> >
> > >> > This voting will be open for at least 72 hours. I'll try to close it
> > >> Aug.
> > >> > 30 10:00 UTC, unless there is an objection or not enough votes.
> > >> >
> > >> > Thank you~
> > >> >
> > >> > Xintong Song
> > >> >
> > >> >
> > >> > [1]
> > >> >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> > >> > [2]
> > >> >
> > >> >
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
> > >> >
> > >>
> > >
> >
>


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-03 Thread Aljoscha Krettek
Hi,

I think it’s important to keep in mind the original goals of this FLIP and not 
let the scope grow indefinitely. As I recall it, the goals are:

 - Extend the ConfigOption system enough to allow the Table API to configure 
options that are right now only available on CheckpointingOptions, 
ExecutionConfig, and StreamExecutionEnvironment. We also want to do this 
without manually having to “forward” all the available configuration options by 
introducing equivalent setters in the Table API

 - Do the above while keeping in mind that eventually we want to allow users to 
configure everything from either the flink-conf.yaml, vie command line 
parameters, or via a Configuration.

I think the FLIP achieves this, with the added side goals of making validation 
a part of ConfigOptions, making them type safe, and making the validation 
constraints documentable (via automatic doc generation.) All this without 
breaking backwards compatibility, if I’m not mistaken.

I think we should first agree what the basic goals are so that we can quickly 
converge to consensus on this FLIP because it blocks other people/work. Among 
other things FLIP-59 depends on this. What are other opinions that people have? 
I know Becket at least has some thoughts about immutability and loading objects 
via the configuration but maybe they could be put into a follow-up FLIP if they 
are needed.

Also, I had one thought on the interaction of this FLIP-54 and FLIP-59 when it 
comes to naming. I think eventually it makes sense to have a common interface 
for things that are configurable from a Configuration (FLIP-59 introduces the 
first batch of this). It seems natural to call this interface Configurable. 
That’s a problem for this FLIP-54 because we also introduce a Configurable. 
Maybe the thing that we introduce here should be called ConfigObject or 
ConfigStruct to highlight that it has a more narrow focus and is really only a 
POJO for holding a bunch of config options that have to go together. What do 
you think?

Best,
Aljoscha

> On 3. Sep 2019, at 14:08, Timo Walther  wrote:
> 
> Hi Danny,
> 
> yes, this FLIP covers all the building blocks we need also for unification of 
> the DDL properties.
> 
> Regards,
> Timo
> 
> 
> On 03.09.19 13:45, Danny Chan wrote:
>>> with the new SQL DDL
>> based on properties as well as more connectors and formats coming up,
>> unified configuration becomes more important
>> 
>> I Cann’t agree more, do you think we can unify the config options key format 
>> here for all the DDL properties ?
>> 
>> Best,
>> Danny Chan
>> 在 2019年8月16日 +0800 PM10:12,dev@flink.apache.org,写道:
>>> with the new SQL DDL
>>> based on properties as well as more connectors and formats coming up,
>>> unified configuration becomes more important
> 
> 



Re: Flink SQL - Support Computed Columns in DDL?

2019-09-03 Thread Qi Luo
Hi Jark and Danny,

Glad to hear your plan on this!

One of our use cases is to define some column as rowtime (which is not of
type timestamp). Computed column seems to be a natural fit for that.

Thanks,
Qi

On Tue, Sep 3, 2019 at 7:46 PM Jark Wu  wrote:

> Hi Qi,
>
> The computed column is not fully supported in 1.9. We will start a design
> discussion in the dev mailing list soon. Please stay tuned!
>
> Btw, could you share with us what's the case why do you want to use
> computed column?
>
> Best,
> Jark
>
> On Tue, 3 Sep 2019 at 19:25, Danny Chan  wrote:
>
> > Yeah, we are planning to implement this feature in release-1.10, wait for
> > our good news !
> >
> > Best,
> > Danny Chan
> > 在 2019年9月3日 +0800 PM6:19,Qi Luo ,写道:
> > > Hi folks,
> > >
> > > Computed columns in Flink SQL DDL is currently disabled in both old
> > planner
> > > and Blink planner (throws "Computed columns for DDL is not supported
> > yet!"
> > > exception in SqlToOperationConverter).
> > >
> > > I searched through the JIRA but found no relevant issues. Do we have
> any
> > > plans to support this nice feature?
> > >
> > > Thanks,
> > > Qi
> >
>


Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-03 Thread jincheng sun
Hi Timo,

Thanks for the quick reply ! :)
I have added more example for #3 and #5 to the FLIP. That are great
suggestions !

Regarding 2:

There are two kind Serialization for CloudPickle(Which is different from
Java):
 1) For class and function which can be imported, CloudPickle only
serialize the full path of the class and function (just like java class
name).
 2) For the class and function which can not be imported, CloudPickle will
serialize the full content of the class and function.
For #2, It means that we can not just store the full path of the class and
function.

The above serialization is recursive.

However, there is indeed an problem of backwards compatibility when the
module path of the parent class changed. But I think this is an rare case
and acceptable. i.e., For Flink framework we never change the user
interface module path if we want to keep backwards compatibility. For user
code, if they change the interface of UDF's parent, they should re-register
their functions.

If we do not want support #2, we can store the full path of class and
function, in that case we have no backwards compatibility problem. But I
think the #2 is very convenient for users.

What do you think?

Regarding 4:
As I mentioned earlier, there may be built-in Python functions and I think
language is a "function" concept. Function and Language are orthogonal
concepts.
We may have R, GO and other language functions in the future, not only
user-defined, but also built-in functions.

You are right that users will not set this method and for Python functions,
it will be set in the code-generated Java function by the framework. So, I
think we should declare the getLanguage() in FunctionDefinition for now.
(I'm not pretty sure what do you mean by saying that getKind() is final in
UserDefinedFunction?)

Best,
Jincheng

Timo Walther  于2019年9月3日周二 下午6:01写道:

> Hi Jincheng,
>
> thanks for your response.
>
> 2. Serializability of functions: Using some arbitrary serialization
> format for shipping a function to worker sounds fine to me. But once we
> store functions a the catalog we need to think about backwards
> compatibility and evolution of interfaces etc. I'm not sure if
> CloudPickle is the right long-term storage format for this. If we don't
> think about this in advance, we are basically violating our code quality
> guide [1] of never use Java Serialization but in the Python-way. We are
> using the RPC serialization for persistence.
>
> 3. TableEnvironment: Can you add some example to the FLIP? Because API
> code like the following is not covered there:
>
> self.t_env.register_function("add_one", udf(lambda i: i + 1,
> DataTypes.BIGINT(),
>  DataTypes.BIGINT()))
> self.t_env.register_function("subtract_one", udf(SubtractOne(),
> DataTypes.BIGINT(),
> DataTypes.BIGINT()))
> self.t_env.register_function("add", add)
>
> 4. FunctionDefinition: Your response still doesn't answer my question
> entirely. Why do we need FunctionDefinition.getLanguage() if this is a
> "user-defined function" concept and not a "function" concept. In any
> case, all users should not be able to set this method. So it must be
> final in UserDefinedFunction similar to getKind().
>
> 5. Function characteristics: If UserDefinedFunction is defined in
> Python, why is it not used in your example in FLIP-58. You could you
> extend the example to show how to specify these attributes in the FLIP?
>
> Regards,
> Timo
>
> [1] https://flink.apache.org/contributing/code-style-and-quality-java.html
>
> On 02.09.19 15:35, jincheng sun wrote:
> > Hi Timo,
> >
> > Great thanks for your feedback. I would like to share my thoughts with
> you
> > inline. :)
> >
> > Best,
> > Jincheng
> >
> > Timo Walther  于2019年9月2日周一 下午5:04写道:
> >
> >> Hi all,
> >>
> >> the FLIP looks awesome. However, I would like to discuss the changes to
> >> the user-facing parts again. Some feedback:
> >>
> >> 1. DataViews: With the current non-annotation design for DataViews, we
> >> cannot perform eager state declaration, right? At which point during
> >> execution do we know which state is required by the function? We need to
> >> instantiate the function first, right?
> >>
> >>> We will analysis the Python AggregateFunction and extract the DataViews
> > used in the Python AggregateFunction. This can be done
> > by instantiate a Python AggregateFunction, creating an accumulator by
> > calling method create_accumulator and then analysis the created
> > accumulator. This is actually similar to the way that Java
> > AggregateFunction processing codegen logic. The extracted DataViews can
> > then be used to construct the StateDescriptors in the operator, i.e., we
> > should have hold the state spec and the state descriptor id in Java
> > operator and Python worker can access the state by specifying the
> > corresponding state descriptor id.
> >
> >
> >
> >> 2. Serializability of functions: How do we ensure serializability of
> >> functions for catalog persi

Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-03 Thread Till Rohrmann
Thanks for creating this FLIP and starting the vote Xintong.

+1 for the proposal from my side.

I agree with Stephan that we might wanna revisit some of the configuration
names.

If I understood it correctly, then Task Off-heap memory represents the
direct memory used by the user code, right? How would users configure
native memory requirements for the user code? If it is part of Task Off
heap memory, then we need to split it to set -XX:MaxDirectMemorySize
correctly or to introduce another configuration option.

Given all these configuration options, I can see that users will get
confused quite easily. Therefore, I would like to emphasise that we need a
very good documentation about how to properly configure Flink processes and
which knobs to turn in which cases.

Cheers,
Till

On Tue, Sep 3, 2019 at 2:34 PM Andrey Zagrebin  wrote:

> Thanks for starting the vote Xintong
>
> Also +1 for the proposed FLIP-49.
>
> @Stephan regarding namings: network vs shuffle.
> My understanding so far was that the network memory is what we basically
> give to Shuffle implementations and default netty implementation uses it in
> particular mostly for networking.
> Are the network pools used for something else outside of the shuffling
> scope?
>
> best,
> Andrey
>
> On Tue, Sep 3, 2019 at 1:01 PM Stephan Ewen  wrote:
>
> > +1 to the proposal in general
> >
> > A few things seems to be a bit put of sync with the latest discussions
> > though.
> >
> > The section about JVM Parameters states that the
> > -XX:MaxDirectMemorySize value is set to Task Off-heap Memory, Shuffle
> > Memory and JVM Overhead.
> > The way I understand the last discussion conclusion is that it is only
> the
> > sum of shuffle memory and user-defined direct memory.
> >
> > I am someone neutral but unsure about is the separation between
> > "taskmanager.memory.framework.heap" and "taskmanager.memory.task.heap".
> > Could that be simply combined under "taskmanager.memory.javaheap"?
> >
> > It might be good to also expose these values somehow in the web UI so
> that
> > users see immediately what amount of memory TMs assume to use for what.
> >
> > I assume config key names and default values might be adjusted over time
> as
> > we get feedback.
> >   - I would keep the network memory under the name
> > "taskmanager.memory.network". Because network memory is actually used for
> > more than shuffling. Also, the old config key seems good, so why change
> it?
> >
> > One thing to be aware of is that often, the Java Heap is understood as
> > "managed memory" as a whole, because it is managed by the GC not
> explicitly
> > by the user.
> > So we need to make sure that we don't confuse users by speaking of
> managed
> > heap and unmanaged heap. All heap is managed in Java. Some memory is
> > explicitly managed by Flink.
> >
> > Best,
> > Stephan
> >
> >
> > On Mon, Sep 2, 2019 at 3:08 PM Xintong Song 
> wrote:
> >
> > > Hi everyone,
> > >
> > > I'm here to re-start the voting process for FLIP-49 [1], with respect
> to
> > > consensus reached in this thread [2] regarding some new comments and
> > > concerns.
> > >
> > > This voting will be open for at least 72 hours. I'll try to close it
> Sep.
> > > 5, 14:00 UTC, unless there is an objection or not enough votes.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> > > [2]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
> > >
> > > On Tue, Aug 27, 2019 at 9:29 PM Xintong Song 
> > > wrote:
> > >
> > > > Alright, then let's keep the discussion in the DISCUSS mailing
> thread,
> > > and
> > > > see whether we need to restart the vote.
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Tue, Aug 27, 2019 at 8:12 PM Till Rohrmann 
> > > > wrote:
> > > >
> > > >> I had a couple of comments concerning the implementation plan. I've
> > > posted
> > > >> them to the original discussion thread. Depending on the outcome of
> > this
> > > >> discussion we might need to restart the vote.
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > > >> On Tue, Aug 27, 2019 at 11:14 AM Xintong Song <
> tonysong...@gmail.com>
> > > >> wrote:
> > > >>
> > > >> > Hi all,
> > > >> >
> > > >> > I would like to start the voting process for FLIP-49 [1], which is
> > > >> > discussed and reached consensus in this thread [2].
> > > >> >
> > > >> > This voting will be open for at least 72 hours. I'll try to close
> it
> > > >> Aug.
> > > >> > 30 10:00 UTC, unless there is an objection or not enough votes.
> > > >> >
> > > >> > Thank you~
> > > >> >
> > > >> > Xintong Song
> > > >> >
> > > >> >
> > > >> > [1]
> > > >> >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Kurt Young
Does this only affect the functions and operations we currently have in SQL
and
have no effect on tables, right? Looks like this is an orthogonal concept
with Catalog?
If the answer are both yes, then the catalog function will be a weird
concept?

Best,
Kurt


On Tue, Sep 3, 2019 at 8:10 PM Danny Chan  wrote:

> The way you proposed are basically the same as what Calcite does, I think
> we are in the same line.
>
> Best,
> Danny Chan
> 在 2019年9月3日 +0800 PM7:57,Timo Walther ,写道:
> > This sounds exactly as the module approach I mentioned, no?
> >
> > Regards,
> > Timo
> >
> > On 03.09.19 13:42, Danny Chan wrote:
> > > Thanks Bowen for bring up this topic, I think it’s a useful
> refactoring to make our function usage more user friendly.
> > >
> > > For the topic of how to organize the builtin operators and operators
> of Hive, here is a solution from Apache Calcite, the Calcite way is to make
> every dialect operators a “Library”, user can specify which libraries they
> want to use for a sql query. The builtin operators always comes as the
> first class objects and the others are used from the order they appears.
> Maybe you can take a reference.
> > >
> > > [1]
> https://github.com/apache/calcite/commit/9a4eab5240d96379431d14a1ac33bfebaf6fbb28
> > >
> > > Best,
> > > Danny Chan
> > > 在 2019年8月28日 +0800 AM2:50,Bowen Li ,写道:
> > > > Hi folks,
> > > >
> > > > I'd like to kick off a discussion on reworking Flink's
> FunctionCatalog.
> > > > It's critically helpful to improve function usability in SQL.
> > > >
> > > >
> https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing
> > > >
> > > > In short, it:
> > > > - adds support for precise function reference with fully/partially
> > > > qualified name
> > > > - redefines function resolution order for ambiguous function
> reference
> > > > - adds support for Hive's rich built-in functions (support for Hive
> user
> > > > defined functions was already added in 1.9.0)
> > > > - clarifies the concept of temporary functions
> > > >
> > > > Would love to hear your thoughts.
> > > >
> > > > Bowen
> >
> >
>


Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-03 Thread Kostas Kloudas
Thanks for waiting!

A fix for FLINK-13940 has been merged on 1.8, 1.9 and the master under
FLINK-13941.

Cheers,
Kostas

On Tue, Sep 3, 2019 at 11:25 AM jincheng sun  wrote:
>
> +1 FLINK-13940  is a
> blocker, due to loss data is very important bug, And great thanks for
> helping fix it  Kostas!
>
> Best, Jincheng
>
> Kostas Kloudas  于2019年9月2日周一 下午7:20写道:
>
> > Hi all,
> >
> > I think this should be also considered a blocker
> > https://issues.apache.org/jira/browse/FLINK-13940.
> > It is not a regression but it can result to data loss.
> >
> > I think I can have a quick fix by tomorrow.
> >
> > Cheers,
> > Kostas
> >
> > On Mon, Sep 2, 2019 at 12:01 PM jincheng sun 
> > wrote:
> > >
> > > Thanks for all of your feedback!
> > >
> > > Hi Jark, Glad to see that you are doing what RM should doing.
> > >
> > > Only one tips here is before the RC1 all the blocker should be fixed, but
> > > othrers is nice to have. So you can decide when to prepare RC1 after the
> > > blokcer is resolved.
> > >
> > > Feel free to tell me if you have any questions.
> > >
> > > Best,Jincheng
> > >
> > > Aljoscha Krettek  于2019年9月2日周一 下午5:03写道:
> > >
> > > > I cut a PR for FLINK-13586: https://github.com/apache/flink/pull/9595
> > <
> > > > https://github.com/apache/flink/pull/9595>
> > > >
> > > > > On 2. Sep 2019, at 05:03, Yu Li  wrote:
> > > > >
> > > > > +1 for a 1.8.2 release, thanks for bringing this up Jincheng!
> > > > >
> > > > > Best Regards,
> > > > > Yu
> > > > >
> > > > >
> > > > > On Mon, 2 Sep 2019 at 09:19, Thomas Weise  wrote:
> > > > >
> > > > >> +1 for the 1.8.2 release
> > > > >>
> > > > >> I marked https://issues.apache.org/jira/browse/FLINK-13586 for this
> > > > >> release. It would be good to compensate for the backward
> > incompatible
> > > > >> change to ClosureCleaner that was introduced in 1.8.1, which affects
> > > > >> downstream dependencies.
> > > > >>
> > > > >> Thanks,
> > > > >> Thomas
> > > > >>
> > > > >>
> > > > >> On Sun, Sep 1, 2019 at 5:10 PM jincheng sun <
> > sunjincheng...@gmail.com>
> > > > >> wrote:
> > > > >>
> > > > >>> Hi Jark,
> > > > >>>
> > > > >>> Glad to hear that you want to be the Release Manager of flink
> > 1.8.2.
> > > > >>> I believe that you will be a great RM, and I am very willing to
> > help
> > > > you
> > > > >>> with the final release in the final stages. :)
> > > > >>>
> > > > >>> The release of Apache Flink involves a number of tasks. For
> > details,
> > > > you
> > > > >>> can consult the documentation [1]. If you have any questions,
> > please
> > > > let
> > > > >> me
> > > > >>> know and let us work together.
> > > > >>>
> > > > >>>
> > > > >>>
> > > > >>
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release#CreatingaFlinkRelease-Checklisttoproceedtothenextstep.1
> > > > >>>
> > > > >>> Cheers,
> > > > >>> Jincheng
> > > > >>>
> > > > >>> Till Rohrmann  于2019年8月31日周六 上午12:59写道:
> > > > >>>
> > > >  +1 for a 1.8.2 bug fix release. Thanks for kicking this
> > discussion off
> > > >  Jincheng.
> > > > 
> > > >  Cheers,
> > > >  Till
> > > > 
> > > >  On Fri, Aug 30, 2019 at 6:45 PM Jark Wu  wrote:
> > > > 
> > > > > Thanks Jincheng for bringing this up.
> > > > >
> > > > > +1 to the 1.8.2 release, because it already contains a couple of
> > > >  important
> > > > > fixes and it has been a long time since 1.8.1 came out.
> > > > > I'm willing to help the community as much as possible. I'm
> > wondering
> > > > >>> if I
> > > > > can be the release manager of 1.8.2 or work with you together
> > > > >>> @Jincheng?
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > On Fri, 30 Aug 2019 at 18:58, Hequn Cheng 
> > > > >>> wrote:
> > > > >
> > > > >> Hi Jincheng,
> > > > >>
> > > > >> +1 for a 1.8.2 release.
> > > > >> Thanks a lot for raising the discussion. It would be nice to
> > have
> > > > >>> these
> > > > >> critical fixes.
> > > > >>
> > > > >> Best, Hequn
> > > > >>
> > > > >>
> > > > >> On Fri, Aug 30, 2019 at 6:31 PM Maximilian Michels <
> > m...@apache.org
> > > > >>>
> > > > > wrote:
> > > > >>
> > > > >>> Hi Jincheng,
> > > > >>>
> > > > >>> +1 I would be for a 1.8.2 release such that we can fix the
> > > > >> problems
> > > > > with
> > > > >>> the nested closure cleaner which currently block 1.8.1 users
> > with
> > > >  Beam:
> > > > >>> https://issues.apache.org/jira/browse/FLINK-13367
> > > > >>>
> > > > >>> Thanks,
> > > > >>> Max
> > > > >>>
> > > > >>> On 30.08.19 11:25, jincheng sun wrote:
> > > >  Hi Flink devs,
> > > > 
> > > >  It has been nearly 2 months since the 1.8.1 released. So, what
> > > > >> do
> > > >  you
> > > > >>> think
> > > >  about releasing Flink 1.8.2 soon?
> > > > 
> > > >  We already have some b

Re: Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-03 Thread Till Rohrmann
I see the problem Jan. What about the following proposal: Instead of using
the LocalEnvironment for local tests you always use the RemoteEnvironment
but when testing it locally you spin up a MiniCluster in the same process
and initialize the RemoteEnvironment with `MiniCluster#getRestAddress`.
That way you would always submit a jar with the generated classes and,
hence, not having to set the context class loader.

The contract of the LocalEnvironment is indeed that all classes it is
supposed t execute must be present when being started.

Cheers,
Till

On Tue, Sep 3, 2019 at 2:27 PM guaishushu1...@163.com <
guaishushu1...@163.com> wrote:

>
>
>
>
> guaishushu1...@163.com
>
> From: guaishushu1...@163.com
> Date: 2019-09-03 20:25
> To: dev
> Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not
> using context classloader
>
>
>
>
> guaishushu1...@163.com
> From: guaishushu1...@163.com
> Date: 2019-09-03 20:23
> To: dev
> Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not
> using context classloader
> guaishushu1...@163.com
> From: Jan Lukavský
> Date: 2019-09-03 20:17
> To: dev
> Subject: Re: ClassLoader created by BlobLibraryCacheManager is not using
> context classloader
> Hi Till,
> the use-case is pretty much simple - I have a REPL shell in groovy,
> which generates classes at runtime. The actual hierarchy is therefore
> system class loader -> application classloader -> repl classloader
> (GroovyClassLoader actually)
> now, when a terminal (sink) operation in the shell occurs, I'm able to
> build a jar, which I can submit to remote cluster (in distributed case).
> But - during testing -  I run the code using local flink. There is no
> (legal) way of adding this new runtime generated jar to local flink. As
> I said, I have a hackish solution which works on JDK <= 8, because it
> uses reflection to call addURL on the application classloader (and
> thefore "pretends", that the generated jar was there all the time from
> the JVM startup). This breaks on JDK >= 9. It might be possible to work
> around this somehow, but I think that the reason why LocalEnvironment is
> not having a way to add jars (as in case of RemoteEnvironment) is that
> is assumes, that you actually have all of the on classpath when using
> local runner. I think that this implies that it either has to use
> context classloader (to be able to work on top of any classloading user
> might have), or is wrong and would need be fixed, so that
> LocalEnvironment would accept files to "stage" - which would mean adding
> them to a class loader (probably URLClassLoader with the application
> class loader as parent).
> Or, would you see any other option?
> Jan
> On 9/3/19 2:00 PM, Till Rohrmann wrote:
> > Hi Jan,
> >
> > I've talked with Aljoscha and Stephan offline and we concluded that we
> > would like to avoid the usage of context class loaders if possible. The
> > reason for this is that using the context class loader can easily mess up
> > an otherwise clear class loader hierarchy which makes it hard to reason
> > about and to debug class loader issues.
> >
> > Given this, I think it would help to better understand the exact problem
> > you are trying to solve by using the context class loader. Usually the
> > usage of the context class loader points towards an API deficiency which
> we
> > might be able to address differently.
> >
> > Cheers,
> > Till
> >
> > On Mon, Sep 2, 2019 at 11:32 AM Aljoscha Krettek 
> > wrote:
> >
> >> I’m not saying we can’t change that code to use the context class
> loader.
> >> I’m just not sure whether this might break other things.
> >>
> >> Best,
> >> Aljoscha
> >>
> >>> On 2. Sep 2019, at 11:24, Jan Lukavský  wrote:
> >>>
> >>> Essentially, the class loader of Flink should be present in parent
> >> hierarchy of context class loader. If FlinkUserCodeClassLoader doesn't
> use
> >> context class loader, then it is actually impossible to use a hierarchy
> >> like this:
> >>>   system class loader -> application class loader -> user-defined class
> >> loader (defines some UDFs to be used in flink program)
> >>> Flink now uses the application class loader, and so the UDFs fail to
> >> deserialize on local flink, because the user-defined class loader is
> >> bypassed. Moreover, there is no way to add additional classpath elements
> >> for LocalEnvironment (as opposed to RemoteEnvironment). I'm able to hack
> >> this by calling addURL method on the application class loader (which is
> >> terribly hackish), but that works only on JDK <= 8. No sensible
> workaround
> >> is available for JDK >= 9.
> >>> Alternative solution would be to enable adding jars to class loader
> when
> >> using LocalEnvironment, but that looks a little odd.
> >>> Jan
> >>>
> >>> On 9/2/19 11:02 AM, Aljoscha Krettek wrote:
>  Hi,
> 
>  I actually don’t know whether that change would be ok.
> >> FlinkUserCodeClassLoader has taken
> >> FlinkUserCodeClassLoader.class.getClassLoader() as the parent
> C

Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-03 Thread Jan Lukavský

Hi Till,

hmm, that sounds it might work. I would have to incorporate this (either 
as default, or on demand) into Apache Beam. Would you see any 
disadvantages of this approach? Would you suggest to make this default 
behavior for local beam FlinkRunner? I can introduce a configuration 
option to turn this behavior on, but that would bring additional 
maintenance burden, etc., etc.


Jan

On 9/3/19 3:38 PM, Till Rohrmann wrote:

I see the problem Jan. What about the following proposal: Instead of using
the LocalEnvironment for local tests you always use the RemoteEnvironment
but when testing it locally you spin up a MiniCluster in the same process
and initialize the RemoteEnvironment with `MiniCluster#getRestAddress`.
That way you would always submit a jar with the generated classes and,
hence, not having to set the context class loader.

The contract of the LocalEnvironment is indeed that all classes it is
supposed t execute must be present when being started.

Cheers,
Till

On Tue, Sep 3, 2019 at 2:27 PM guaishushu1...@163.com <
guaishushu1...@163.com> wrote:





guaishushu1...@163.com

From: guaishushu1...@163.com
Date: 2019-09-03 20:25
To: dev
Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not
using context classloader




guaishushu1...@163.com
From: guaishushu1...@163.com
Date: 2019-09-03 20:23
To: dev
Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not
using context classloader
guaishushu1...@163.com
From: Jan Lukavský
Date: 2019-09-03 20:17
To: dev
Subject: Re: ClassLoader created by BlobLibraryCacheManager is not using
context classloader
Hi Till,
the use-case is pretty much simple - I have a REPL shell in groovy,
which generates classes at runtime. The actual hierarchy is therefore
system class loader -> application classloader -> repl classloader
(GroovyClassLoader actually)
now, when a terminal (sink) operation in the shell occurs, I'm able to
build a jar, which I can submit to remote cluster (in distributed case).
But - during testing -  I run the code using local flink. There is no
(legal) way of adding this new runtime generated jar to local flink. As
I said, I have a hackish solution which works on JDK <= 8, because it
uses reflection to call addURL on the application classloader (and
thefore "pretends", that the generated jar was there all the time from
the JVM startup). This breaks on JDK >= 9. It might be possible to work
around this somehow, but I think that the reason why LocalEnvironment is
not having a way to add jars (as in case of RemoteEnvironment) is that
is assumes, that you actually have all of the on classpath when using
local runner. I think that this implies that it either has to use
context classloader (to be able to work on top of any classloading user
might have), or is wrong and would need be fixed, so that
LocalEnvironment would accept files to "stage" - which would mean adding
them to a class loader (probably URLClassLoader with the application
class loader as parent).
Or, would you see any other option?
Jan
On 9/3/19 2:00 PM, Till Rohrmann wrote:

Hi Jan,

I've talked with Aljoscha and Stephan offline and we concluded that we
would like to avoid the usage of context class loaders if possible. The
reason for this is that using the context class loader can easily mess up
an otherwise clear class loader hierarchy which makes it hard to reason
about and to debug class loader issues.

Given this, I think it would help to better understand the exact problem
you are trying to solve by using the context class loader. Usually the
usage of the context class loader points towards an API deficiency which

we

might be able to address differently.

Cheers,
Till

On Mon, Sep 2, 2019 at 11:32 AM Aljoscha Krettek 
wrote:


I’m not saying we can’t change that code to use the context class

loader.

I’m just not sure whether this might break other things.

Best,
Aljoscha


On 2. Sep 2019, at 11:24, Jan Lukavský  wrote:

Essentially, the class loader of Flink should be present in parent

hierarchy of context class loader. If FlinkUserCodeClassLoader doesn't

use

context class loader, then it is actually impossible to use a hierarchy
like this:

   system class loader -> application class loader -> user-defined class

loader (defines some UDFs to be used in flink program)

Flink now uses the application class loader, and so the UDFs fail to

deserialize on local flink, because the user-defined class loader is
bypassed. Moreover, there is no way to add additional classpath elements
for LocalEnvironment (as opposed to RemoteEnvironment). I'm able to hack
this by calling addURL method on the application class loader (which is
terribly hackish), but that works only on JDK <= 8. No sensible

workaround

is available for JDK >= 9.

Alternative solution would be to enable adding jars to class loader

when

using LocalEnvironment, but that looks a little odd.

Jan

On 9/2/19 11:02 AM, Aljoscha Krettek wrote:

Hi,

I actually don’t know whether that chan

Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-03 Thread Jan Lukavský
On the other hand, if you say, that the contract of LocalEnvironment is 
to execute as if it had all classes on its class loader, then it 
currently breaks this contract. :-)


Jan

On 9/3/19 3:45 PM, Jan Lukavský wrote:

Hi Till,

hmm, that sounds it might work. I would have to incorporate this 
(either as default, or on demand) into Apache Beam. Would you see any 
disadvantages of this approach? Would you suggest to make this default 
behavior for local beam FlinkRunner? I can introduce a configuration 
option to turn this behavior on, but that would bring additional 
maintenance burden, etc., etc.


Jan

On 9/3/19 3:38 PM, Till Rohrmann wrote:
I see the problem Jan. What about the following proposal: Instead of 
using
the LocalEnvironment for local tests you always use the 
RemoteEnvironment
but when testing it locally you spin up a MiniCluster in the same 
process

and initialize the RemoteEnvironment with `MiniCluster#getRestAddress`.
That way you would always submit a jar with the generated classes and,
hence, not having to set the context class loader.

The contract of the LocalEnvironment is indeed that all classes it is
supposed t execute must be present when being started.

Cheers,
Till

On Tue, Sep 3, 2019 at 2:27 PM guaishushu1...@163.com <
guaishushu1...@163.com> wrote:





guaishushu1...@163.com

From: guaishushu1...@163.com
Date: 2019-09-03 20:25
To: dev
Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not
using context classloader




guaishushu1...@163.com
From: guaishushu1...@163.com
Date: 2019-09-03 20:23
To: dev
Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not
using context classloader
guaishushu1...@163.com
From: Jan Lukavský
Date: 2019-09-03 20:17
To: dev
Subject: Re: ClassLoader created by BlobLibraryCacheManager is not 
using

context classloader
Hi Till,
the use-case is pretty much simple - I have a REPL shell in groovy,
which generates classes at runtime. The actual hierarchy is therefore
system class loader -> application classloader -> repl classloader
(GroovyClassLoader actually)
now, when a terminal (sink) operation in the shell occurs, I'm able to
build a jar, which I can submit to remote cluster (in distributed 
case).

But - during testing -  I run the code using local flink. There is no
(legal) way of adding this new runtime generated jar to local flink. As
I said, I have a hackish solution which works on JDK <= 8, because it
uses reflection to call addURL on the application classloader (and
thefore "pretends", that the generated jar was there all the time from
the JVM startup). This breaks on JDK >= 9. It might be possible to work
around this somehow, but I think that the reason why 
LocalEnvironment is

not having a way to add jars (as in case of RemoteEnvironment) is that
is assumes, that you actually have all of the on classpath when using
local runner. I think that this implies that it either has to use
context classloader (to be able to work on top of any classloading user
might have), or is wrong and would need be fixed, so that
LocalEnvironment would accept files to "stage" - which would mean 
adding

them to a class loader (probably URLClassLoader with the application
class loader as parent).
Or, would you see any other option?
Jan
On 9/3/19 2:00 PM, Till Rohrmann wrote:

Hi Jan,

I've talked with Aljoscha and Stephan offline and we concluded that we
would like to avoid the usage of context class loaders if possible. 
The
reason for this is that using the context class loader can easily 
mess up
an otherwise clear class loader hierarchy which makes it hard to 
reason

about and to debug class loader issues.

Given this, I think it would help to better understand the exact 
problem

you are trying to solve by using the context class loader. Usually the
usage of the context class loader points towards an API deficiency 
which

we

might be able to address differently.

Cheers,
Till

On Mon, Sep 2, 2019 at 11:32 AM Aljoscha Krettek 
wrote:


I’m not saying we can’t change that code to use the context class

loader.

I’m just not sure whether this might break other things.

Best,
Aljoscha


On 2. Sep 2019, at 11:24, Jan Lukavský  wrote:

Essentially, the class loader of Flink should be present in parent
hierarchy of context class loader. If FlinkUserCodeClassLoader 
doesn't

use
context class loader, then it is actually impossible to use a 
hierarchy

like this:
   system class loader -> application class loader -> 
user-defined class

loader (defines some UDFs to be used in flink program)

Flink now uses the application class loader, and so the UDFs fail to

deserialize on local flink, because the user-defined class loader is
bypassed. Moreover, there is no way to add additional classpath 
elements
for LocalEnvironment (as opposed to RemoteEnvironment). I'm able 
to hack
this by calling addURL method on the application class loader 
(which is

terribly hackish), but that works only on JDK <= 8. No sensible

workaround

is avai

Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-03 Thread Till Rohrmann
How so? Does your REPL add the generated classes to the system class
loader? I assume the system class loader is used to load the Flink classes.

Ideally, what you would like to have is the option to provide the parent
class loader which is used load user code to the LocalEnvironment. This one
could then be forwarded to the TaskExecutor where it is used to generate
the user code class loader. But this is a bigger effort.

The downside to this approach is that it requires you to create a jar file
and to submit it via a REST call. The upside is that it is closer to the
production setting.

Cheers,
Till

On Tue, Sep 3, 2019 at 3:47 PM Jan Lukavský  wrote:

> On the other hand, if you say, that the contract of LocalEnvironment is
> to execute as if it had all classes on its class loader, then it
> currently breaks this contract. :-)
>
> Jan
>
> On 9/3/19 3:45 PM, Jan Lukavský wrote:
> > Hi Till,
> >
> > hmm, that sounds it might work. I would have to incorporate this
> > (either as default, or on demand) into Apache Beam. Would you see any
> > disadvantages of this approach? Would you suggest to make this default
> > behavior for local beam FlinkRunner? I can introduce a configuration
> > option to turn this behavior on, but that would bring additional
> > maintenance burden, etc., etc.
> >
> > Jan
> >
> > On 9/3/19 3:38 PM, Till Rohrmann wrote:
> >> I see the problem Jan. What about the following proposal: Instead of
> >> using
> >> the LocalEnvironment for local tests you always use the
> >> RemoteEnvironment
> >> but when testing it locally you spin up a MiniCluster in the same
> >> process
> >> and initialize the RemoteEnvironment with `MiniCluster#getRestAddress`.
> >> That way you would always submit a jar with the generated classes and,
> >> hence, not having to set the context class loader.
> >>
> >> The contract of the LocalEnvironment is indeed that all classes it is
> >> supposed t execute must be present when being started.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Tue, Sep 3, 2019 at 2:27 PM guaishushu1...@163.com <
> >> guaishushu1...@163.com> wrote:
> >>
> >>>
> >>>
> >>>
> >>> guaishushu1...@163.com
> >>>
> >>> From: guaishushu1...@163.com
> >>> Date: 2019-09-03 20:25
> >>> To: dev
> >>> Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not
> >>> using context classloader
> >>>
> >>>
> >>>
> >>>
> >>> guaishushu1...@163.com
> >>> From: guaishushu1...@163.com
> >>> Date: 2019-09-03 20:23
> >>> To: dev
> >>> Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not
> >>> using context classloader
> >>> guaishushu1...@163.com
> >>> From: Jan Lukavský
> >>> Date: 2019-09-03 20:17
> >>> To: dev
> >>> Subject: Re: ClassLoader created by BlobLibraryCacheManager is not
> >>> using
> >>> context classloader
> >>> Hi Till,
> >>> the use-case is pretty much simple - I have a REPL shell in groovy,
> >>> which generates classes at runtime. The actual hierarchy is therefore
> >>> system class loader -> application classloader -> repl classloader
> >>> (GroovyClassLoader actually)
> >>> now, when a terminal (sink) operation in the shell occurs, I'm able to
> >>> build a jar, which I can submit to remote cluster (in distributed
> >>> case).
> >>> But - during testing -  I run the code using local flink. There is no
> >>> (legal) way of adding this new runtime generated jar to local flink. As
> >>> I said, I have a hackish solution which works on JDK <= 8, because it
> >>> uses reflection to call addURL on the application classloader (and
> >>> thefore "pretends", that the generated jar was there all the time from
> >>> the JVM startup). This breaks on JDK >= 9. It might be possible to work
> >>> around this somehow, but I think that the reason why
> >>> LocalEnvironment is
> >>> not having a way to add jars (as in case of RemoteEnvironment) is that
> >>> is assumes, that you actually have all of the on classpath when using
> >>> local runner. I think that this implies that it either has to use
> >>> context classloader (to be able to work on top of any classloading user
> >>> might have), or is wrong and would need be fixed, so that
> >>> LocalEnvironment would accept files to "stage" - which would mean
> >>> adding
> >>> them to a class loader (probably URLClassLoader with the application
> >>> class loader as parent).
> >>> Or, would you see any other option?
> >>> Jan
> >>> On 9/3/19 2:00 PM, Till Rohrmann wrote:
>  Hi Jan,
> 
>  I've talked with Aljoscha and Stephan offline and we concluded that we
>  would like to avoid the usage of context class loaders if possible.
>  The
>  reason for this is that using the context class loader can easily
>  mess up
>  an otherwise clear class loader hierarchy which makes it hard to
>  reason
>  about and to debug class loader issues.
> 
>  Given this, I think it would help to better understand the exact
>  problem
>  you are trying to solve by using the context class loader. Usually t

Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-03 Thread Jark Wu
Thanks Kostas for the quick fixing.

However, I find that FLINK-13940 still target to 1.8.2 as a blocker. If I
understand correctly, FLINK-13940 is aiming for a nicer and better solution
in the future.
So should we update the fixVersion of FLINK-13940?

Best,
Jark

On Tue, 3 Sep 2019 at 21:33, Kostas Kloudas  wrote:

> Thanks for waiting!
>
> A fix for FLINK-13940 has been merged on 1.8, 1.9 and the master under
> FLINK-13941.
>
> Cheers,
> Kostas
>
> On Tue, Sep 3, 2019 at 11:25 AM jincheng sun 
> wrote:
> >
> > +1 FLINK-13940  is a
> > blocker, due to loss data is very important bug, And great thanks for
> > helping fix it  Kostas!
> >
> > Best, Jincheng
> >
> > Kostas Kloudas  于2019年9月2日周一 下午7:20写道:
> >
> > > Hi all,
> > >
> > > I think this should be also considered a blocker
> > > https://issues.apache.org/jira/browse/FLINK-13940.
> > > It is not a regression but it can result to data loss.
> > >
> > > I think I can have a quick fix by tomorrow.
> > >
> > > Cheers,
> > > Kostas
> > >
> > > On Mon, Sep 2, 2019 at 12:01 PM jincheng sun  >
> > > wrote:
> > > >
> > > > Thanks for all of your feedback!
> > > >
> > > > Hi Jark, Glad to see that you are doing what RM should doing.
> > > >
> > > > Only one tips here is before the RC1 all the blocker should be
> fixed, but
> > > > othrers is nice to have. So you can decide when to prepare RC1 after
> the
> > > > blokcer is resolved.
> > > >
> > > > Feel free to tell me if you have any questions.
> > > >
> > > > Best,Jincheng
> > > >
> > > > Aljoscha Krettek  于2019年9月2日周一 下午5:03写道:
> > > >
> > > > > I cut a PR for FLINK-13586:
> https://github.com/apache/flink/pull/9595
> > > <
> > > > > https://github.com/apache/flink/pull/9595>
> > > > >
> > > > > > On 2. Sep 2019, at 05:03, Yu Li  wrote:
> > > > > >
> > > > > > +1 for a 1.8.2 release, thanks for bringing this up Jincheng!
> > > > > >
> > > > > > Best Regards,
> > > > > > Yu
> > > > > >
> > > > > >
> > > > > > On Mon, 2 Sep 2019 at 09:19, Thomas Weise 
> wrote:
> > > > > >
> > > > > >> +1 for the 1.8.2 release
> > > > > >>
> > > > > >> I marked https://issues.apache.org/jira/browse/FLINK-13586 for
> this
> > > > > >> release. It would be good to compensate for the backward
> > > incompatible
> > > > > >> change to ClosureCleaner that was introduced in 1.8.1, which
> affects
> > > > > >> downstream dependencies.
> > > > > >>
> > > > > >> Thanks,
> > > > > >> Thomas
> > > > > >>
> > > > > >>
> > > > > >> On Sun, Sep 1, 2019 at 5:10 PM jincheng sun <
> > > sunjincheng...@gmail.com>
> > > > > >> wrote:
> > > > > >>
> > > > > >>> Hi Jark,
> > > > > >>>
> > > > > >>> Glad to hear that you want to be the Release Manager of flink
> > > 1.8.2.
> > > > > >>> I believe that you will be a great RM, and I am very willing to
> > > help
> > > > > you
> > > > > >>> with the final release in the final stages. :)
> > > > > >>>
> > > > > >>> The release of Apache Flink involves a number of tasks. For
> > > details,
> > > > > you
> > > > > >>> can consult the documentation [1]. If you have any questions,
> > > please
> > > > > let
> > > > > >> me
> > > > > >>> know and let us work together.
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release#CreatingaFlinkRelease-Checklisttoproceedtothenextstep.1
> > > > > >>>
> > > > > >>> Cheers,
> > > > > >>> Jincheng
> > > > > >>>
> > > > > >>> Till Rohrmann  于2019年8月31日周六 上午12:59写道:
> > > > > >>>
> > > > >  +1 for a 1.8.2 bug fix release. Thanks for kicking this
> > > discussion off
> > > > >  Jincheng.
> > > > > 
> > > > >  Cheers,
> > > > >  Till
> > > > > 
> > > > >  On Fri, Aug 30, 2019 at 6:45 PM Jark Wu 
> wrote:
> > > > > 
> > > > > > Thanks Jincheng for bringing this up.
> > > > > >
> > > > > > +1 to the 1.8.2 release, because it already contains a
> couple of
> > > > >  important
> > > > > > fixes and it has been a long time since 1.8.1 came out.
> > > > > > I'm willing to help the community as much as possible. I'm
> > > wondering
> > > > > >>> if I
> > > > > > can be the release manager of 1.8.2 or work with you together
> > > > > >>> @Jincheng?
> > > > > >
> > > > > > Best,
> > > > > > Jark
> > > > > >
> > > > > > On Fri, 30 Aug 2019 at 18:58, Hequn Cheng <
> chenghe...@gmail.com>
> > > > > >>> wrote:
> > > > > >
> > > > > >> Hi Jincheng,
> > > > > >>
> > > > > >> +1 for a 1.8.2 release.
> > > > > >> Thanks a lot for raising the discussion. It would be nice to
> > > have
> > > > > >>> these
> > > > > >> critical fixes.
> > > > > >>
> > > > > >> Best, Hequn
> > > > > >>
> > > > > >>
> > > > > >> On Fri, Aug 30, 2019 at 6:31 PM Maximilian Michels <
> > > m...@apache.org
> > > > > >>>
> > > > > > wrote:
> > > > > >>
> > > > > >>> Hi Jincheng,
> > > > > >>>
> > > > > >>> +1 I would be for a 1.8.2 

Re: State of FLIPs

2019-09-03 Thread vino yang
Hi Chesnay,

I created FLIP-44 two months ago.

Originally, the discussion of local aggregation feature happened in the ML
thread.[1]
After many people agree with this feature, I created an umbrella issue and
split into some subtasks and opened one PR.
However, @Aljoscha Krettek  thinks it's a big feature
and worth to create a FLIP, so I created FLIP-44 and closed the PR.
Then, I started a new ML thread about FLIP-44[2] in the dev mailing list.

Unfortunately, we did not come to an agreement on the design. I will
re-active it soon.

Best,
Vino

[1]:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Local-Aggregation-in-Flink-td29307.html#a29308
[2]: https://www.mail-archive.com/dev@flink.apache.org/msg26445.html

Yu Li  于2019年9月2日周一 上午5:16写道:

> Thanks for the reminder Chesnay. I've just moved FLIP-50 into accepted
> list since it has already passed the vote and is under development.
>
> Best Regards,
> Yu
>
>
> On Fri, 30 Aug 2019 at 22:29, Dian Fu  wrote:
>
>> Hi Chesnay,
>>
>> Thanks a lot for the remind. FLIP-38 has been released in 1.9 and I have
>> updated the status in the wiki page.
>>
>> Regards,
>> Dian
>>
>> On Fri, Aug 30, 2019 at 9:38 PM Becket Qin  wrote:
>>
>>> Hi Chesnay,
>>>
>>> You are right. FLIP-36 actually has not passed the vote yet. In fact some
>>> of the key designs may have to change due to the later code changes. I'll
>>> update the wiki and start a new vote.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Fri, Aug 30, 2019 at 8:44 PM Chesnay Schepler 
>>> wrote:
>>>
>>> > The following FLIPs are marked as "Under discussion" in the wiki
>>> > <
>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>>> >,
>>> > but actually seem to be in progress (i.e. have open pull requests) and
>>> some
>>> > even  have code merged to master:
>>> >
>>> >- FLIP-36 (Interactive Programming)
>>> >- FLIP-38 (Python Table API)
>>> >- FLIP-44 (Support Local Aggregation)
>>> >- FLIP-50 (Spill-able Heap Keyed State Backend)
>>> >
>>> > I would like to find out what the _actual_ state is, and then discuss
>>> how
>>> > we handle these FLIPs from now on (e.g., retcon history and mark them
>>> as
>>> > accepted, freeze further development until a vote, ...).
>>> >
>>> > I've cc'd all people who create the wiki pages for said FLIPs.
>>> >
>>> >
>>> >
>>>
>>


Re: [ANNOUNCE] Kinesis connector becomes part of Flink releases

2019-09-03 Thread vino yang
Good news! Thanks for your efforts, Bowen!

Best,
Vino

Yu Li  于2019年9月2日周一 上午6:04写道:

> Great to know, thanks for the efforts Bowen!
>
> And I believe it worth a release note in the original JIRA, wdyt? Thanks.
>
> Best Regards,
> Yu
>
>
> On Sat, 31 Aug 2019 at 11:01, Bowen Li  wrote:
>
>> Hi all,
>>
>> I'm glad to announce that, as #9494
>> was merged today,
>> flink-connector-kinesis is officially of Apache 2.0 license now in master
>> branch and its artifact will be deployed to Maven central as part of Flink
>> releases starting from Flink 1.10.0. Users can use the artifact out of
>> shelf then and no longer have to build and maintain it on their own.
>>
>> It brings a much better user experience to our large AWS customer base by
>> making their work simpler, smoother, and more productive!
>>
>> Thanks everyone who participated in coding and review to drive this
>> initiative forward.
>>
>> Cheers,
>> Bowen
>>
>


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Timo Walther

Hi Kurt,

it should not affect the functions and operations we currently have in 
SQL. It just categorizes the available built-in functions. It is kind of 
an orthogonal concept to the catalog API but built-in functions deserve 
this special kind of treatment. CatalogFunction still fits perfectly in 
there because the regular catalog object resolution logic is not 
affected. So tables and functions are resolved in the same way but with 
built-in functions that have priority as in the original design.


Regards,
Timo


On 03.09.19 15:26, Kurt Young wrote:

Does this only affect the functions and operations we currently have in SQL
and
have no effect on tables, right? Looks like this is an orthogonal concept
with Catalog?
If the answer are both yes, then the catalog function will be a weird
concept?

Best,
Kurt


On Tue, Sep 3, 2019 at 8:10 PM Danny Chan  wrote:


The way you proposed are basically the same as what Calcite does, I think
we are in the same line.

Best,
Danny Chan
在 2019年9月3日 +0800 PM7:57,Timo Walther ,写道:

This sounds exactly as the module approach I mentioned, no?

Regards,
Timo

On 03.09.19 13:42, Danny Chan wrote:

Thanks Bowen for bring up this topic, I think it’s a useful

refactoring to make our function usage more user friendly.

For the topic of how to organize the builtin operators and operators

of Hive, here is a solution from Apache Calcite, the Calcite way is to make
every dialect operators a “Library”, user can specify which libraries they
want to use for a sql query. The builtin operators always comes as the
first class objects and the others are used from the order they appears.
Maybe you can take a reference.

[1]

https://github.com/apache/calcite/commit/9a4eab5240d96379431d14a1ac33bfebaf6fbb28

Best,
Danny Chan
在 2019年8月28日 +0800 AM2:50,Bowen Li ,写道:

Hi folks,

I'd like to kick off a discussion on reworking Flink's

FunctionCatalog.

It's critically helpful to improve function usability in SQL.



https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing

In short, it:
- adds support for precise function reference with fully/partially
qualified name
- redefines function resolution order for ambiguous function

reference

- adds support for Hive's rich built-in functions (support for Hive

user

defined functions was already added in 1.9.0)
- clarifies the concept of temporary functions

Would love to hear your thoughts.

Bowen






Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-03 Thread zhijiang
Thanks for proposing this FLIP and also +1 on my side.

@Andrey Zagrebin For the point of "network memory is actually used more than 
shuffling", I guess that the component of queryable state is also using 
network/netty stack atm, which is outside of shuffling.
In addition, if we only consider the shuffle memory provided by shuffle service 
interface, we should not only consider the memory used by local buffer pool, 
but also consider the netty internal memory 
usages as the overhead, especially we have not the zero-copy improvement on 
dowstream read side. This issue might be out of the vote scope, just think of 
we have this issue in [1]. :)

[1] https://issues.apache.org/jira/browse/FLINK-12110

Best,
Zhijiang
--
From:Till Rohrmann 
Send Time:2019年9月3日(星期二) 15:07
To:dev 
Subject:Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

Thanks for creating this FLIP and starting the vote Xintong.

+1 for the proposal from my side.

I agree with Stephan that we might wanna revisit some of the configuration
names.

If I understood it correctly, then Task Off-heap memory represents the
direct memory used by the user code, right? How would users configure
native memory requirements for the user code? If it is part of Task Off
heap memory, then we need to split it to set -XX:MaxDirectMemorySize
correctly or to introduce another configuration option.

Given all these configuration options, I can see that users will get
confused quite easily. Therefore, I would like to emphasise that we need a
very good documentation about how to properly configure Flink processes and
which knobs to turn in which cases.

Cheers,
Till

On Tue, Sep 3, 2019 at 2:34 PM Andrey Zagrebin  wrote:

> Thanks for starting the vote Xintong
>
> Also +1 for the proposed FLIP-49.
>
> @Stephan regarding namings: network vs shuffle.
> My understanding so far was that the network memory is what we basically
> give to Shuffle implementations and default netty implementation uses it in
> particular mostly for networking.
> Are the network pools used for something else outside of the shuffling
> scope?
>
> best,
> Andrey
>
> On Tue, Sep 3, 2019 at 1:01 PM Stephan Ewen  wrote:
>
> > +1 to the proposal in general
> >
> > A few things seems to be a bit put of sync with the latest discussions
> > though.
> >
> > The section about JVM Parameters states that the
> > -XX:MaxDirectMemorySize value is set to Task Off-heap Memory, Shuffle
> > Memory and JVM Overhead.
> > The way I understand the last discussion conclusion is that it is only
> the
> > sum of shuffle memory and user-defined direct memory.
> >
> > I am someone neutral but unsure about is the separation between
> > "taskmanager.memory.framework.heap" and "taskmanager.memory.task.heap".
> > Could that be simply combined under "taskmanager.memory.javaheap"?
> >
> > It might be good to also expose these values somehow in the web UI so
> that
> > users see immediately what amount of memory TMs assume to use for what.
> >
> > I assume config key names and default values might be adjusted over time
> as
> > we get feedback.
> >   - I would keep the network memory under the name
> > "taskmanager.memory.network". Because network memory is actually used for
> > more than shuffling. Also, the old config key seems good, so why change
> it?
> >
> > One thing to be aware of is that often, the Java Heap is understood as
> > "managed memory" as a whole, because it is managed by the GC not
> explicitly
> > by the user.
> > So we need to make sure that we don't confuse users by speaking of
> managed
> > heap and unmanaged heap. All heap is managed in Java. Some memory is
> > explicitly managed by Flink.
> >
> > Best,
> > Stephan
> >
> >
> > On Mon, Sep 2, 2019 at 3:08 PM Xintong Song 
> wrote:
> >
> > > Hi everyone,
> > >
> > > I'm here to re-start the voting process for FLIP-49 [1], with respect
> to
> > > consensus reached in this thread [2] regarding some new comments and
> > > concerns.
> > >
> > > This voting will be open for at least 72 hours. I'll try to close it
> Sep.
> > > 5, 14:00 UTC, unless there is an objection or not enough votes.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> > > [2]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
> > >
> > > On Tue, Aug 27, 2019 at 9:29 PM Xintong Song 
> > > wrote:
> > >
> > > > Alright, then let's keep the discussion in the DISCUSS mailing
> thread,
> > > and
> > > > see whether we need to restart the vote.
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Tue, Aug 27, 2019 at 8:12 PM Till Rohrmann 
> > > > wrote:
> > > >
> > > >> I had a couple of comments concerning the implementation plan

Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-03 Thread Jan Lukavský

Answers inline.

On 9/3/19 4:01 PM, Till Rohrmann wrote:

How so? Does your REPL add the generated classes to the system class
loader? I assume the system class loader is used to load the Flink classes.
No, it does not. It cannot on JDK >= 9 (or would have to hack into 
jdk.internal.loader.ClassLoaders, which I don't want to :)). It just 
creates another class loader, and is able to create a jar from generated 
files. The jar is used for remote execution.


Ideally, what you would like to have is the option to provide the parent
class loader which is used load user code to the LocalEnvironment. This one
could then be forwarded to the TaskExecutor where it is used to generate
the user code class loader. But this is a bigger effort.
I'm not sure how this differs from using context classloader? Maybe 
there is subtle difference in that this is a little more explicit. On 
the other hand, users normally do not modify class loaders, so the 
practical impact is IMHO negligible. But maybe this opens another 
possibility - we probably could add optional ClassLoader parameter to 
LocalEnvironment, with default value of 
FlinkRunner.class.getClassLoader()? That might be a good compromise.


The downside to this approach is that it requires you to create a jar file
and to submit it via a REST call. The upside is that it is closer to the
production setting.


Yes, a REPL has to do that anyway to support distributed computing, so 
this is not an issue.


Jan



Cheers,
Till

On Tue, Sep 3, 2019 at 3:47 PM Jan Lukavský  wrote:


On the other hand, if you say, that the contract of LocalEnvironment is
to execute as if it had all classes on its class loader, then it
currently breaks this contract. :-)

Jan

On 9/3/19 3:45 PM, Jan Lukavský wrote:

Hi Till,

hmm, that sounds it might work. I would have to incorporate this
(either as default, or on demand) into Apache Beam. Would you see any
disadvantages of this approach? Would you suggest to make this default
behavior for local beam FlinkRunner? I can introduce a configuration
option to turn this behavior on, but that would bring additional
maintenance burden, etc., etc.

Jan

On 9/3/19 3:38 PM, Till Rohrmann wrote:

I see the problem Jan. What about the following proposal: Instead of
using
the LocalEnvironment for local tests you always use the
RemoteEnvironment
but when testing it locally you spin up a MiniCluster in the same
process
and initialize the RemoteEnvironment with `MiniCluster#getRestAddress`.
That way you would always submit a jar with the generated classes and,
hence, not having to set the context class loader.

The contract of the LocalEnvironment is indeed that all classes it is
supposed t execute must be present when being started.

Cheers,
Till

On Tue, Sep 3, 2019 at 2:27 PM guaishushu1...@163.com <
guaishushu1...@163.com> wrote:




guaishushu1...@163.com

From: guaishushu1...@163.com
Date: 2019-09-03 20:25
To: dev
Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not
using context classloader




guaishushu1...@163.com
From: guaishushu1...@163.com
Date: 2019-09-03 20:23
To: dev
Subject: Re: Re: ClassLoader created by BlobLibraryCacheManager is not
using context classloader
guaishushu1...@163.com
From: Jan Lukavský
Date: 2019-09-03 20:17
To: dev
Subject: Re: ClassLoader created by BlobLibraryCacheManager is not
using
context classloader
Hi Till,
the use-case is pretty much simple - I have a REPL shell in groovy,
which generates classes at runtime. The actual hierarchy is therefore
system class loader -> application classloader -> repl classloader
(GroovyClassLoader actually)
now, when a terminal (sink) operation in the shell occurs, I'm able to
build a jar, which I can submit to remote cluster (in distributed
case).
But - during testing -  I run the code using local flink. There is no
(legal) way of adding this new runtime generated jar to local flink. As
I said, I have a hackish solution which works on JDK <= 8, because it
uses reflection to call addURL on the application classloader (and
thefore "pretends", that the generated jar was there all the time from
the JVM startup). This breaks on JDK >= 9. It might be possible to work
around this somehow, but I think that the reason why
LocalEnvironment is
not having a way to add jars (as in case of RemoteEnvironment) is that
is assumes, that you actually have all of the on classpath when using
local runner. I think that this implies that it either has to use
context classloader (to be able to work on top of any classloading user
might have), or is wrong and would need be fixed, so that
LocalEnvironment would accept files to "stage" - which would mean
adding
them to a class loader (probably URLClassLoader with the application
class loader as parent).
Or, would you see any other option?
Jan
On 9/3/19 2:00 PM, Till Rohrmann wrote:

Hi Jan,

I've talked with Aljoscha and Stephan offline and we concluded that we
would like to avoid the usage of context class loaders if possible.
The
reason for t

[jira] [Created] (FLINK-13945) Vendor-repos Maven profile doesn't exist in flink-shaded

2019-09-03 Thread Jira
Elise Ramé created FLINK-13945:
--

 Summary: Vendor-repos Maven profile doesn't exist in flink-shaded
 Key: FLINK-13945
 URL: https://issues.apache.org/jira/browse/FLINK-13945
 Project: Flink
  Issue Type: Bug
  Components: BuildSystem / Shaded
Affects Versions: shaded-8.0, shaded-7.0, shaded-9.0
Reporter: Elise Ramé


According to 
[documentation|https://ci.apache.org/projects/flink/flink-docs-release-1.9/flinkDev/building.html#custom--vendor-specific-versions],
 to build Flink against a vendor specific Hadoop version it is necessary to 
build flink-shaded against this version first : 
{code:bash}
mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=
{code}
vendor-repos profile has to be activated to include Hadoop vendors repositories.
 But Maven cannot find expected Hadoop dependencies and returns an error 
because vendor-repos profile isn't defined in flink-shaded.

Example using flink-shaded 8.0 and HDP 2.6.5 Hadoop version :
{code:bash}
mvn clean install -DskipTests -Pvendor-repos -Dhadoop.version=2.7.3.2.6.5.0-292
{code}
{code:bash}
[INFO] ---< org.apache.flink:flink-shaded-hadoop-2 >---
[INFO] Building flink-shaded-hadoop-2 2.7.3.2.6.5.0-292-8.0 [10/11]
[INFO] [ jar ]-
[WARNING] The POM for org.apache.hadoop:hadoop-common:jar:2.7.3.2.6.5.0-292 is 
missing, no dependency information available
[WARNING] The POM for org.apache.hadoop:hadoop-hdfs:jar:2.7.3.2.6.5.0-292 is 
missing, no dependency information available
[WARNING] The POM for 
org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.7.3.2.6.5.0-292 is 
missing, no dependency information available
[WARNING] The POM for 
org.apache.hadoop:hadoop-yarn-client:jar:2.7.3.2.6.5.0-292 is missing, no 
dependency information available
[WARNING] The POM for 
org.apache.hadoop:hadoop-yarn-common:jar:2.7.3.2.6.5.0-292 is missing, no 
dependency information available
[INFO] 
[INFO] Reactor Summary:
[INFO]
[INFO] flink-shaded 8.0 ... SUCCESS [  2.122 s]
[INFO] flink-shaded-force-shading 8.0 . SUCCESS [  0.607 s]
[INFO] flink-shaded-asm-7 7.1-8.0 . SUCCESS [  0.667 s]
[INFO] flink-shaded-guava-18 18.0-8.0 . SUCCESS [  1.452 s]
[INFO] flink-shaded-netty-4 4.1.39.Final-8.0 .. SUCCESS [  4.597 s]
[INFO] flink-shaded-netty-tcnative-dynamic 2.0.25.Final-8.0 SUCCESS [  0.620 s]
[INFO] flink-shaded-jackson-parent 2.9.8-8.0 .. SUCCESS [  0.018 s]
[INFO] flink-shaded-jackson-2 2.9.8-8.0 ... SUCCESS [  0.914 s]
[INFO] flink-shaded-jackson-module-jsonSchema-2 2.9.8-8.0 . SUCCESS [  0.627 s]
[INFO] flink-shaded-hadoop-2 2.7.3.2.6.5.0-292-8.0  FAILURE [  0.047 s]
[INFO] flink-shaded-hadoop-2-uber 2.7.3.2.6.5.0-292-8.0 ... SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time:  11.947 s
[INFO] Finished at: 2019-09-03T16:52:59+02:00
[INFO] 
[WARNING] The requested profile "vendor-repos" could not be activated because 
it does not exist.
[ERROR] Failed to execute goal on project flink-shaded-hadoop-2: Could not 
resolve dependencies for project 
org.apache.flink:flink-shaded-hadoop-2:jar:2.7.3.2.6.5.0-292-8.0: The following 
artifacts could not be resolved: 
org.apache.hadoop:hadoop-common:jar:2.7.3.2.6.5.0-292, 
org.apache.hadoop:hadoop-hdfs:jar:2.7.3.2.6.5.0-292, 
org.apache.hadoop:hadoop-mapreduce-client-core:jar:2.7.3.2.6.5.0-292, 
org.apache.hadoop:hadoop-yarn-client:jar:2.7.3.2.6.5.0-292, 
org.apache.hadoop:hadoop-yarn-common:jar:2.7.3.2.6.5.0-292: Failure to find 
org.apache.hadoop:hadoop-common:jar:2.7.3.2.6.5.0-292 in 
https://repo.maven.apache.org/maven2 was cached in the local repository, 
resolution will not be reattempted until the update interval of central has 
elapsed or updates are forced -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
[ERROR]
[ERROR] After correcting the problems, you can resume the build with the command
[ERROR]   mvn  -rf :flink-shaded-hadoop-2
{code}
vendor-repos profile exists in Flink pom.xml file : 
[https://github.com/apache/flink/blob/3079d11913f153ec40c75afb5356fd3be1a1e550/pom.xml#L1037]



--
This message was sent by Atlassian Jira
(v8.3.2

Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-03 Thread Kostas Kloudas
Yes, I will do that Jark!

Kostas

On Tue, Sep 3, 2019 at 4:19 PM Jark Wu  wrote:
>
> Thanks Kostas for the quick fixing.
>
> However, I find that FLINK-13940 still target to 1.8.2 as a blocker. If I
> understand correctly, FLINK-13940 is aiming for a nicer and better solution
> in the future.
> So should we update the fixVersion of FLINK-13940?
>
> Best,
> Jark
>
> On Tue, 3 Sep 2019 at 21:33, Kostas Kloudas  wrote:
>
> > Thanks for waiting!
> >
> > A fix for FLINK-13940 has been merged on 1.8, 1.9 and the master under
> > FLINK-13941.
> >
> > Cheers,
> > Kostas
> >
> > On Tue, Sep 3, 2019 at 11:25 AM jincheng sun 
> > wrote:
> > >
> > > +1 FLINK-13940  is a
> > > blocker, due to loss data is very important bug, And great thanks for
> > > helping fix it  Kostas!
> > >
> > > Best, Jincheng
> > >
> > > Kostas Kloudas  于2019年9月2日周一 下午7:20写道:
> > >
> > > > Hi all,
> > > >
> > > > I think this should be also considered a blocker
> > > > https://issues.apache.org/jira/browse/FLINK-13940.
> > > > It is not a regression but it can result to data loss.
> > > >
> > > > I think I can have a quick fix by tomorrow.
> > > >
> > > > Cheers,
> > > > Kostas
> > > >
> > > > On Mon, Sep 2, 2019 at 12:01 PM jincheng sun  > >
> > > > wrote:
> > > > >
> > > > > Thanks for all of your feedback!
> > > > >
> > > > > Hi Jark, Glad to see that you are doing what RM should doing.
> > > > >
> > > > > Only one tips here is before the RC1 all the blocker should be
> > fixed, but
> > > > > othrers is nice to have. So you can decide when to prepare RC1 after
> > the
> > > > > blokcer is resolved.
> > > > >
> > > > > Feel free to tell me if you have any questions.
> > > > >
> > > > > Best,Jincheng
> > > > >
> > > > > Aljoscha Krettek  于2019年9月2日周一 下午5:03写道:
> > > > >
> > > > > > I cut a PR for FLINK-13586:
> > https://github.com/apache/flink/pull/9595
> > > > <
> > > > > > https://github.com/apache/flink/pull/9595>
> > > > > >
> > > > > > > On 2. Sep 2019, at 05:03, Yu Li  wrote:
> > > > > > >
> > > > > > > +1 for a 1.8.2 release, thanks for bringing this up Jincheng!
> > > > > > >
> > > > > > > Best Regards,
> > > > > > > Yu
> > > > > > >
> > > > > > >
> > > > > > > On Mon, 2 Sep 2019 at 09:19, Thomas Weise 
> > wrote:
> > > > > > >
> > > > > > >> +1 for the 1.8.2 release
> > > > > > >>
> > > > > > >> I marked https://issues.apache.org/jira/browse/FLINK-13586 for
> > this
> > > > > > >> release. It would be good to compensate for the backward
> > > > incompatible
> > > > > > >> change to ClosureCleaner that was introduced in 1.8.1, which
> > affects
> > > > > > >> downstream dependencies.
> > > > > > >>
> > > > > > >> Thanks,
> > > > > > >> Thomas
> > > > > > >>
> > > > > > >>
> > > > > > >> On Sun, Sep 1, 2019 at 5:10 PM jincheng sun <
> > > > sunjincheng...@gmail.com>
> > > > > > >> wrote:
> > > > > > >>
> > > > > > >>> Hi Jark,
> > > > > > >>>
> > > > > > >>> Glad to hear that you want to be the Release Manager of flink
> > > > 1.8.2.
> > > > > > >>> I believe that you will be a great RM, and I am very willing to
> > > > help
> > > > > > you
> > > > > > >>> with the final release in the final stages. :)
> > > > > > >>>
> > > > > > >>> The release of Apache Flink involves a number of tasks. For
> > > > details,
> > > > > > you
> > > > > > >>> can consult the documentation [1]. If you have any questions,
> > > > please
> > > > > > let
> > > > > > >> me
> > > > > > >>> know and let us work together.
> > > > > > >>>
> > > > > > >>>
> > > > > > >>>
> > > > > > >>
> > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release#CreatingaFlinkRelease-Checklisttoproceedtothenextstep.1
> > > > > > >>>
> > > > > > >>> Cheers,
> > > > > > >>> Jincheng
> > > > > > >>>
> > > > > > >>> Till Rohrmann  于2019年8月31日周六 上午12:59写道:
> > > > > > >>>
> > > > > >  +1 for a 1.8.2 bug fix release. Thanks for kicking this
> > > > discussion off
> > > > > >  Jincheng.
> > > > > > 
> > > > > >  Cheers,
> > > > > >  Till
> > > > > > 
> > > > > >  On Fri, Aug 30, 2019 at 6:45 PM Jark Wu 
> > wrote:
> > > > > > 
> > > > > > > Thanks Jincheng for bringing this up.
> > > > > > >
> > > > > > > +1 to the 1.8.2 release, because it already contains a
> > couple of
> > > > > >  important
> > > > > > > fixes and it has been a long time since 1.8.1 came out.
> > > > > > > I'm willing to help the community as much as possible. I'm
> > > > wondering
> > > > > > >>> if I
> > > > > > > can be the release manager of 1.8.2 or work with you together
> > > > > > >>> @Jincheng?
> > > > > > >
> > > > > > > Best,
> > > > > > > Jark
> > > > > > >
> > > > > > > On Fri, 30 Aug 2019 at 18:58, Hequn Cheng <
> > chenghe...@gmail.com>
> > > > > > >>> wrote:
> > > > > > >
> > > > > > >> Hi Jincheng,
> > > > > > >>
> > > > > > >> +1 for a 1.8.2 release.
> > > > > > >> Thanks a lot for raising the discussion. It wo

Re: [DISCUSS] Repository split

2019-09-03 Thread Robert Metzger
Thanks a lot for your summary Chesnay.
I agree with you that we have no consensus in the community for splitting
up the repository immediately, and I agree with you that we should have a
separate discussion about reducing the build time (which is already making
good progress).

Also, I will keep the thoughts about decentralising the Flink development
in the back of my head and bring it up again whenever I feel it's the right
time.


On Wed, Aug 14, 2019 at 2:26 PM Chesnay Schepler  wrote:

> Let's recap a bit:
>
> Several people have raised the argument that build times can be kept in
> check via other means (mostly differential builds via some means, be it
> custom scripts or switching to gradle). I will start a separate
> discussion thread on this topic, since it is a useful discussion in any
> case.
> I agree with this, and believe it is feasible to update the CI process
> to behave as if the repository was split.
>
>
> The suggestion of a "project split" within a single repository was
> brought up.
> This approach is a mixed bag; it avoids the downsides to the development
> process that multiple repositories would incur, but also only has few
> upsides. It seems primarily relevant for local development, where one
> might want to skip certain modules when running tests.
>
> There's no benefit from the CI side: since we're still limited to 1
> .travis.yml, whatever rules we want to set up (e.g., "do not test core
> if only connectors are modified") have to be handled by the CI scripts
> regardless of whether the project is split or not.
>
> Overall, I'd like to put this item on ice for the time being; the
> subsequent item is related, vastly more impactful and may also render
> this item obsolete.
>
>
> A major topic of discussion is that of the development process. It was
> pointed how that having a split repository makes the dev process more
> complicated, since certain changes turn into a 2 step process (merge to
> core, then merge to connectors). Others have pointed out that this may
> actually be an advantage, as it (to some extent) enforces that changes
> to core are also tested in core.
>
> I find myself more in the latter camp; it is all to easy for people to
> make a change to the core while making whatever adjustments to
> connectors to make things fit. A recent change to the ClosureCleaner in
> 1.8.0  comes to mind,
> which, with a split repo, may have resulted in build failures in the
> connectors project. (provided that the time-frame between the 2 merges
> is sufficiently large...) As Arvid pointed out, having to feel the pain
> that users have to go through may not be such a bad thing.
>
> This is a fundamental discussion as to whether we want to continue with
> a centralized development of all components.
>
> Robert also pointed out that such a split could result in us
> establishing entirely separate projects. We've had times in the past
> (like the first flink-ml library) where such a setup may have simplified
> things (back then we had lot's of contributors but no committer to
> shepherd the effort; a separate project could be more lenient when it
> comes to appointing new committers).
>
>
> @Robert We should have a SNAPSHOT dependency /somewhere/ in the
> connector repo, to detect issues (like the ClosureCleaner one) in a
> timely manner and to prepare for new features so that we can have a
> timely release after core, but not necessarily on the master branch.
>
> @Bowen I have implemented and deployed your suggestion to cancel Travis
> builds if the associated PR has been closed.
>
>
> On 07/08/2019 13:14, Chesnay Schepler wrote:
> > Hello everyone,
> >
> > The Flink project sees an ever-increasing amount of dev activity, both
> > in terms of reworked and new features.
> >
> > This is of course an excellent situation to be in, but we are getting
> > to a point where the associate downsides are becoming increasingly
> > troublesome.
> >
> > The ever increasing build times, in addition to unstable tests,
> > significantly slow down the develoment process.
> > Additionally, pull requests for smaller features frequently slip
> > through the crasks as they are being buried under a mountain of other
> > pull requests.
> >
> > As a result I'd like to start a discussion on splitting the Flink
> > repository.
> >
> > In this mail I will outline the core idea, and what problems I
> > currently envision.
> >
> > I'd specifically like to encourage those who were part of similar
> > initiatives in other projects to share the experiences and ideas.
> >
> >
> >General Idea
> >
> > For starters, the idea is to create a new repository for
> > "flink-connectors".
> > For the remainder of this mail, the current Flink repository is
> > referred to as "flink-main".
> >
> > There are also other candidates that we could discuss in the future,
> > like flink-libraries (the next top-priority repo to ease flink-ml
> > development), metric reporters, f

Re: [DISCUSS] Reducing build times

2019-09-03 Thread Robert Metzger
Hi all,

I wanted to give a short update on this:
- Arvid, Aljoscha and I have started working on a Gradle PoC, currently
working on making all modules compile and test with Gradle. We've also
identified some problematic areas (shading being the most obvious one)
which we will analyse as part of the PoC.
The goal is to see how much Gradle helps to parallelise our build, and to
avoid duplicate work (incremental builds).

- I am working on setting up a Flink testing infrastructure based on Azure
Pipelines, using more powerful hardware. Alibaba kindly provided me with
two 32 core machines (temporarily), and another company reached out to
privately, looking into options for cheap, fast machines :)
If nobody in the community disagrees, I am going to set up Azure Pipelines
with our apache/flink GitHub as a build infrastructure that exists next to
Flinkbot and flink-ci. I would like to make sure that Azure Pipelines is
equally or even more reliable than Travis, and I want to see what the
required maintenance work is.
On top of that, Azure Pipelines is a very feature-rich tool with a lot of
nice options for us to improve the build experience (statistics about tests
(flaky tests etc.), nice docker support, plenty of free build resources for
open source projects, ...)

Best,
Robert





On Mon, Aug 19, 2019 at 5:12 PM Robert Metzger  wrote:

> Hi all,
>
> I have summarized all arguments mentioned so far + some additional
> research into a Wiki page here:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125309279
>
> I'm happy to hear further comments on my summary! I'm pretty sure we can
> find more pro's and con's for the different options.
>
> My opinion after looking at the options:
>
>- Flink relies on an outdated build tool (Maven), while a good
>alternative is well-established (gradle), and will likely provide a much
>better CI and local build experience through incremental build and cached
>intermediates.
>Scripting around Maven, or splitting modules / test execution /
>repositories won't solve this problem. We should rather spend the effort in
>migrating to a modern build tool which will provide us benefits in the long
>run.
>- Flink relies on a fairly slow build service (Travis CI), while
>simply putting more money onto the problem could cut the build time at
>least in half.
>We should consider using a build service that provides bigger machines
>to solve our build time problem.
>
> My opinion is based on many assumptions (gradle is actually as fast as
> promised (haven't used it before), we can build Flink with gradle, we find
> sponsors for bigger build machines) that we need to test first through PoCs.
>
> Best,
> Robert
>
>
>
>
> On Mon, Aug 19, 2019 at 10:26 AM Aljoscha Krettek 
> wrote:
>
>> I did a quick test: a normal "mvn clean install -DskipTests
>> -Drat.skip=true -Dmaven.javadoc.skip=true -Punsafe-mapr-repo” on my machine
>> takes about 14 minutes. After removing all mentions of maven-shade-plugin
>> the build time goes down to roughly 11.5 minutes. (Obviously the resulting
>> Flink won’t work, because some expected stuff is not packaged and most of
>> the end-to-end tests use the shade plugin to package the jars for testing.
>>
>> Aljoscha
>>
>> > On 18. Aug 2019, at 19:52, Robert Metzger  wrote:
>> >
>> > Hi all,
>> >
>> > I wanted to understand the impact of the hardware we are using for
>> running
>> > our tests. Each travis worker has 2 virtual cores, and 7.5 gb memory
>> [1].
>> > They are using Google Cloud Compute Engine *n1-standard-2* instances.
>> > Running a full "mvn clean verify" takes *03:32 h* on such a machine
>> type.
>> >
>> > Running the same workload on a 32 virtual cores, 64 gb machine, takes
>> *1:21
>> > h*.
>> >
>> > What is interesting are the per-module build time differences.
>> > Modules which are parallelizing tests well greatly benefit from the
>> > additional cores:
>> > "flink-tests" 36:51 min vs 4:33 min
>> > "flink-runtime" 23:41 min vs 3:47 min
>> > "flink-table-planner" 15:54 min vs 3:13 min
>> >
>> > On the other hand, we have modules which are not parallel at all:
>> > "flink-connector-kafka": 16:32 min vs 15:19 min
>> > "flink-connector-kafka-0.11": 9:52 min vs 7:46 min
>> > Also, the checkstyle plugin is not scaling at all.
>> >
>> > Chesnay reported some significant speedups by reusing forks.
>> > I don't know how much effort it would be to make the Kafka tests
>> > parallelizable. In total, they currently use 30 minutes on the big
>> machine
>> > (while 31 CPUs are idling :) )
>> >
>> > Let me know what you think about these results. If the community is
>> > generally interested in further investigating into that direction, I
>> could
>> > look into software to orchestrate this, as well as sponsors for such an
>> > infrastructure.
>> >
>> > [1] https://docs.travis-ci.com/user/reference/overview/
>> >
>> >
>> > On Fri, Aug 16, 2019 at 3:27 PM Chesnay Schepler 
>> wrote:
>> >
>> >> @Alj

Re: [DISCUSS] Reducing build times

2019-09-03 Thread Arvid Heise
+1 for Azure Pipelines, had very good experiences in the past with it and
the open source and payment models are much better.

The upcoming Github CI/CD seems also like a promising alternative, but from
the first looks, it seems like the small brother of Azure Pipeline. So, any
effort going into Azure Pipelines is probably also going into this
direction.

Best,

Arvid

On Tue, Sep 3, 2019 at 6:57 PM Robert Metzger  wrote:

> Hi all,
>
> I wanted to give a short update on this:
> - Arvid, Aljoscha and I have started working on a Gradle PoC, currently
> working on making all modules compile and test with Gradle. We've also
> identified some problematic areas (shading being the most obvious one)
> which we will analyse as part of the PoC.
> The goal is to see how much Gradle helps to parallelise our build, and to
> avoid duplicate work (incremental builds).
>
> - I am working on setting up a Flink testing infrastructure based on Azure
> Pipelines, using more powerful hardware. Alibaba kindly provided me with
> two 32 core machines (temporarily), and another company reached out to
> privately, looking into options for cheap, fast machines :)
> If nobody in the community disagrees, I am going to set up Azure Pipelines
> with our apache/flink GitHub as a build infrastructure that exists next to
> Flinkbot and flink-ci. I would like to make sure that Azure Pipelines is
> equally or even more reliable than Travis, and I want to see what the
> required maintenance work is.
> On top of that, Azure Pipelines is a very feature-rich tool with a lot of
> nice options for us to improve the build experience (statistics about tests
> (flaky tests etc.), nice docker support, plenty of free build resources for
> open source projects, ...)
>
> Best,
> Robert
>
>
>
>
>
> On Mon, Aug 19, 2019 at 5:12 PM Robert Metzger 
> wrote:
>
> > Hi all,
> >
> > I have summarized all arguments mentioned so far + some additional
> > research into a Wiki page here:
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=125309279
> >
> > I'm happy to hear further comments on my summary! I'm pretty sure we can
> > find more pro's and con's for the different options.
> >
> > My opinion after looking at the options:
> >
> >- Flink relies on an outdated build tool (Maven), while a good
> >alternative is well-established (gradle), and will likely provide a
> much
> >better CI and local build experience through incremental build and
> cached
> >intermediates.
> >Scripting around Maven, or splitting modules / test execution /
> >repositories won't solve this problem. We should rather spend the
> effort in
> >migrating to a modern build tool which will provide us benefits in
> the long
> >run.
> >- Flink relies on a fairly slow build service (Travis CI), while
> >simply putting more money onto the problem could cut the build time at
> >least in half.
> >We should consider using a build service that provides bigger machines
> >to solve our build time problem.
> >
> > My opinion is based on many assumptions (gradle is actually as fast as
> > promised (haven't used it before), we can build Flink with gradle, we
> find
> > sponsors for bigger build machines) that we need to test first through
> PoCs.
> >
> > Best,
> > Robert
> >
> >
> >
> >
> > On Mon, Aug 19, 2019 at 10:26 AM Aljoscha Krettek 
> > wrote:
> >
> >> I did a quick test: a normal "mvn clean install -DskipTests
> >> -Drat.skip=true -Dmaven.javadoc.skip=true -Punsafe-mapr-repo” on my
> machine
> >> takes about 14 minutes. After removing all mentions of
> maven-shade-plugin
> >> the build time goes down to roughly 11.5 minutes. (Obviously the
> resulting
> >> Flink won’t work, because some expected stuff is not packaged and most
> of
> >> the end-to-end tests use the shade plugin to package the jars for
> testing.
> >>
> >> Aljoscha
> >>
> >> > On 18. Aug 2019, at 19:52, Robert Metzger 
> wrote:
> >> >
> >> > Hi all,
> >> >
> >> > I wanted to understand the impact of the hardware we are using for
> >> running
> >> > our tests. Each travis worker has 2 virtual cores, and 7.5 gb memory
> >> [1].
> >> > They are using Google Cloud Compute Engine *n1-standard-2* instances.
> >> > Running a full "mvn clean verify" takes *03:32 h* on such a machine
> >> type.
> >> >
> >> > Running the same workload on a 32 virtual cores, 64 gb machine, takes
> >> *1:21
> >> > h*.
> >> >
> >> > What is interesting are the per-module build time differences.
> >> > Modules which are parallelizing tests well greatly benefit from the
> >> > additional cores:
> >> > "flink-tests" 36:51 min vs 4:33 min
> >> > "flink-runtime" 23:41 min vs 3:47 min
> >> > "flink-table-planner" 15:54 min vs 3:13 min
> >> >
> >> > On the other hand, we have modules which are not parallel at all:
> >> > "flink-connector-kafka": 16:32 min vs 15:19 min
> >> > "flink-connector-kafka-0.11": 9:52 min vs 7:46 min
> >> > Also, the checkstyle plugin is not scaling at all.
> >> >
> >> > C

[jira] [Created] (FLINK-13946) Remove deactivated JobSession-related code.

2019-09-03 Thread Kostas Kloudas (Jira)
Kostas Kloudas created FLINK-13946:
--

 Summary: Remove deactivated JobSession-related code.
 Key: FLINK-13946
 URL: https://issues.apache.org/jira/browse/FLINK-13946
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Affects Versions: 1.9.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas


This issue refers to removing the code related to job session as described in 
[FLINK-2097|https://issues.apache.org/jira/browse/FLINK-2097].  The feature is 
deactivated, as pointed by the comment 
[here|https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java#L285]
 and it complicates the code paths related to job submission, namely the 
lifecycle of the Remote and LocalExecutors.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: Potential block size issue with S3 binary files

2019-09-03 Thread Ken Krugler
Hi Arvid,

Thanks for following up…

> On Sep 2, 2019, at 3:09 AM, Arvid Heise  wrote:
> 
> Hi Ken,
> 
> that's indeed a very odd issue that you found. I had a hard time to connect
> block size with S3 in the beginning and had to dig into the code. I still
> cannot fully understand why you got two different block size values from
> the S3 FileSytem. Looking into Hadoop code, I found the following snippet
> 
> public long getDefaultBlockSize() {
>return this.getConf().getLong("fs.s3.block.size", 67108864L);
> }
> 
> I don't see a quick fix for that. Looks like mismatching configurations on
> different machines. We should probably have some sanity checks to detect
> mismatching block header information, but unfortunately, the block header
> is very primitive and doesn't allow for sophisticated checks.

Yes - and what made it harder to debug is that when the incorrect block size 
was set to 32MB, sometimes the first split that got processed was split[1] 
(second actual split). In that situation, the block info record was where the 
code expected it to be (since it was reading from 64MB - record size), so it 
all looked OK, but then the first record processed would be at an incorrect 
position.

> So let's focus on implementation solutions:
> 1. I gather that you need to have support for data that uses
> IOReadableWritable. So moving to more versatile solutions like Avro or
> Parquet is unfortunately not an option. I'd still recommend that for any
> new project.

See below - it’s not a requirement, but certainly easier.

> 2. Storing block size into the repeated headers in a file introduces a kind
> of hen-and-egg problem. You need the block size to read the header to get
> the block size.
> 3. Storing block size once in first block would require additional seeks
> and depending of the degree of parallelism would put a rather high load on
> the data node with the first block.
> 4. Storing block size in metadata would be ideal but with the wide range of
> possible filesystems most likely not doable.
> 5. Explicitly setting the block size would be the most reliable technique
> but quite user-unfriendly, especially, if multiple deployment environment
> use different block sizes.
> 6. Adding a periodic marker seems indeed as the most robust option and
> adding 20 bytes every 2k bytes doesn't seem too bad for me. The downside is
> that seeking can take a long time for larger records as it will linearly
> scan through the bytes at the block start. However, if you really want to
> support copying files across file systems with different block sizes, this
> would be the only option.
> 7. Deprecating sequence format is a good option in the long run. I simply
> don't see that for productive code the performance gain over using Avro or
> Parquet would be noticeable and getting a solid base concept for schema
> evolution will pay off quickly from my experience.
> 
> @Ken, could you please describe for what kind of data do you use the
> sequence format? I like to understand your requirements. How large are your
> records (OoM)? Are they POJOs? Do you craft them manually?

They are hand-crafted POJOs, typically about 1.2K/record.

It’s a mapping from words to feature vectors (and some additional data).

I then use them as backing store with a cache (in a downstream job) as 
side-input to a map function that creates word vectors from large collections 
of text. This is why the serialized format was appealing, as it’s then 
relatively straightforward to use the existing deserialization logic when 
reading from my custom Java code.

So yes, I could switch to Parquet with some additional work, I’ve used that 
format before in Hadoop jobs, but I’ve never tried directly reading from it.

Ditto for Avro. Note that in my use case I don’t have to worry about evolving 
the schema, as it’s just transient data used in the middle of a batch workflow 
(to avoid really, really big joins that take forever).

Regards,

— Ken



> On Sun, Sep 1, 2019 at 9:42 PM Stephan Ewen  wrote:
> 
>> Sounds reasonable.
>> 
>> I am adding Arvid to the thread - IIRC he authored that tool in his
>> Stratosphere days. And my a stroke of luck, he is now working on Flink
>> again.
>> 
>> @Arvid - what are your thoughts on Ken's suggestions?
>> 
>> On Fri, Aug 30, 2019 at 8:56 PM Ken Krugler 
>> wrote:
>> 
>>> Hi Stephan (switching to dev list),
>>> 
>>> On Aug 29, 2019, at 2:52 AM, Stephan Ewen  wrote:
>>> 
>>> That is a good point.
>>> 
>>> Which way would you suggest to go? Not relying on the FS block size at
>>> all, but using a fix (configurable) block size?
>>> 
>>> 
>>> There’s value to not requiring a fixed block size, as then a file that’s
>>> moved between different file systems can be read using whatever block
>> size
>>> is optimal for that system.
>>> 
>>> Hadoop handles this in sequence files by storing a unique “sync marker”
>>> value in the file header (essentially a 16 byte UUID), injecting one of
>>> these every 2K bytes or so (in between 

Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-03 Thread Yun Tang
Hi Yijie

I can see that Pulsar becomes more and more popular recently and very glad to 
see more people willing to contribute to Flink ecosystem.

Before any further discussion, would you please give some explanation of the 
relationship between this thread to current existing JIRAs of pulsar source [1] 
and sink [2] connector? Will the contribution contains part of those PRs or 
totally different implementation?

[1] https://issues.apache.org/jira/browse/FLINK-9641
[2] https://issues.apache.org/jira/browse/FLINK-9168

Best
Yun Tang

From: Yijie Shen 
Sent: Tuesday, September 3, 2019 13:57
To: dev@flink.apache.org 
Subject: [DISCUSS] Contribute Pulsar Flink connector back to Flink

Dear Flink Community!

I would like to open the discussion of contributing Pulsar Flink
connector [0] back to Flink.

## A brief introduction to Apache Pulsar

Apache Pulsar[1] is a multi-tenant, high-performance distributed
pub-sub messaging system. Pulsar includes multiple features such as
native support for multiple clusters in a Pulsar instance, with
seamless geo-replication of messages across clusters, very low publish
and end-to-end latency, seamless scalability to over a million topics,
and guaranteed message delivery with persistent message storage
provided by Apache BookKeeper. Nowadays, Pulsar has been adopted by
more and more companies[2].

## The status of Pulsar Flink connector

The Pulsar Flink connector we are planning to contribute is built upon
Flink 1.9.0 and Pulsar 2.4.0. The main features are:
- Pulsar as a streaming source with exactly-once guarantee.
- Sink streaming results to Pulsar with at-least-once semantics. (We
would update this to exactly-once as well when Pulsar gets all
transaction features ready in its 2.5.0 version)
- Build upon Flink new Table API Type system (FLIP-37[3]), and can
automatically (de)serialize messages with the help of Pulsar schema.
- Integrate with Flink new Catalog API (FLIP-30[4]), which enables the
use of Pulsar topics as tables in Table API as well as SQL client.

## Reference
[0] https://github.com/streamnative/pulsar-flink
[1] https://pulsar.apache.org/
[2] https://pulsar.apache.org/en/powered-by/
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
[4] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs


Best,
Yijie Shen


[jira] [Created] (FLINK-13947) Check Hive shim serialization in Hive UDF wrapper classes and test coverage

2019-09-03 Thread Xuefu Zhang (Jira)
Xuefu Zhang created FLINK-13947:
---

 Summary: Check Hive shim serialization in Hive UDF wrapper classes 
and test coverage
 Key: FLINK-13947
 URL: https://issues.apache.org/jira/browse/FLINK-13947
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Reporter: Xuefu Zhang
Assignee: Xuefu Zhang
 Fix For: 1.10.0


Including 3.1.0, 3.1.1, and 3.1.2.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Bowen Li
Hi Kurt,

Re: > What I want to propose is we can merge #3 and #4, make them both under
>"catalog" concept, by extending catalog function to make it have ability to
>have built-in catalog functions. Some benefits I can see from this
approach:
>1. We don't have to introduce new concept like external built-in functions.
>Actually I don't see a full story about how to treat a built-in functions,
and it
>seems a little bit disrupt with catalog. As a result, you have to make
some restriction
>like "hive built-in functions can only be used when current catalog is
hive catalog".

Yes, I've unified #3 and #4 but it seems I didn't update some part of the
doc. I've modified those sections, and they are up to date now.

In short, now built-in function of external systems are defined as a
special kind of catalog function in Flink, and handled by Flink as
following:
- An external built-in function must be associated with a catalog for the
purpose of decoupling flink-table and external systems.
- It always resides in front of catalog functions in ambiguous function
reference order, just like in its own external system
- It is a special catalog function that doesn’t have a schema/database
namespace
- It goes thru the same instantiation logic as other user defined catalog
functions in the external system

Please take another look at the doc, and let me know if you have more
questions.


On Tue, Sep 3, 2019 at 7:28 AM Timo Walther  wrote:

> Hi Kurt,
>
> it should not affect the functions and operations we currently have in
> SQL. It just categorizes the available built-in functions. It is kind of
> an orthogonal concept to the catalog API but built-in functions deserve
> this special kind of treatment. CatalogFunction still fits perfectly in
> there because the regular catalog object resolution logic is not
> affected. So tables and functions are resolved in the same way but with
> built-in functions that have priority as in the original design.
>
> Regards,
> Timo
>
>
> On 03.09.19 15:26, Kurt Young wrote:
> > Does this only affect the functions and operations we currently have in
> SQL
> > and
> > have no effect on tables, right? Looks like this is an orthogonal concept
> > with Catalog?
> > If the answer are both yes, then the catalog function will be a weird
> > concept?
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Sep 3, 2019 at 8:10 PM Danny Chan  wrote:
> >
> >> The way you proposed are basically the same as what Calcite does, I
> think
> >> we are in the same line.
> >>
> >> Best,
> >> Danny Chan
> >> 在 2019年9月3日 +0800 PM7:57,Timo Walther ,写道:
> >>> This sounds exactly as the module approach I mentioned, no?
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>> On 03.09.19 13:42, Danny Chan wrote:
>  Thanks Bowen for bring up this topic, I think it’s a useful
> >> refactoring to make our function usage more user friendly.
>  For the topic of how to organize the builtin operators and operators
> >> of Hive, here is a solution from Apache Calcite, the Calcite way is to
> make
> >> every dialect operators a “Library”, user can specify which libraries
> they
> >> want to use for a sql query. The builtin operators always comes as the
> >> first class objects and the others are used from the order they appears.
> >> Maybe you can take a reference.
>  [1]
> >>
> https://github.com/apache/calcite/commit/9a4eab5240d96379431d14a1ac33bfebaf6fbb28
>  Best,
>  Danny Chan
>  在 2019年8月28日 +0800 AM2:50,Bowen Li ,写道:
> > Hi folks,
> >
> > I'd like to kick off a discussion on reworking Flink's
> >> FunctionCatalog.
> > It's critically helpful to improve function usability in SQL.
> >
> >
> >>
> https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing
> > In short, it:
> > - adds support for precise function reference with fully/partially
> > qualified name
> > - redefines function resolution order for ambiguous function
> >> reference
> > - adds support for Hive's rich built-in functions (support for Hive
> >> user
> > defined functions was already added in 1.9.0)
> > - clarifies the concept of temporary functions
> >
> > Would love to hear your thoughts.
> >
> > Bowen
> >>>
>
>


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Bowen Li
Hi Jingsong,

Re> 1.Hive built-in functions is an intermediate solution. So we should
> not introduce interfaces to influence the framework. To make
> Flink itself more powerful, we should implement the functions
> we need to add.

Yes, please see the doc.

Re> 2.Non-flink built-in functions are easy for users to change their
> behavior. If we support some flink built-in functions in the
> future but act differently from non-flink built-in, this will lead to
> changes in user behavior.

There's no such concept as "external built-in functions" any more. Built-in
functions of external systems will be treated as special catalog functions.

Re> Another question is, does this fallback include all
> hive built-in functions? As far as I know, some hive functions
> have some hacky. If possible, can we start with a white list?
> Once we implement some functions to flink built-in, we can
> also update the whitelist.

Yes, that's something we thought of too. I don't think it's super critical
to the scope of this FLIP, thus I'd like to leave it to future efforts as a
nice-to-have feature.


On Tue, Sep 3, 2019 at 1:37 PM Bowen Li  wrote:

> Hi Kurt,
>
> Re: > What I want to propose is we can merge #3 and #4, make them both
> under
> >"catalog" concept, by extending catalog function to make it have ability
> to
> >have built-in catalog functions. Some benefits I can see from this
> approach:
> >1. We don't have to introduce new concept like external built-in
> functions.
> >Actually I don't see a full story about how to treat a built-in
> functions, and it
> >seems a little bit disrupt with catalog. As a result, you have to make
> some restriction
> >like "hive built-in functions can only be used when current catalog is
> hive catalog".
>
> Yes, I've unified #3 and #4 but it seems I didn't update some part of the
> doc. I've modified those sections, and they are up to date now.
>
> In short, now built-in function of external systems are defined as a
> special kind of catalog function in Flink, and handled by Flink as
> following:
> - An external built-in function must be associated with a catalog for the
> purpose of decoupling flink-table and external systems.
> - It always resides in front of catalog functions in ambiguous function
> reference order, just like in its own external system
> - It is a special catalog function that doesn’t have a schema/database
> namespace
> - It goes thru the same instantiation logic as other user defined catalog
> functions in the external system
>
> Please take another look at the doc, and let me know if you have more
> questions.
>
>
> On Tue, Sep 3, 2019 at 7:28 AM Timo Walther  wrote:
>
>> Hi Kurt,
>>
>> it should not affect the functions and operations we currently have in
>> SQL. It just categorizes the available built-in functions. It is kind of
>> an orthogonal concept to the catalog API but built-in functions deserve
>> this special kind of treatment. CatalogFunction still fits perfectly in
>> there because the regular catalog object resolution logic is not
>> affected. So tables and functions are resolved in the same way but with
>> built-in functions that have priority as in the original design.
>>
>> Regards,
>> Timo
>>
>>
>> On 03.09.19 15:26, Kurt Young wrote:
>> > Does this only affect the functions and operations we currently have in
>> SQL
>> > and
>> > have no effect on tables, right? Looks like this is an orthogonal
>> concept
>> > with Catalog?
>> > If the answer are both yes, then the catalog function will be a weird
>> > concept?
>> >
>> > Best,
>> > Kurt
>> >
>> >
>> > On Tue, Sep 3, 2019 at 8:10 PM Danny Chan  wrote:
>> >
>> >> The way you proposed are basically the same as what Calcite does, I
>> think
>> >> we are in the same line.
>> >>
>> >> Best,
>> >> Danny Chan
>> >> 在 2019年9月3日 +0800 PM7:57,Timo Walther ,写道:
>> >>> This sounds exactly as the module approach I mentioned, no?
>> >>>
>> >>> Regards,
>> >>> Timo
>> >>>
>> >>> On 03.09.19 13:42, Danny Chan wrote:
>>  Thanks Bowen for bring up this topic, I think it’s a useful
>> >> refactoring to make our function usage more user friendly.
>>  For the topic of how to organize the builtin operators and operators
>> >> of Hive, here is a solution from Apache Calcite, the Calcite way is to
>> make
>> >> every dialect operators a “Library”, user can specify which libraries
>> they
>> >> want to use for a sql query. The builtin operators always comes as the
>> >> first class objects and the others are used from the order they
>> appears.
>> >> Maybe you can take a reference.
>>  [1]
>> >>
>> https://github.com/apache/calcite/commit/9a4eab5240d96379431d14a1ac33bfebaf6fbb28
>>  Best,
>>  Danny Chan
>>  在 2019年8月28日 +0800 AM2:50,Bowen Li ,写道:
>> > Hi folks,
>> >
>> > I'd like to kick off a discussion on reworking Flink's
>> >> FunctionCatalog.
>> > It's critically helpful to improve function usability in SQL.
>> >
>> >
>> >>
>> https://docs.google.com/document/d/1w3HZG

[jira] [Created] (FLINK-13948) Fix loss of state for Identical Windows merging after initial merge

2019-09-03 Thread Scott Waller (Jira)
Scott Waller created FLINK-13948:


 Summary: Fix loss of state for Identical Windows merging after 
initial merge
 Key: FLINK-13948
 URL: https://issues.apache.org/jira/browse/FLINK-13948
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.8.0
Reporter: Scott Waller


In the situation where there is a merging window, if we've performed a merge 
into a new window, and another window comes into the set that is exactly 
identical to the window created by the merge, the state window is replaced by 
the incoming window, and we lose the previous state.

 

Example:

Window (1,2) comes in to an empty set. The mapping is (1,2) -> (1,2)

Window (1,3) comes into the set,  we merge. The mapping is (1,3) -> (1,2)

Window (1,3) comes into the set, we don't merge. The new mapping is (1,3) -> 
(1,3). This mapping will cause us to lose the previous state when its persisted.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Bowen Li
Hi Timo,

Re> 1) We should not have the restriction "hive built-in functions can only
> be used when current catalog is hive catalog". Switching a catalog
> should only have implications on the cat.db.object resolution but not
> functions. It would be quite convinient for users to use Hive built-ins
> even if they use a Confluent schema registry or just the in-memory
catalog.

There might be a misunderstanding here.

First of all, Hive built-in functions are not part of Flink built-in
functions, they are catalog functions, thus if the current catalog is not a
HiveCatalog but, say, a schema registry catalog, ambiguous functions
reference just shouldn't be resolved to a different catalog.

Second, Hive built-in functions can potentially be referenced across
catalog, but it doesn't have db namespace and we currently just don't have
a SQL syntax for it. It can be enabled when such a SQL syntax is defined,
e.g. "catalog::function", but it's out of scope of this FLIP.

2) I would propose to have separate concepts for catalog and built-in
functions. In particular it would be nice to modularize built-in
functions. Some built-in functions are very crucial (like AS, CAST,
MINUS), others are more optional but stable (MD5, CONCAT_WS), and maybe
we add more experimental functions in the future or function for some
special application area (Geo functions, ML functions). A data platform
team might not want to make every built-in function available. Or a
function module like ML functions is in a different Maven module.

I think this is orthogonal to this FLIP, especially we don't have the
"external built-in functions" anymore and currently the built-in function
category remains untouched.

But just to share some thoughts on the proposal, I'm not sure about it:
- I don't know if any other databases handle built-in functions like that.
Maybe you can give some examples? IMHO, built-in functions are system info
and should be deterministic, not depending on loaded libraries. Geo
functions should be either built-in already or just libraries functions,
and library functions can be adapted to catalog APIs or of some other
syntax to use
- I don't know if all use cases stand, and many can be achieved by other
approaches too. E.g. experimental functions can be taken good care of by
documentations, annotations, etc
- the proposal basically introduces some concept like a pluggable built-in
function catalog, despite the already existing catalog APIs
- it brings in even more complicated scenarios to the design. E.g. how do
you handle built-in functions in different modules but different names?

In short, I'm not sure if it really stands and it looks like an overkill to
me. I'd rather not go to that route. Related discussion can be on its own
thread.

3) Following the suggestion above, we can have a separate discovery
mechanism for built-in functions. Instead of just going through a static
list like in BuiltInFunctionDefinitions, a platform team should be able
to select function modules like
catalogManager.setFunctionModules(CoreFunctions, GeoFunctions,
HiveFunctions) or via service discovery;

Same as above. I'll leave it to its own thread.

re > 3) Dawid and I discussed the resulution order again. I agree with Kurt
> that we should unify built-in function (external or internal) under a
> common layer. However, the resolution order should be:
>   1. built-in functions
>   2. temporary functions
>   3. regular catalog resolution logic
> Otherwise a temporary function could cause clashes with Flink's built-in
> functions. If you take a look at other vendors, like SQL Server they
> also do not allow to overwrite built-in functions.

”I agree with Kurt that we should unify built-in function (external or
internal) under a common layer.“ <- I don't think this is what Kurt means.
Kurt and I are in favor of unifying built-in functions of external systems
and catalog functions. Did you type a mistake?

Besides, I'm not sure about the resolution order you proposed. Temporary
functions have a lifespan over a session and are only visible to the
session owner, they are unique to each user, and users create them on
purpose to be the highest priority in order to overwrite system info
(built-in functions in this case).

In your case, why would users name a temporary function the same as a
built-in function then? Since using that name in ambiguous function
reference will always be resolved to built-in functions, creating a
same-named temp function would be meaningless in the end.


On Tue, Sep 3, 2019 at 1:44 PM Bowen Li  wrote:

> Hi Jingsong,
>
> Re> 1.Hive built-in functions is an intermediate solution. So we should
> > not introduce interfaces to influence the framework. To make
> > Flink itself more powerful, we should implement the functions
> > we need to add.
>
> Yes, please see the doc.
>
> Re> 2.Non-flink built-in functions are easy for users to change their
> > behavior. If we support some flink built-in functions in the
> > future but act di

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Bowen Li
Hi all,

Thanks for the feedback. Just a kindly reminder that the [Proposal] section
in the google doc was updated, please take a look first and let me know if
you have more questions.

On Tue, Sep 3, 2019 at 4:57 PM Bowen Li  wrote:

> Hi Timo,
>
> Re> 1) We should not have the restriction "hive built-in functions can
> only
> > be used when current catalog is hive catalog". Switching a catalog
> > should only have implications on the cat.db.object resolution but not
> > functions. It would be quite convinient for users to use Hive built-ins
> > even if they use a Confluent schema registry or just the in-memory
> catalog.
>
> There might be a misunderstanding here.
>
> First of all, Hive built-in functions are not part of Flink built-in
> functions, they are catalog functions, thus if the current catalog is not a
> HiveCatalog but, say, a schema registry catalog, ambiguous functions
> reference just shouldn't be resolved to a different catalog.
>
> Second, Hive built-in functions can potentially be referenced across
> catalog, but it doesn't have db namespace and we currently just don't have
> a SQL syntax for it. It can be enabled when such a SQL syntax is defined,
> e.g. "catalog::function", but it's out of scope of this FLIP.
>
> 2) I would propose to have separate concepts for catalog and built-in
> functions. In particular it would be nice to modularize built-in
> functions. Some built-in functions are very crucial (like AS, CAST,
> MINUS), others are more optional but stable (MD5, CONCAT_WS), and maybe
> we add more experimental functions in the future or function for some
> special application area (Geo functions, ML functions). A data platform
> team might not want to make every built-in function available. Or a
> function module like ML functions is in a different Maven module.
>
> I think this is orthogonal to this FLIP, especially we don't have the
> "external built-in functions" anymore and currently the built-in function
> category remains untouched.
>
> But just to share some thoughts on the proposal, I'm not sure about it:
> - I don't know if any other databases handle built-in functions like that.
> Maybe you can give some examples? IMHO, built-in functions are system info
> and should be deterministic, not depending on loaded libraries. Geo
> functions should be either built-in already or just libraries functions,
> and library functions can be adapted to catalog APIs or of some other
> syntax to use
> - I don't know if all use cases stand, and many can be achieved by other
> approaches too. E.g. experimental functions can be taken good care of by
> documentations, annotations, etc
> - the proposal basically introduces some concept like a pluggable built-in
> function catalog, despite the already existing catalog APIs
> - it brings in even more complicated scenarios to the design. E.g. how do
> you handle built-in functions in different modules but different names?
>
> In short, I'm not sure if it really stands and it looks like an overkill
> to me. I'd rather not go to that route. Related discussion can be on its
> own thread.
>
> 3) Following the suggestion above, we can have a separate discovery
> mechanism for built-in functions. Instead of just going through a static
> list like in BuiltInFunctionDefinitions, a platform team should be able
> to select function modules like
> catalogManager.setFunctionModules(CoreFunctions, GeoFunctions,
> HiveFunctions) or via service discovery;
>
> Same as above. I'll leave it to its own thread.
>
> re > 3) Dawid and I discussed the resulution order again. I agree with
> Kurt
> > that we should unify built-in function (external or internal) under a
> > common layer. However, the resolution order should be:
> >   1. built-in functions
> >   2. temporary functions
> >   3. regular catalog resolution logic
> > Otherwise a temporary function could cause clashes with Flink's built-in
> > functions. If you take a look at other vendors, like SQL Server they
> > also do not allow to overwrite built-in functions.
>
> ”I agree with Kurt that we should unify built-in function (external or
> internal) under a common layer.“ <- I don't think this is what Kurt means.
> Kurt and I are in favor of unifying built-in functions of external systems
> and catalog functions. Did you type a mistake?
>
> Besides, I'm not sure about the resolution order you proposed. Temporary
> functions have a lifespan over a session and are only visible to the
> session owner, they are unique to each user, and users create them on
> purpose to be the highest priority in order to overwrite system info
> (built-in functions in this case).
>
> In your case, why would users name a temporary function the same as a
> built-in function then? Since using that name in ambiguous function
> reference will always be resolved to built-in functions, creating a
> same-named temp function would be meaningless in the end.
>
>
> On Tue, Sep 3, 2019 at 1:44 PM Bowen Li  wrote:
>
>> Hi Jingsong,
>>
>> Re> 1

Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-03 Thread Sijie Guo
Hi Yun,

Since I was the main driver behind FLINK-9641 and FLINK-9168, let me try to
add more context on this.

FLINK-9641 and FLINK-9168 was created for bringing Pulsar as source and
sink for Flink. The integration was done with Flink 1.6.0. We sent out pull
requests about a year ago and we ended up maintaining those connectors in
Pulsar for Pulsar users to use Flink to process event streams in Pulsar.
(See https://github.com/apache/pulsar/tree/master/pulsar-flink). The Flink
1.6 integration is pretty simple and there is no schema considerations.

In the past year, we have made a lot of changes in Pulsar and brought
Pulsar schema as the first-class citizen in Pulsar. We also integrated with
other computing engines for processing Pulsar event streams with Pulsar
schema.

It led us to rethink how to integrate with Flink in the best way. Then we
reimplement the pulsar-flink connectors from the ground up with schema and
bring table API and catalog API as the first-class citizen in the
integration. With that being said, in the new pulsar-flink implementation,
you can register pulsar as a flink catalog and query / process the event
streams using Flink SQL.

This is an example about how to use Pulsar as a Flink catalog:
https://github.com/streamnative/pulsar-flink/blob/3eeddec5625fc7dddc3f8a3ec69f72e1614ca9c9/README.md#use-pulsar-catalog

Yijie has also written a blog post explaining why we re-implement the flink
connector with Flink 1.9 and what are the changes we made in the new
connector:
https://medium.com/streamnative/use-apache-pulsar-as-streaming-table-with-8-lines-of-code-39033a93947f

We believe Pulsar is not just a simple data sink or source for Flink. It
actually can be a fully integrated streaming data storage for Flink in many
areas (sink, source, schema/catalog and state). The combination of Flink
and Pulsar can create a great streaming warehouse architecture for
streaming-first, unified data processing. Since we are talking to
contribute Pulsar integration to Flink here, we are also dedicated to
maintain, improve and evolve the integration with Flink to help the users
who use both Flink and Pulsar.

Hope this give you a bit more background about the pulsar flink
integration. Let me know what are your thoughts.

Thanks,
Sijie


On Tue, Sep 3, 2019 at 11:54 AM Yun Tang  wrote:

> Hi Yijie
>
> I can see that Pulsar becomes more and more popular recently and very glad
> to see more people willing to contribute to Flink ecosystem.
>
> Before any further discussion, would you please give some explanation of
> the relationship between this thread to current existing JIRAs of pulsar
> source [1] and sink [2] connector? Will the contribution contains part of
> those PRs or totally different implementation?
>
> [1] https://issues.apache.org/jira/browse/FLINK-9641
> [2] https://issues.apache.org/jira/browse/FLINK-9168
>
> Best
> Yun Tang
> 
> From: Yijie Shen 
> Sent: Tuesday, September 3, 2019 13:57
> To: dev@flink.apache.org 
> Subject: [DISCUSS] Contribute Pulsar Flink connector back to Flink
>
> Dear Flink Community!
>
> I would like to open the discussion of contributing Pulsar Flink
> connector [0] back to Flink.
>
> ## A brief introduction to Apache Pulsar
>
> Apache Pulsar[1] is a multi-tenant, high-performance distributed
> pub-sub messaging system. Pulsar includes multiple features such as
> native support for multiple clusters in a Pulsar instance, with
> seamless geo-replication of messages across clusters, very low publish
> and end-to-end latency, seamless scalability to over a million topics,
> and guaranteed message delivery with persistent message storage
> provided by Apache BookKeeper. Nowadays, Pulsar has been adopted by
> more and more companies[2].
>
> ## The status of Pulsar Flink connector
>
> The Pulsar Flink connector we are planning to contribute is built upon
> Flink 1.9.0 and Pulsar 2.4.0. The main features are:
> - Pulsar as a streaming source with exactly-once guarantee.
> - Sink streaming results to Pulsar with at-least-once semantics. (We
> would update this to exactly-once as well when Pulsar gets all
> transaction features ready in its 2.5.0 version)
> - Build upon Flink new Table API Type system (FLIP-37[3]), and can
> automatically (de)serialize messages with the help of Pulsar schema.
> - Integrate with Flink new Catalog API (FLIP-30[4]), which enables the
> use of Pulsar topics as tables in Table API as well as SQL client.
>
> ## Reference
> [0] https://github.com/streamnative/pulsar-flink
> [1] https://pulsar.apache.org/
> [2] https://pulsar.apache.org/en/powered-by/
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
> [4]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs
>
>
> Best,
> Yijie Shen
>


Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-09-03 Thread Rong Rong
Thanks for putting together the proposal @Timo and sorry for joining the
discussion thread late.

I also share the same thought with Fabian on the ease-of-use front. However
I was wondering if we need to start the expression design with them?
One thing I can think of is: is it possible to support "alias" later on in
the Expression once we collect enough feedback from the users?

IMO, It is always easier to expand the APIs later than reducing them.

Cheers,
Rong

On Mon, Sep 2, 2019 at 2:37 AM Timo Walther  wrote:

> Hi all,
>
> I see a majority votes for `lit(12)` so let's adopt that in the FLIP.
> The `$("field")` would consider Fabian's concerns so I would vote for
> keeping it like that.
>
> One more question for native English speakers, is it acceptable to have
> `isEqual` instead of `isEqualTo` and `isGreater` instead of
> `isGreaterThan`?
>
> If there are no more concerns, I will start a voting thread soon.
>
> Thanks,
> Timo
>
>
> On 29.08.19 12:24, Fabian Hueske wrote:
> > Hi,
> >
> > IMO, we should define what we would like to optimize for:
> > 1) easy-to-get-started experience or
> > 2) productivity and ease-of-use
> >
> > While 1) is certainly important, I think we should put more emphasis on
> > goal 2).
> > That's why I favor as short as possible names for commonly used methods
> > like column references and literals/values.
> > These are used *many* times in *every* query.
> > Every user who uses the API for more than 30 mins will know what $() or
> v()
> > (or whatever method names we come up with) are used for and everybody who
> > doesn't know can have a look at the JavaDocs or regular documentation.
> > Shorter method names are not only about increasing the speed to write a
> > query, but also reducing clutter that needs to be parsed to understand an
> > expression / query.
> >
> > I'm OK with descriptive names for other expressions like call(),
> > isEqualTo() (although these could be the commonly used eq(), gte(),
> etc.),
> > and so on but column references (and literals) should be as lightweight
> as
> > possible, IMO.
> >
> > Cheers,
> > Fabian
> >
> > Am Do., 29. Aug. 2019 um 12:15 Uhr schrieb Timo Walther <
> twal...@apache.org
> >> :
> >> I'm fine with `lit()`. Regarding `col()`, I initially suggested `ref()`
> >> but I think Fabian and Dawid liked single char methods for the most
> >> commonly used expressions.
> >>
> >> Btw, what is your opinion on the names of commonly used methods such as
> >> `isEqual`, `isGreaterOrEqual`? Are we fine with the current naming.
> >> In theory we could make them shorter like `equals(), greaterOrEqual()`
> >> or even shorter to `eq`, `gt`, `gte`?
> >>
> >> Thanks,
> >> Timo
> >>
> >>
> >> On 29.08.19 11:51, Aljoscha Krettek wrote:
> >>> Overall, this is a very nice development that should also simplify the
> >> code base once we deprecate the expression parser!
> >>> Regarding method names, I agree with Seth that values/literals should
> >> use something like “lit()”. I also think that for column references we
> >> could use “col()” to make it clear that it is a column reference. What
> do
> >> you think?
> >>> Aljoscha
> >>>
>  On 28. Aug 2019, at 15:59, Seth Wiesman  wrote:
> 
>  I would prefer ‘lit()’ over  ‘val()’ since val is a keyword in Scala.
> >> Assuming the intention is to make the dsl ergonomic for Scala
> developers.
>  Seth
> 
> > On Aug 28, 2019, at 7:58 AM, Timo Walther 
> wrote:
> >
> > Hi David,
> >
> > thanks for your feedback. I was also skeptical about 1 char method
> >> names, I restored the `val()` method for now. If you read literature
> such
> >> as Wikipedia [1]: "literal is a notation for representing a fixed value
> in
> >> source code. Almost all programming languages have notations for atomic
> >> values". So they are also talking about "values".
> > Alteratively we could use `lit(12)` or `l(12)` but I'm not convinced
> >> that this is better.
> > Regards,
> > Timo
> >
> > [1] https://en.wikipedia.org/wiki/Literal_(computer_programming)
> >
> >> On 27.08.19 22:10, David Anderson wrote:
> >> TImo,
> >>
> >> While it's not exactly pretty, I don't mind the $("field")
> construct.
> >> It's not particularly surprising. The v() method troubles me more;
> it
> >> looks mysterious. I think we would do better to have something more
> >> explicit. val() isn't much better -- val("foo") could be interpreted
> >> to mean the value of the "foo" column, or a literal string.
> >>
> >> David
> >>
> >>> On Tue, Aug 27, 2019 at 5:45 PM Timo Walther 
> >> wrote:
> >>> Hi David,
> >>>
> >>> thanks for your feedback. With the current design, the DSL would be
> >> free
> >>> of any ambiguity but it is definitely more verbose esp. around
> >> defining
> >>> values.
> >>>
> >>> I would be happy about further suggestions that make the DSL more
> >>> readable. I'm also not sure if we go for `$()` and 

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-03 Thread Becket Qin
Hi Timo, Dawid and Aljoscha,

Thanks for clarifying the goals. It is very helpful to understand the
motivation here. It would be great to add them to the FLIP wiki.

I agree that the current FLIP design achieves the two goals it wants to
achieve. But I am trying to see is if the current approach is the most
reasonable approach.

Please let me check if I understand this correctly. From end users'
perspective, they will do the following when they want to configure their
Flink Jobs.
1. Create a Configuration instance, and call setters of Configuration with
the ConfigOptions defined in different components.
2. The Configuration created in step 1 will be passed around, and each
component will just exact their own options from it.
3. ExecutionConfig, CheckpointConfig (and other Config classes) will become
a Configurable, which is responsible for extracting the configuration
values from the Configuration set by users in step 1.

The confusion I had was that in step 1, how users are going to set the
configs for the ExecutionConfig / CheckpointConfig? There may be two ways:
a) Users will call setConfigurable(ExectionConfigConfigurableOption,
"config1:v1,config2:v2,config3:v3"), i.e. the entire ExecutionConfig is
exposed as a Configurable to the users.
b) Users will call setInteger(MAX_PARALLELISM, 1),
setInteger(LATENCY_TRACKING_INTERVAL, 1000), etc.. This means users will
set individual ConfigOptions for the ExecutionConfig. And they do not see
ExecutionConfig as a Configurable.

I assume we are following b), then do we need to expose Configurable to the
users in this FLIP? My concern is that the Configurable may be related to
other mechanism such as plugin which we have not really thought through in
this FLIP.

I know Becket at least has some thoughts about immutability and loading
> objects via the configuration but maybe they could be put into a follow-up
> FLIP if they are needed.

I am perfectly fine to leave something out of the scope of this FLIP to
later FLIPs. But I think it is important to avoid introducing something in
this FLIP that will be shortly changed by the follow-up FLIPs.

Thanks,

Jiangjie (Becket) Qin

On Tue, Sep 3, 2019 at 8:47 PM Aljoscha Krettek  wrote:

> Hi,
>
> I think it’s important to keep in mind the original goals of this FLIP and
> not let the scope grow indefinitely. As I recall it, the goals are:
>
>  - Extend the ConfigOption system enough to allow the Table API to
> configure options that are right now only available on
> CheckpointingOptions, ExecutionConfig, and StreamExecutionEnvironment. We
> also want to do this without manually having to “forward” all the available
> configuration options by introducing equivalent setters in the Table API
>
>  - Do the above while keeping in mind that eventually we want to allow
> users to configure everything from either the flink-conf.yaml, vie command
> line parameters, or via a Configuration.
>
> I think the FLIP achieves this, with the added side goals of making
> validation a part of ConfigOptions, making them type safe, and making the
> validation constraints documentable (via automatic doc generation.) All
> this without breaking backwards compatibility, if I’m not mistaken.
>
> I think we should first agree what the basic goals are so that we can
> quickly converge to consensus on this FLIP because it blocks other
> people/work. Among other things FLIP-59 depends on this. What are other
> opinions that people have? I know Becket at least has some thoughts about
> immutability and loading objects via the configuration but maybe they could
> be put into a follow-up FLIP if they are needed.
>
> Also, I had one thought on the interaction of this FLIP-54 and FLIP-59
> when it comes to naming. I think eventually it makes sense to have a common
> interface for things that are configurable from a Configuration (FLIP-59
> introduces the first batch of this). It seems natural to call this
> interface Configurable. That’s a problem for this FLIP-54 because we also
> introduce a Configurable. Maybe the thing that we introduce here should be
> called ConfigObject or ConfigStruct to highlight that it has a more narrow
> focus and is really only a POJO for holding a bunch of config options that
> have to go together. What do you think?
>
> Best,
> Aljoscha
>
> > On 3. Sep 2019, at 14:08, Timo Walther  wrote:
> >
> > Hi Danny,
> >
> > yes, this FLIP covers all the building blocks we need also for
> unification of the DDL properties.
> >
> > Regards,
> > Timo
> >
> >
> > On 03.09.19 13:45, Danny Chan wrote:
> >>> with the new SQL DDL
> >> based on properties as well as more connectors and formats coming up,
> >> unified configuration becomes more important
> >>
> >> I Cann’t agree more, do you think we can unify the config options key
> format here for all the DDL properties ?
> >>
> >> Best,
> >> Danny Chan
> >> 在 2019年8月16日 +0800 PM10:12,dev@flink.apache.org,写道:
> >>> with the new SQL DDL
> >>> based on properties as well as more connectors an

[jira] [Created] (FLINK-13949) Delete deduplicating JobVertexDetailsInfo.VertexTaskDetail

2019-09-03 Thread lining (Jira)
lining created FLINK-13949:
--

 Summary: Delete deduplicating JobVertexDetailsInfo.VertexTaskDetail
 Key: FLINK-13949
 URL: https://issues.apache.org/jira/browse/FLINK-13949
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST
Reporter: lining


As there is SubtaskExecutionAttemptDetailsInfo for subtask, so we can use it 
replace JobVertexDetailsInfo.VertexTaskDetail.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Kurt Young
Thanks Timo & Bowen for the feedback. Bowen was right, my proposal is the
same
as Bowen's. But after thinking about it, I'm currently lean to Timo's
suggestion.

The reason is backward compatibility. If we follow Bowen's approach, let's
say we
first find function in Flink's built-in functions, and then hive's
built-in. For example, `foo`
is not supported by Flink, but hive has such built-in function. So user
will have hive's
behavior for function `foo`. And in next release, Flink realize this is a
very popular function
and add it into Flink's built-in functions, but with different behavior as
hive's. So in next
release, the behavior changes.

With Timo's approach, IIUC user have to tell the framework explicitly what
kind of
built-in functions he would like to use. He can just tell framework to
abandon Flink's built-in
functions, and use hive's instead. User can only choose between them, but
not use
them at the same time. I think this approach is more predictable.

Best,
Kurt


On Wed, Sep 4, 2019 at 8:00 AM Bowen Li  wrote:

> Hi all,
>
> Thanks for the feedback. Just a kindly reminder that the [Proposal] section
> in the google doc was updated, please take a look first and let me know if
> you have more questions.
>
> On Tue, Sep 3, 2019 at 4:57 PM Bowen Li  wrote:
>
> > Hi Timo,
> >
> > Re> 1) We should not have the restriction "hive built-in functions can
> > only
> > > be used when current catalog is hive catalog". Switching a catalog
> > > should only have implications on the cat.db.object resolution but not
> > > functions. It would be quite convinient for users to use Hive built-ins
> > > even if they use a Confluent schema registry or just the in-memory
> > catalog.
> >
> > There might be a misunderstanding here.
> >
> > First of all, Hive built-in functions are not part of Flink built-in
> > functions, they are catalog functions, thus if the current catalog is
> not a
> > HiveCatalog but, say, a schema registry catalog, ambiguous functions
> > reference just shouldn't be resolved to a different catalog.
> >
> > Second, Hive built-in functions can potentially be referenced across
> > catalog, but it doesn't have db namespace and we currently just don't
> have
> > a SQL syntax for it. It can be enabled when such a SQL syntax is defined,
> > e.g. "catalog::function", but it's out of scope of this FLIP.
> >
> > 2) I would propose to have separate concepts for catalog and built-in
> > functions. In particular it would be nice to modularize built-in
> > functions. Some built-in functions are very crucial (like AS, CAST,
> > MINUS), others are more optional but stable (MD5, CONCAT_WS), and maybe
> > we add more experimental functions in the future or function for some
> > special application area (Geo functions, ML functions). A data platform
> > team might not want to make every built-in function available. Or a
> > function module like ML functions is in a different Maven module.
> >
> > I think this is orthogonal to this FLIP, especially we don't have the
> > "external built-in functions" anymore and currently the built-in function
> > category remains untouched.
> >
> > But just to share some thoughts on the proposal, I'm not sure about it:
> > - I don't know if any other databases handle built-in functions like
> that.
> > Maybe you can give some examples? IMHO, built-in functions are system
> info
> > and should be deterministic, not depending on loaded libraries. Geo
> > functions should be either built-in already or just libraries functions,
> > and library functions can be adapted to catalog APIs or of some other
> > syntax to use
> > - I don't know if all use cases stand, and many can be achieved by other
> > approaches too. E.g. experimental functions can be taken good care of by
> > documentations, annotations, etc
> > - the proposal basically introduces some concept like a pluggable
> built-in
> > function catalog, despite the already existing catalog APIs
> > - it brings in even more complicated scenarios to the design. E.g. how do
> > you handle built-in functions in different modules but different names?
> >
> > In short, I'm not sure if it really stands and it looks like an overkill
> > to me. I'd rather not go to that route. Related discussion can be on its
> > own thread.
> >
> > 3) Following the suggestion above, we can have a separate discovery
> > mechanism for built-in functions. Instead of just going through a static
> > list like in BuiltInFunctionDefinitions, a platform team should be able
> > to select function modules like
> > catalogManager.setFunctionModules(CoreFunctions, GeoFunctions,
> > HiveFunctions) or via service discovery;
> >
> > Same as above. I'll leave it to its own thread.
> >
> > re > 3) Dawid and I discussed the resulution order again. I agree with
> > Kurt
> > > that we should unify built-in function (external or internal) under a
> > > common layer. However, the resolution order should be:
> > >   1. built-in functions
> > >   2. temporary functions
> > >   

Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-03 Thread Jark Wu
Hi all,

I am very happy to say that all the blockers and critical issues for
release 1.8.2 have been resolved!

Great thanks to everyone who contribute to the release.

I hope to create the first RC on Sep 05, at 10:00 UTC+8.
If you find some other blocker issues for 1.8.2, please let me know before
that to account for it for the 1.8.2 release.

Before cutting the RC1, I think it has chance to merge the
ClosureCleaner.clean fix (FLINK-13586), because the review and travis are
both passed.

Cheers,
Jark

On Wed, 4 Sep 2019 at 00:45, Kostas Kloudas  wrote:

> Yes, I will do that Jark!
>
> Kostas
>
> On Tue, Sep 3, 2019 at 4:19 PM Jark Wu  wrote:
> >
> > Thanks Kostas for the quick fixing.
> >
> > However, I find that FLINK-13940 still target to 1.8.2 as a blocker. If I
> > understand correctly, FLINK-13940 is aiming for a nicer and better
> solution
> > in the future.
> > So should we update the fixVersion of FLINK-13940?
> >
> > Best,
> > Jark
> >
> > On Tue, 3 Sep 2019 at 21:33, Kostas Kloudas  wrote:
> >
> > > Thanks for waiting!
> > >
> > > A fix for FLINK-13940 has been merged on 1.8, 1.9 and the master under
> > > FLINK-13941.
> > >
> > > Cheers,
> > > Kostas
> > >
> > > On Tue, Sep 3, 2019 at 11:25 AM jincheng sun  >
> > > wrote:
> > > >
> > > > +1 FLINK-13940 
> is a
> > > > blocker, due to loss data is very important bug, And great thanks for
> > > > helping fix it  Kostas!
> > > >
> > > > Best, Jincheng
> > > >
> > > > Kostas Kloudas  于2019年9月2日周一 下午7:20写道:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I think this should be also considered a blocker
> > > > > https://issues.apache.org/jira/browse/FLINK-13940.
> > > > > It is not a regression but it can result to data loss.
> > > > >
> > > > > I think I can have a quick fix by tomorrow.
> > > > >
> > > > > Cheers,
> > > > > Kostas
> > > > >
> > > > > On Mon, Sep 2, 2019 at 12:01 PM jincheng sun <
> sunjincheng...@gmail.com
> > > >
> > > > > wrote:
> > > > > >
> > > > > > Thanks for all of your feedback!
> > > > > >
> > > > > > Hi Jark, Glad to see that you are doing what RM should doing.
> > > > > >
> > > > > > Only one tips here is before the RC1 all the blocker should be
> > > fixed, but
> > > > > > othrers is nice to have. So you can decide when to prepare RC1
> after
> > > the
> > > > > > blokcer is resolved.
> > > > > >
> > > > > > Feel free to tell me if you have any questions.
> > > > > >
> > > > > > Best,Jincheng
> > > > > >
> > > > > > Aljoscha Krettek  于2019年9月2日周一 下午5:03写道:
> > > > > >
> > > > > > > I cut a PR for FLINK-13586:
> > > https://github.com/apache/flink/pull/9595
> > > > > <
> > > > > > > https://github.com/apache/flink/pull/9595>
> > > > > > >
> > > > > > > > On 2. Sep 2019, at 05:03, Yu Li  wrote:
> > > > > > > >
> > > > > > > > +1 for a 1.8.2 release, thanks for bringing this up Jincheng!
> > > > > > > >
> > > > > > > > Best Regards,
> > > > > > > > Yu
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, 2 Sep 2019 at 09:19, Thomas Weise 
> > > wrote:
> > > > > > > >
> > > > > > > >> +1 for the 1.8.2 release
> > > > > > > >>
> > > > > > > >> I marked https://issues.apache.org/jira/browse/FLINK-13586
> for
> > > this
> > > > > > > >> release. It would be good to compensate for the backward
> > > > > incompatible
> > > > > > > >> change to ClosureCleaner that was introduced in 1.8.1, which
> > > affects
> > > > > > > >> downstream dependencies.
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >> Thomas
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Sun, Sep 1, 2019 at 5:10 PM jincheng sun <
> > > > > sunjincheng...@gmail.com>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >>> Hi Jark,
> > > > > > > >>>
> > > > > > > >>> Glad to hear that you want to be the Release Manager of
> flink
> > > > > 1.8.2.
> > > > > > > >>> I believe that you will be a great RM, and I am very
> willing to
> > > > > help
> > > > > > > you
> > > > > > > >>> with the final release in the final stages. :)
> > > > > > > >>>
> > > > > > > >>> The release of Apache Flink involves a number of tasks. For
> > > > > details,
> > > > > > > you
> > > > > > > >>> can consult the documentation [1]. If you have any
> questions,
> > > > > please
> > > > > > > let
> > > > > > > >> me
> > > > > > > >>> know and let us work together.
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>>
> > > > > > > >>
> > > > > > >
> > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release#CreatingaFlinkRelease-Checklisttoproceedtothenextstep.1
> > > > > > > >>>
> > > > > > > >>> Cheers,
> > > > > > > >>> Jincheng
> > > > > > > >>>
> > > > > > > >>> Till Rohrmann  于2019年8月31日周六
> 上午12:59写道:
> > > > > > > >>>
> > > > > > >  +1 for a 1.8.2 bug fix release. Thanks for kicking this
> > > > > discussion off
> > > > > > >  Jincheng.
> > > > > > > 
> > > > > > >  Cheers,
> > > > > > >  Till
> > > > > > > 
> > > > > > >  On Fri, Aug 30, 2019 at 6:45 PM Ja

[jira] [Created] (FLINK-13950) should report an error if hive table partition keys are not in the last schema fields

2019-09-03 Thread Hongtao Zhang (Jira)
Hongtao Zhang created FLINK-13950:
-

 Summary: should report an error if hive table partition keys are 
not in the last schema fields
 Key: FLINK-13950
 URL: https://issues.apache.org/jira/browse/FLINK-13950
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.9.0, 1.10.0
Reporter: Hongtao Zhang


when creating a hive table via hive catalog API, we will build the table schema 
first and then optionally we can choose one or more fields as the partition 
keys when user want to create partition table. 

 

according to hive partition rule, the partition keys should be  last fields of 
the table. but now hive catalog create table API didn't report an error when 
user specify the first / middle fields of the table schema.

 

we should report an error for this scenario



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-03 Thread jincheng sun
Thanks for the udpate Jark!

I have add the new version 1.8.3 in JIRA, could you please remark the
JIRAs(Such as FLINK-13689) which we do not want merge into the 1.8.2
release :)

 You are right, I think FLINK-13586 is better to be contained in 1.8.2
release!

Thanks,
Jincheng


Jark Wu  于2019年9月4日周三 上午10:15写道:

> Hi all,
>
> I am very happy to say that all the blockers and critical issues for
> release 1.8.2 have been resolved!
>
> Great thanks to everyone who contribute to the release.
>
> I hope to create the first RC on Sep 05, at 10:00 UTC+8.
> If you find some other blocker issues for 1.8.2, please let me know before
> that to account for it for the 1.8.2 release.
>
> Before cutting the RC1, I think it has chance to merge the
> ClosureCleaner.clean fix (FLINK-13586), because the review and travis are
> both passed.
>
> Cheers,
> Jark
>
> On Wed, 4 Sep 2019 at 00:45, Kostas Kloudas  wrote:
>
> > Yes, I will do that Jark!
> >
> > Kostas
> >
> > On Tue, Sep 3, 2019 at 4:19 PM Jark Wu  wrote:
> > >
> > > Thanks Kostas for the quick fixing.
> > >
> > > However, I find that FLINK-13940 still target to 1.8.2 as a blocker.
> If I
> > > understand correctly, FLINK-13940 is aiming for a nicer and better
> > solution
> > > in the future.
> > > So should we update the fixVersion of FLINK-13940?
> > >
> > > Best,
> > > Jark
> > >
> > > On Tue, 3 Sep 2019 at 21:33, Kostas Kloudas 
> wrote:
> > >
> > > > Thanks for waiting!
> > > >
> > > > A fix for FLINK-13940 has been merged on 1.8, 1.9 and the master
> under
> > > > FLINK-13941.
> > > >
> > > > Cheers,
> > > > Kostas
> > > >
> > > > On Tue, Sep 3, 2019 at 11:25 AM jincheng sun <
> sunjincheng...@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > > +1 FLINK-13940 
> > is a
> > > > > blocker, due to loss data is very important bug, And great thanks
> for
> > > > > helping fix it  Kostas!
> > > > >
> > > > > Best, Jincheng
> > > > >
> > > > > Kostas Kloudas  于2019年9月2日周一 下午7:20写道:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I think this should be also considered a blocker
> > > > > > https://issues.apache.org/jira/browse/FLINK-13940.
> > > > > > It is not a regression but it can result to data loss.
> > > > > >
> > > > > > I think I can have a quick fix by tomorrow.
> > > > > >
> > > > > > Cheers,
> > > > > > Kostas
> > > > > >
> > > > > > On Mon, Sep 2, 2019 at 12:01 PM jincheng sun <
> > sunjincheng...@gmail.com
> > > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > Thanks for all of your feedback!
> > > > > > >
> > > > > > > Hi Jark, Glad to see that you are doing what RM should doing.
> > > > > > >
> > > > > > > Only one tips here is before the RC1 all the blocker should be
> > > > fixed, but
> > > > > > > othrers is nice to have. So you can decide when to prepare RC1
> > after
> > > > the
> > > > > > > blokcer is resolved.
> > > > > > >
> > > > > > > Feel free to tell me if you have any questions.
> > > > > > >
> > > > > > > Best,Jincheng
> > > > > > >
> > > > > > > Aljoscha Krettek  于2019年9月2日周一 下午5:03写道:
> > > > > > >
> > > > > > > > I cut a PR for FLINK-13586:
> > > > https://github.com/apache/flink/pull/9595
> > > > > > <
> > > > > > > > https://github.com/apache/flink/pull/9595>
> > > > > > > >
> > > > > > > > > On 2. Sep 2019, at 05:03, Yu Li  wrote:
> > > > > > > > >
> > > > > > > > > +1 for a 1.8.2 release, thanks for bringing this up
> Jincheng!
> > > > > > > > >
> > > > > > > > > Best Regards,
> > > > > > > > > Yu
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, 2 Sep 2019 at 09:19, Thomas Weise 
> > > > wrote:
> > > > > > > > >
> > > > > > > > >> +1 for the 1.8.2 release
> > > > > > > > >>
> > > > > > > > >> I marked
> https://issues.apache.org/jira/browse/FLINK-13586
> > for
> > > > this
> > > > > > > > >> release. It would be good to compensate for the backward
> > > > > > incompatible
> > > > > > > > >> change to ClosureCleaner that was introduced in 1.8.1,
> which
> > > > affects
> > > > > > > > >> downstream dependencies.
> > > > > > > > >>
> > > > > > > > >> Thanks,
> > > > > > > > >> Thomas
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> On Sun, Sep 1, 2019 at 5:10 PM jincheng sun <
> > > > > > sunjincheng...@gmail.com>
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >>> Hi Jark,
> > > > > > > > >>>
> > > > > > > > >>> Glad to hear that you want to be the Release Manager of
> > flink
> > > > > > 1.8.2.
> > > > > > > > >>> I believe that you will be a great RM, and I am very
> > willing to
> > > > > > help
> > > > > > > > you
> > > > > > > > >>> with the final release in the final stages. :)
> > > > > > > > >>>
> > > > > > > > >>> The release of Apache Flink involves a number of tasks.
> For
> > > > > > details,
> > > > > > > > you
> > > > > > > > >>> can consult the documentation [1]. If you have any
> > questions,
> > > > > > please
> > > > > > > > let
> > > > > > > > >> me
> > > > > > > > >>> know and let us work together.
> > > > > > > > >>>
> >

[jira] [Created] (FLINK-13951) Unable to call limit without sort for batch mode

2019-09-03 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-13951:
--

 Summary: Unable to call limit without sort for batch mode
 Key: FLINK-13951
 URL: https://issues.apache.org/jira/browse/FLINK-13951
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.9.0
Reporter: Jeff Zhang


Here's the sample code:  tenv.sql(select * from a).fetch(n)

 
{code:java}

Fail to run sql command: select * from a
org.apache.flink.table.api.ValidationException: A limit operation must be 
preceded by a sort operation.
  at 
org.apache.flink.table.operations.utils.factories.SortOperationFactory.validateAndGetChildSort(SortOperationFactory.java:117)
  at 
org.apache.flink.table.operations.utils.factories.SortOperationFactory.createLimitWithFetch(SortOperationFactory.java:102)
  at 
org.apache.flink.table.operations.utils.OperationTreeBuilder.limitWithFetch(OperationTreeBuilder.java:388)
  at org.apache.flink.table.api.internal.TableImpl.fetch(TableImpl.java:406) 
{code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Xuefu Z
>From what I have seen, there are a couple of focal disagreements:

1. Resolution order: temp function --> flink built-in function --> catalog
function vs flink built-in function --> temp function -> catalog function.
2. "External" built-in functions: how to treat built-in functions in
external system and how users reference them

For #1, I agree with Bowen that temp function needs to be at the highest
priority because that's how a user might overwrite a built-in function
without referencing a persistent, overwriting catalog function with a fully
qualified name. Putting built-in functions at the highest priority
eliminates that usage.

For #2, I saw a general agreement on referencing "external" built-in
functions such as those in Hive needs to be explicit and deterministic even
though different approaches are proposed. To limit the scope and simply the
usage, it seems making sense to me to introduce special syntax for user  to
explicitly reference an external built-in function such as hive1::sqrt or
hive1._built_in.sqrt. This is a DML syntax matching nicely Catalog API call
hive1.getFunction(ObjectPath functionName) where the database name is
absent for bulit-in functions available in that catalog hive1. I understand
that Bowen's original proposal was trying to avoid this, but this could
turn out to be a clean and simple solution.

(Timo's modular approach is great way to "expand" Flink's built-in function
set, which seems orthogonal and complementary to this, which could be
tackled in further future work.)

I'd be happy to hear further thoughts on the two points.

Thanks,
Xuefu

On Tue, Sep 3, 2019 at 7:11 PM Kurt Young  wrote:

> Thanks Timo & Bowen for the feedback. Bowen was right, my proposal is the
> same
> as Bowen's. But after thinking about it, I'm currently lean to Timo's
> suggestion.
>
> The reason is backward compatibility. If we follow Bowen's approach, let's
> say we
> first find function in Flink's built-in functions, and then hive's
> built-in. For example, `foo`
> is not supported by Flink, but hive has such built-in function. So user
> will have hive's
> behavior for function `foo`. And in next release, Flink realize this is a
> very popular function
> and add it into Flink's built-in functions, but with different behavior as
> hive's. So in next
> release, the behavior changes.
>
> With Timo's approach, IIUC user have to tell the framework explicitly what
> kind of
> built-in functions he would like to use. He can just tell framework to
> abandon Flink's built-in
> functions, and use hive's instead. User can only choose between them, but
> not use
> them at the same time. I think this approach is more predictable.
>
> Best,
> Kurt
>
>
> On Wed, Sep 4, 2019 at 8:00 AM Bowen Li  wrote:
>
> > Hi all,
> >
> > Thanks for the feedback. Just a kindly reminder that the [Proposal]
> section
> > in the google doc was updated, please take a look first and let me know
> if
> > you have more questions.
> >
> > On Tue, Sep 3, 2019 at 4:57 PM Bowen Li  wrote:
> >
> > > Hi Timo,
> > >
> > > Re> 1) We should not have the restriction "hive built-in functions can
> > > only
> > > > be used when current catalog is hive catalog". Switching a catalog
> > > > should only have implications on the cat.db.object resolution but not
> > > > functions. It would be quite convinient for users to use Hive
> built-ins
> > > > even if they use a Confluent schema registry or just the in-memory
> > > catalog.
> > >
> > > There might be a misunderstanding here.
> > >
> > > First of all, Hive built-in functions are not part of Flink built-in
> > > functions, they are catalog functions, thus if the current catalog is
> > not a
> > > HiveCatalog but, say, a schema registry catalog, ambiguous functions
> > > reference just shouldn't be resolved to a different catalog.
> > >
> > > Second, Hive built-in functions can potentially be referenced across
> > > catalog, but it doesn't have db namespace and we currently just don't
> > have
> > > a SQL syntax for it. It can be enabled when such a SQL syntax is
> defined,
> > > e.g. "catalog::function", but it's out of scope of this FLIP.
> > >
> > > 2) I would propose to have separate concepts for catalog and built-in
> > > functions. In particular it would be nice to modularize built-in
> > > functions. Some built-in functions are very crucial (like AS, CAST,
> > > MINUS), others are more optional but stable (MD5, CONCAT_WS), and maybe
> > > we add more experimental functions in the future or function for some
> > > special application area (Geo functions, ML functions). A data platform
> > > team might not want to make every built-in function available. Or a
> > > function module like ML functions is in a different Maven module.
> > >
> > > I think this is orthogonal to this FLIP, especially we don't have the
> > > "external built-in functions" anymore and currently the built-in
> function
> > > category remains untouched.
> > >
> > > But just to share some thoughts on the proposal, I

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread JingsongLee
Thank you for your wonderful points.

I like timo's proposal to enrich built-in functions to flexible function
 modules (For example, the financial model is useful to bank system).
 But I agree with bowen, I don't think hive functions deserves be a
 function module. I think all function modules should be flink built-in
 functions. In this way, we can control their standardization, rather
 than some controversial functions like hive.

About Kurt's concern, yes, every addition of flink's function
 changes user behavior. But in the near future, we'll cover all
 of hive's functions (in the white list). So, if the final form does
 not have hive functions. sooner or later, this behavioral change will
 come. So do we need to let users choose?

Back to the goal of hive built-in, I always thought it was just an
 intermediate solution. Do we need to provide hive built-in
 functions mode to users in the future?

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2019年9月4日(星期三) 10:11
To:dev 
Subject:Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

Thanks Timo & Bowen for the feedback. Bowen was right, my proposal is the
same
as Bowen's. But after thinking about it, I'm currently lean to Timo's
suggestion.

The reason is backward compatibility. If we follow Bowen's approach, let's
say we
first find function in Flink's built-in functions, and then hive's
built-in. For example, `foo`
is not supported by Flink, but hive has such built-in function. So user
will have hive's
behavior for function `foo`. And in next release, Flink realize this is a
very popular function
and add it into Flink's built-in functions, but with different behavior as
hive's. So in next
release, the behavior changes.

With Timo's approach, IIUC user have to tell the framework explicitly what
kind of
built-in functions he would like to use. He can just tell framework to
abandon Flink's built-in
functions, and use hive's instead. User can only choose between them, but
not use
them at the same time. I think this approach is more predictable.

Best,
Kurt


On Wed, Sep 4, 2019 at 8:00 AM Bowen Li  wrote:

> Hi all,
>
> Thanks for the feedback. Just a kindly reminder that the [Proposal] section
> in the google doc was updated, please take a look first and let me know if
> you have more questions.
>
> On Tue, Sep 3, 2019 at 4:57 PM Bowen Li  wrote:
>
> > Hi Timo,
> >
> > Re> 1) We should not have the restriction "hive built-in functions can
> > only
> > > be used when current catalog is hive catalog". Switching a catalog
> > > should only have implications on the cat.db.object resolution but not
> > > functions. It would be quite convinient for users to use Hive built-ins
> > > even if they use a Confluent schema registry or just the in-memory
> > catalog.
> >
> > There might be a misunderstanding here.
> >
> > First of all, Hive built-in functions are not part of Flink built-in
> > functions, they are catalog functions, thus if the current catalog is
> not a
> > HiveCatalog but, say, a schema registry catalog, ambiguous functions
> > reference just shouldn't be resolved to a different catalog.
> >
> > Second, Hive built-in functions can potentially be referenced across
> > catalog, but it doesn't have db namespace and we currently just don't
> have
> > a SQL syntax for it. It can be enabled when such a SQL syntax is defined,
> > e.g. "catalog::function", but it's out of scope of this FLIP.
> >
> > 2) I would propose to have separate concepts for catalog and built-in
> > functions. In particular it would be nice to modularize built-in
> > functions. Some built-in functions are very crucial (like AS, CAST,
> > MINUS), others are more optional but stable (MD5, CONCAT_WS), and maybe
> > we add more experimental functions in the future or function for some
> > special application area (Geo functions, ML functions). A data platform
> > team might not want to make every built-in function available. Or a
> > function module like ML functions is in a different Maven module.
> >
> > I think this is orthogonal to this FLIP, especially we don't have the
> > "external built-in functions" anymore and currently the built-in function
> > category remains untouched.
> >
> > But just to share some thoughts on the proposal, I'm not sure about it:
> > - I don't know if any other databases handle built-in functions like
> that.
> > Maybe you can give some examples? IMHO, built-in functions are system
> info
> > and should be deterministic, not depending on loaded libraries. Geo
> > functions should be either built-in already or just libraries functions,
> > and library functions can be adapted to catalog APIs or of some other
> > syntax to use
> > - I don't know if all use cases stand, and many can be achieved by other
> > approaches too. E.g. experimental functions can be taken good care of by
> > documentations, annotations, etc
> > - the proposal basically introduces some concept like a pluggable
> built-in
> > 

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Bowen Li
Hi,

I agree with Xuefu that the main controversial points are mainly the two
places. My thoughts on them:

1) Determinism of referencing Hive built-in functions. We can either remove
Hive built-in functions from ambiguous function resolution and require
users to use special syntax for their qualified names, or add a config flag
to catalog constructor/yaml for turning on and off Hive built-in functions
with the flag set to 'false' by default and proper doc added to help users
make their decisions.

2) Flink temp functions v.s. Flink built-in functions in ambiguous function
resolution order. We believe Flink temp functions should precede Flink
built-in functions, and I have presented my reasons. Just in case if we
cannot reach an agreement, I propose forbid users registering temp
functions in the same name as a built-in function, like MySQL's approach,
for the moment. It won't have any performance concern, since built-in
functions are all in memory and thus cost of a name check will be really
trivial.


On Tue, Sep 3, 2019 at 8:01 PM Xuefu Z  wrote:

> From what I have seen, there are a couple of focal disagreements:
>
> 1. Resolution order: temp function --> flink built-in function --> catalog
> function vs flink built-in function --> temp function -> catalog function.
> 2. "External" built-in functions: how to treat built-in functions in
> external system and how users reference them
>
> For #1, I agree with Bowen that temp function needs to be at the highest
> priority because that's how a user might overwrite a built-in function
> without referencing a persistent, overwriting catalog function with a fully
> qualified name. Putting built-in functions at the highest priority
> eliminates that usage.
>
> For #2, I saw a general agreement on referencing "external" built-in
> functions such as those in Hive needs to be explicit and deterministic even
> though different approaches are proposed. To limit the scope and simply the
> usage, it seems making sense to me to introduce special syntax for user  to
> explicitly reference an external built-in function such as hive1::sqrt or
> hive1._built_in.sqrt. This is a DML syntax matching nicely Catalog API call
> hive1.getFunction(ObjectPath functionName) where the database name is
> absent for bulit-in functions available in that catalog hive1. I understand
> that Bowen's original proposal was trying to avoid this, but this could
> turn out to be a clean and simple solution.
>
> (Timo's modular approach is great way to "expand" Flink's built-in function
> set, which seems orthogonal and complementary to this, which could be
> tackled in further future work.)
>
> I'd be happy to hear further thoughts on the two points.
>
> Thanks,
> Xuefu
>
> On Tue, Sep 3, 2019 at 7:11 PM Kurt Young  wrote:
>
> > Thanks Timo & Bowen for the feedback. Bowen was right, my proposal is the
> > same
> > as Bowen's. But after thinking about it, I'm currently lean to Timo's
> > suggestion.
> >
> > The reason is backward compatibility. If we follow Bowen's approach,
> let's
> > say we
> > first find function in Flink's built-in functions, and then hive's
> > built-in. For example, `foo`
> > is not supported by Flink, but hive has such built-in function. So user
> > will have hive's
> > behavior for function `foo`. And in next release, Flink realize this is a
> > very popular function
> > and add it into Flink's built-in functions, but with different behavior
> as
> > hive's. So in next
> > release, the behavior changes.
> >
> > With Timo's approach, IIUC user have to tell the framework explicitly
> what
> > kind of
> > built-in functions he would like to use. He can just tell framework to
> > abandon Flink's built-in
> > functions, and use hive's instead. User can only choose between them, but
> > not use
> > them at the same time. I think this approach is more predictable.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Sep 4, 2019 at 8:00 AM Bowen Li  wrote:
> >
> > > Hi all,
> > >
> > > Thanks for the feedback. Just a kindly reminder that the [Proposal]
> > section
> > > in the google doc was updated, please take a look first and let me know
> > if
> > > you have more questions.
> > >
> > > On Tue, Sep 3, 2019 at 4:57 PM Bowen Li  wrote:
> > >
> > > > Hi Timo,
> > > >
> > > > Re> 1) We should not have the restriction "hive built-in functions
> can
> > > > only
> > > > > be used when current catalog is hive catalog". Switching a catalog
> > > > > should only have implications on the cat.db.object resolution but
> not
> > > > > functions. It would be quite convinient for users to use Hive
> > built-ins
> > > > > even if they use a Confluent schema registry or just the in-memory
> > > > catalog.
> > > >
> > > > There might be a misunderstanding here.
> > > >
> > > > First of all, Hive built-in functions are not part of Flink built-in
> > > > functions, they are catalog functions, thus if the current catalog is
> > > not a
> > > > HiveCatalog but, say, a schema registry catalog, ambiguous funct

Re: [DISCUSS] META-FLIP: Sticking (or not) to a strict FLIP voting process

2019-09-03 Thread Becket Qin
Thanks for the summary, Yu.

Hi all,

Just want to loop back on this thread. The META-FLIP actually does not
change much in the current FLIP process. It did add a few more details that
are currently not explicitly written down in the FLIP wiki page. More
specifically:

1. In general FLIPs should be completed in a reasonable amount of time,
rather than lasting for many releases.
2. FLIPs should be immutable. Changes to FLIPs need a new vote processes.
3. FLIPs should be concrete in terms of interfaces, semantic and
behaviours, rather than conceptual.
4. At least one committer should shepherd a FLIP. By default that will be
the author him/herself if the author is a committer. Otherwise, that will
be one of the committer who voted +1 on the FLIP.

For the last point, I am not quite sure about how this Shepherding is
related to the FLIP voting. Or should shepherding be a necessary thing in
order to start the voting process. Any idea on how to make this work?

Thanks,

Jiangjie (Becket) Qin


On Thu, Jul 11, 2019 at 12:29 PM Yu Li  wrote:

> Thanks for the summary and bring the discussion to public again Becket!
>
> Looking through the whole thread I think later discussion has covered a
> broader scope than what Aljoscha initially proposed (strict voting process
> for FLIP), and please allow me to categorize the topics to help decide how
> to conclude the thread:
>
> The very initial one:
> * Stick voting process for FLIP
>- Definition of a passing vote: lazy majority (more than 3 binding +1s)
> rather than lazy consensus @Aljosha (all others agreed)
>- Whose votings are counted as "binding": from both PMCs and
> committers @Jark @Stephan @David
> (My 2 cents around this topic: I suggest to encourage more non-binding
> votes to supply more inputs to our PMCs/committers and prevent the binding
> votes only representing a small group, although non-binding votes don't
> count)
>
> The extended ones:
> * The standard of FLIP (definition, scope, etc.)
>- Definition: does the long-existing standard still stands or need some
> update? should we have a blacklist about what's NOT a FLIP? @Chesnay
> @jincheng @Gordon @Hequn
>- Timeline Scope: should the FLIP be scoped to fit into a single
> release? @David @Becket
>- Dos and Don'ts: FLIP should be concrete in terms of interfaces,
> semantic and behaviors, not conceptual. FLIP should become immutable after
> voting, and change it requiring a new iteration of the
> process @Becket @Aljoscha
>- We'd better explicitly write down our Flink bylaw @Robert @Becket
>
> * How to better follow up a FLIP
>- Should a committer be assigned to shepherd the FLIP if it's created by
> a contributor? @Kurt @Biao
>- There must be a wiki for a FLIP no matter it's accepted or not, for
> later tracking and future discussion @Aljoscha @Becket
>
> FWIW, there should be no doubt to conclude the initial proposal on stick
> voting process for FLIP, while I'm not sure whether we could also conclude
> the other two in this single thread or they worth new separate discuss
> threads? Thanks.
>
> Best Regards,
> Yu
>
>
> On Thu, 11 Jul 2019 at 09:33, Becket Qin  wrote:
>
> > Hi Robert,
> >
> > That is a great point.
> >
> > Completely agree that we should have our own bylaws. Technically speaking
> > it should come before FLIP process as FLIP refers to the voting process
> > definition. Kafka's bylaws is a good reference. It has been in place for
> > years and seems working well. Maybe we can use that as a base version and
> > make revisions if needed. I can create a bylaws wiki page as a base
> version
> > for discussion. We may highlight and link to it on the "How to
> contribute"
> > page after that is finalized. Every time when we grant permission to new
> > contributors, we should refer them to that page.
> >
> > The FLIP process itself seems clear enough to me. Once we have our
> bylaws,
> > we probably just need more attention on the execution side. Usually what
> > happens is that someone proposes an idea either in a Jira ticket /
> mailing
> > list. Those who are familiar with the process will simply ask for a FLIP
> if
> > needed.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Wed, Jul 10, 2019 at 11:55 PM Robert Metzger 
> > wrote:
> >
> > > Thanks for your summary Becket.
> > >
> > > Your list of items makes sense to me.
> > > I wonder if we should start working on some project Bylaws to write
> down
> > > how we want to work together. I really like your thoughts around
> > "sticking
> > > to the process" to make us more efficient and approachable for new
> > > contributors.
> > > But we should try to make the process (FLIP) and rules (bylaws) as
> > visible
> > > as possible. Future (and probably even current) contributors will not
> be
> > > aware of this discussion, so we need to make the results more
> prominent.
> > >
> > >
> > >
> > > On Tue, Jul 9, 2019 at 4:29 PM Becket Qin 
> wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Thanks f

Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-03 Thread Jark Wu
Thanks for the work Jincheng!

I have moved remaining major issues to 1.8.3 except FLINK-13586.

Hi @Aljoscha Krettek  , is that possible to merge
FLINK-13586 today?

Best,
Jark

On Wed, 4 Sep 2019 at 10:47, jincheng sun  wrote:

> Thanks for the udpate Jark!
>
> I have add the new version 1.8.3 in JIRA, could you please remark the
> JIRAs(Such as FLINK-13689) which we do not want merge into the 1.8.2
> release :)
>
>  You are right, I think FLINK-13586 is better to be contained in 1.8.2
> release!
>
> Thanks,
> Jincheng
>
>
> Jark Wu  于2019年9月4日周三 上午10:15写道:
>
> > Hi all,
> >
> > I am very happy to say that all the blockers and critical issues for
> > release 1.8.2 have been resolved!
> >
> > Great thanks to everyone who contribute to the release.
> >
> > I hope to create the first RC on Sep 05, at 10:00 UTC+8.
> > If you find some other blocker issues for 1.8.2, please let me know
> before
> > that to account for it for the 1.8.2 release.
> >
> > Before cutting the RC1, I think it has chance to merge the
> > ClosureCleaner.clean fix (FLINK-13586), because the review and travis are
> > both passed.
> >
> > Cheers,
> > Jark
> >
> > On Wed, 4 Sep 2019 at 00:45, Kostas Kloudas  wrote:
> >
> > > Yes, I will do that Jark!
> > >
> > > Kostas
> > >
> > > On Tue, Sep 3, 2019 at 4:19 PM Jark Wu  wrote:
> > > >
> > > > Thanks Kostas for the quick fixing.
> > > >
> > > > However, I find that FLINK-13940 still target to 1.8.2 as a blocker.
> > If I
> > > > understand correctly, FLINK-13940 is aiming for a nicer and better
> > > solution
> > > > in the future.
> > > > So should we update the fixVersion of FLINK-13940?
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Tue, 3 Sep 2019 at 21:33, Kostas Kloudas 
> > wrote:
> > > >
> > > > > Thanks for waiting!
> > > > >
> > > > > A fix for FLINK-13940 has been merged on 1.8, 1.9 and the master
> > under
> > > > > FLINK-13941.
> > > > >
> > > > > Cheers,
> > > > > Kostas
> > > > >
> > > > > On Tue, Sep 3, 2019 at 11:25 AM jincheng sun <
> > sunjincheng...@gmail.com
> > > >
> > > > > wrote:
> > > > > >
> > > > > > +1 FLINK-13940 <
> https://issues.apache.org/jira/browse/FLINK-13940>
> > > is a
> > > > > > blocker, due to loss data is very important bug, And great thanks
> > for
> > > > > > helping fix it  Kostas!
> > > > > >
> > > > > > Best, Jincheng
> > > > > >
> > > > > > Kostas Kloudas  于2019年9月2日周一 下午7:20写道:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > I think this should be also considered a blocker
> > > > > > > https://issues.apache.org/jira/browse/FLINK-13940.
> > > > > > > It is not a regression but it can result to data loss.
> > > > > > >
> > > > > > > I think I can have a quick fix by tomorrow.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Kostas
> > > > > > >
> > > > > > > On Mon, Sep 2, 2019 at 12:01 PM jincheng sun <
> > > sunjincheng...@gmail.com
> > > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Thanks for all of your feedback!
> > > > > > > >
> > > > > > > > Hi Jark, Glad to see that you are doing what RM should doing.
> > > > > > > >
> > > > > > > > Only one tips here is before the RC1 all the blocker should
> be
> > > > > fixed, but
> > > > > > > > othrers is nice to have. So you can decide when to prepare
> RC1
> > > after
> > > > > the
> > > > > > > > blokcer is resolved.
> > > > > > > >
> > > > > > > > Feel free to tell me if you have any questions.
> > > > > > > >
> > > > > > > > Best,Jincheng
> > > > > > > >
> > > > > > > > Aljoscha Krettek  于2019年9月2日周一
> 下午5:03写道:
> > > > > > > >
> > > > > > > > > I cut a PR for FLINK-13586:
> > > > > https://github.com/apache/flink/pull/9595
> > > > > > > <
> > > > > > > > > https://github.com/apache/flink/pull/9595>
> > > > > > > > >
> > > > > > > > > > On 2. Sep 2019, at 05:03, Yu Li 
> wrote:
> > > > > > > > > >
> > > > > > > > > > +1 for a 1.8.2 release, thanks for bringing this up
> > Jincheng!
> > > > > > > > > >
> > > > > > > > > > Best Regards,
> > > > > > > > > > Yu
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Mon, 2 Sep 2019 at 09:19, Thomas Weise <
> t...@apache.org>
> > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> +1 for the 1.8.2 release
> > > > > > > > > >>
> > > > > > > > > >> I marked
> > https://issues.apache.org/jira/browse/FLINK-13586
> > > for
> > > > > this
> > > > > > > > > >> release. It would be good to compensate for the backward
> > > > > > > incompatible
> > > > > > > > > >> change to ClosureCleaner that was introduced in 1.8.1,
> > which
> > > > > affects
> > > > > > > > > >> downstream dependencies.
> > > > > > > > > >>
> > > > > > > > > >> Thanks,
> > > > > > > > > >> Thomas
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> On Sun, Sep 1, 2019 at 5:10 PM jincheng sun <
> > > > > > > sunjincheng...@gmail.com>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >>> Hi Jark,
> > > > > > > > > >>>
> > > > > > > > > >>> Glad to hear that you want to be the Release Manager of
> > > flink
> > > > 

[jira] [Created] (FLINK-13952) PartitionableTableSink can not work with OverwritableTableSink

2019-09-03 Thread Jingsong Lee (Jira)
Jingsong Lee created FLINK-13952:


 Summary: PartitionableTableSink can not work with 
OverwritableTableSink
 Key: FLINK-13952
 URL: https://issues.apache.org/jira/browse/FLINK-13952
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Reporter: Jingsong Lee
 Fix For: 1.10.0


{code:java}
tableSink match {
  case partitionableSink: PartitionableTableSink
if partitionableSink.getPartitionFieldNames != null
  && partitionableSink.getPartitionFieldNames.nonEmpty =>
partitionableSink.setStaticPartition(insertOptions.staticPartitions)
  case overwritableTableSink: OverwritableTableSink =>
overwritableTableSink.setOverwrite(insertOptions.overwrite)
{code}
Code in TableEnvImpl and PlannerBase

overwrite will not be invoked when there are static partition columns.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


How to calculate one day's uv every minute by SQL

2019-09-03 Thread 刘建刚
  We want to calculate one day's uv and show the result every minute .
We have implemented this by java code:

  dataStream.keyBy(dimension)
.incrementWindow(Time.days(1), Time.minutes(1))
.uv(userId)

  The input data is big. So we use ValueState to store all the
distinct userIds from 00:00:00 to last minute. For current minute, we union
the minute's data with ValueState to obtain a new
ValueState and output the current uv.
  The problem is how to translate the java code to sql? We expect the
sql to be like this:

   select incrementWindow_end, dimension, distinct(userId) from table
group by incrementWindow(Time.days(1), Time.minutes(1)), dimension

  Anyone can give me some suggestions? Thank you very much.


Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Dawid Wysakowicz
Hi all,

Just an opinion on the built-in <> temporary functions resolution and
NAMING issue. I think we should not allow overriding the built-in
functions, as this may pose serious issues and to be honest is rather
not feasible and would require major rework. What happens if a user
wants to override CAST? Calls to that function are generated at
different layers of the stack that unfortunately does not always go
through the Catalog API (at least yet). Moreover from what I've checked
no other systems allow overriding the built-in functions. All the
systems I've checked so far register temporary functions in a
database/schema (either special database for temporary functions, or
just current database). What I would suggest is to always register
temporary functions with a 3 part identifier. The same way as tables,
views etc. This effectively means you cannot override built-in
functions. With such approach it is natural that the temporary functions
end up a step lower in the resolution order:

1. built-in functions (1 part, maybe 2? - this is still under discussion)

2. temporary functions (always 3 part path)

3. catalog functions (always 3 part path)

Let me know what do you think.

Best,

Dawid

On 04/09/2019 06:13, Bowen Li wrote:
> Hi,
>
> I agree with Xuefu that the main controversial points are mainly the two
> places. My thoughts on them:
>
> 1) Determinism of referencing Hive built-in functions. We can either remove
> Hive built-in functions from ambiguous function resolution and require
> users to use special syntax for their qualified names, or add a config flag
> to catalog constructor/yaml for turning on and off Hive built-in functions
> with the flag set to 'false' by default and proper doc added to help users
> make their decisions.
>
> 2) Flink temp functions v.s. Flink built-in functions in ambiguous function
> resolution order. We believe Flink temp functions should precede Flink
> built-in functions, and I have presented my reasons. Just in case if we
> cannot reach an agreement, I propose forbid users registering temp
> functions in the same name as a built-in function, like MySQL's approach,
> for the moment. It won't have any performance concern, since built-in
> functions are all in memory and thus cost of a name check will be really
> trivial.
>
>
> On Tue, Sep 3, 2019 at 8:01 PM Xuefu Z  wrote:
>
>> From what I have seen, there are a couple of focal disagreements:
>>
>> 1. Resolution order: temp function --> flink built-in function --> catalog
>> function vs flink built-in function --> temp function -> catalog function.
>> 2. "External" built-in functions: how to treat built-in functions in
>> external system and how users reference them
>>
>> For #1, I agree with Bowen that temp function needs to be at the highest
>> priority because that's how a user might overwrite a built-in function
>> without referencing a persistent, overwriting catalog function with a fully
>> qualified name. Putting built-in functions at the highest priority
>> eliminates that usage.
>>
>> For #2, I saw a general agreement on referencing "external" built-in
>> functions such as those in Hive needs to be explicit and deterministic even
>> though different approaches are proposed. To limit the scope and simply the
>> usage, it seems making sense to me to introduce special syntax for user  to
>> explicitly reference an external built-in function such as hive1::sqrt or
>> hive1._built_in.sqrt. This is a DML syntax matching nicely Catalog API call
>> hive1.getFunction(ObjectPath functionName) where the database name is
>> absent for bulit-in functions available in that catalog hive1. I understand
>> that Bowen's original proposal was trying to avoid this, but this could
>> turn out to be a clean and simple solution.
>>
>> (Timo's modular approach is great way to "expand" Flink's built-in function
>> set, which seems orthogonal and complementary to this, which could be
>> tackled in further future work.)
>>
>> I'd be happy to hear further thoughts on the two points.
>>
>> Thanks,
>> Xuefu
>>
>> On Tue, Sep 3, 2019 at 7:11 PM Kurt Young  wrote:
>>
>>> Thanks Timo & Bowen for the feedback. Bowen was right, my proposal is the
>>> same
>>> as Bowen's. But after thinking about it, I'm currently lean to Timo's
>>> suggestion.
>>>
>>> The reason is backward compatibility. If we follow Bowen's approach,
>> let's
>>> say we
>>> first find function in Flink's built-in functions, and then hive's
>>> built-in. For example, `foo`
>>> is not supported by Flink, but hive has such built-in function. So user
>>> will have hive's
>>> behavior for function `foo`. And in next release, Flink realize this is a
>>> very popular function
>>> and add it into Flink's built-in functions, but with different behavior
>> as
>>> hive's. So in next
>>> release, the behavior changes.
>>>
>>> With Timo's approach, IIUC user have to tell the framework explicitly
>> what
>>> kind of
>>> built-in functions he would like to use. He can just tell framework