Re: [DISCUSS] FLIP-59: Enable execution configuration from Configuration object

2019-09-02 Thread Dawid Wysakowicz
Hi Gyula,

Yes you are right, we were also considering the external configurer. The
reason we suggest the built in method is that it is more tightly coupled
with the place the options are actually set. Therefore our hope is that,
whenever somebody e.g. adds new fields to the ExecutionConfig he/she
updates also the configure method. I am not entirely against your
suggestion though, if this is the preferred way in the community.

Does anyone has any comments regarding the option keys?

Best,

Dawid

On 30/08/2019 14:57, Gyula Fóra wrote:
> Hi Dawid,
>
> Sorry I misread one of the interfaces a little (Configuration instead of
> ConfigurationReader), you are right.
> I was referring to:
>
>
>-
>
>void StreamExecutionEnvironment.configure(ConfigurationReader)
>
>
> This might be slightly orthogonal to the changes that you made here but
> what I meant is that instead of adding methods to the
> StreamExecutionEnvironment we could make this an external interface:
>
> EnvironmentConfigurer {
>   void configure(StreamExecutionEnvironment, ConfigurationReader)
> }
>
> We could then have a default implementation of the EnvironmentConfigurer
> that would understand built in options.  We could also allow users to pass
> custom implementations of this, which could configure the
> StreamExecutionEnvironment based on user defined config options. This is
> just a rough idea for extensibility and probably out of scope at first.
>
> Cheers,
> Gyula
>
> On Fri, Aug 30, 2019 at 12:13 PM Dawid Wysakowicz 
> wrote:
>
>> Hi Gyula,
>>
>> Thank you for the support on those changes.
>>
>> I am not sure if I understood your idea for the "reconfiguration" logic.
>>
>> The configure method on those objects would take ConfigurationReader. So
>> user can provide a thin wrapper around Configuration for e.g. filtering
>> certain logic, changing values based on other parameters etc. Is that
>> what you had in mind?
>>
>> Best,
>>
>> Dawid
>>
>> On 29/08/2019 19:21, Gyula Fóra wrote:
>>> Hi!
>>>
>>> Huuuge +1 from me, this has been an operational pain for years.
>>> This would also introduce a nice and simple way to extend it in the
>> future
>>> if we need.
>>>
>>> Ship it!
>>>
>>> Gyula
>>>
>>> On Thu, Aug 29, 2019 at 5:05 PM Dawid Wysakowicz >>
>>> wrote:
>>>
 Hi,

 I wanted to propose a new, additional way of configuring execution
 parameters that can currently be set only on such objects like
 ExecutionConfig, CheckpointConfig and StreamExecutionEnvironment. This
 poses problems such as:

- no easy way to configure those from a file
- there is no easy way to pass a configuration from layers built on
top of StreamExecutionEnvironment. (e.g. when we want to configure
>> those
options from TableEnvironment)
- they are not automatically documented

 Note that there are a few concepts from FLIP-54[1] that this FLIP is
>> based
 on.

 Would be really grateful to know if you think this would be a valuable
 addition and any other feedback.

 Best,

 Dawid

 Wiki page:

>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-59%3A+Enable+execution+configuration+from+Configuration+object
 Google doc:

>> https://docs.google.com/document/d/1l8jW2NjhwHH1mVPbLvFolnL2vNvf4buUMDZWMfN_hFM/edit?usp=sharing

 [1]

>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration


>>



signature.asc
Description: OpenPGP digital signature


[jira] [Created] (FLINK-13938) Use yarn public distributed cache to speed up containers launch

2019-09-02 Thread Yang Wang (Jira)
Yang Wang created FLINK-13938:
-

 Summary: Use yarn public distributed cache to speed up containers 
launch
 Key: FLINK-13938
 URL: https://issues.apache.org/jira/browse/FLINK-13938
 Project: Flink
  Issue Type: New Feature
Reporter: Yang Wang


By default, the LocalResourceVisibility is APPLICATION, so they will be 
downloaded only once and shared for all taskmanager containers of a same 
application in the same node. However, different applications will have to 
download all jars every time, including the flink-dist.jar. I think we could 
use the yarn public cache to eliminate the unnecessary jars downloading and 
make launching container faster.

 

How to use the shared lib feature?
 # Upload a copy of flink release binary to hdfs.
 # Use the -ysl argument to specify the shared lib

{code:java}
./bin/flink run -d -m yarn-cluster -p 20 -ysl 
hdfs:///flink/release/flink-1.9.0/lib examples/streaming/WindowJoin.jar{code}
 

-ysl,--yarnsharedLib           Upload a copy of flink lib beforehand

                                                          and specify the path 
to use public

                                                          visibility feature of 
YARN NodeManager

                                                          localizing resources.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: How to handle Flink Job with 400MB+ Uberjar with 800+ containers ?

2019-09-02 Thread Yang Wang
Hi Dadashov,


Regarding your questions.


> Q1 Do all those 800 nodes download of batch of  3  at a time

The 800+ containers will be allocated on different yarn nodes. By default,
the LocalResourceVisibility is APPLICATION, so they will be downloaded only
once and shared for all taskmanager containers of a same application in the
same node. And the batch is not 3. Even the replica of your jars is 3(hdfs
blocks located on 3 different datanodes), a datanode could serve multiple
downloads. The limit is bandwidth of the datanode. I guess the bandwidth of
your hdfs datanode is not very good.So increase the replica of fat jar will
help to reduce the downloading time. And a JIRA ticket has been created.[1]


> Q2 What is the recommended way of handling 400MB+ Uberjar with 800+
containers ?

>From our online production experience, there are at least 3 optimization
ways.

   1. Increase the replica of jars in the yarn distributed cache.[1]
   2. Increase the container launch number or use NMClientAsync so that the
   allocated containers could be started asap. Even the startContainer in yarn
   nodemanager is asynchronous, launching container in
   FlinkYarnResourceManager is a blocking call. We have to start containers
   one by one.[2]
   3. Use yarn public cache to eliminate unnecessary jar downloading. Such
   as flink-dist.jar, it will not have to been uploaded ant then localized for
   each application.[3]


Unfortunately, the three features above are under developing. As a work
around, you could set dfs.replication=10 in the hdfs-site.xml of
HADOOP_CONF_DIR in the flink client machine.



[1].https://issues.apache.org/jira/browse/FLINK-12343

[2].https://issues.apache.org/jira/browse/FLINK-13184

[3].https://issues.apache.org/jira/browse/FLINK-13938



Best,

Yang

Zhu Zhu  于2019年9月2日周一 上午10:42写道:

> Hi Elkhan,
>
> >>Regarding "One optimization that we take is letting yarn to reuse the
> flink-dist jar which was localized when running previous jobs."
> >>We are intending to use Flink Real-time pipeline for Replay from
> Hive/HDFS (from offline source), to have 1 single pipeline for both batch
> and real-time. So for batch Flink job, the ?>>containers will be released
> once the job is done.
> >>I guess your job is real-time flink, so  you can share the  jars from
> already long-running jobs.
>
> This optimization is conducted by making flink dist jar a public
> distributed cache of YARN.
> In this way, the localized dist jar can be shared by different YARN
> applications and it will not be removed when the YARN application which
> localized it terminates.
> This requires some changes in Flink though.
> We will open a ISSUE to contribute this optimization to the community.
>
> Thanks,
> Zhu Zhu
>
> SHI Xiaogang  于2019年8月31日周六 下午12:57写道:
>
>> Hi Dadashov,
>>
>> You may have a look at method YarnResourceManager#onContainersAllocated
>> which will launch containers (via NMClient#startContainer) after containers
>> are allocated.
>> The launching is performed in the main thread of YarnResourceManager and
>> the launching is synchronous/blocking. Consequently, the containers will be
>> launched one by one.
>>
>> Regards,
>> Xiaogang
>>
>> Elkhan Dadashov  于2019年8月31日周六 上午2:37写道:
>>
>>> Thanks  everyone for valuable input and sharing  your experience for
>>> tackling the issue.
>>>
>>> Regarding suggestions :
>>> - We provision some common jars in all cluster nodes  *-->*  but this
>>> requires dependence on Infra Team schedule for handling common jars/updating
>>> - Making Uberjar slimmer *-->* tried even with 200 MB Uberjar (half
>>> size),  did not improve much. Only 100 containers could started in time.
>>> but then receiving :
>>>
>>> org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to 
>>> start container.
>>> This token is expired. current time is 1566422713305 found 1566422560552
>>> Note: System times on machines may be out of sync. Check system time and 
>>> time zones.
>>>
>>>
>>> - It would be nice to see FLINK-13184
>>>  , but expected
>>> version that will get in is 1.10
>>> - Increase replication factor --> It would be nice to have Flink conf
>>> for setting replication factor for only Fink job jars, but not the output.
>>> It is also challenging to set a replication for yet non-existing directory,
>>> the new files will have default replication factor. Will explore HDFS cache
>>> option.
>>>
>>> Maybe another option can be:
>>> - Letting yet-to-be-started Task Managers (or NodeManagers) download the
>>> jars from already started TaskManagers  in P2P fashion, not to have a
>>> blocker on HDFS replication.
>>>
>>> Spark job without any tuning exact same size jar with 800 executors, can
>>> start without any issue at the same cluster in less than a minute.
>>>
>>> *Further questions:*
>>>
>>> *@ SHI Xiaogang > :*
>>>
>>> I see that all 800 requests are sent concurrently :
>>>
>>> 2019-08-30 00:28:28.516 [flink-akka.actor.default-

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-02 Thread Timo Walther

Hi Becket,

Re 1 & 3: "values in configurations should actually be immutable"

I would also prefer immutability but most of our configuration is 
mutable due to serialization/deserialization. Also maps and list could 
be mutable in theory. It is difficult to really enforce that for nested 
structures. I see two options:


a) For the original design: How about we force implementers to add a 
duplicate() method in a Configurable object? This would mean that the 
object is still mutable but by duplicating the object both during 
reading and writing we would avoid the problem you described.


b) For the current design: We still use the factory approach but let a 
Configurable object implement a getFactory() method such that we know 
how to serialize the object. With the help of a factory we can also 
duplicate the object easily during reading and writing and ensure 
immutability.


I would personally go for approach a) to not over-engineer this topic. 
But I'm open for option b).


Regards,
Timo


On 31.08.19 04:09, Becket Qin wrote:

Hi Timo,

Thanks for the reply. I am still a little concerned over the mutability of
the Configurable which could be the value in Configuration.

Re: 1


But in general, people should not use any internal fields.
Configurable objects are meant for simple little helper POJOs, not
complex arbitrary nested data structures.

This seems difficult to enforce... Ideally the values in configurations
should actually be immutable. The value can only be changed by explicitly
calling setters in Configuration. Otherwise we may have weird situation
where the Configurable in the same configuration are different in two
places because the configurable is modified in one places and not modified
in another place. So I am a little concerned on putting a Configurable type
in the Configuration map, because the value could be modified without
explicitly setting the configuration. For example, can users do the
following?

Configurable configurable =
configuration.getConfigurable(myConfigurableOption);
configurable.setConfigA(123); // this already changes the configurable
object in the configuration.

Re: 2
Thanks for confirming. As long as users will not have a situation where
they need to set two configurations with the same key but different
descriptions, I think it is OK.

Re: 3
This is actually kind of related to 1. i.e. Whether toConfiguration()
guarantees the exact same object can be rebuilt from the configuration or
not. I am still not quite sure about the use case of toConfiguration()
though. It seems indicating the Configurable is mutable, which might be
dangerous.

Thanks,

Jiangjie (Becket) Qin

On Fri, Aug 30, 2019 at 10:04 PM Timo Walther  wrote:


Hi Becket,

1. First of all, you are totally right. The FLIP contains a bug due to
the last minute changes that Dawid suggested: by having immutable
objects created by a factory we loose the serializability of the
Configuration because the factory itself is not stored in the
Configuration. I would propose to revert the last change and stick to
the original design, which means that a object must implement the
Configurable interface and also implements serialization/deserialization
methods such that also internal fields can be persisted as you
suggested. But in general, people should not use any internal fields.
Configurable objects are meant for simple little helper POJOs, not
complex arbitrary nested data structures.

It is Map because Configuration stores the raw objects.
If you put a Boolean option into it, it remains Boolean. This makes the
map very efficient for shipping to the cluster and accessing options
multiple times. The same for configurable objects. We put the pure
objects into the map without any serialization/deserialization. The
provided factory allows to convert the Object into a Configuration and
we know how to serialize/deserializise a configuration because it is
just a key/value map.

2. Yes, this is what we had in mind. It should still be the same
configuration option. We would like to avoid specialized option keys
across components (exec.max-para and table.exec.max-para) if they are
describing basically the same thing. But adding some more description
like "TableOptions.MAX_PARALLELISM with description_1 + description_2"
does not hurt.

3. They should restore the original object given that the
toConfiguration/fromConfiguration methods have been implemented
correctly. I will extend the example to make the logic clearer while
fixing the bug.

Thanks for the healthy discussion,
Timo


On 30.08.19 15:29, Becket Qin wrote:

Hi Timo,

Thanks again for the clarification. Please see a few more questions

below.

Re: 1


Please also keep in mind that Configuration must not consist of only
strings, it manages a Map for efficient access. Every
map entry can have a string representation for persistence, but in most
cases consists of unserialized objects.

I'd like to understand this a bit more. The reason we have a Map in Configuration 

Re: [DISCUSS] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-02 Thread Yang Wang
I also agree that all the configuration should be calculated out of
TaskManager.

So a full configuration should be generated before TaskManager started.

Override the calculated configurations through -D now seems better.



Best,

Yang

Xintong Song  于2019年9月2日周一 上午11:39写道:

> I just updated the FLIP wiki page [1], with the following changes:
>
>- Network memory uses JVM direct memory, and is accounted when setting
>JVM max direct memory size parameter.
>- Use dynamic configurations (`-Dkey=value`) to pass calculated memory
>configs into TaskExecutors, instead of ENV variables.
>- Remove 'supporting memory reservation' from the scope of this FLIP.
>
> @till @stephan, please take another look see if there are any other
> concerns.
>
> Thank you~
>
> Xintong Song
>
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
>
> On Mon, Sep 2, 2019 at 11:13 AM Xintong Song 
> wrote:
>
> > Sorry for the late response.
> >
> > - Regarding the `TaskExecutorSpecifics` naming, let's discuss the detail
> > in PR.
> > - Regarding passing parameters into the `TaskExecutor`, +1 for using
> > dynamic configuration at the moment, given that there are more questions
> to
> > be discussed to have a general framework for overwriting configurations
> > with ENV variables.
> > - Regarding memory reservation, I double checked with Yu and he will take
> > care of it.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Aug 29, 2019 at 7:35 PM Till Rohrmann 
> > wrote:
> >
> >> What I forgot to add is that we could tackle specifying the
> configuration
> >> fully in an incremental way and that the full specification should be
> the
> >> desired end state.
> >>
> >> On Thu, Aug 29, 2019 at 1:33 PM Till Rohrmann 
> >> wrote:
> >>
> >> > I think our goal should be that the configuration is fully specified
> >> when
> >> > the process is started. By considering the internal calculation step
> to
> >> be
> >> > rather validate existing values and calculate missing ones, these two
> >> > proposal shouldn't even conflict (given determinism).
> >> >
> >> > Since we don't want to change an existing flink-conf.yaml, specifying
> >> the
> >> > full configuration would require to pass in the options differently.
> >> >
> >> > One way could be the ENV variables approach. The reason why I'm trying
> >> to
> >> > exclude this feature from the FLIP is that I believe it needs a bit
> more
> >> > discussion. Just some questions which come to my mind: What would be
> the
> >> > exact format (FLINK_KEY_NAME)? Would we support a dot separator which
> is
> >> > supported by some systems (FLINK.KEY.NAME)? If we accept the dot
> >> > separator what would be the order of precedence if there are two ENV
> >> > variables defined (FLINK_KEY_NAME and FLINK.KEY.NAME)? What is the
> >> > precedence of env variable vs. dynamic configuration value specified
> >> via -D?
> >> >
> >> > Another approach could be to pass in the dynamic configuration values
> >> via
> >> > `-Dkey=value` to the Flink process. For that we don't have to change
> >> > anything because the functionality already exists.
> >> >
> >> > Cheers,
> >> > Till
> >> >
> >> > On Thu, Aug 29, 2019 at 12:50 PM Stephan Ewen 
> wrote:
> >> >
> >> >> I see. Under the assumption of strict determinism that should work.
> >> >>
> >> >> The original proposal had this point "don't compute inside the TM,
> >> compute
> >> >> outside and supply a full config", because that sounded more
> intuitive.
> >> >>
> >> >> On Thu, Aug 29, 2019 at 12:15 PM Till Rohrmann  >
> >> >> wrote:
> >> >>
> >> >> > My understanding was that before starting the Flink process we
> call a
> >> >> > utility which calculates these values. I assume that this utility
> >> will
> >> >> do
> >> >> > the calculation based on a set of configured values (process
> memory,
> >> >> flink
> >> >> > memory, network memory etc.). Assuming that these values don't
> differ
> >> >> from
> >> >> > the values with which the JVM is started, it should be possible to
> >> >> > recompute them in the Flink process in order to set the values.
> >> >> >
> >> >> >
> >> >> >
> >> >> > On Thu, Aug 29, 2019 at 11:29 AM Stephan Ewen 
> >> wrote:
> >> >> >
> >> >> > > When computing the values in the JVM process after it started,
> how
> >> >> would
> >> >> > > you deal with values like Max Direct Memory, Metaspace size.
> native
> >> >> > memory
> >> >> > > reservation (reduce heap size), etc? All the values that are
> >> >> parameters
> >> >> > to
> >> >> > > the JVM process and that need to be supplied at process startup?
> >> >> > >
> >> >> > > On Wed, Aug 28, 2019 at 4:46 PM Till Rohrmann <
> >> trohrm...@apache.org>
> >> >> > > wrote:
> >> >> > >
> >> >> > > > Thanks for the clarification. I have some more comments:
> >> >> > > >
> >> >> > > > - I would actually split the logic to compute the process
> memory
> >> >> > > > requirements and storing the values into t

Re: [VOTE] FLIP-54: Evolve ConfigOption and Configuration

2019-09-02 Thread vino yang
+1

Dawid Wysakowicz  于2019年8月30日周五 下午7:34写道:

> +1 to the design
>
> On 29/08/2019 15:53, Timo Walther wrote:
> > I converted the mentioned Google doc into a wiki page:
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> >
> >
> > The core semantics have not changed.
> >
> > Happy voting,
> > Timo
> >
> > On 29.08.19 04:30, Zili Chen wrote:
> >> The design looks good to me.
> >>
> >> +1 go ahead!
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Jark Wu  于2019年8月28日周三 下午6:08写道:
> >>
> >>> Hi Timo,
> >>>
> >>> The new changes looks good to me.
> >>>
> >>> +1 to the FLIP.
> >>>
> >>>
> >>> Cheers,
> >>> Jark
> >>>
> >>> On Wed, 28 Aug 2019 at 16:02, Timo Walther  wrote:
> >>>
>  Hi everyone,
> 
>  after some last minute changes yesterday, I would like to start a new
>  vote on FLIP-54. The discussion seems to have reached an agreement. Of
>  course this doesn't mean that we can't propose further improvements on
>  ConfigOption's and Flink configuration in general in the future. It is
>  just one step towards having a better unified configuration for the
>  project.
> 
>  Please vote for the following design document:
> 
> 
> 
> >>>
> https://docs.google.com/document/d/1IQ7nwXqmhCy900t2vQLEL3N2HIdMg-JO8vTzo1BtyKU/edit#
> >>>
>  The discussion can be found at:
> 
> 
> 
> >>>
> https://lists.apache.org/thread.html/a56c6b52e5f828d4a737602b031e36b5dd6eaa97557306696a8063a9@%3Cdev.flink.apache.org%3E
> >>>
>  This voting will be open for at least 72 hours. I'll try to close
>  it on
>  2019-09-02 8:00 UTC, unless there is an objection or not enough votes.
> 
>  I will convert it to a Wiki page afterwards.
> 
>  Thanks,
> 
>  Timo
> 
> 
> >
>
>


Re: [DISCUSS] FLIP-60: Restructure the Table API & SQL documentation

2019-09-02 Thread vino yang
Agree with Dawid's suggestion about function.

Having a Functions section to unify the built-in function and UDF would be
better.

Dawid Wysakowicz  于2019年8月30日周五 下午7:43写道:

> +1 to the idea of restructuring the docs.
>
> My only suggestion to consider is how about moving the
> User-Defined-Extensions subpages to corresponding broader topics?
>
> Sources & Sinks >> Connect to external systems
>
> Catalogs >> Connect to external systems
>
> and then have a Functions sections with subsections:
>
> functions
>
> |- built in functions
>
> |- user defined functions
>
>
> Best,
>
> Dawid
>
> On 30/08/2019 10:59, Timo Walther wrote:
> > Hi everyone,
> >
> > the Table API & SQL documentation was already in a very good shape in
> > Flink 1.8. However, in the past it was mostly presented as an addition
> > to DataStream API. As the Table and SQL world is growing quickly,
> > stabilizes in its concepts, and is considered as another top-level API
> > and closed ecosystem, it is time to restructure the docs a little bit
> > to represent the vision of FLIP-32.
> >
> > Current state:
> > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/
> >
> > We would like to propose the following FLIP-60 for a new structure:
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=127405685
> >
> >
> > Looking forward to feedback.
> >
> > Thanks,
> >
> > Timo
> >
> >
> >
>
>


Build failure on flink-python

2019-09-02 Thread Biao Liu
Hi guys,

I just found I can't pass the Travis build due to some errors in
flink-python module [1].
I'm sure my PR has nothing related with flink-python. And there are also a
lot of builds are failing on these errors.

I have rebased master branch and tried several times. But it doesn't work.

Could somebody who is familiar with flink-python module take a look at this
failure?

1. https://travis-ci.com/flink-ci/flink/jobs/229989235

Thanks,
Biao /'bɪ.aʊ/


Re: Build failure on flink-python

2019-09-02 Thread Biao Liu
There are already some Jira tickets opened for this failure [1] [2].
Sorry I didn't recognize them.

1. https://issues.apache.org/jira/browse/FLINK-13906
2. https://issues.apache.org/jira/browse/FLINK-13932

Thanks,
Biao /'bɪ.aʊ/



On Mon, 2 Sep 2019 at 16:24, Biao Liu  wrote:

> Hi guys,
>
> I just found I can't pass the Travis build due to some errors in
> flink-python module [1].
> I'm sure my PR has nothing related with flink-python. And there are also a
> lot of builds are failing on these errors.
>
> I have rebased master branch and tried several times. But it doesn't work.
>
> Could somebody who is familiar with flink-python module take a look at
> this failure?
>
> 1. https://travis-ci.com/flink-ci/flink/jobs/229989235
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-02 Thread Dawid Wysakowicz
Hi Timo, Becket

From the options that Timo suggested for improving the mutability
situation I would prefer option a) as this is the more explicit option
and simpler option. Just as a remark, I think in general Configurable
types for options will be rather very rare for some special use-cases,
as the majority of options are rather simpler parameters of primitive
types (+duration, memory)

@Becket for the toConfiguration this is required for shipping the
Configuration to TaskManager, so that we do not have to use java
serializability.

Best,

Dawid


On 02/09/2019 10:05, Timo Walther wrote:
> Hi Becket,
>
> Re 1 & 3: "values in configurations should actually be immutable"
>
> I would also prefer immutability but most of our configuration is
> mutable due to serialization/deserialization. Also maps and list could
> be mutable in theory. It is difficult to really enforce that for
> nested structures. I see two options:
>
> a) For the original design: How about we force implementers to add a
> duplicate() method in a Configurable object? This would mean that the
> object is still mutable but by duplicating the object both during
> reading and writing we would avoid the problem you described.
>
> b) For the current design: We still use the factory approach but let a
> Configurable object implement a getFactory() method such that we know
> how to serialize the object. With the help of a factory we can also
> duplicate the object easily during reading and writing and ensure
> immutability.
>
> I would personally go for approach a) to not over-engineer this topic.
> But I'm open for option b).
>
> Regards,
> Timo
>
>
> On 31.08.19 04:09, Becket Qin wrote:
>> Hi Timo,
>>
>> Thanks for the reply. I am still a little concerned over the
>> mutability of
>> the Configurable which could be the value in Configuration.
>>
>> Re: 1
>>
>>> But in general, people should not use any internal fields.
>>> Configurable objects are meant for simple little helper POJOs, not
>>> complex arbitrary nested data structures.
>> This seems difficult to enforce... Ideally the values in configurations
>> should actually be immutable. The value can only be changed by
>> explicitly
>> calling setters in Configuration. Otherwise we may have weird situation
>> where the Configurable in the same configuration are different in two
>> places because the configurable is modified in one places and not
>> modified
>> in another place. So I am a little concerned on putting a
>> Configurable type
>> in the Configuration map, because the value could be modified without
>> explicitly setting the configuration. For example, can users do the
>> following?
>>
>> Configurable configurable =
>> configuration.getConfigurable(myConfigurableOption);
>> configurable.setConfigA(123); // this already changes the configurable
>> object in the configuration.
>>
>> Re: 2
>> Thanks for confirming. As long as users will not have a situation where
>> they need to set two configurations with the same key but different
>> descriptions, I think it is OK.
>>
>> Re: 3
>> This is actually kind of related to 1. i.e. Whether toConfiguration()
>> guarantees the exact same object can be rebuilt from the
>> configuration or
>> not. I am still not quite sure about the use case of toConfiguration()
>> though. It seems indicating the Configurable is mutable, which might be
>> dangerous.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, Aug 30, 2019 at 10:04 PM Timo Walther 
>> wrote:
>>
>>> Hi Becket,
>>>
>>> 1. First of all, you are totally right. The FLIP contains a bug due to
>>> the last minute changes that Dawid suggested: by having immutable
>>> objects created by a factory we loose the serializability of the
>>> Configuration because the factory itself is not stored in the
>>> Configuration. I would propose to revert the last change and stick to
>>> the original design, which means that a object must implement the
>>> Configurable interface and also implements
>>> serialization/deserialization
>>> methods such that also internal fields can be persisted as you
>>> suggested. But in general, people should not use any internal fields.
>>> Configurable objects are meant for simple little helper POJOs, not
>>> complex arbitrary nested data structures.
>>>
>>> It is Map because Configuration stores the raw objects.
>>> If you put a Boolean option into it, it remains Boolean. This makes the
>>> map very efficient for shipping to the cluster and accessing options
>>> multiple times. The same for configurable objects. We put the pure
>>> objects into the map without any serialization/deserialization. The
>>> provided factory allows to convert the Object into a Configuration and
>>> we know how to serialize/deserializise a configuration because it is
>>> just a key/value map.
>>>
>>> 2. Yes, this is what we had in mind. It should still be the same
>>> configuration option. We would like to avoid specialized option keys
>>> across components (exec.max-para and table.exec.max-para) 

Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-02 Thread Aljoscha Krettek
Hi,

I actually don’t know whether that change would be ok. FlinkUserCodeClassLoader 
has taken FlinkUserCodeClassLoader.class.getClassLoader() as the parent 
ClassLoader before my change. See: 
https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
 
.

I have the feeling that this might be on purpose because we want to have the 
ClassLoader of the Flink Framework components be the parent ClassLoader, but I 
could be wrong. Maybe Stephan would be most appropriate for answering this.

Best,
Aljoscha

> On 30. Aug 2019, at 16:28, Till Rohrmann  wrote:
> 
> Hi Jan,
> 
> this looks to me like a bug for which you could create a JIRA and PR to fix 
> it. Just to make sure, I've pulled in Aljoscha who is the author of this 
> change to check with him whether we are forgetting something.
> 
> Cheers,
> Till
> 
> On Fri, Aug 30, 2019 at 3:44 PM Jan Lukavský  > wrote:
> Hi,
> 
> I have come across an issue with classloading in Flink's MiniCluster. 
> The issue is that when I run local flink job from a thread, that has a 
> non-default context classloader (for whatever reason), this classloader 
> is not taken into account when classloading user defined functions. This 
> is due to [1]. Is this behavior intentional, or can I file a JIRA and 
> use Thread.currentThread.getContextClassLoader() there? I have validated 
> that it fixes issues I'm facing.
> 
> Jan
> 
> [1] 
> https://github.com/apache/flink/blob/ce557839d762b5f1ec92aa1885fd3d2ae33d0d0b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java#L280
>  
> 
> 



Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-02 Thread Aljoscha Krettek
I cut a PR for FLINK-13586: https://github.com/apache/flink/pull/9595 


> On 2. Sep 2019, at 05:03, Yu Li  wrote:
> 
> +1 for a 1.8.2 release, thanks for bringing this up Jincheng!
> 
> Best Regards,
> Yu
> 
> 
> On Mon, 2 Sep 2019 at 09:19, Thomas Weise  wrote:
> 
>> +1 for the 1.8.2 release
>> 
>> I marked https://issues.apache.org/jira/browse/FLINK-13586 for this
>> release. It would be good to compensate for the backward incompatible
>> change to ClosureCleaner that was introduced in 1.8.1, which affects
>> downstream dependencies.
>> 
>> Thanks,
>> Thomas
>> 
>> 
>> On Sun, Sep 1, 2019 at 5:10 PM jincheng sun 
>> wrote:
>> 
>>> Hi Jark,
>>> 
>>> Glad to hear that you want to be the Release Manager of flink 1.8.2.
>>> I believe that you will be a great RM, and I am very willing to help you
>>> with the final release in the final stages. :)
>>> 
>>> The release of Apache Flink involves a number of tasks. For details, you
>>> can consult the documentation [1]. If you have any questions, please let
>> me
>>> know and let us work together.
>>> 
>>> 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release#CreatingaFlinkRelease-Checklisttoproceedtothenextstep.1
>>> 
>>> Cheers,
>>> Jincheng
>>> 
>>> Till Rohrmann  于2019年8月31日周六 上午12:59写道:
>>> 
 +1 for a 1.8.2 bug fix release. Thanks for kicking this discussion off
 Jincheng.
 
 Cheers,
 Till
 
 On Fri, Aug 30, 2019 at 6:45 PM Jark Wu  wrote:
 
> Thanks Jincheng for bringing this up.
> 
> +1 to the 1.8.2 release, because it already contains a couple of
 important
> fixes and it has been a long time since 1.8.1 came out.
> I'm willing to help the community as much as possible. I'm wondering
>>> if I
> can be the release manager of 1.8.2 or work with you together
>>> @Jincheng?
> 
> Best,
> Jark
> 
> On Fri, 30 Aug 2019 at 18:58, Hequn Cheng 
>>> wrote:
> 
>> Hi Jincheng,
>> 
>> +1 for a 1.8.2 release.
>> Thanks a lot for raising the discussion. It would be nice to have
>>> these
>> critical fixes.
>> 
>> Best, Hequn
>> 
>> 
>> On Fri, Aug 30, 2019 at 6:31 PM Maximilian Michels >> 
> wrote:
>> 
>>> Hi Jincheng,
>>> 
>>> +1 I would be for a 1.8.2 release such that we can fix the
>> problems
> with
>>> the nested closure cleaner which currently block 1.8.1 users with
 Beam:
>>> https://issues.apache.org/jira/browse/FLINK-13367
>>> 
>>> Thanks,
>>> Max
>>> 
>>> On 30.08.19 11:25, jincheng sun wrote:
 Hi Flink devs,
 
 It has been nearly 2 months since the 1.8.1 released. So, what
>> do
 you
>>> think
 about releasing Flink 1.8.2 soon?
 
 We already have some blocker and critical fixes in the
>>> release-1.8
>>> branch:
 
 [Blocker]
 - FLINK-13159 java.lang.ClassNotFoundException when restore job
 - FLINK-10368 'Kerberized YARN on Docker test' unstable
 - FLINK-12578 Use secure URLs for Maven repositories
 
 [Critical]
 - FLINK-12736 ResourceManager may release TM with allocated
>> slots
 - FLINK-12889 Job keeps in FAILING state
 - FLINK-13484 ConnectedComponents end-to-end test instable with
 NoResourceAvailableException
 - FLINK-13508 CommonTestUtils#waitUntilCondition() may attempt
>> to
> sleep
 with negative time
 - FLINK-13806 Metric Fetcher floods the JM log with errors when
>>> TM
 is
>>> lost
 
 Furthermore, I think the following one blocker issue should be
 merged
 before 1.8.2 release.
 
 - FLINK-13897: OSS FS NOTICE file is placed in wrong directory
 
 It would also be great if we can have the fix of
>> Elasticsearch6.x
>>> connector
 threads leaking (FLINK-13689) in 1.8.2 release which is
>>> identified
 as
>>> major.
 
 Please let me know what you think?
 
 Cheers,
 Jincheng
 
>>> 
>> 
> 
 
>>> 
>> 



Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-02 Thread Timo Walther

Hi all,

the FLIP looks awesome. However, I would like to discuss the changes to 
the user-facing parts again. Some feedback:


1. DataViews: With the current non-annotation design for DataViews, we 
cannot perform eager state declaration, right? At which point during 
execution do we know which state is required by the function? We need to 
instantiate the function first, right?


2. Serializability of functions: How do we ensure serializability of 
functions for catalog persistence? In the Scala/Java API, we would like 
to register classes instead of instances soon. This is the only way to 
store a function properly in a catalog or we need some 
serialization/deserialization logic in the function interfaces to 
convert an instance to string properties.


3. TableEnvironment: What is the signature of `register_function(self, 
name, function)`? Does it accept both a class and function? Like `class 
Sum` and `def split()`? Could you add some examples for registering both 
kinds of functions?


4. FunctionDefinition: Function definition is not a user-defined 
function definition. It is the highest interface for both user-defined 
and built-in functions. I'm not sure if getLanguage() should be part of 
this interface or one-level down which would be `UserDefinedFunction`. 
Built-in functions will never be implemented in a different language. In 
any case, I would vote for removing the UNKNOWN language, because it 
does not solve anything. Why should a user declare a function that the 
runtime can not handle? I also find the term `JAVA` confusing for Scala 
users. How about `FunctionLanguage.JVM` instead?


5. Function characteristics: In the current design, function classes do 
not extend from any upper class. How can users declare characteristics 
that are present in `FunctionDefinition` like determinism, requirements, 
or soon also monotonism.


Thanks,
Timo


On 02.09.19 03:38, Shaoxuan Wang wrote:

Hi Jincheng, Fudian, and Aljoscha,
I am assuming the proposed python UDX can also be applied to Flink SQL.
Is this correct? If yes, I would suggest to title the FLIP as "Flink Python
User-Defined Function" or "Flink Python User-Defined Function for Table".

Regards,
Shaoxuan


On Wed, Aug 28, 2019 at 12:22 PM jincheng sun 
wrote:


Thanks for the feedback Bowen!

Great thanks for create the FLIP and bring up the VOTE Dian!

Best, Jincheng

Dian Fu  于2019年8月28日周三 上午11:32写道:


Hi all,

I have started a voting thread [1]. Thanks a lot for your help during
creating the FLIP @Jincheng.


Hi Bowen,

Very appreciated for your comments. I have replied you in the design doc.
As it seems that the comments doesn't affect the overall design, I'll not
cancel the vote for now and we can continue the discussion in the design
doc.

[1]


http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html

<


http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html

Regards,
Dian


在 2019年8月28日,上午11:05,Bowen Li  写道:

Hi Jincheng and Dian,

Sorry for being late to the party. I took a glance at the proposal,

LGTM

in

general, and I left only a couple comments.

Thanks,
Bowen


On Mon, Aug 26, 2019 at 8:05 PM Dian Fu  wrote:


Hi Jincheng,

Thanks! It works.

Thanks,
Dian


在 2019年8月27日,上午10:55,jincheng sun  写道:

Hi Dian, can you check if you have edit access? :)


Dian Fu  于2019年8月26日周一 上午10:52写道:


Hi Jincheng,

Appreciated for the kind tips and offering of help. Definitely need

it!

Could you grant me write permission for confluence? My Id: Dian Fu

Thanks,
Dian


在 2019年8月26日,上午9:53,jincheng sun  写道:

Thanks for your feedback Hequn & Dian.

Dian, I am glad to see that you want help to create the FLIP!
Everyone will have first time, and I am very willing to help you

complete

your first FLIP creation. Here some tips:

- First I'll give your account write permission for confluence.
- Before create the FLIP, please have look at the FLIP Template

[1],

(It's

better to know more about FLIP by reading [2])
- Create Flink Python UDFs related JIRAs after completing the VOTE

of

FLIP.(I think you also can bring up the VOTE thread, if you want! )

Any problems you encounter during this period,feel free to tell me

that

we

can solve them together. :)

Best,
Jincheng




[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template

[2]


https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals


Hequn Cheng  于2019年8月23日周五 上午11:54写道:


+1 for starting the vote.

Thanks Jincheng a lot for the discussion.

Best, Hequn

On Fri, Aug 23, 2019 at 10:06 AM Dian Fu 

wrote:

Hi Jincheng,

+1 to start the FLIP create and VOTE on this feature. I'm willing

to

help

on the FLIP create if you don't mind. As I haven't created a FLIP

before,

it will be great if you could help on this. :)

Regards,
Dian


在 2019年8月22日,下午11:41,jincheng sun 

写道:

Hi all,

Thanks a 

Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-02 Thread Jan Lukavský
Essentially, the class loader of Flink should be present in parent 
hierarchy of context class loader. If FlinkUserCodeClassLoader doesn't 
use context class loader, then it is actually impossible to use a 
hierarchy like this:


 system class loader -> application class loader -> user-defined class 
loader (defines some UDFs to be used in flink program)


Flink now uses the application class loader, and so the UDFs fail to 
deserialize on local flink, because the user-defined class loader is 
bypassed. Moreover, there is no way to add additional classpath elements 
for LocalEnvironment (as opposed to RemoteEnvironment). I'm able to hack 
this by calling addURL method on the application class loader (which is 
terribly hackish), but that works only on JDK <= 8. No sensible 
workaround is available for JDK >= 9.


Alternative solution would be to enable adding jars to class loader when 
using LocalEnvironment, but that looks a little odd.


Jan

On 9/2/19 11:02 AM, Aljoscha Krettek wrote:

Hi,

I actually don’t know whether that change would be ok. FlinkUserCodeClassLoader has 
taken FlinkUserCodeClassLoader.class.getClassLoader() as the parent ClassLoader 
before my change. See: 
https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
 
.

I have the feeling that this might be on purpose because we want to have the 
ClassLoader of the Flink Framework components be the parent ClassLoader, but I 
could be wrong. Maybe Stephan would be most appropriate for answering this.

Best,
Aljoscha


On 30. Aug 2019, at 16:28, Till Rohrmann  wrote:

Hi Jan,

this looks to me like a bug for which you could create a JIRA and PR to fix it. 
Just to make sure, I've pulled in Aljoscha who is the author of this change to 
check with him whether we are forgetting something.

Cheers,
Till

On Fri, Aug 30, 2019 at 3:44 PM Jan Lukavský mailto:je...@seznam.cz>> wrote:
Hi,

I have come across an issue with classloading in Flink's MiniCluster.
The issue is that when I run local flink job from a thread, that has a
non-default context classloader (for whatever reason), this classloader
is not taken into account when classloading user defined functions. This
is due to [1]. Is this behavior intentional, or can I file a JIRA and
use Thread.currentThread.getContextClassLoader() there? I have validated
that it fixes issues I'm facing.

Jan

[1]
https://github.com/apache/flink/blob/ce557839d762b5f1ec92aa1885fd3d2ae33d0d0b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java#L280
 






Re: Build failure on flink-python

2019-09-02 Thread Hequn Cheng
Hi Biao,

Thanks a lot for reporting the problem. The fix has been merged into the
master just now. You can rebase to the master and try again.

Thanks to @Wei Zhong for the fixing.

Best, Hequn

On Mon, Sep 2, 2019 at 4:41 PM Biao Liu  wrote:

> There are already some Jira tickets opened for this failure [1] [2].
> Sorry I didn't recognize them.
>
> 1. https://issues.apache.org/jira/browse/FLINK-13906
> 2. https://issues.apache.org/jira/browse/FLINK-13932
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Mon, 2 Sep 2019 at 16:24, Biao Liu  wrote:
>
> > Hi guys,
> >
> > I just found I can't pass the Travis build due to some errors in
> > flink-python module [1].
> > I'm sure my PR has nothing related with flink-python. And there are also
> a
> > lot of builds are failing on these errors.
> >
> > I have rebased master branch and tried several times. But it doesn't
> work.
> >
> > Could somebody who is familiar with flink-python module take a look at
> > this failure?
> >
> > 1. https://travis-ci.com/flink-ci/flink/jobs/229989235
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
>


Re: [DISCUSS] FLIP-60: Restructure the Table API & SQL documentation

2019-09-02 Thread Kurt Young
+1 to the general idea and thanks for driving this. I think the new
structure is
more clear than the old one, and i have some suggestions:

1. How about adding a "Architecture & Internals" chapter? This can help
developers
or users who want to contribute more to have a better understanding about
Table.
Essentially with blink planner, we merged a lots of codes and features but
lack of
proper user and design documents.

2. Add a dedicated "Hive Integration" chapter. We spend lots of effort on
integrating
hive, and hive integration is happened in different areas, like catalog,
function and
maybe ddl in the future. I think a dedicated chapter can make users who are
interested
in this topic easier to find the information they need.

3. Add a chapter about how to manage, monitor or tune the Table & SQL jobs,
and
might adding something like how to migrate old version jobs to new version
in the future.

Best,
Kurt


On Mon, Sep 2, 2019 at 4:17 PM vino yang  wrote:

> Agree with Dawid's suggestion about function.
>
> Having a Functions section to unify the built-in function and UDF would be
> better.
>
> Dawid Wysakowicz  于2019年8月30日周五 下午7:43写道:
>
> > +1 to the idea of restructuring the docs.
> >
> > My only suggestion to consider is how about moving the
> > User-Defined-Extensions subpages to corresponding broader topics?
> >
> > Sources & Sinks >> Connect to external systems
> >
> > Catalogs >> Connect to external systems
> >
> > and then have a Functions sections with subsections:
> >
> > functions
> >
> > |- built in functions
> >
> > |- user defined functions
> >
> >
> > Best,
> >
> > Dawid
> >
> > On 30/08/2019 10:59, Timo Walther wrote:
> > > Hi everyone,
> > >
> > > the Table API & SQL documentation was already in a very good shape in
> > > Flink 1.8. However, in the past it was mostly presented as an addition
> > > to DataStream API. As the Table and SQL world is growing quickly,
> > > stabilizes in its concepts, and is considered as another top-level API
> > > and closed ecosystem, it is time to restructure the docs a little bit
> > > to represent the vision of FLIP-32.
> > >
> > > Current state:
> > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/
> > >
> > > We would like to propose the following FLIP-60 for a new structure:
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=127405685
> > >
> > >
> > > Looking forward to feedback.
> > >
> > > Thanks,
> > >
> > > Timo
> > >
> > >
> > >
> >
> >
>


Re: [DISCUSS] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-02 Thread Andrey Zagrebin
 Hi All,

@Xitong thanks a lot for driving the discussion.

I also reviewed the FLIP and it looks quite good to me.
Here are some comments:


   - One thing I wanted to discuss is the backwards-compatibility with the
   previous user setups. We could list which options we plan to deprecate.
   From the first glance it looks possible to provide the same/similar
   behaviour for the setups relying on the deprecated options. E.g.
   setting taskmanager.memory.preallocate to true could override the
   new taskmanager.memory.managed.offheap-fraction to 1 etc. At the moment the
   FLIP just states that in some cases it may require re-configuring of
   cluster if migrated from prior versions. My suggestion is that we try to
   keep it backwards-compatible unless there is a good reason like some major
   complication for the implementation.


Also couple of smaller things:

   - I suggest we remove TaskExecutorSpecifics from the FLIP and leave some
   general wording atm, like 'data structure to store' or 'utility classes'.
   When the classes are implemented, we put the concrete class names. This way
   we can avoid confusion and stale documents.


   - As I understand, if user task uses native memory (not direct memory,
   but e.g. unsafe.allocate or from external lib), there will be no
   explicit guard against exceeding 'task off heap memory'. Then user should
   still explicitly make sure that her/his direct buffer allocation plus any
   other memory usages does not exceed value announced as 'task off heap'. I
   guess there is no so much that can be done about it except mentioning in
   docs, similar to controlling the heap state backend.


Thanks,
Andrey

On Mon, Sep 2, 2019 at 10:07 AM Yang Wang  wrote:

> I also agree that all the configuration should be calculated out of
> TaskManager.
>
> So a full configuration should be generated before TaskManager started.
>
> Override the calculated configurations through -D now seems better.
>
>
>
> Best,
>
> Yang
>
> Xintong Song  于2019年9月2日周一 上午11:39写道:
>
> > I just updated the FLIP wiki page [1], with the following changes:
> >
> >- Network memory uses JVM direct memory, and is accounted when setting
> >JVM max direct memory size parameter.
> >- Use dynamic configurations (`-Dkey=value`) to pass calculated memory
> >configs into TaskExecutors, instead of ENV variables.
> >- Remove 'supporting memory reservation' from the scope of this FLIP.
> >
> > @till @stephan, please take another look see if there are any other
> > concerns.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> >
> > On Mon, Sep 2, 2019 at 11:13 AM Xintong Song 
> > wrote:
> >
> > > Sorry for the late response.
> > >
> > > - Regarding the `TaskExecutorSpecifics` naming, let's discuss the
> detail
> > > in PR.
> > > - Regarding passing parameters into the `TaskExecutor`, +1 for using
> > > dynamic configuration at the moment, given that there are more
> questions
> > to
> > > be discussed to have a general framework for overwriting configurations
> > > with ENV variables.
> > > - Regarding memory reservation, I double checked with Yu and he will
> take
> > > care of it.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Thu, Aug 29, 2019 at 7:35 PM Till Rohrmann 
> > > wrote:
> > >
> > >> What I forgot to add is that we could tackle specifying the
> > configuration
> > >> fully in an incremental way and that the full specification should be
> > the
> > >> desired end state.
> > >>
> > >> On Thu, Aug 29, 2019 at 1:33 PM Till Rohrmann 
> > >> wrote:
> > >>
> > >> > I think our goal should be that the configuration is fully specified
> > >> when
> > >> > the process is started. By considering the internal calculation step
> > to
> > >> be
> > >> > rather validate existing values and calculate missing ones, these
> two
> > >> > proposal shouldn't even conflict (given determinism).
> > >> >
> > >> > Since we don't want to change an existing flink-conf.yaml,
> specifying
> > >> the
> > >> > full configuration would require to pass in the options differently.
> > >> >
> > >> > One way could be the ENV variables approach. The reason why I'm
> trying
> > >> to
> > >> > exclude this feature from the FLIP is that I believe it needs a bit
> > more
> > >> > discussion. Just some questions which come to my mind: What would be
> > the
> > >> > exact format (FLINK_KEY_NAME)? Would we support a dot separator
> which
> > is
> > >> > supported by some systems (FLINK.KEY.NAME)? If we accept the dot
> > >> > separator what would be the order of precedence if there are two ENV
> > >> > variables defined (FLINK_KEY_NAME and FLINK.KEY.NAME)? What is the
> > >> > precedence of env variable vs. dynamic configuration value specified
> > >> via -D?
> > >> >
> > >> > Another approach could be to pass in the dynamic configuration
> values
> > >> via
> > >> > 

Re: ClassLoader created by BlobLibraryCacheManager is not using context classloader

2019-09-02 Thread Aljoscha Krettek
I’m not saying we can’t change that code to use the context class loader. I’m 
just not sure whether this might break other things.

Best,
Aljoscha

> On 2. Sep 2019, at 11:24, Jan Lukavský  wrote:
> 
> Essentially, the class loader of Flink should be present in parent hierarchy 
> of context class loader. If FlinkUserCodeClassLoader doesn't use context 
> class loader, then it is actually impossible to use a hierarchy like this:
> 
>  system class loader -> application class loader -> user-defined class loader 
> (defines some UDFs to be used in flink program)
> 
> Flink now uses the application class loader, and so the UDFs fail to 
> deserialize on local flink, because the user-defined class loader is 
> bypassed. Moreover, there is no way to add additional classpath elements for 
> LocalEnvironment (as opposed to RemoteEnvironment). I'm able to hack this by 
> calling addURL method on the application class loader (which is terribly 
> hackish), but that works only on JDK <= 8. No sensible workaround is 
> available for JDK >= 9.
> 
> Alternative solution would be to enable adding jars to class loader when 
> using LocalEnvironment, but that looks a little odd.
> 
> Jan
> 
> On 9/2/19 11:02 AM, Aljoscha Krettek wrote:
>> Hi,
>> 
>> I actually don’t know whether that change would be ok. 
>> FlinkUserCodeClassLoader has taken 
>> FlinkUserCodeClassLoader.class.getClassLoader() as the parent ClassLoader 
>> before my change. See: 
>> https://github.com/apache/flink/blob/release-1.2/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader.java
>>  
>> .
>> 
>> I have the feeling that this might be on purpose because we want to have the 
>> ClassLoader of the Flink Framework components be the parent ClassLoader, but 
>> I could be wrong. Maybe Stephan would be most appropriate for answering this.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 30. Aug 2019, at 16:28, Till Rohrmann  wrote:
>>> 
>>> Hi Jan,
>>> 
>>> this looks to me like a bug for which you could create a JIRA and PR to fix 
>>> it. Just to make sure, I've pulled in Aljoscha who is the author of this 
>>> change to check with him whether we are forgetting something.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> On Fri, Aug 30, 2019 at 3:44 PM Jan Lukavský >> > wrote:
>>> Hi,
>>> 
>>> I have come across an issue with classloading in Flink's MiniCluster.
>>> The issue is that when I run local flink job from a thread, that has a
>>> non-default context classloader (for whatever reason), this classloader
>>> is not taken into account when classloading user defined functions. This
>>> is due to [1]. Is this behavior intentional, or can I file a JIRA and
>>> use Thread.currentThread.getContextClassLoader() there? I have validated
>>> that it fixes issues I'm facing.
>>> 
>>> Jan
>>> 
>>> [1]
>>> https://github.com/apache/flink/blob/ce557839d762b5f1ec92aa1885fd3d2ae33d0d0b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheManager.java#L280
>>>  
>>> 
>>> 
>> 



Re: Build failure on flink-python

2019-09-02 Thread Biao Liu
Hi Hequn,

Glad to hear that! Thanks a lot.

Thanks,
Biao /'bɪ.aʊ/



On Mon, 2 Sep 2019 at 17:28, Hequn Cheng  wrote:

> Hi Biao,
>
> Thanks a lot for reporting the problem. The fix has been merged into the
> master just now. You can rebase to the master and try again.
>
> Thanks to @Wei Zhong for the fixing.
>
> Best, Hequn
>
> On Mon, Sep 2, 2019 at 4:41 PM Biao Liu  wrote:
>
> > There are already some Jira tickets opened for this failure [1] [2].
> > Sorry I didn't recognize them.
> >
> > 1. https://issues.apache.org/jira/browse/FLINK-13906
> > 2. https://issues.apache.org/jira/browse/FLINK-13932
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
> >
> > On Mon, 2 Sep 2019 at 16:24, Biao Liu  wrote:
> >
> > > Hi guys,
> > >
> > > I just found I can't pass the Travis build due to some errors in
> > > flink-python module [1].
> > > I'm sure my PR has nothing related with flink-python. And there are
> also
> > a
> > > lot of builds are failing on these errors.
> > >
> > > I have rebased master branch and tried several times. But it doesn't
> > work.
> > >
> > > Could somebody who is familiar with flink-python module take a look at
> > > this failure?
> > >
> > > 1. https://travis-ci.com/flink-ci/flink/jobs/229989235
> > >
> > > Thanks,
> > > Biao /'bɪ.aʊ/
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-09-02 Thread Timo Walther

Hi all,

I see a majority votes for `lit(12)` so let's adopt that in the FLIP. 
The `$("field")` would consider Fabian's concerns so I would vote for 
keeping it like that.


One more question for native English speakers, is it acceptable to have 
`isEqual` instead of `isEqualTo` and `isGreater` instead of `isGreaterThan`?


If there are no more concerns, I will start a voting thread soon.

Thanks,
Timo


On 29.08.19 12:24, Fabian Hueske wrote:

Hi,

IMO, we should define what we would like to optimize for:
1) easy-to-get-started experience or
2) productivity and ease-of-use

While 1) is certainly important, I think we should put more emphasis on
goal 2).
That's why I favor as short as possible names for commonly used methods
like column references and literals/values.
These are used *many* times in *every* query.
Every user who uses the API for more than 30 mins will know what $() or v()
(or whatever method names we come up with) are used for and everybody who
doesn't know can have a look at the JavaDocs or regular documentation.
Shorter method names are not only about increasing the speed to write a
query, but also reducing clutter that needs to be parsed to understand an
expression / query.

I'm OK with descriptive names for other expressions like call(),
isEqualTo() (although these could be the commonly used eq(), gte(), etc.),
and so on but column references (and literals) should be as lightweight as
possible, IMO.

Cheers,
Fabian

Am Do., 29. Aug. 2019 um 12:15 Uhr schrieb Timo Walther 
:
I'm fine with `lit()`. Regarding `col()`, I initially suggested `ref()`
but I think Fabian and Dawid liked single char methods for the most
commonly used expressions.

Btw, what is your opinion on the names of commonly used methods such as
`isEqual`, `isGreaterOrEqual`? Are we fine with the current naming.
In theory we could make them shorter like `equals(), greaterOrEqual()`
or even shorter to `eq`, `gt`, `gte`?

Thanks,
Timo


On 29.08.19 11:51, Aljoscha Krettek wrote:

Overall, this is a very nice development that should also simplify the

code base once we deprecate the expression parser!

Regarding method names, I agree with Seth that values/literals should

use something like “lit()”. I also think that for column references we
could use “col()” to make it clear that it is a column reference. What do
you think?

Aljoscha


On 28. Aug 2019, at 15:59, Seth Wiesman  wrote:

I would prefer ‘lit()’ over  ‘val()’ since val is a keyword in Scala.

Assuming the intention is to make the dsl ergonomic for Scala developers.

Seth


On Aug 28, 2019, at 7:58 AM, Timo Walther  wrote:

Hi David,

thanks for your feedback. I was also skeptical about 1 char method

names, I restored the `val()` method for now. If you read literature such
as Wikipedia [1]: "literal is a notation for representing a fixed value in
source code. Almost all programming languages have notations for atomic
values". So they are also talking about "values".

Alteratively we could use `lit(12)` or `l(12)` but I'm not convinced

that this is better.

Regards,
Timo

[1] https://en.wikipedia.org/wiki/Literal_(computer_programming)


On 27.08.19 22:10, David Anderson wrote:
TImo,

While it's not exactly pretty, I don't mind the $("field") construct.
It's not particularly surprising. The v() method troubles me more; it
looks mysterious. I think we would do better to have something more
explicit. val() isn't much better -- val("foo") could be interpreted
to mean the value of the "foo" column, or a literal string.

David


On Tue, Aug 27, 2019 at 5:45 PM Timo Walther 

wrote:

Hi David,

thanks for your feedback. With the current design, the DSL would be

free

of any ambiguity but it is definitely more verbose esp. around

defining

values.

I would be happy about further suggestions that make the DSL more
readable. I'm also not sure if we go for `$()` and `v()` instead of

more

readable `ref()` and `val()`. This could maybe make it look less
"alien", what do you think?

Some people mentioned to overload certain methods for accepting

values

or column names. E.g. `$("field").isEqual("str")` but then string

values

could be confused with column names.

Thanks,
Timo


On 27.08.19 17:34, David Anderson wrote:
In general I'm in favor of anything that is going to make the Table
API easier to learn and more predictable in its behavior. This
proposal kind of falls in the middle. As someone who has spent hours
in the crevices between the various flavors of the current
implementations, I certainly view keeping the various APIs and DSLs
more in sync, and making them less buggy, as highly desirable.

On the other hand, some of the details in the proposal do make the
resulting user code less pretty and less approachable than the

current

Java DSL. In a training context it will be easy to teach, but I

wonder

if we can find a way to make it look less alien at first glance.

David


On Wed, Aug 21, 2019 at 1:33 PM Timo Walther 

wrote:

Hi everyone,

some of you might 

Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-02 Thread jincheng sun
Thanks for all of your feedback!

Hi Jark, Glad to see that you are doing what RM should doing.

Only one tips here is before the RC1 all the blocker should be fixed, but
othrers is nice to have. So you can decide when to prepare RC1 after the
blokcer is resolved.

Feel free to tell me if you have any questions.

Best,Jincheng

Aljoscha Krettek  于2019年9月2日周一 下午5:03写道:

> I cut a PR for FLINK-13586: https://github.com/apache/flink/pull/9595 <
> https://github.com/apache/flink/pull/9595>
>
> > On 2. Sep 2019, at 05:03, Yu Li  wrote:
> >
> > +1 for a 1.8.2 release, thanks for bringing this up Jincheng!
> >
> > Best Regards,
> > Yu
> >
> >
> > On Mon, 2 Sep 2019 at 09:19, Thomas Weise  wrote:
> >
> >> +1 for the 1.8.2 release
> >>
> >> I marked https://issues.apache.org/jira/browse/FLINK-13586 for this
> >> release. It would be good to compensate for the backward incompatible
> >> change to ClosureCleaner that was introduced in 1.8.1, which affects
> >> downstream dependencies.
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
> >> On Sun, Sep 1, 2019 at 5:10 PM jincheng sun 
> >> wrote:
> >>
> >>> Hi Jark,
> >>>
> >>> Glad to hear that you want to be the Release Manager of flink 1.8.2.
> >>> I believe that you will be a great RM, and I am very willing to help
> you
> >>> with the final release in the final stages. :)
> >>>
> >>> The release of Apache Flink involves a number of tasks. For details,
> you
> >>> can consult the documentation [1]. If you have any questions, please
> let
> >> me
> >>> know and let us work together.
> >>>
> >>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release#CreatingaFlinkRelease-Checklisttoproceedtothenextstep.1
> >>>
> >>> Cheers,
> >>> Jincheng
> >>>
> >>> Till Rohrmann  于2019年8月31日周六 上午12:59写道:
> >>>
>  +1 for a 1.8.2 bug fix release. Thanks for kicking this discussion off
>  Jincheng.
> 
>  Cheers,
>  Till
> 
>  On Fri, Aug 30, 2019 at 6:45 PM Jark Wu  wrote:
> 
> > Thanks Jincheng for bringing this up.
> >
> > +1 to the 1.8.2 release, because it already contains a couple of
>  important
> > fixes and it has been a long time since 1.8.1 came out.
> > I'm willing to help the community as much as possible. I'm wondering
> >>> if I
> > can be the release manager of 1.8.2 or work with you together
> >>> @Jincheng?
> >
> > Best,
> > Jark
> >
> > On Fri, 30 Aug 2019 at 18:58, Hequn Cheng 
> >>> wrote:
> >
> >> Hi Jincheng,
> >>
> >> +1 for a 1.8.2 release.
> >> Thanks a lot for raising the discussion. It would be nice to have
> >>> these
> >> critical fixes.
> >>
> >> Best, Hequn
> >>
> >>
> >> On Fri, Aug 30, 2019 at 6:31 PM Maximilian Michels  >>>
> > wrote:
> >>
> >>> Hi Jincheng,
> >>>
> >>> +1 I would be for a 1.8.2 release such that we can fix the
> >> problems
> > with
> >>> the nested closure cleaner which currently block 1.8.1 users with
>  Beam:
> >>> https://issues.apache.org/jira/browse/FLINK-13367
> >>>
> >>> Thanks,
> >>> Max
> >>>
> >>> On 30.08.19 11:25, jincheng sun wrote:
>  Hi Flink devs,
> 
>  It has been nearly 2 months since the 1.8.1 released. So, what
> >> do
>  you
> >>> think
>  about releasing Flink 1.8.2 soon?
> 
>  We already have some blocker and critical fixes in the
> >>> release-1.8
> >>> branch:
> 
>  [Blocker]
>  - FLINK-13159 java.lang.ClassNotFoundException when restore job
>  - FLINK-10368 'Kerberized YARN on Docker test' unstable
>  - FLINK-12578 Use secure URLs for Maven repositories
> 
>  [Critical]
>  - FLINK-12736 ResourceManager may release TM with allocated
> >> slots
>  - FLINK-12889 Job keeps in FAILING state
>  - FLINK-13484 ConnectedComponents end-to-end test instable with
>  NoResourceAvailableException
>  - FLINK-13508 CommonTestUtils#waitUntilCondition() may attempt
> >> to
> > sleep
>  with negative time
>  - FLINK-13806 Metric Fetcher floods the JM log with errors when
> >>> TM
>  is
> >>> lost
> 
>  Furthermore, I think the following one blocker issue should be
>  merged
>  before 1.8.2 release.
> 
>  - FLINK-13897: OSS FS NOTICE file is placed in wrong directory
> 
>  It would also be great if we can have the fix of
> >> Elasticsearch6.x
> >>> connector
>  threads leaking (FLINK-13689) in 1.8.2 release which is
> >>> identified
>  as
> >>> major.
> 
>  Please let me know what you think?
> 
>  Cheers,
>  Jincheng
> 
> >>>
> >>
> >
> 
> >>>
> >>
>
>


Re: Potential block size issue with S3 binary files

2019-09-02 Thread Arvid Heise
Hi Ken,

that's indeed a very odd issue that you found. I had a hard time to connect
block size with S3 in the beginning and had to dig into the code. I still
cannot fully understand why you got two different block size values from
the S3 FileSytem. Looking into Hadoop code, I found the following snippet

public long getDefaultBlockSize() {
return this.getConf().getLong("fs.s3.block.size", 67108864L);
}

I don't see a quick fix for that. Looks like mismatching configurations on
different machines. We should probably have some sanity checks to detect
mismatching block header information, but unfortunately, the block header
is very primitive and doesn't allow for sophisticated checks.

So let's focus on implementation solutions:
1. I gather that you need to have support for data that uses
IOReadableWritable. So moving to more versatile solutions like Avro or
Parquet is unfortunately not an option. I'd still recommend that for any
new project.
2. Storing block size into the repeated headers in a file introduces a kind
of hen-and-egg problem. You need the block size to read the header to get
the block size.
3. Storing block size once in first block would require additional seeks
and depending of the degree of parallelism would put a rather high load on
the data node with the first block.
4. Storing block size in metadata would be ideal but with the wide range of
possible filesystems most likely not doable.
5. Explicitly setting the block size would be the most reliable technique
but quite user-unfriendly, especially, if multiple deployment environment
use different block sizes.
6. Adding a periodic marker seems indeed as the most robust option and
adding 20 bytes every 2k bytes doesn't seem too bad for me. The downside is
that seeking can take a long time for larger records as it will linearly
scan through the bytes at the block start. However, if you really want to
support copying files across file systems with different block sizes, this
would be the only option.
7. Deprecating sequence format is a good option in the long run. I simply
don't see that for productive code the performance gain over using Avro or
Parquet would be noticeable and getting a solid base concept for schema
evolution will pay off quickly from my experience.

@Ken, could you please describe for what kind of data do you use the
sequence format? I like to understand your requirements. How large are your
records (OoM)? Are they POJOs? Do you craft them manually?

Best,

Arvid

On Sun, Sep 1, 2019 at 9:42 PM Stephan Ewen  wrote:

> Sounds reasonable.
>
> I am adding Arvid to the thread - IIRC he authored that tool in his
> Stratosphere days. And my a stroke of luck, he is now working on Flink
> again.
>
> @Arvid - what are your thoughts on Ken's suggestions?
>
> On Fri, Aug 30, 2019 at 8:56 PM Ken Krugler 
> wrote:
>
> > Hi Stephan (switching to dev list),
> >
> > On Aug 29, 2019, at 2:52 AM, Stephan Ewen  wrote:
> >
> > That is a good point.
> >
> > Which way would you suggest to go? Not relying on the FS block size at
> > all, but using a fix (configurable) block size?
> >
> >
> > There’s value to not requiring a fixed block size, as then a file that’s
> > moved between different file systems can be read using whatever block
> size
> > is optimal for that system.
> >
> > Hadoop handles this in sequence files by storing a unique “sync marker”
> > value in the file header (essentially a 16 byte UUID), injecting one of
> > these every 2K bytes or so (in between records), and then code can scan
> for
> > this to find record boundaries without relying on a block size. The idea
> is
> > that 2^128 is a Big Number, so the odds of finding a false-positive sync
> > marker in data is low enough to be ignorable.
> >
> > But that’s a bigger change. Simpler would be to put a header in each part
> > file being written, with some signature bytes to aid in detecting
> > old-format files.
> >
> > Or maybe deprecate SerializedOutputFormat/SerializedInputFormat, and
> > provide some wrapper glue to make it easier to write/read Hadoop
> > SequenceFiles that have a null key value, and store the POJO as the data
> > value. Then you could also leverage Hadoop support for compression at
> > either record or block level.
> >
> > — Ken
> >
> >
> > On Thu, Aug 29, 2019 at 4:49 AM Ken Krugler  >
> > wrote:
> >
> >> Hi all,
> >>
> >> Wondering if anyone else has run into this.
> >>
> >> We write files to S3 using the SerializedOutputFormat.
> >> When we read them back, sometimes we get deserialization errors where
> the
> >> data seems to be corrupt.
> >>
> >> After a lot of logging, the weathervane of blame pointed towards the
> >> block size somehow not being the same between the write (where it’s
> 64MB)
> >> and the read (unknown).
> >>
> >> When I added a call to SerializedInputFormat.setBlockSize(64MB), the
> >> problems went away.
> >>
> >> It looks like both input and output formats use fs.getDefaultBlockSize()
> >> to set this value by default, so maybe 

Kafka Checkpointing weird behavior.

2019-09-02 Thread Dominik Wosiński
Hey,
I just want to understand something, because I am observing weird behavior
of Kafka Consumer > 0.8 .

So the idea is, if we enable the checkpointing and enable the commit
offsets on checkpoint, which AFAIK is enabled by default, then for versions
of Kafka > 0.8 we should see the changes in the topics where broker keeps
the offsets for the particular groups am I correct here ?

And do I understand it correctly, that we can keep the offsets for Flink
between job restarts In two ways:
1) Cancel with savepoint and start the job with savepoint again.
2) When using group offsets Flink should be able to determine the offset
for particular group by using the Kafka offsets ?

Is my understanding correct?

Thanks in advance,
Best Regards.
Dom.


[jira] [Created] (FLINK-13939) pyflink ExecutionConfigTests#test_equals_and_hash test failed

2019-09-02 Thread vinoyang (Jira)
vinoyang created FLINK-13939:


 Summary: pyflink ExecutionConfigTests#test_equals_and_hash test 
failed
 Key: FLINK-13939
 URL: https://issues.apache.org/jira/browse/FLINK-13939
 Project: Flink
  Issue Type: Test
  Components: API / Python, Tests
Affects Versions: 1.9.0
Reporter: vinoyang


 
=== FAILURES ===
 
669__ ExecutionConfigTests.test_equals_and_hash 
___
 
670
 
671self = 
 
672
 
673 def test_equals_and_hash(self):
 
674 
 
675 config1 = ExecutionEnvironment.get_execution_environment().get_config()
 
676 
 
677 config2 = ExecutionEnvironment.get_execution_environment().get_config()
 
678 
 
679 self.assertEqual(config1, config2)
 
680 
 
681> self.assertEqual(hash(config1), hash(config2))
 
682E AssertionError: 897378335 != 1596606912
 
683
 
684pyflink/common/tests/test_execution_config.py:277: AssertionError
 
685=== warnings summary 
===
 
686.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13
 
687.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13
 
688.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13
 
689.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13
 
690.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13
 
691 
/home/travis/build/flink-ci/flink/flink-python/.tox/py37/lib/python3.7/site-packages/py4j/java_collections.py:13:
 DeprecationWarning: Using or importing the ABCs from 'collections' instead of 
from 'collections.abc' is deprecated, and in 3.8 it will stop working
 
692 from collections import (
 
693
 
694-- Docs: https://docs.pytest.org/en/latest/warnings.html
 
695== 1 failed, 373 passed, 5 warnings in 34.10s 
==
 
696ERROR: InvocationError for command 
/home/travis/build/flink-ci/flink/flink-python/.tox/py37/bin/pytest (exited 
with code 1)

more details: https://api.travis-ci.com/v3/job/229979817/log.txt



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13940) S3RecoverableWriter causes job to get stuck in recovery

2019-09-02 Thread Jimmy Weibel Rasmussen (Jira)
Jimmy Weibel Rasmussen created FLINK-13940:
--

 Summary: S3RecoverableWriter causes job to get stuck in recovery
 Key: FLINK-13940
 URL: https://issues.apache.org/jira/browse/FLINK-13940
 Project: Flink
  Issue Type: Bug
  Components: Connectors / FileSystem
Affects Versions: 1.9.0, 1.8.1, 1.8.0
Reporter: Jimmy Weibel Rasmussen


 
 
 
The cleaning up of tmp files in S3 introduced by this ticket/PR:
https://issues.apache.org/jira/browse/FLINK-10963
 
is preventing the flink job from being able to recover under some circumstances.
 
 
This is what seems to be happening:
When the jobs tries to recover, it will call initializeState() on all 
operators, which results in the Bucket.restoreInProgressFile method being 
called.
This will download the part_tmp file mentioned in the checkpoint that we're 
restoring from, and finally it will call fsWriter.cleanupRecoverableState which 
deletes the part_tmp file in S3.
 
Now the open() method is called on all operators. If the open() call fails for 
one of the operators (this might happen if the issue that caused the job to 
fail and restart is still unresolved), the job will fail again and try to 
restart from the same checkpoint as before.
 
 
This time however, downloading the part_tmp file mentioned in the checkpoint 
fails because it was deleted during the last recover attempt.
 
The bug is critical because it results in data loss.
 
 
 
I discovered the bug because I have a flink job with a RabbitMQ source and a 
StreamingFileSink that writes to S3 (and therefore uses the 
S3RecoverableWriter).
Occasionally I have some RabbitMQ connection issues which causes the job to 
fail and restart, sometimes the first few restart attempts fail because 
rabbitmq is unreachable when flink tries to reconnect.
 
This is what I was seeing:
RabbitMQ goes down
Job fails because of a RabbitMQ ConsumerCancelledException
Job attempts to restart but fails with a Rabbitmq connection exception (x 
number of times)
RabbitMQ is back up
Job attempts to restart but fails with a FileNotFoundException due to some 
_part_tmp file missing in S3.
 
The job will be unable to restart and only option is to cancel and restart the 
job (and loose all state)
 
 
 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Releasing Flink 1.8.2

2019-09-02 Thread Kostas Kloudas
Hi all,

I think this should be also considered a blocker
https://issues.apache.org/jira/browse/FLINK-13940.
It is not a regression but it can result to data loss.

I think I can have a quick fix by tomorrow.

Cheers,
Kostas

On Mon, Sep 2, 2019 at 12:01 PM jincheng sun  wrote:
>
> Thanks for all of your feedback!
>
> Hi Jark, Glad to see that you are doing what RM should doing.
>
> Only one tips here is before the RC1 all the blocker should be fixed, but
> othrers is nice to have. So you can decide when to prepare RC1 after the
> blokcer is resolved.
>
> Feel free to tell me if you have any questions.
>
> Best,Jincheng
>
> Aljoscha Krettek  于2019年9月2日周一 下午5:03写道:
>
> > I cut a PR for FLINK-13586: https://github.com/apache/flink/pull/9595 <
> > https://github.com/apache/flink/pull/9595>
> >
> > > On 2. Sep 2019, at 05:03, Yu Li  wrote:
> > >
> > > +1 for a 1.8.2 release, thanks for bringing this up Jincheng!
> > >
> > > Best Regards,
> > > Yu
> > >
> > >
> > > On Mon, 2 Sep 2019 at 09:19, Thomas Weise  wrote:
> > >
> > >> +1 for the 1.8.2 release
> > >>
> > >> I marked https://issues.apache.org/jira/browse/FLINK-13586 for this
> > >> release. It would be good to compensate for the backward incompatible
> > >> change to ClosureCleaner that was introduced in 1.8.1, which affects
> > >> downstream dependencies.
> > >>
> > >> Thanks,
> > >> Thomas
> > >>
> > >>
> > >> On Sun, Sep 1, 2019 at 5:10 PM jincheng sun 
> > >> wrote:
> > >>
> > >>> Hi Jark,
> > >>>
> > >>> Glad to hear that you want to be the Release Manager of flink 1.8.2.
> > >>> I believe that you will be a great RM, and I am very willing to help
> > you
> > >>> with the final release in the final stages. :)
> > >>>
> > >>> The release of Apache Flink involves a number of tasks. For details,
> > you
> > >>> can consult the documentation [1]. If you have any questions, please
> > let
> > >> me
> > >>> know and let us work together.
> > >>>
> > >>>
> > >>>
> > >>
> > https://cwiki.apache.org/confluence/display/FLINK/Creating+a+Flink+Release#CreatingaFlinkRelease-Checklisttoproceedtothenextstep.1
> > >>>
> > >>> Cheers,
> > >>> Jincheng
> > >>>
> > >>> Till Rohrmann  于2019年8月31日周六 上午12:59写道:
> > >>>
> >  +1 for a 1.8.2 bug fix release. Thanks for kicking this discussion off
> >  Jincheng.
> > 
> >  Cheers,
> >  Till
> > 
> >  On Fri, Aug 30, 2019 at 6:45 PM Jark Wu  wrote:
> > 
> > > Thanks Jincheng for bringing this up.
> > >
> > > +1 to the 1.8.2 release, because it already contains a couple of
> >  important
> > > fixes and it has been a long time since 1.8.1 came out.
> > > I'm willing to help the community as much as possible. I'm wondering
> > >>> if I
> > > can be the release manager of 1.8.2 or work with you together
> > >>> @Jincheng?
> > >
> > > Best,
> > > Jark
> > >
> > > On Fri, 30 Aug 2019 at 18:58, Hequn Cheng 
> > >>> wrote:
> > >
> > >> Hi Jincheng,
> > >>
> > >> +1 for a 1.8.2 release.
> > >> Thanks a lot for raising the discussion. It would be nice to have
> > >>> these
> > >> critical fixes.
> > >>
> > >> Best, Hequn
> > >>
> > >>
> > >> On Fri, Aug 30, 2019 at 6:31 PM Maximilian Michels  > >>>
> > > wrote:
> > >>
> > >>> Hi Jincheng,
> > >>>
> > >>> +1 I would be for a 1.8.2 release such that we can fix the
> > >> problems
> > > with
> > >>> the nested closure cleaner which currently block 1.8.1 users with
> >  Beam:
> > >>> https://issues.apache.org/jira/browse/FLINK-13367
> > >>>
> > >>> Thanks,
> > >>> Max
> > >>>
> > >>> On 30.08.19 11:25, jincheng sun wrote:
> >  Hi Flink devs,
> > 
> >  It has been nearly 2 months since the 1.8.1 released. So, what
> > >> do
> >  you
> > >>> think
> >  about releasing Flink 1.8.2 soon?
> > 
> >  We already have some blocker and critical fixes in the
> > >>> release-1.8
> > >>> branch:
> > 
> >  [Blocker]
> >  - FLINK-13159 java.lang.ClassNotFoundException when restore job
> >  - FLINK-10368 'Kerberized YARN on Docker test' unstable
> >  - FLINK-12578 Use secure URLs for Maven repositories
> > 
> >  [Critical]
> >  - FLINK-12736 ResourceManager may release TM with allocated
> > >> slots
> >  - FLINK-12889 Job keeps in FAILING state
> >  - FLINK-13484 ConnectedComponents end-to-end test instable with
> >  NoResourceAvailableException
> >  - FLINK-13508 CommonTestUtils#waitUntilCondition() may attempt
> > >> to
> > > sleep
> >  with negative time
> >  - FLINK-13806 Metric Fetcher floods the JM log with errors when
> > >>> TM
> >  is
> > >>> lost
> > 
> >  Furthermore, I think the following one blocker issue should be
> >  merged
> >  before 1.8.2 release.
> > 
> >  - FLINK-13897: OSS FS NOTICE 

[jira] [Created] (FLINK-13941) Prevent data-loss by not cleaning up small part files from S3.

2019-09-02 Thread Kostas Kloudas (Jira)
Kostas Kloudas created FLINK-13941:
--

 Summary: Prevent data-loss by not cleaning up small part files 
from S3.
 Key: FLINK-13941
 URL: https://issues.apache.org/jira/browse/FLINK-13941
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / FileSystem
Affects Versions: 1.9.0, 1.8.1, 1.8.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.8.2






--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-13942) Add Overview page for Getting Started section

2019-09-02 Thread Fabian Hueske (Jira)
Fabian Hueske created FLINK-13942:
-

 Summary: Add Overview page for Getting Started section
 Key: FLINK-13942
 URL: https://issues.apache.org/jira/browse/FLINK-13942
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Affects Versions: 1.9.0, 1.10.0
Reporter: Fabian Hueske
Assignee: Fabian Hueske


The Getting Started section provide different types of tutorials that target 
users with different interests and backgrounds.

We should add a brief overview page that describes the different tutorials such 
that users easily find the material that they need to get started with Flink.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] FLIP-53: Fine Grained Resource Management

2019-09-02 Thread Zhu Zhu
Thanks Xintong for proposing this improvement. Fine grained resources can
be very helpful when user has good planning on resources.

I have a few questions:
1. Currently in a batch job, vertices from different regions can run at the
same time in slots from the same shared group, as long as they do not have
data dependency on each other and available slot count is not smaller than
the *max* of parallelism of all tasks.
With changes in this FLIP however, tasks from different regions cannot
share slots anymore.
Once available slot count is smaller than the *sum* of all parallelism of
tasks from all regions, tasks may need to be executed sequentially, which
might result in a performance regression.
Is this(performance regression to existing DataSet jobs) considered as a
necessary and accepted trade off in this FLIP?

2. The network memory depends on the input/output ExecutionEdge count and
thus can be different even for parallel instances of the same JobVertex.
Does this mean that when adding task resources to calculating the slot
resource for a shared group, the max possible network memory of the vertex
instance shall be used?
This might result in larger resource required than actually needed.

And some minor comments:
1. Regarding "fracManagedMemOnHeap = 1 / numOpsUseOnHeapManagedMemory", I
guess you mean numOpsUseOnHeapManagedMemoryInTheSameSharedGroup ?
2. I think the *StreamGraphGenerator* in the #Slot Sharing section and
implementation step 4 should be *StreamingJobGraphGenerator*, as
*StreamGraphGenerator* is not aware of JobGraph and pipelined region.


Thanks,
Zhu Zhu

Xintong Song  于2019年9月2日周一 上午11:59写道:

> Updated the FLIP wiki page [1], with the following changes.
>
>- Remove the step of converting pipelined edges between different slot
>sharing groups into blocking edges.
>- Set `allSourcesInSamePipelinedRegion` to true by default.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Sep 2, 2019 at 11:50 AM Xintong Song 
> wrote:
>
> > Regarding changing edge type, I think actually we don't need to do this
> > for batch jobs neither, because we don't have public interfaces for users
> > to explicitly set slot sharing groups in DataSet API and SQL/Table API.
> We
> > have such interfaces in DataStream API only.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Tue, Aug 27, 2019 at 10:16 PM Xintong Song 
> > wrote:
> >
> >> Thanks for the correction, Till.
> >>
> >> Regarding your comments:
> >> - You are right, we should not change the edge type for streaming jobs.
> >> Then I think we can change the option 'allSourcesInSamePipelinedRegion'
> in
> >> step 2 to 'isStreamingJob', and implement the current step 2 before the
> >> current step 1 so we can use this option to decide whether should change
> >> the edge type. What do you think?
> >> - Agree. It should be easier to make the default value of
> >> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true', and set
> it
> >> to 'false' when using DataSet API or blink planner.
> >>
> >> Thank you~
> >>
> >> Xintong Song
> >>
> >>
> >>
> >> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann 
> >> wrote:
> >>
> >>> Thanks for creating the implementation plan Xintong. Overall, the
> >>> implementation plan looks good. I had a couple of comments:
> >>>
> >>> - What will happen if a user has defined a streaming job with two slot
> >>> sharing groups? Would the code insert a blocking data exchange between
> >>> these two groups? If yes, then this breaks existing Flink streaming
> jobs.
> >>> - How do we detect unbounded streaming jobs to set
> >>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be easier to
> >>> set
> >>> it false if we are using the DataSet API or the Blink planner with a
> >>> bounded job?
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann 
> >>> wrote:
> >>>
> >>> > I guess there is a typo since the link to the FLIP-53 is
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A+Fine+Grained+Operator+Resource+Management
> >>> >
> >>> > Cheers,
> >>> > Till
> >>> >
> >>> > On Tue, Aug 27, 2019 at 1:42 PM Xintong Song 
> >>> > wrote:
> >>> >
> >>> >> Added implementation steps for this FLIP on the wiki page [1].
> >>> >>
> >>> >>
> >>> >> Thank you~
> >>> >>
> >>> >> Xintong Song
> >>> >>
> >>> >>
> >>> >> [1]
> >>> >>
> >>> >>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
> >>> >>
> >>> >> On Mon, Aug 19, 2019 at 10:29 PM Xintong Song <
> tonysong...@gmail.com>
> >>> >> wrote:
> >>> >>
> >>> >> > Hi everyone,
> >>> >> >
> >>> >> > As Till suggested, the original "FLIP-53: Fine Grained Resource
> >>> >> > Management" splits into two separate FLIPs,
> >>> >> >
> >>> >> >- FLIP-53: Fine Grained Operator Resource Management [1]
> >>> >> >- FLIP-56: Dynamic Slot Allocation [2]
> >>> >> >
> >>> >> > We'll continue using this discussion thread for FLIP-53. For
> >>> FLIP-56, I
> 

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-02 Thread Becket Qin
Hi Timo and Dawid,

Thanks for the patient reply. I agree that both option a) and option b) can
solve the mutability problem.

For option a), is it a little intrusive to add a duplicate() method for a
Configurable? It would be great if we don't put this burden on users if
possible.

For option b), I actually feel it is slightly better than option a) from
API perspective as getFactory() seems a more understandable method of a
Configurable compared with duplicate(). And users do not need to implement
much more logic.

I am curious what is the downside of keeping the Configuration simple to
only have primitive types, and always create the Configurable using a util
method? If Configurables are rare, do we need to put the instantiation /
bookkeeping logic in Configuration?

 @Becket for the toConfiguration this is required for shipping the
> Configuration to TaskManager, so that we do not have to use java
> serializability.

Do you mean a Configurable may be created and configured directly without
reading settings from a Configuration instance? I thought a Configurable
will always be created via a ConfigurableFactory by extracting required
configs from a Configuration. Am I missing something?

Thanks,

Jiangjie (Becket) Qin

On Mon, Sep 2, 2019 at 4:47 PM Dawid Wysakowicz 
wrote:

> Hi Timo, Becket
>
> From the options that Timo suggested for improving the mutability
> situation I would prefer option a) as this is the more explicit option
> and simpler option. Just as a remark, I think in general Configurable
> types for options will be rather very rare for some special use-cases,
> as the majority of options are rather simpler parameters of primitive
> types (+duration, memory)
>
> @Becket for the toConfiguration this is required for shipping the
> Configuration to TaskManager, so that we do not have to use java
> serializability.
>
> Best,
>
> Dawid
>
>
> On 02/09/2019 10:05, Timo Walther wrote:
> > Hi Becket,
> >
> > Re 1 & 3: "values in configurations should actually be immutable"
> >
> > I would also prefer immutability but most of our configuration is
> > mutable due to serialization/deserialization. Also maps and list could
> > be mutable in theory. It is difficult to really enforce that for
> > nested structures. I see two options:
> >
> > a) For the original design: How about we force implementers to add a
> > duplicate() method in a Configurable object? This would mean that the
> > object is still mutable but by duplicating the object both during
> > reading and writing we would avoid the problem you described.
> >
> > b) For the current design: We still use the factory approach but let a
> > Configurable object implement a getFactory() method such that we know
> > how to serialize the object. With the help of a factory we can also
> > duplicate the object easily during reading and writing and ensure
> > immutability.
> >
> > I would personally go for approach a) to not over-engineer this topic.
> > But I'm open for option b).
> >
> > Regards,
> > Timo
> >
> >
> > On 31.08.19 04:09, Becket Qin wrote:
> >> Hi Timo,
> >>
> >> Thanks for the reply. I am still a little concerned over the
> >> mutability of
> >> the Configurable which could be the value in Configuration.
> >>
> >> Re: 1
> >>
> >>> But in general, people should not use any internal fields.
> >>> Configurable objects are meant for simple little helper POJOs, not
> >>> complex arbitrary nested data structures.
> >> This seems difficult to enforce... Ideally the values in configurations
> >> should actually be immutable. The value can only be changed by
> >> explicitly
> >> calling setters in Configuration. Otherwise we may have weird situation
> >> where the Configurable in the same configuration are different in two
> >> places because the configurable is modified in one places and not
> >> modified
> >> in another place. So I am a little concerned on putting a
> >> Configurable type
> >> in the Configuration map, because the value could be modified without
> >> explicitly setting the configuration. For example, can users do the
> >> following?
> >>
> >> Configurable configurable =
> >> configuration.getConfigurable(myConfigurableOption);
> >> configurable.setConfigA(123); // this already changes the configurable
> >> object in the configuration.
> >>
> >> Re: 2
> >> Thanks for confirming. As long as users will not have a situation where
> >> they need to set two configurations with the same key but different
> >> descriptions, I think it is OK.
> >>
> >> Re: 3
> >> This is actually kind of related to 1. i.e. Whether toConfiguration()
> >> guarantees the exact same object can be rebuilt from the
> >> configuration or
> >> not. I am still not quite sure about the use case of toConfiguration()
> >> though. It seems indicating the Configurable is mutable, which might be
> >> dangerous.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >> On Fri, Aug 30, 2019 at 10:04 PM Timo Walther 
> >> wrote:
> >>
> >>> Hi Becket,
> >>>
> >>> 1. First

Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-02 Thread Xintong Song
Hi everyone,

I'm here to re-start the voting process for FLIP-49 [1], with respect to
consensus reached in this thread [2] regarding some new comments and
concerns.

This voting will be open for at least 72 hours. I'll try to close it Sep.
5, 14:00 UTC, unless there is an objection or not enough votes.

Thank you~

Xintong Song


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html

On Tue, Aug 27, 2019 at 9:29 PM Xintong Song  wrote:

> Alright, then let's keep the discussion in the DISCUSS mailing thread, and
> see whether we need to restart the vote.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Tue, Aug 27, 2019 at 8:12 PM Till Rohrmann 
> wrote:
>
>> I had a couple of comments concerning the implementation plan. I've posted
>> them to the original discussion thread. Depending on the outcome of this
>> discussion we might need to restart the vote.
>>
>> Cheers,
>> Till
>>
>> On Tue, Aug 27, 2019 at 11:14 AM Xintong Song 
>> wrote:
>>
>> > Hi all,
>> >
>> > I would like to start the voting process for FLIP-49 [1], which is
>> > discussed and reached consensus in this thread [2].
>> >
>> > This voting will be open for at least 72 hours. I'll try to close it
>> Aug.
>> > 30 10:00 UTC, unless there is an objection or not enough votes.
>> >
>> > Thank you~
>> >
>> > Xintong Song
>> >
>> >
>> > [1]
>> >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-49%3A+Unified+Memory+Configuration+for+TaskExecutors
>> > [2]
>> >
>> >
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
>> >
>>
>


Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-02 Thread jincheng sun
Hi Shaoxuan,

Thanks for reminding that. I think "Flink Python User-Defined Function for
Table"  make sense to me.

Best,
Jincheng

Timo Walther  于2019年9月2日周一 下午5:04写道:

> Hi all,
>
> the FLIP looks awesome. However, I would like to discuss the changes to
> the user-facing parts again. Some feedback:
>
> 1. DataViews: With the current non-annotation design for DataViews, we
> cannot perform eager state declaration, right? At which point during
> execution do we know which state is required by the function? We need to
> instantiate the function first, right?
>
> 2. Serializability of functions: How do we ensure serializability of
> functions for catalog persistence? In the Scala/Java API, we would like
> to register classes instead of instances soon. This is the only way to
> store a function properly in a catalog or we need some
> serialization/deserialization logic in the function interfaces to
> convert an instance to string properties.
>
> 3. TableEnvironment: What is the signature of `register_function(self,
> name, function)`? Does it accept both a class and function? Like `class
> Sum` and `def split()`? Could you add some examples for registering both
> kinds of functions?
>
> 4. FunctionDefinition: Function definition is not a user-defined
> function definition. It is the highest interface for both user-defined
> and built-in functions. I'm not sure if getLanguage() should be part of
> this interface or one-level down which would be `UserDefinedFunction`.
> Built-in functions will never be implemented in a different language. In
> any case, I would vote for removing the UNKNOWN language, because it
> does not solve anything. Why should a user declare a function that the
> runtime can not handle? I also find the term `JAVA` confusing for Scala
> users. How about `FunctionLanguage.JVM` instead?
>
> 5. Function characteristics: In the current design, function classes do
> not extend from any upper class. How can users declare characteristics
> that are present in `FunctionDefinition` like determinism, requirements,
> or soon also monotonism.
>
> Thanks,
> Timo
>
>
> On 02.09.19 03:38, Shaoxuan Wang wrote:
> > Hi Jincheng, Fudian, and Aljoscha,
> > I am assuming the proposed python UDX can also be applied to Flink SQL.
> > Is this correct? If yes, I would suggest to title the FLIP as "Flink
> Python
> > User-Defined Function" or "Flink Python User-Defined Function for Table".
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Wed, Aug 28, 2019 at 12:22 PM jincheng sun 
> > wrote:
> >
> >> Thanks for the feedback Bowen!
> >>
> >> Great thanks for create the FLIP and bring up the VOTE Dian!
> >>
> >> Best, Jincheng
> >>
> >> Dian Fu  于2019年8月28日周三 上午11:32写道:
> >>
> >>> Hi all,
> >>>
> >>> I have started a voting thread [1]. Thanks a lot for your help during
> >>> creating the FLIP @Jincheng.
> >>>
> >>>
> >>> Hi Bowen,
> >>>
> >>> Very appreciated for your comments. I have replied you in the design
> doc.
> >>> As it seems that the comments doesn't affect the overall design, I'll
> not
> >>> cancel the vote for now and we can continue the discussion in the
> design
> >>> doc.
> >>>
> >>> [1]
> >>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
> >>> <
> >>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html
> >>> Regards,
> >>> Dian
> >>>
>  在 2019年8月28日,上午11:05,Bowen Li  写道:
> 
>  Hi Jincheng and Dian,
> 
>  Sorry for being late to the party. I took a glance at the proposal,
> >> LGTM
> >>> in
>  general, and I left only a couple comments.
> 
>  Thanks,
>  Bowen
> 
> 
>  On Mon, Aug 26, 2019 at 8:05 PM Dian Fu 
> wrote:
> 
> > Hi Jincheng,
> >
> > Thanks! It works.
> >
> > Thanks,
> > Dian
> >
> >> 在 2019年8月27日,上午10:55,jincheng sun  写道:
> >>
> >> Hi Dian, can you check if you have edit access? :)
> >>
> >>
> >> Dian Fu  于2019年8月26日周一 上午10:52写道:
> >>
> >>> Hi Jincheng,
> >>>
> >>> Appreciated for the kind tips and offering of help. Definitely need
> >>> it!
> >>> Could you grant me write permission for confluence? My Id: Dian Fu
> >>>
> >>> Thanks,
> >>> Dian
> >>>
>  在 2019年8月26日,上午9:53,jincheng sun  写道:
> 
>  Thanks for your feedback Hequn & Dian.
> 
>  Dian, I am glad to see that you want help to create the FLIP!
>  Everyone will have first time, and I am very willing to help you
> > complete
>  your first FLIP creation. Here some tips:
> 
>  - First I'll give your account write permission for confluence.
>  - Before create the FLIP, please have look at the FLIP Template
> >> [1],
> >>> (It's
>  better to know more about FLIP by reading [2])
>  - Create Flink Python UDFs related 

Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-02 Thread jincheng sun
Hi Timo,

Great thanks for your feedback. I would like to share my thoughts with you
inline. :)

Best,
Jincheng

Timo Walther  于2019年9月2日周一 下午5:04写道:

> Hi all,
>
> the FLIP looks awesome. However, I would like to discuss the changes to
> the user-facing parts again. Some feedback:
>
> 1. DataViews: With the current non-annotation design for DataViews, we
> cannot perform eager state declaration, right? At which point during
> execution do we know which state is required by the function? We need to
> instantiate the function first, right?
>

>> We will analysis the Python AggregateFunction and extract the DataViews
used in the Python AggregateFunction. This can be done
by instantiate a Python AggregateFunction, creating an accumulator by
calling method create_accumulator and then analysis the created
accumulator. This is actually similar to the way that Java
AggregateFunction processing codegen logic. The extracted DataViews can
then be used to construct the StateDescriptors in the operator, i.e., we
should have hold the state spec and the state descriptor id in Java
operator and Python worker can access the state by specifying the
corresponding state descriptor id.



> 2. Serializability of functions: How do we ensure serializability of
> functions for catalog persistence? In the Scala/Java API, we would like
> to register classes instead of instances soon. This is the only way to
> store a function properly in a catalog or we need some
> serialization/deserialization logic in the function interfaces to
> convert an instance to string properties.
>

>> The Python function will be serialized with CloudPickle anyway in the
Python API as we need to transfer it to the Python worker which can then
deserialize it for execution. The serialized Python function can be stored
into catalog.



> 3. TableEnvironment: What is the signature of `register_function(self,
> name, function)`? Does it accept both a class and function? Like `class
> Sum` and `def split()`? Could you add some examples for registering both
> kinds of functions?
>

>> It has been already supported which you mentioned. You can find an
example in the POC code:
https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26



> 4. FunctionDefinition: Function definition is not a user-defined
> function definition. It is the highest interface for both user-defined
> and built-in functions. I'm not sure if getLanguage() should be part of
> this interface or one-level down which would be `UserDefinedFunction`.
> Built-in functions will never be implemented in a different language. In
> any case, I would vote for removing the UNKNOWN language, because it
> does not solve anything. Why should a user declare a function that the
> runtime can not handle? I also find the term `JAVA` confusing for Scala
> users. How about `FunctionLanguage.JVM` instead?
>

>> Actually we may have built-in Python functions in the future. Regarding
to the following expression: py_udf1(a, b) + py_udf2(c), if there is
built-in Python
funciton for '+' operator, then we don't need to mix using Java and Python
UDFs. In this way, we can improve the execution performance.
Regarding to removing FunctionLanguage.UNKNOWN and renaming
FunctionLanguage.Java to FunctionLanguage.JVM, it makes more sense to me.



> 5. Function characteristics: In the current design, function classes do
> not extend from any upper class. How can users declare characteristics
> that are present in `FunctionDefinition` like determinism, requirements,
> or soon also monotonism.
>

>> Actually we have defined 'UserDefinedFunction' which is the base class
for all user-defined functions.
We can define the deterministic, requirements, etc in this class.
Currently, we have already supported to define the deterministic.



>
> Thanks,
> Timo
>
>
> On 02.09.19 03:38, Shaoxuan Wang wrote:
> > Hi Jincheng, Fudian, and Aljoscha,
> > I am assuming the proposed python UDX can also be applied to Flink SQL.
> > Is this correct? If yes, I would suggest to title the FLIP as "Flink
> Python
> > User-Defined Function" or "Flink Python User-Defined Function for Table".
> >
> > Regards,
> > Shaoxuan
> >
> >
> > On Wed, Aug 28, 2019 at 12:22 PM jincheng sun 
> > wrote:
> >
> >> Thanks for the feedback Bowen!
> >>
> >> Great thanks for create the FLIP and bring up the VOTE Dian!
> >>
> >> Best, Jincheng
> >>
> >> Dian Fu  于2019年8月28日周三 上午11:32写道:
> >>
> >>> Hi all,
> >>>
> >>> I have started a voting thread [1]. Thanks a lot for your help during
> >>> creating the FLIP @Jincheng.
> >>>
> >>>
> >>> Hi Bowen,
> >>>
> >>> Very appreciated for your comments. I have replied you in the design
> doc.
> >>> As it seems that the comments doesn't affect the overall design, I'll
> not
> >>> cancel the vote for now and we can continue the discussion in the
> design
> >>> doc.
> >>>
> >>> [1]
> >>>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-02 Thread Aljoscha Krettek
Hi,

Regarding the factory and duplicate() and whatnot, wouldn’t it work to have a 
factory like this:

/**
 * Allows to read and write an instance from and to {@link Configuration}. A 
configurable instance
 * operates in its own key space in {@link Configuration} and will be 
(de)prefixed by the framework. It cannot access keys from other options. A 
factory must have a default constructor.
 *
 */
public interface ConfigurableFactory {

/**
 * Creates an instance from the given configuration.
 */
T fromConfiguration(ConfigurationReader configuration);
}

with Configurable being:

/**
 * Allows to read and write an instance from and to {@link Configuration}. A 
configurable instance
 * operates in its own key space in {@link Configuration} and will be 
(de)prefixed by the framework. It cannot access keys from other options. A 
factory must have a default constructor.
 *
 */
public interface Configurable {

/**
 * Writes this instance to the given configuration.
 */
void writeToConfiguration(ConfigurationWriter configuration); // method 
name TBD
}

This would make the Configurable immutable and we wouldn’t need a duplicate() 
method.

Best,
Aljoscha

> On 2. Sep 2019, at 14:40, Becket Qin  wrote:
> 
> Hi Timo and Dawid,
> 
> Thanks for the patient reply. I agree that both option a) and option b) can
> solve the mutability problem.
> 
> For option a), is it a little intrusive to add a duplicate() method for a
> Configurable? It would be great if we don't put this burden on users if
> possible.
> 
> For option b), I actually feel it is slightly better than option a) from
> API perspective as getFactory() seems a more understandable method of a
> Configurable compared with duplicate(). And users do not need to implement
> much more logic.
> 
> I am curious what is the downside of keeping the Configuration simple to
> only have primitive types, and always create the Configurable using a util
> method? If Configurables are rare, do we need to put the instantiation /
> bookkeeping logic in Configuration?
> 
> @Becket for the toConfiguration this is required for shipping the
>> Configuration to TaskManager, so that we do not have to use java
>> serializability.
> 
> Do you mean a Configurable may be created and configured directly without
> reading settings from a Configuration instance? I thought a Configurable
> will always be created via a ConfigurableFactory by extracting required
> configs from a Configuration. Am I missing something?
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Mon, Sep 2, 2019 at 4:47 PM Dawid Wysakowicz 
> wrote:
> 
>> Hi Timo, Becket
>> 
>> From the options that Timo suggested for improving the mutability
>> situation I would prefer option a) as this is the more explicit option
>> and simpler option. Just as a remark, I think in general Configurable
>> types for options will be rather very rare for some special use-cases,
>> as the majority of options are rather simpler parameters of primitive
>> types (+duration, memory)
>> 
>> @Becket for the toConfiguration this is required for shipping the
>> Configuration to TaskManager, so that we do not have to use java
>> serializability.
>> 
>> Best,
>> 
>> Dawid
>> 
>> 
>> On 02/09/2019 10:05, Timo Walther wrote:
>>> Hi Becket,
>>> 
>>> Re 1 & 3: "values in configurations should actually be immutable"
>>> 
>>> I would also prefer immutability but most of our configuration is
>>> mutable due to serialization/deserialization. Also maps and list could
>>> be mutable in theory. It is difficult to really enforce that for
>>> nested structures. I see two options:
>>> 
>>> a) For the original design: How about we force implementers to add a
>>> duplicate() method in a Configurable object? This would mean that the
>>> object is still mutable but by duplicating the object both during
>>> reading and writing we would avoid the problem you described.
>>> 
>>> b) For the current design: We still use the factory approach but let a
>>> Configurable object implement a getFactory() method such that we know
>>> how to serialize the object. With the help of a factory we can also
>>> duplicate the object easily during reading and writing and ensure
>>> immutability.
>>> 
>>> I would personally go for approach a) to not over-engineer this topic.
>>> But I'm open for option b).
>>> 
>>> Regards,
>>> Timo
>>> 
>>> 
>>> On 31.08.19 04:09, Becket Qin wrote:
 Hi Timo,
 
 Thanks for the reply. I am still a little concerned over the
 mutability of
 the Configurable which could be the value in Configuration.
 
 Re: 1
 
> But in general, people should not use any internal fields.
> Configurable objects are meant for simple little helper POJOs, not
> complex arbitrary nested data structures.
 This seems difficult to enforce... Ideally the values in configurations
 should actually be immutable. The value can only be changed by
 explicitly
 calling setters in Configuration. Otherwi

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-02 Thread Timo Walther

@Becket:

Regarding "great if we don't put this burden on users", we should 
consider who is actually using this API. It is not first-level API but 
mostly API for Flink contributors. Most of the users will use API 
classes ike ExecutionConfig or TableConfig or other builders for 
performing configuration. They will never use the ConfigOptions classes 
directly. So enforcing a duplicate method does not sound like a burden 
to me.


How would the util that you are suggesting look like? It would always 
need to serialize/deserialize an object into an immutable string. This 
is not very efficient, given that the instance already exists and can be 
made immutable by the implementer by not exposing setters. Furthermore, 
we would loose the declarative approach and could not generate 
documentation. The current approach specifies the static final 
sub-ConfigOptions either in Configurable object (initial design) or in 
the ConfigurableFactory (current design) such that the docs generator 
can read them.


Regarding "Configurable may be created and configured directly without 
reading settings from a Configuration instance", there seems to be a 
misunderstanding. This is a very common case if not the most common. As 
mentioned before, take ExecutionConfig. This configuration is currently 
only used in a programmatic-way and needs a way to be expressed as 
ConfigOptions. CachedFile for instance will be a Configurable object 
that will binary serialized most of the time when sending it to the 
cluster but due to the Configurable design it is possible to store it in 
a string representation as well.


@Aljoscha:

Yes, this approach would also work. We still would need to call 
writeToConf/readFromConf for duplicate() and ensure immutable semantics, 
if this is really an important use case. But IMHO all configuration is 
currently mutable (all API classes like ExecutionConfig, 
CheckpointConfig, Configuration itself), I don't understand why 
immutability needs to be discussed here.


Regards,
Timo


On 02.09.19 16:22, Aljoscha Krettek wrote:

Hi,

Regarding the factory and duplicate() and whatnot, wouldn’t it work to have a 
factory like this:

/**
  * Allows to read and write an instance from and to {@link Configuration}. A 
configurable instance
  * operates in its own key space in {@link Configuration} and will be 
(de)prefixed by the framework. It cannot access keys from other options. A 
factory must have a default constructor.
  *
  */
public interface ConfigurableFactory {

 /**
  * Creates an instance from the given configuration.
  */
 T fromConfiguration(ConfigurationReader configuration);
}

with Configurable being:

/**
  * Allows to read and write an instance from and to {@link Configuration}. A 
configurable instance
  * operates in its own key space in {@link Configuration} and will be 
(de)prefixed by the framework. It cannot access keys from other options. A 
factory must have a default constructor.
  *
  */
public interface Configurable {

 /**
  * Writes this instance to the given configuration.
  */
 void writeToConfiguration(ConfigurationWriter configuration); // method 
name TBD
}

This would make the Configurable immutable and we wouldn’t need a duplicate() 
method.

Best,
Aljoscha


On 2. Sep 2019, at 14:40, Becket Qin  wrote:

Hi Timo and Dawid,

Thanks for the patient reply. I agree that both option a) and option b) can
solve the mutability problem.

For option a), is it a little intrusive to add a duplicate() method for a
Configurable? It would be great if we don't put this burden on users if
possible.

For option b), I actually feel it is slightly better than option a) from
API perspective as getFactory() seems a more understandable method of a
Configurable compared with duplicate(). And users do not need to implement
much more logic.

I am curious what is the downside of keeping the Configuration simple to
only have primitive types, and always create the Configurable using a util
method? If Configurables are rare, do we need to put the instantiation /
bookkeeping logic in Configuration?

@Becket for the toConfiguration this is required for shipping the

Configuration to TaskManager, so that we do not have to use java
serializability.

Do you mean a Configurable may be created and configured directly without
reading settings from a Configuration instance? I thought a Configurable
will always be created via a ConfigurableFactory by extracting required
configs from a Configuration. Am I missing something?

Thanks,

Jiangjie (Becket) Qin

On Mon, Sep 2, 2019 at 4:47 PM Dawid Wysakowicz 
wrote:


Hi Timo, Becket

 From the options that Timo suggested for improving the mutability
situation I would prefer option a) as this is the more explicit option
and simpler option. Just as a remark, I think in general Configurable
types for options will be rather very rare for some special use-cases,
as the majority of options are rather simpler parameters of primitive
types 

Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-02 Thread Till Rohrmann
Thanks a lot for all your feedback. I see there is a slight tendency
towards having a non zero default delay so far.

However, Yu has brought up some valid points. Maybe I can shed some light
on a).

Before FLINK-9158 we set the default delay to 10s because Flink did not
support queued scheduling which meant that if one slot was missing/still
being occupied, then Flink would fail right away with
a NoResourceAvailableException. In order to prevent this we added the
delay. This also covered the case when the job was failing because of an
overloaded external system.

When we finished FLIP-6, we thought that we could improve the user
experience by decreasing the default delay to 0s because all Flink related
problems (slot still occupied, slot missing because of reconnecting TM)
could be handled by the default slot request time out which allowed the
slots to become ready after the scheduling was kicked off. However, we did
not properly take the case of overloaded external systems into account.

For b) I agree that any default value should be properly documented. This
was clearly an oversight when FLINK-9158 has been merged. Moreover, I
believe that there won't be the solve it all default value. There are
always cases where one needs to adapt it to ones needs. But this is ok. The
goal should be to find the default value which works for most cases.

So maybe the middle ground between 10s and 0s could be a solution. Setting
the default restart delay to 1s should prevent restart storms caused by
overloaded external systems and still be fast enough to not slow down
recoveries noticeably in most cases. If one needs a super fast recovery,
then one should set the delay value to 0s. If one requires a longer delay
because of a particular infrastructure, then one needs to change the value
too. What do you think?

Cheers,
Till

On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:

> -1 on increasing the default delay to none zero, with below reasons:
>
> a) I could see some concerns about setting the delay to zero in the very
> original JIRA (FLINK-2993
> ) but later on in
> FLINK-9158  we still
> decided to make the change, so I'm wondering whether the decision also came
> from any customer requirement? If so, how could we judge whether one
> requirement override the other?
>
> b) There could be valid reasons for both default values depending on
> different use cases, as well as relative work around (like based on latest
> policy, setting the config manually to 10s could resolve the problem
> mentioned), and from former replies to this thread we could see users have
> already taken actions. Changing it back to non-zero again won't affect such
> users but might cause surprises to those depending on 0 as default.
>
> Last but not least, no matter what decision we make this time, I'd suggest
> to make it final and document in our release note explicitly. Checking the
> 1.5.0 release note [1] [2] it seems we didn't mention about the change on
> default restart delay and we'd better learn from it this time. Thanks.
>
> [1]
> https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html
>
> Best Regards,
> Yu
>
>
> On Sun, 1 Sep 2019 at 04:33, Steven Wu  wrote:
>
>> +1 on what Zhu Zhu said.
>>
>> We also override the default to 10 s.
>>
>> On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:
>>
>>> In our production, we usually override the restart delay to be 10 s.
>>> We once encountered cases that external services are overwhelmed by
>>> reconnections from frequent restarted tasks.
>>> As a safer though not optimized option, a default delay larger than 0 s
>>> is better in my opinion.
>>>
>>>
>>> 未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:
>>>
 Hi,


 I thinks it's better to increase the default value. +1


 Best.




 -- 原始邮件 --
 发件人: "Till Rohrmann";
 发送时间: 2019年8月30日(星期五) 晚上10:07
 收件人: "dev"; "user";
 主题: [SURVEY] Is the default restart delay of 0s causing problems?



 Hi everyone,

 I wanted to reach out to you and ask whether decreasing the default
 delay
 to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
 user reported that he would like to increase the default value because
 it
 can cause restart storms in case of systematic faults [2].

 The downside of increasing the default delay would be a slightly
 increased
 restart time if this config option is not explicitly set.

 [1] https://issues.apache.org/jira/browse/FLINK-9158
 [2] https://issues.apache.org/jira/browse/FLINK-11218

 Cheers,
 Till
>>>
>>>


Re: [DISCUSS] Simplify Flink's cluster level RestartStrategy configuration

2019-09-02 Thread Till Rohrmann
Thanks a lot for the positive feedback. I think you are right Becket that
this needs a FLIP since it changes Flink's behaviour. I'll create one and
post it to the dev ML.

@Zhu Zhu   I agree that the restart delay is related to
the RestartStrategy configuration. However, I would like to exclude it from
this improvement proposal since it would broaden the scope unnecessarily.
I'd suggest to continue the discussion about this on the SURVEY thread and
then based on the outcome start a DISCUSS thread about the concrete changes
to the default restart delay value.

Since I will create a FLIP for the simplification logic, I'll conclude this
thread and would encourage everyone to move all further discussion to the
Flip DISCUSS thread. I'll post the link to it shortly.

Cheers,
Till

On Mon, Sep 2, 2019 at 6:22 AM zhijiang 
wrote:

> +1 for this proposal.
>
> IMO, it not only simplifies the cluster configuration, but also seems more
> fit logic to not rely on some low-level speicific parameters to judge the
> upper-level strategy.
> It is also resonable to push forward the restart strategy configuration
> step by step for batch later.
>
> Best,
> Zhijiang
> --
> From:Zhu Zhu 
> Send Time:2019年9月2日(星期一) 05:18
> To:dev 
> Subject:Re: [DISCUSS] Simplify Flink's cluster level RestartStrategy
> configuration
>
> +1 to simplify the RestartStrategy configuration
>
> One thing to confirm is whether the default delay should be "0 s" in the
> case of
> "If the config option `restart-strategy` is not configured" and "If
> checkpointing is enabled".
> I see a related discussion([SURVEY] Is the default restart delay of 0s
> causing problems) is ongoing and we may need to take the result from that.
>
> Thanks,
> Zhu Zhu
>
> Becket Qin  于2019年9月2日周一 上午9:06写道:
>
> > +1. The new behavior makes sense to me.
> >
> > BTW, we need a FLIP for this :)
> >
> > On Fri, Aug 30, 2019 at 10:17 PM Till Rohrmann 
> > wrote:
> >
> > > After an offline discussion with Stephan, we concluded that changing
> the
> > > default restart strategy for batch jobs is not that easy because the
> > > cluster level restart configuration does not necessarily know about the
> > > type of job which is submitted. We concluded that we would like to keep
> > the
> > > batch behaviour as is (NoRestartStrategy) and revisit this issue at a
> > later
> > > point in time.
> > >
> > > On Fri, Aug 30, 2019 at 3:24 PM Till Rohrmann 
> > > wrote:
> > >
> > > > The current default behaviour for batch is `NoRestartStrategy` if
> > nothing
> > > > is configured. We could say that we set the default value of
> > > > `restart-strategy` to `FixedDelayRestartStrategy(Integer.MAX_VALUE,
> "0
> > > s")`
> > > > independent of the checkpointing. The only downside I could see is
> that
> > > > some faulty batch jobs might get stuck in a restart loop without
> > > reaching a
> > > > terminal state.
> > > >
> > > > @Dawid, I don't intend to touch the ExecutionConfig. This change only
> > > > targets the cluster level configuration of the RestartStrategy.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Fri, Aug 30, 2019 at 3:14 PM Dawid Wysakowicz <
> > dwysakow...@apache.org
> > > >
> > > > wrote:
> > > >
> > > >> Also +1 in general.
> > > >>
> > > >> I have a few questions though:
> > > >>
> > > >> - does it only apply to the logic in
> > > >>
> > > >>
> > >
> >
> org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory#createRestartStrategyFactory,
> > > >> which is only the cluster side configuration? Or do you want to
> change
> > > >> the logic also on the job side in ExecutionConfig?
> > > >>
> > > >> - if the latter, does that mean deprecated methods in
> ExecutionConfig
> > > >> like: setNumberOfExecutionRetries, setExecutionRetryDelay will have
> no
> > > >> effect? I think this would be a good idea, but would suggest to
> remove
> > > >> the corresponding fields and methods. This is not that simple
> though.
> > I
> > > >> tried to do that for other parameters that have no effect already
> like
> > > >> codeAnalysisMode & failTaskOnCheckpointError. The are two problems:
> > > >>
> > > >> 1) setNumberOfExecutionRetires are effectively marked with
> @Public
> > > >> annotation (the codeAnalysisMode & failTaskOnCheckpointError don't
> > have
> > > >> this problem). Therefore this would be a binary incompatible change.
> > > >>
> > > >> 2) ExecutionConfig is stored in state as part of PojoSerializer
> in
> > > >> pre flink 1.7. It should not be a problem for
> > numberOfExecutionRetries &
> > > >> executionRetryDelays as they are of primitive types. It is a problem
> > for
> > > >> codeAnalysisMode (we cannot remove the class, as this breaks
> > > >> serialization). I wanted to mention that anyway, just to be aware of
> > > that.
> > > >>
> > > >> Best,
> > > >>
> > > >> Dawid
> > > >>
> > > >> On 30/08/2019 14:48, Stephan Ewen wrote:
> > > >> > +1 in general
> > > >> >
> > > >> > What is

Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-02 Thread Steven Wu
1s sounds a good tradeoff to me.

On Mon, Sep 2, 2019 at 1:30 PM Till Rohrmann  wrote:

> Thanks a lot for all your feedback. I see there is a slight tendency
> towards having a non zero default delay so far.
>
> However, Yu has brought up some valid points. Maybe I can shed some light
> on a).
>
> Before FLINK-9158 we set the default delay to 10s because Flink did not
> support queued scheduling which meant that if one slot was missing/still
> being occupied, then Flink would fail right away with
> a NoResourceAvailableException. In order to prevent this we added the
> delay. This also covered the case when the job was failing because of an
> overloaded external system.
>
> When we finished FLIP-6, we thought that we could improve the user
> experience by decreasing the default delay to 0s because all Flink related
> problems (slot still occupied, slot missing because of reconnecting TM)
> could be handled by the default slot request time out which allowed the
> slots to become ready after the scheduling was kicked off. However, we did
> not properly take the case of overloaded external systems into account.
>
> For b) I agree that any default value should be properly documented. This
> was clearly an oversight when FLINK-9158 has been merged. Moreover, I
> believe that there won't be the solve it all default value. There are
> always cases where one needs to adapt it to ones needs. But this is ok. The
> goal should be to find the default value which works for most cases.
>
> So maybe the middle ground between 10s and 0s could be a solution. Setting
> the default restart delay to 1s should prevent restart storms caused by
> overloaded external systems and still be fast enough to not slow down
> recoveries noticeably in most cases. If one needs a super fast recovery,
> then one should set the delay value to 0s. If one requires a longer delay
> because of a particular infrastructure, then one needs to change the value
> too. What do you think?
>
> Cheers,
> Till
>
> On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:
>
>> -1 on increasing the default delay to none zero, with below reasons:
>>
>> a) I could see some concerns about setting the delay to zero in the very
>> original JIRA (FLINK-2993
>> ) but later on in
>> FLINK-9158  we still
>> decided to make the change, so I'm wondering whether the decision also came
>> from any customer requirement? If so, how could we judge whether one
>> requirement override the other?
>>
>> b) There could be valid reasons for both default values depending on
>> different use cases, as well as relative work around (like based on latest
>> policy, setting the config manually to 10s could resolve the problem
>> mentioned), and from former replies to this thread we could see users have
>> already taken actions. Changing it back to non-zero again won't affect such
>> users but might cause surprises to those depending on 0 as default.
>>
>> Last but not least, no matter what decision we make this time, I'd
>> suggest to make it final and document in our release note explicitly.
>> Checking the 1.5.0 release note [1] [2] it seems we didn't mention about
>> the change on default restart delay and we'd better learn from it this
>> time. Thanks.
>>
>> [1]
>> https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html
>>
>> Best Regards,
>> Yu
>>
>>
>> On Sun, 1 Sep 2019 at 04:33, Steven Wu  wrote:
>>
>>> +1 on what Zhu Zhu said.
>>>
>>> We also override the default to 10 s.
>>>
>>> On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:
>>>
 In our production, we usually override the restart delay to be 10 s.
 We once encountered cases that external services are overwhelmed by
 reconnections from frequent restarted tasks.
 As a safer though not optimized option, a default delay larger than 0 s
 is better in my opinion.


 未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:

> Hi,
>
>
> I thinks it's better to increase the default value. +1
>
>
> Best.
>
>
>
>
> -- 原始邮件 --
> 发件人: "Till Rohrmann";
> 发送时间: 2019年8月30日(星期五) 晚上10:07
> 收件人: "dev"; "user";
> 主题: [SURVEY] Is the default restart delay of 0s causing problems?
>
>
>
> Hi everyone,
>
> I wanted to reach out to you and ask whether decreasing the default
> delay
> to `0 s` for the fixed delay restart strategy [1] is causing trouble. A
> user reported that he would like to increase the default value because
> it
> can cause restart storms in case of systematic faults [2].
>
> The downside of increasing the default delay would be a slightly
> increased
> restart time if this config option is not explicitly set.
>
> [1] https://issues.apache

[DISCUSS] FLIP-61 Simplify Flink's cluster level RestartStrategy configuration

2019-09-02 Thread Till Rohrmann
Hi everyone,

I'd like to discuss FLIP-61 [1] which tries to simplify Flink's cluster
lever RestartStrategy configuration.

Currently, Flink's behaviour with respect to configuring the
`RestartStrategies` is quite complicated and convoluted. The reason for
this is that we evolved the way it has been configured and wanted to keep
it backwards compatible. Due to this, we have currently the following
behaviour:

* If the config option `restart-strategy` is configured, then Flink uses
this `RestartStrategy` (so far so simple)
* If the config option `restart-strategy` is not configured, then
** If `restart-strategy.fixed-delay.attempts` or
`restart-strategy.fixed-delay.delay` are defined, then instantiate
`FixedDelayRestartStrategy(restart-strategy.fixed-delay.attempts,
restart-strategy.fixed-delay.delay)`
** If `restart-strategy.fixed-delay.attempts` and
`restart-strategy.fixed-delay.delay` are not defined, then
*** If checkpointing is disabled, then choose `NoRestartStrategy`
*** If checkpointing is enabled, then choose
`FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`

I would like to simplify the configuration by removing the "If
`restart-strategy.fixed-delay.attempts` or
`restart-strategy.fixed-delay.delay`, then" condition. That way, the logic
would be the following:

* If the config option `restart-strategy` is configured, then Flink uses
this `RestartStrategy`
* If the config option `restart-strategy` is not configured, then
** If checkpointing is disabled, then choose `NoRestartStrategy`
** If checkpointing is enabled, then choose
`FixedDelayRestartStrategy(Integer.MAX_VALUE, "0 s")`

That way we retain the user friendliness that jobs restart if the user
enabled checkpointing and we make it clear that any `
restart-strategy.fixed-delay.xyz` setting will only be respected if
`restart-strategy` has been set to `fixed-delay`.

This simplification would, however, change Flink's behaviour and might
break existing setups. Since we introduced `RestartStrategies` with Flink
1.0.0 and deprecated the prior configuration mechanism which enables
restarting if either the `attempts` or the `delay` has been set, I think
that the number of broken jobs should be minimal if not non-existent.

There has been a previous discuss thread which is now being abandoned [2].

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-61+Simplify+Flink%27s+cluster+level+RestartStrategy+configuration
[2]
https://lists.apache.org/thread.html/80bef7146f9696f35b1e50ff4acdd1cc3e87ae6f212d205aa7a72182@%3Cdev.flink.apache.org%3E

Cheers,
Till


Re: [DISCUSS] Simplify Flink's cluster level RestartStrategy configuration

2019-09-02 Thread Till Rohrmann
The link to the dev ML discussion about FLIP-61 is
https://lists.apache.org/thread.html/e206390127bcbd9b24d9c41a838faa75157e468e01552ad241e3e24b@%3Cdev.flink.apache.org%3E

Cheers,
Till

On Mon, Sep 2, 2019 at 10:37 PM Till Rohrmann  wrote:

> Thanks a lot for the positive feedback. I think you are right Becket that
> this needs a FLIP since it changes Flink's behaviour. I'll create one and
> post it to the dev ML.
>
> @Zhu Zhu   I agree that the restart delay is related
> to the RestartStrategy configuration. However, I would like to exclude it
> from this improvement proposal since it would broaden the scope
> unnecessarily. I'd suggest to continue the discussion about this on the
> SURVEY thread and then based on the outcome start a DISCUSS thread about
> the concrete changes to the default restart delay value.
>
> Since I will create a FLIP for the simplification logic, I'll conclude
> this thread and would encourage everyone to move all further discussion to
> the Flip DISCUSS thread. I'll post the link to it shortly.
>
> Cheers,
> Till
>
> On Mon, Sep 2, 2019 at 6:22 AM zhijiang 
> wrote:
>
>> +1 for this proposal.
>>
>> IMO, it not only simplifies the cluster configuration, but also seems
>> more fit logic to not rely on some low-level speicific parameters to judge
>> the upper-level strategy.
>> It is also resonable to push forward the restart strategy configuration
>> step by step for batch later.
>>
>> Best,
>> Zhijiang
>> --
>> From:Zhu Zhu 
>> Send Time:2019年9月2日(星期一) 05:18
>> To:dev 
>> Subject:Re: [DISCUSS] Simplify Flink's cluster level RestartStrategy
>> configuration
>>
>> +1 to simplify the RestartStrategy configuration
>>
>> One thing to confirm is whether the default delay should be "0 s" in the
>> case of
>> "If the config option `restart-strategy` is not configured" and "If
>> checkpointing is enabled".
>> I see a related discussion([SURVEY] Is the default restart delay of 0s
>> causing problems) is ongoing and we may need to take the result from that.
>>
>> Thanks,
>> Zhu Zhu
>>
>> Becket Qin  于2019年9月2日周一 上午9:06写道:
>>
>> > +1. The new behavior makes sense to me.
>> >
>> > BTW, we need a FLIP for this :)
>> >
>> > On Fri, Aug 30, 2019 at 10:17 PM Till Rohrmann 
>> > wrote:
>> >
>> > > After an offline discussion with Stephan, we concluded that changing
>> the
>> > > default restart strategy for batch jobs is not that easy because the
>> > > cluster level restart configuration does not necessarily know about
>> the
>> > > type of job which is submitted. We concluded that we would like to
>> keep
>> > the
>> > > batch behaviour as is (NoRestartStrategy) and revisit this issue at a
>> > later
>> > > point in time.
>> > >
>> > > On Fri, Aug 30, 2019 at 3:24 PM Till Rohrmann 
>> > > wrote:
>> > >
>> > > > The current default behaviour for batch is `NoRestartStrategy` if
>> > nothing
>> > > > is configured. We could say that we set the default value of
>> > > > `restart-strategy` to `FixedDelayRestartStrategy(Integer.MAX_VALUE,
>> "0
>> > > s")`
>> > > > independent of the checkpointing. The only downside I could see is
>> that
>> > > > some faulty batch jobs might get stuck in a restart loop without
>> > > reaching a
>> > > > terminal state.
>> > > >
>> > > > @Dawid, I don't intend to touch the ExecutionConfig. This change
>> only
>> > > > targets the cluster level configuration of the RestartStrategy.
>> > > >
>> > > > Cheers,
>> > > > Till
>> > > >
>> > > > On Fri, Aug 30, 2019 at 3:14 PM Dawid Wysakowicz <
>> > dwysakow...@apache.org
>> > > >
>> > > > wrote:
>> > > >
>> > > >> Also +1 in general.
>> > > >>
>> > > >> I have a few questions though:
>> > > >>
>> > > >> - does it only apply to the logic in
>> > > >>
>> > > >>
>> > >
>> >
>> org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory#createRestartStrategyFactory,
>> > > >> which is only the cluster side configuration? Or do you want to
>> change
>> > > >> the logic also on the job side in ExecutionConfig?
>> > > >>
>> > > >> - if the latter, does that mean deprecated methods in
>> ExecutionConfig
>> > > >> like: setNumberOfExecutionRetries, setExecutionRetryDelay will
>> have no
>> > > >> effect? I think this would be a good idea, but would suggest to
>> remove
>> > > >> the corresponding fields and methods. This is not that simple
>> though.
>> > I
>> > > >> tried to do that for other parameters that have no effect already
>> like
>> > > >> codeAnalysisMode & failTaskOnCheckpointError. The are two problems:
>> > > >>
>> > > >> 1) setNumberOfExecutionRetires are effectively marked with
>> @Public
>> > > >> annotation (the codeAnalysisMode & failTaskOnCheckpointError don't
>> > have
>> > > >> this problem). Therefore this would be a binary incompatible
>> change.
>> > > >>
>> > > >> 2) ExecutionConfig is stored in state as part of
>> PojoSerializer in
>> > > >> pre flink 1.7. It should not be a problem for
>> > numberOfExecutionRetries &
>> > 

[jira] [Created] (FLINK-13943) Provide api to convert flink table to java List

2019-09-02 Thread Jeff Zhang (Jira)
Jeff Zhang created FLINK-13943:
--

 Summary: Provide api to convert flink table to java List
 Key: FLINK-13943
 URL: https://issues.apache.org/jira/browse/FLINK-13943
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Affects Versions: 1.9.0
Reporter: Jeff Zhang


It would be nice to convert flink table to java List so that I can do other 
data manipulation in client side after execution flink job. For flink planner, 
I can convert flink table to DataSet and use DataSet#collect, but for blink 
planner, there's no such api.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Add ARM CI build to Flink (information-only)

2019-09-02 Thread Xiyuan Wang
The ARM CI trigger has been changed to `github comment` way only. It means
that every PR won't start ARM test unless a comment `check_arm` is added.
Like what I did in the PR[1].

A POC for Flink nightly end to end test job is created as well[2]. I'll
improve it then.

Any feedback or question?


[1]: https://github.com/apache/flink/pull/9416
 https://github.com/apache/flink/pull/9416#issuecomment-527268203
[2]: https://github.com/theopenlab/openlab-zuul-jobs/pull/631


Thanks

Xiyuan Wang  于2019年8月26日周一 下午7:41写道:

> Before ARM CI is ready, I can close the CI test for each PR and let it
> only be triggered by PR comment.  It's quite easy for OpenLab to do this.
>
> OpenLab have many job piplines[1].  Now I use `check` pipline in
> https://github.com/apache/flink/pull/9416. The job trigger contains
> github_action and github_comment[2]. I can create a new pipline for Flink,
> the new trigger can only contain github_coment like:
>
> trigger:
>   github:
>  - event: pull_request
>action: comment
>comment: (?i)^\s*recheck_arm_build\s*$
>
> So that the ARM job will not be ran for every PR. It'll be just ran for
> the PR which have `recheck_arm_build` comment.
>
> Then once ARM CI is ready, I can add it back.
>
>
> nightly tests can be added as well of couse. There is a kind of job in
> OpenLab called `periodic job`. We can use it for Flink daily nightly tests.
> If any error occur, the report can be sent to bui...@flink.apache.org  as
> well.
>
> [1]:
> https://github.com/theopenlab/openlab-zuul-jobs/blob/master/zuul.d/pipelines.yaml
> [2]:
> https://github.com/theopenlab/openlab-zuul-jobs/blob/master/zuul.d/pipelines.yaml#L10-L19
>
> Stephan Ewen  于2019年8月26日周一 下午6:13写道:
>
>> Adding CI builds for ARM makes only sense when we actually take them into
>> account as "blocking a merge", otherwise there is no point in having them.
>> So we would need to be prepared to do that.
>>
>> The cases where something runs in UNIX/x64 but fails on ARM are few cases
>> and so far seem to have been related to libraries or some magic that tries
>> to do system dependent actions outside Java.
>>
>> One worthwhile discussion could be whether to run the ARM CI builds as
>> part
>> of the nightly tests, not on every commit.
>> There are a lot of nightly tests, for example for different Java / Scala /
>> Hadoop versions.
>>
>> On Mon, Aug 26, 2019 at 10:46 AM Xiyuan Wang 
>> wrote:
>>
>> > Sorry, maybe my words is misleading.
>> >
>> > We are just starting adding ARM support. So the CI is non-voting at this
>> > moment to avoid blocking normal Flink development.
>> >
>> > But once the ARM CI works well and stable enough. We should mark it as
>> > voting. It means that in the future, if the ARM test is failed in a PR,
>> the
>> > PR can not be merged. The test log may tell develpers what error is
>> > comming. If the develper need debug the detail on an ARM vm, OpenLab can
>> > provider it.
>> >
>> > Adding ARM CI can make sure Flink support ARM originally
>> >
>> > I left a workflow in the PR, I'd like to print it here:
>> >
>> >1. Add the basic build script to ensure the CI system and build job
>> >works as expect. The job should be marked as non-voting first, it
>> means the
>> >CI test failure won't block Flink PR to be merged.
>> >2. Add the test script to run unit/intergration test. At this step
>> the
>> >--fn parameter will be added to mvn test. It will run the full test
>> cases
>> >in Flink, so that we can find what test is failed on ARM.
>> >3. Fix the test failure one by one.
>> >4. Once all the tests are passed, remove the --fn parameter and keep
>> >watch the CI's status for some days. If some bugs raise then, fix
>> them as
>> >what we usually do for travis-ci.
>> >5. Once the CI is stable enought, remove the non-voting tag, so that
>> >the ARM CI will be the same as travis-ci, to be one of the gate for
>> Flink
>> >PR.
>> >6. Finally, Flink community can announce and release Flink ARM
>> version.
>> >
>> >
>> > Chesnay Schepler  于2019年8月26日周一 下午2:25写道:
>> >
>> >> I'm sorry, but if these issues are only fixed later anyway I see no
>> >> reason to run these tests on each PR. We're just adding noise to each
>> PR
>> >> that everyone will just ignore.
>> >>
>> >> I'm curious as to the benefit of having this directly in Flink; why
>> >> aren't the ARM builds run outside of the Flink project, and fixes for
>> it
>> >> provided?
>> >>
>> >> It seems to me like nothing about these arm builds is actually handled
>> >> by the Flink project.
>> >>
>> >> On 26/08/2019 03:43, Xiyuan Wang wrote:
>> >> > Thanks for Stephan to bring up this topic.
>> >> >
>> >> > The package build jobs work well now. I have a simple online demo
>> which
>> >> is
>> >> > built and ran on a ARM VM. Feel free to have a try[1].
>> >> >
>> >> > As the first step for ARM support, maybe it's good to add them now.
>> >> >
>> >> > While for the next step, the test part is still broken. It rela

Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-02 Thread Zhu Zhu
1s looks good to me.
And I think the conclusion that when a user should override the delay is
worth to be documented.

Thanks,
Zhu Zhu

Steven Wu  于2019年9月3日周二 上午4:42写道:

> 1s sounds a good tradeoff to me.
>
> On Mon, Sep 2, 2019 at 1:30 PM Till Rohrmann  wrote:
>
>> Thanks a lot for all your feedback. I see there is a slight tendency
>> towards having a non zero default delay so far.
>>
>> However, Yu has brought up some valid points. Maybe I can shed some light
>> on a).
>>
>> Before FLINK-9158 we set the default delay to 10s because Flink did not
>> support queued scheduling which meant that if one slot was missing/still
>> being occupied, then Flink would fail right away with
>> a NoResourceAvailableException. In order to prevent this we added the
>> delay. This also covered the case when the job was failing because of an
>> overloaded external system.
>>
>> When we finished FLIP-6, we thought that we could improve the user
>> experience by decreasing the default delay to 0s because all Flink related
>> problems (slot still occupied, slot missing because of reconnecting TM)
>> could be handled by the default slot request time out which allowed the
>> slots to become ready after the scheduling was kicked off. However, we did
>> not properly take the case of overloaded external systems into account.
>>
>> For b) I agree that any default value should be properly documented. This
>> was clearly an oversight when FLINK-9158 has been merged. Moreover, I
>> believe that there won't be the solve it all default value. There are
>> always cases where one needs to adapt it to ones needs. But this is ok. The
>> goal should be to find the default value which works for most cases.
>>
>> So maybe the middle ground between 10s and 0s could be a solution.
>> Setting the default restart delay to 1s should prevent restart storms
>> caused by overloaded external systems and still be fast enough to not slow
>> down recoveries noticeably in most cases. If one needs a super fast
>> recovery, then one should set the delay value to 0s. If one requires a
>> longer delay because of a particular infrastructure, then one needs to
>> change the value too. What do you think?
>>
>> Cheers,
>> Till
>>
>> On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:
>>
>>> -1 on increasing the default delay to none zero, with below reasons:
>>>
>>> a) I could see some concerns about setting the delay to zero in the very
>>> original JIRA (FLINK-2993
>>> ) but later on in
>>> FLINK-9158  we still
>>> decided to make the change, so I'm wondering whether the decision also came
>>> from any customer requirement? If so, how could we judge whether one
>>> requirement override the other?
>>>
>>> b) There could be valid reasons for both default values depending on
>>> different use cases, as well as relative work around (like based on latest
>>> policy, setting the config manually to 10s could resolve the problem
>>> mentioned), and from former replies to this thread we could see users have
>>> already taken actions. Changing it back to non-zero again won't affect such
>>> users but might cause surprises to those depending on 0 as default.
>>>
>>> Last but not least, no matter what decision we make this time, I'd
>>> suggest to make it final and document in our release note explicitly.
>>> Checking the 1.5.0 release note [1] [2] it seems we didn't mention about
>>> the change on default restart delay and we'd better learn from it this
>>> time. Thanks.
>>>
>>> [1]
>>> https://flink.apache.org/news/2018/05/25/release-1.5.0.html#release-notes
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-notes/flink-1.5.html
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Sun, 1 Sep 2019 at 04:33, Steven Wu  wrote:
>>>
 +1 on what Zhu Zhu said.

 We also override the default to 10 s.

 On Fri, Aug 30, 2019 at 8:58 PM Zhu Zhu  wrote:

> In our production, we usually override the restart delay to be 10 s.
> We once encountered cases that external services are overwhelmed by
> reconnections from frequent restarted tasks.
> As a safer though not optimized option, a default delay larger than 0
> s is better in my opinion.
>
>
> 未来阳光 <2217232...@qq.com> 于2019年8月30日周五 下午10:23写道:
>
>> Hi,
>>
>>
>> I thinks it's better to increase the default value. +1
>>
>>
>> Best.
>>
>>
>>
>>
>> -- 原始邮件 --
>> 发件人: "Till Rohrmann";
>> 发送时间: 2019年8月30日(星期五) 晚上10:07
>> 收件人: "dev"; "user";
>> 主题: [SURVEY] Is the default restart delay of 0s causing problems?
>>
>>
>>
>> Hi everyone,
>>
>> I wanted to reach out to you and ask whether decreasing the default
>> delay
>> to `0 s` for the fixed delay restart strategy [1] is causing trouble.
>> A
>> user reported that he would like to increa

Re: [DISCUSS] FLIP-60: Restructure the Table API & SQL documentation

2019-09-02 Thread Jark Wu
big +1 to the idea of restructuring the docs. We got a lot of complaints
from users about the Table & SQL docs.

In general, I think the new structure is very nice.

Regarding to moving "User-defined Extensions" to corresponding broader
topics, I would prefer current "User-defined Extensions".
Because it is a more advanced topic than "Connect to external systems" and
"Builtin Functions", and we can mention the common points (e.g. pom
dependency) in the overview of the Extensions section.
Besides that, I would like to keep Builtin Functions as a top-level to make
it have more exposure and may further split the page.

I have some other suggestions:

1) Having subpages under "Built-in Functions". For example:

Built-in Functions
 - Mathematical Functions
 - Bit Functions
 - Date and Time Functions
 - Conditional Functions
 - String Functions
 - Aggregate Functions
 - ...

Currently, all the functions are squeezed in one page. It make the
page bloated.
Meanwhile, I think it would be great to enrich the built-in functions with
argument explanation and more clear examples like MySQL[1] and other
DataBase docs.

2) +1 to the "Architecture & Internals" chapter.
We already have a pull request[2] to add "Streaming Aggregation Performance
Tuning" page which talks about the performance tuning tips around streaming
aggregation and the internals.
Maybe we can put it under the internal chapter or a "Performance Tuning"
chapter.

3) How about restructure SQL chapter a bit like this?

SQL
 - Overview
 - Data Manipulation Statements (all operations available in SQL)
 - Data Definition Statements (DDL syntaxes)
 - Pattern Matching

It renames "Full Reference" to "Data Manipulation Statements" which is more
align with "Data Definition Statements".


Regards,
Jark

[1]:
https://dev.mysql.com/doc/refman/8.0/en/date-and-time-functions.html#function_adddate
[2]: https://github.com/apache/flink/pull/9525





On Mon, 2 Sep 2019 at 17:29, Kurt Young  wrote:

> +1 to the general idea and thanks for driving this. I think the new
> structure is
> more clear than the old one, and i have some suggestions:
>
> 1. How about adding a "Architecture & Internals" chapter? This can help
> developers
> or users who want to contribute more to have a better understanding about
> Table.
> Essentially with blink planner, we merged a lots of codes and features but
> lack of
> proper user and design documents.
>
> 2. Add a dedicated "Hive Integration" chapter. We spend lots of effort on
> integrating
> hive, and hive integration is happened in different areas, like catalog,
> function and
> maybe ddl in the future. I think a dedicated chapter can make users who are
> interested
> in this topic easier to find the information they need.
>
> 3. Add a chapter about how to manage, monitor or tune the Table & SQL jobs,
> and
> might adding something like how to migrate old version jobs to new version
> in the future.
>
> Best,
> Kurt
>
>
> On Mon, Sep 2, 2019 at 4:17 PM vino yang  wrote:
>
> > Agree with Dawid's suggestion about function.
> >
> > Having a Functions section to unify the built-in function and UDF would
> be
> > better.
> >
> > Dawid Wysakowicz  于2019年8月30日周五 下午7:43写道:
> >
> > > +1 to the idea of restructuring the docs.
> > >
> > > My only suggestion to consider is how about moving the
> > > User-Defined-Extensions subpages to corresponding broader topics?
> > >
> > > Sources & Sinks >> Connect to external systems
> > >
> > > Catalogs >> Connect to external systems
> > >
> > > and then have a Functions sections with subsections:
> > >
> > > functions
> > >
> > > |- built in functions
> > >
> > > |- user defined functions
> > >
> > >
> > > Best,
> > >
> > > Dawid
> > >
> > > On 30/08/2019 10:59, Timo Walther wrote:
> > > > Hi everyone,
> > > >
> > > > the Table API & SQL documentation was already in a very good shape in
> > > > Flink 1.8. However, in the past it was mostly presented as an
> addition
> > > > to DataStream API. As the Table and SQL world is growing quickly,
> > > > stabilizes in its concepts, and is considered as another top-level
> API
> > > > and closed ecosystem, it is time to restructure the docs a little bit
> > > > to represent the vision of FLIP-32.
> > > >
> > > > Current state:
> > > > https://ci.apache.org/projects/flink/flink-docs-master/dev/table/
> > > >
> > > > We would like to propose the following FLIP-60 for a new structure:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=127405685
> > > >
> > > >
> > > > Looking forward to feedback.
> > > >
> > > > Thanks,
> > > >
> > > > Timo
> > > >
> > > >
> > > >
> > >
> > >
> >
>


Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-02 Thread Becket Qin
Hi Timo,

I think I might have misunderstood the scope or motivation of the FLIP a
little bit. Please let me clarify a little bit.

Regarding "great if we don't put this burden on users", we should
> consider who is actually using this API. It is not first-level API but
> mostly API for Flink contributors. Most of the users will use API
> classes ike ExecutionConfig or TableConfig or other builders for
> performing configuration. They will never use the ConfigOptions classes
> directly. So enforcing a duplicate method does not sound like a burden
> to me.

I thought the configuration will be used by all the plugins and components
such as connectors. If that is the case, the Configuration sounds a public
API just like UDF. So one may implement a plugin and set that plugin class
in the configuration. Flink will do something like following:

// Define the Plugin ConfigOption. SomePluginInterface extends Configurable.
ConfigOption option =
ConfigOptions.key("pluginConfigKey")

 .ConfigurableType(org.apache.flink.SomePluginInterface.class);

// Instantiate the user configured plugin
SomePluginInterface plugin =
configuration.getConfigurableInstance(pluginConfigKey);

// Programmatically, users will do the following to set the plugin.
// Set the plugin class, alternatively, we can always require a
ConfigurableFactory class as value.
configurations.setConfigurable("pluginConfigKey", MyPluginClass.class);
configurations.setInt("configKey1ForMyPluginClass", 123); // set the
configurations required by my plugin class.

// Non-programmatically, users can do the following to set the plugin in a
config file, e.g. yaml
pluginConfigKey: PATH_TO_MyPluginClass // Or PATH_TO_MyPluginClassFactory
configKey1ForMyPluginClass: 123

Internally, the configuration may discover the MyPluginClassFactory, call
MyPluginClassFactory.create(Configuration) and pass in itself as the
configuration argument.

>From user's perspective, the way to use Configurable is the following:
1. Set a class type of the Plugin in the configuration via Configuration
interface.
2. Provide a factory class for the Plugin, either by config value or by
service provider mechanism.
3. Set the configurations consumed by the plugin, via something like a yaml
file, or programmatically via Configuration interface.

How would the util that you are suggesting look like? It would always
> need to serialize/deserialize an object into an immutable string. This
> is not very efficient, given that the instance already exists and can be
> made immutable by the implementer by not exposing setters. Furthermore,
> we would loose the declarative approach and could not generate
> documentation. The current approach specifies the static final
> sub-ConfigOptions either in Configurable object (initial design) or in
> the ConfigurableFactory (current design) such that the docs generator
> can read them.

I'd imagine that in most cases, after a concrete Configurable (say
ExecutionConfig) instance is created from the Configuration instance, we
will just pass around the ExecutionConfig instead of the Configuration
object. If so, the serialization to / deserialization from String will only
happen once per JVM, which seems fine. I am not sure why the doc generation
would be impacted. As long as the ConfigOptions go into the Configurable or
ConfigurableFactory, the docs generator can still read them, right?

Regarding "Configurable may be created and configured directly without
> reading settings from a Configuration instance", there seems to be a
> misunderstanding. This is a very common case if not the most common. As
> mentioned before, take ExecutionConfig. This configuration is currently
> only used in a programmatic-way and needs a way to be expressed as
> ConfigOptions. CachedFile for instance will be a Configurable object
> that will binary serialized most of the time when sending it to the
> cluster but due to the Configurable design it is possible to store it in
> a string representation as well.

Thanks for the explanation. I feel this creating object then serialize /
deserialize using configuration is more of an internal use case. We are
essentially using the configurations to pass some arbitrary string around.
Technically speaking we can use this way to send and receive any object.
But I am not sure if this is something that we want to generalize and
impact more public use cases.
Personally I feel that Configurable
As for CachedFile, it seems we do not plan to use configuration to pass
that on? It would be good to avoid letting the Configurations to host
arbitrary objects beyond the primitive types.

To summarize, I am thinking if we should consider the following:
1. Design the Config mechanism as a cross-board API for not only internal
usage, but for broader use cases.
2. If writeToConfiguration is only for internal use cases, maybe we can
avoid adding it to the configurable interface. We can add another interface
such as ExtractableConfigurable for internal usage.

Wha

[DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-02 Thread Yijie Shen
Dear Flink Community!

I would like to open the discussion of contributing Pulsar Flink
connector [0] back to Flink.

## A brief introduction to Apache Pulsar

Apache Pulsar[1] is a multi-tenant, high-performance distributed
pub-sub messaging system. Pulsar includes multiple features such as
native support for multiple clusters in a Pulsar instance, with
seamless geo-replication of messages across clusters, very low publish
and end-to-end latency, seamless scalability to over a million topics,
and guaranteed message delivery with persistent message storage
provided by Apache BookKeeper. Nowadays, Pulsar has been adopted by
more and more companies[2].

## The status of Pulsar Flink connector

The Pulsar Flink connector we are planning to contribute is built upon
Flink 1.9.0 and Pulsar 2.4.0. The main features are:
- Pulsar as a streaming source with exactly-once guarantee.
- Sink streaming results to Pulsar with at-least-once semantics. (We
would update this to exactly-once as well when Pulsar gets all
transaction features ready in its 2.5.0 version)
- Build upon Flink new Table API Type system (FLIP-37[3]), and can
automatically (de)serialize messages with the help of Pulsar schema.
- Integrate with Flink new Catalog API (FLIP-30[4]), which enables the
use of Pulsar topics as tables in Table API as well as SQL client.

## Reference
[0] https://github.com/streamnative/pulsar-flink
[1] https://pulsar.apache.org/
[2] https://pulsar.apache.org/en/powered-by/
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
[4] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-30%3A+Unified+Catalog+APIs


Best,
Yijie Shen


Re: [SURVEY] Is the default restart delay of 0s causing problems?

2019-09-02 Thread Arvid Heise
Hi all,

just wanted to share my experience with configurations with you. For
non-expert users configurations of Flink can be very daunting. The list of
common properties is already helping a lot [1], but it's not clear how they
depend on each other and settings common for specific use cases are not
listed.

If we can give somewhat clear recommendations for the start for the most
common use cases (batch small/large cluster, streaming high throughput/low
latency), I think users would be able start much more quickly with a
somewhat well-configured system and fine-tune the settings later. For
example, Kafka Streams has a section on how to set the parameters for
maximum resilience [2].

I'd propose to leave the current configuration page as a reference page,
but also have a recommended configuration settings page that's directly
linked in the first section, such that new users are not overwhelmed.

Sorry if this response is hijacking the discussion.
Btw, is restart-strategy configuration missing in the main configuration
page? Is this a conscious decision?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#common-options
[2]
https://docs.confluent.io/current/streams/developer-guide/config-streams.html#recommended-configuration-parameters-for-resiliency

On Tue, Sep 3, 2019 at 5:10 AM Zhu Zhu  wrote:

> 1s looks good to me.
> And I think the conclusion that when a user should override the delay is
> worth to be documented.
>
> Thanks,
> Zhu Zhu
>
> Steven Wu  于2019年9月3日周二 上午4:42写道:
>
>> 1s sounds a good tradeoff to me.
>>
>> On Mon, Sep 2, 2019 at 1:30 PM Till Rohrmann 
>> wrote:
>>
>>> Thanks a lot for all your feedback. I see there is a slight tendency
>>> towards having a non zero default delay so far.
>>>
>>> However, Yu has brought up some valid points. Maybe I can shed some
>>> light on a).
>>>
>>> Before FLINK-9158 we set the default delay to 10s because Flink did not
>>> support queued scheduling which meant that if one slot was missing/still
>>> being occupied, then Flink would fail right away with
>>> a NoResourceAvailableException. In order to prevent this we added the
>>> delay. This also covered the case when the job was failing because of an
>>> overloaded external system.
>>>
>>> When we finished FLIP-6, we thought that we could improve the user
>>> experience by decreasing the default delay to 0s because all Flink related
>>> problems (slot still occupied, slot missing because of reconnecting TM)
>>> could be handled by the default slot request time out which allowed the
>>> slots to become ready after the scheduling was kicked off. However, we did
>>> not properly take the case of overloaded external systems into account.
>>>
>>> For b) I agree that any default value should be properly documented.
>>> This was clearly an oversight when FLINK-9158 has been merged. Moreover, I
>>> believe that there won't be the solve it all default value. There are
>>> always cases where one needs to adapt it to ones needs. But this is ok. The
>>> goal should be to find the default value which works for most cases.
>>>
>>> So maybe the middle ground between 10s and 0s could be a solution.
>>> Setting the default restart delay to 1s should prevent restart storms
>>> caused by overloaded external systems and still be fast enough to not slow
>>> down recoveries noticeably in most cases. If one needs a super fast
>>> recovery, then one should set the delay value to 0s. If one requires a
>>> longer delay because of a particular infrastructure, then one needs to
>>> change the value too. What do you think?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Sun, Sep 1, 2019 at 11:56 PM Yu Li  wrote:
>>>
 -1 on increasing the default delay to none zero, with below reasons:

 a) I could see some concerns about setting the delay to zero in the
 very original JIRA (FLINK-2993
 ) but later on in
 FLINK-9158  we still
 decided to make the change, so I'm wondering whether the decision also came
 from any customer requirement? If so, how could we judge whether one
 requirement override the other?

 b) There could be valid reasons for both default values depending on
 different use cases, as well as relative work around (like based on latest
 policy, setting the config manually to 10s could resolve the problem
 mentioned), and from former replies to this thread we could see users have
 already taken actions. Changing it back to non-zero again won't affect such
 users but might cause surprises to those depending on 0 as default.

 Last but not least, no matter what decision we make this time, I'd
 suggest to make it final and document in our release note explicitly.
 Checking the 1.5.0 release note [1] [2] it seems we didn't mention about
 the change on default restart delay and we'd better learn from it this
 time. Thanks.

 

Re: [DISCUSS] FLIP-53: Fine Grained Resource Management

2019-09-02 Thread Kurt Young
Thanks Xingtong for driving this effort, I haven't finished the whole
document yet,
but have couple of questions:

1. Regarding to network memory, the document said it will be derived by
framework
automatically. I'm wondering whether we should delete this dimension from
user-
facing API?

2. Regarding to fraction based quota, I don't quite get the meaning of
"slotSharingGroupOnHeapManagedMem" and "slotSharingGroupOffHeapManagedMem".
What if the sharing group is mixed with specified resource and UNKNOWN
resource
requirements.

3 IIUC, even user had set resource requirements, lets say 500MB off-heap
managed
memory, during execution the operator may or may not have 500MB off-heap
managed
memory, right?

Best,
Kurt


On Mon, Sep 2, 2019 at 8:36 PM Zhu Zhu  wrote:

> Thanks Xintong for proposing this improvement. Fine grained resources can
> be very helpful when user has good planning on resources.
>
> I have a few questions:
> 1. Currently in a batch job, vertices from different regions can run at the
> same time in slots from the same shared group, as long as they do not have
> data dependency on each other and available slot count is not smaller than
> the *max* of parallelism of all tasks.
> With changes in this FLIP however, tasks from different regions cannot
> share slots anymore.
> Once available slot count is smaller than the *sum* of all parallelism of
> tasks from all regions, tasks may need to be executed sequentially, which
> might result in a performance regression.
> Is this(performance regression to existing DataSet jobs) considered as a
> necessary and accepted trade off in this FLIP?
>
> 2. The network memory depends on the input/output ExecutionEdge count and
> thus can be different even for parallel instances of the same JobVertex.
> Does this mean that when adding task resources to calculating the slot
> resource for a shared group, the max possible network memory of the vertex
> instance shall be used?
> This might result in larger resource required than actually needed.
>
> And some minor comments:
> 1. Regarding "fracManagedMemOnHeap = 1 / numOpsUseOnHeapManagedMemory", I
> guess you mean numOpsUseOnHeapManagedMemoryInTheSameSharedGroup ?
> 2. I think the *StreamGraphGenerator* in the #Slot Sharing section and
> implementation step 4 should be *StreamingJobGraphGenerator*, as
> *StreamGraphGenerator* is not aware of JobGraph and pipelined region.
>
>
> Thanks,
> Zhu Zhu
>
> Xintong Song  于2019年9月2日周一 上午11:59写道:
>
> > Updated the FLIP wiki page [1], with the following changes.
> >
> >- Remove the step of converting pipelined edges between different slot
> >sharing groups into blocking edges.
> >- Set `allSourcesInSamePipelinedRegion` to true by default.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Sep 2, 2019 at 11:50 AM Xintong Song 
> > wrote:
> >
> > > Regarding changing edge type, I think actually we don't need to do this
> > > for batch jobs neither, because we don't have public interfaces for
> users
> > > to explicitly set slot sharing groups in DataSet API and SQL/Table API.
> > We
> > > have such interfaces in DataStream API only.
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Tue, Aug 27, 2019 at 10:16 PM Xintong Song 
> > > wrote:
> > >
> > >> Thanks for the correction, Till.
> > >>
> > >> Regarding your comments:
> > >> - You are right, we should not change the edge type for streaming
> jobs.
> > >> Then I think we can change the option
> 'allSourcesInSamePipelinedRegion'
> > in
> > >> step 2 to 'isStreamingJob', and implement the current step 2 before
> the
> > >> current step 1 so we can use this option to decide whether should
> change
> > >> the edge type. What do you think?
> > >> - Agree. It should be easier to make the default value of
> > >> 'allSourcesInSamePipelinedRegion' (or 'isStreamingJob') 'true', and
> set
> > it
> > >> to 'false' when using DataSet API or blink planner.
> > >>
> > >> Thank you~
> > >>
> > >> Xintong Song
> > >>
> > >>
> > >>
> > >> On Tue, Aug 27, 2019 at 8:59 PM Till Rohrmann 
> > >> wrote:
> > >>
> > >>> Thanks for creating the implementation plan Xintong. Overall, the
> > >>> implementation plan looks good. I had a couple of comments:
> > >>>
> > >>> - What will happen if a user has defined a streaming job with two
> slot
> > >>> sharing groups? Would the code insert a blocking data exchange
> between
> > >>> these two groups? If yes, then this breaks existing Flink streaming
> > jobs.
> > >>> - How do we detect unbounded streaming jobs to set
> > >>> the allSourcesInSamePipelinedRegion to `true`? Wouldn't it be easier
> to
> > >>> set
> > >>> it false if we are using the DataSet API or the Blink planner with a
> > >>> bounded job?
> > >>>
> > >>> Cheers,
> > >>> Till
> > >>>
> > >>> On Tue, Aug 27, 2019 at 2:16 PM Till Rohrmann 
> > >>> wrote:
> > >>>
> > >>> > I guess there is a typo since the link to the FLIP-53 is
> > >>> >
> > >>>
> >
> https://cwiki.apache.org/confluence/displ