[jira] [Created] (FLINK-15290) Need a way to turn off vectorized orc reader for SQL CLI

2019-12-17 Thread Rui Li (Jira)
Rui Li created FLINK-15290:
--

 Summary: Need a way to turn off vectorized orc reader for SQL CLI
 Key: FLINK-15290
 URL: https://issues.apache.org/jira/browse/FLINK-15290
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive
Reporter: Rui Li
 Fix For: 1.10.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15291) Rename WatermarkSepc#getWatermarkExpressionString to getWatermarkExpr

2019-12-17 Thread Jark Wu (Jira)
Jark Wu created FLINK-15291:
---

 Summary: Rename WatermarkSepc#getWatermarkExpressionString to 
getWatermarkExpr
 Key: FLINK-15291
 URL: https://issues.apache.org/jira/browse/FLINK-15291
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Jark Wu
 Fix For: 1.10.0


Currently, the expression getter methods in 
{{org.apache.flink.table.api.WatermarkSpec}} and 
{{org.apache.flink.table.api.TableColumn}} are not aligned, one is 
{{getWatermarkExpressionString}}, the other is {{getExpr}}. 

I would suggest to rename 
{{WatermarkSepc#getWatermarkExpressionString}} to 
{{WatermarkSepc#getWatermarkExpr}}.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15292) Rename Executor interface to PipelineExecutor

2019-12-17 Thread Kostas Kloudas (Jira)
Kostas Kloudas created FLINK-15292:
--

 Summary: Rename Executor interface to PipelineExecutor
 Key: FLINK-15292
 URL: https://issues.apache.org/jira/browse/FLINK-15292
 Project: Flink
  Issue Type: Improvement
  Components: Client / Job Submission
Affects Versions: 1.10.0
Reporter: Kostas Kloudas
Assignee: Kostas Kloudas
 Fix For: 1.10.0


This is a trivial renaming task which renames the newly introduced {{Executor}} 
interface to {{PipelineExecutor}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15293) Document new RocksDB memory configuration

2019-12-17 Thread Stephan Ewen (Jira)
Stephan Ewen created FLINK-15293:


 Summary: Document new RocksDB memory configuration
 Key: FLINK-15293
 URL: https://issues.apache.org/jira/browse/FLINK-15293
 Project: Flink
  Issue Type: Sub-task
Reporter: Stephan Ewen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15294) Deprecate CustomCommandLine

2019-12-17 Thread Zili Chen (Jira)
Zili Chen created FLINK-15294:
-

 Summary: Deprecate CustomCommandLine
 Key: FLINK-15294
 URL: https://issues.apache.org/jira/browse/FLINK-15294
 Project: Flink
  Issue Type: Task
  Components: Command Line Client
Reporter: Zili Chen


See also discussion in FLINK-15179. We'd like to migrate {{CustomCommandLine}} 
abstraction to Executor(FLIP-73) abstraction.

cc [~fly_in_gis] [~kkl0u]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] Zhu Zhu becomes a Flink committer

2019-12-17 Thread lining jing
Congratulations Zhu Zhu~

Andrey Zagrebin  于2019年12月16日周一 下午5:01写道:

> Congrats Zhu Zhu!
>
> On Mon, Dec 16, 2019 at 8:10 AM Xintong Song 
> wrote:
>
> > Congratulations Zhu Zhu~
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Mon, Dec 16, 2019 at 12:34 PM Danny Chan 
> wrote:
> >
> > > Congrats Zhu Zhu!
> > >
> > > Best,
> > > Danny Chan
> > > 在 2019年12月14日 +0800 AM12:51,dev@flink.apache.org,写道:
> > > >
> > > > Congrats Zhu Zhu and welcome on board!
> > >
> >
>


[jira] [Created] (FLINK-15295) Mark CustomCommandLine and its subclass as deprecated

2019-12-17 Thread Zili Chen (Jira)
Zili Chen created FLINK-15295:
-

 Summary: Mark CustomCommandLine and its subclass as deprecated
 Key: FLINK-15295
 URL: https://issues.apache.org/jira/browse/FLINK-15295
 Project: Flink
  Issue Type: Sub-task
Reporter: Zili Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15296) Support print executor-specific help information from CLI

2019-12-17 Thread Zili Chen (Jira)
Zili Chen created FLINK-15296:
-

 Summary: Support print executor-specific help information from CLI
 Key: FLINK-15296
 URL: https://issues.apache.org/jira/browse/FLINK-15296
 Project: Flink
  Issue Type: Sub-task
Reporter: Zili Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Improve documentation / tooling around security of Flink

2019-12-17 Thread Robert Metzger
Hey,
changes to the network configuration often cause unforeseen trouble, in
particular with things like Kubernetes, Docker etc., and the "onboarding
experience" might suffer due to this.

Updated list:
a) Add a check-security.sh script, or a check into the frontend if the
JobManager can be reached on the public internet
b) Add a prominent warning to the download page and the production
readiness checklist
c) add an opt-out warning to the Flink logs / UI that can be disabled via
the config.
d) Bind the REST endpoint to localhost only, by default
e) provide a script for generating an SSL certificate with the distribution.

On Sun, Dec 15, 2019 at 4:01 PM Konstantin Knauf 
wrote:

> Hi Robert,
>
> we could also add a warning (or a general "security" section) to the
> "production readiness checklist" in the documentation.
>
> Generally, I like d) in combination with an informative log message. Do
> you think this would cause a lot of friction?
>
> Cheers,
>
> Konstantin
>
> On Fri, Dec 13, 2019 at 2:06 PM Chesnay Schepler 
> wrote:
>
>> Another proposal that was brought up was to provide a script for
>> generating an SSL certificate with the distribution.
>>
>> On 12/12/2019 17:45, Robert Metzger wrote:
>> > Hi all,
>> >
>> > There was recently a private report to the Flink PMC, as well as
>> publicly
>> > [1] about Flink's ability to execute arbitrary code. In scenarios where
>> > Flink is accessible by somebody unauthorized, this can lead to issues.
>> > The PMC received a similar report in November 2018.
>> >
>> > I believe it would be good to warn our users a bit more prominently
>> about
>> > the risks of accidentally opening up Flink to the public internet, or
>> other
>> > unauthorized entities.
>> >
>> > I have collected the following potential solutions discussed so far:
>> >
>> > a) Add a check-security.sh script, or a check into the frontend if the
>> > JobManager can be reached on the public internet
>> > b) Add a prominent warning to the download page
>> > c) add an opt-out warning to the Flink logs / UI that can be disabled
>> via
>> > the config.
>> > d) Bind the REST endpoint to localhost only, by default
>> >
>> >
>> > I'm curious to hear if others have other ideas what to do.
>> > I personally like to kick things off with b).
>> >
>> >
>> > Best,
>> > Robert
>> >
>> >
>> > [1] https://twitter.com/pyn3rd/status/1197397475897692160
>> >
>>
>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica 
>
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>


Re: [ANNOUNCE] Zhu Zhu becomes a Flink committer

2019-12-17 Thread Gary Yao
Congratulations, well deserved.

On Tue, Dec 17, 2019 at 10:09 AM lining jing  wrote:

> Congratulations Zhu Zhu~
>
> Andrey Zagrebin  于2019年12月16日周一 下午5:01写道:
>
> > Congrats Zhu Zhu!
> >
> > On Mon, Dec 16, 2019 at 8:10 AM Xintong Song 
> > wrote:
> >
> > > Congratulations Zhu Zhu~
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Mon, Dec 16, 2019 at 12:34 PM Danny Chan 
> > wrote:
> > >
> > > > Congrats Zhu Zhu!
> > > >
> > > > Best,
> > > > Danny Chan
> > > > 在 2019年12月14日 +0800 AM12:51,dev@flink.apache.org,写道:
> > > > >
> > > > > Congrats Zhu Zhu and welcome on board!
> > > >
> > >
> >
>


Re: [ANNOUNCE] Weekly Community Update 2019/50

2019-12-17 Thread Konstantin Knauf
Hi Hequn,

thanks, and thanks for the offer. Of course, you can cover the holiday
break, i.e. the next three weeks. Looking forward to your updates!

Cheers,

Konstantin

On Mon, Dec 16, 2019 at 5:53 AM Hequn Cheng  wrote:

> Hi Konstantin,
>
> Happy holidays and thanks a lot for your great job on the updates
> continuously.
> With the updates, it is easier for us to catch up with what's going on in
> the community, which I think is quite helpful.
>
> I'm wondering if I can do some help and cover this during your vocation. :)
>
> Best,
> Hequn
>
> On Sun, Dec 15, 2019 at 11:36 PM Konstantin Knauf <
> konstan...@ververica.com> wrote:
>
>> Dear community,
>>
>> happy to share this week's brief community digest with updates on Flink
>> 1.8.3 and Flink 1.10, a discussion on how to facilitate easier Flink/Hive
>> setups, a couple of blog posts and a bit more.
>>
>> *Personal Note:* Thank you for reading these updates since I started
>> them early this year. I will take a three week Christmas break and will be
>> back with a Holiday season community update on the 12th of January.
>>
>> Flink Development
>> ==
>>
>> * [releases] Apache Flink 1.8.3 was released on Wednesday. [1,2]
>>
>> * [releases] The feature freeze for Apache Flink took place on Monday.
>> The community is now working on testing, bug fixes and improving the
>> documentation in order to create a first release candidate soon. [3]
>>
>> * [development process] Seth has revived the discussion on a past PR by
>> Marta, which added a documentation style guide to the contributor guide.
>> Please check it [4] out, if you are contributing documentation to Apache
>> Flink. [5]
>>
>> * [security] Following a recent report to the Flink PMC of "exploiting"
>> the Flink Web UI for remote code execution, Robert has started a discussion
>> on how to improve the tooling/documentation to make users aware of this
>> possibility and recommend securing this interface in production setups. [6]
>>
>> * [sql] Bowen has started a discussion on how to simplify the Flink-Hive
>> setup for new users as currently users need to add some additional
>> dependencies to the classpath manually. The discussion seems to conclude
>> towards providing a single additional hive-uber jar, which contains all the
>> required dependencies. [7]
>>
>> [1] https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-8-3-released-tp35868.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Feature-freeze-for-Apache-Flink-1-10-0-release-tp35139.html
>> [4] https://github.com/apache/flink-web/pull/240
>> [5]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Docs-Style-Guide-Review-tp35758.html
>> [6]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-documentation-tooling-around-security-of-Flink-tp35898.html
>> [7]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-have-separate-Flink-distributions-with-built-in-Hive-dependencies-tp35918.html
>>
>> Notable Bugs
>> ==
>>
>> [FLINK-15152] [1.9.1] When a "stop" action on a job fails, because not
>> all tasks are in "RUNNING" state the job is not checkpointing afterwards.
>> [8]
>>
>> [8] https://issues.apache.org/jira/browse/FLINK-15152
>>
>> Events, Blog Posts, Misc
>> ===
>>
>> * Zhu Zhu is now an Apache Flink Comitter. Congratulations! [9]
>>
>> * Gerred Dillon has published a blog post on the Apache Flink blog on how
>> to run Flink on Kubernetes with a KUDO Flink operator. [10]
>>
>> * In this blog post Apache Flink PMC Sun Jincheng outlines the reasons
>> and motivation for his and his colleague's work to provide a world-class
>> Python support for Apache Flink's Table API. [11]
>>
>> * Upcoming Meetups
>> * On December 17th there will be the second Apache Flink meetup in
>> Seoul. [12] *Dongwon* has shared a detailed agenda in last weeks
>> community update. [13]
>> * On December 18th Alexander Fedulov will talk about Stateful Stream
>> Processing with Apache Flink at the Java Professionals Meetup in Minsk. [14]
>>
>> [9]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Zhu-Zhu-becomes-a-Flink-committer-tp35944.html
>> [10] https://flink.apache.org/news/2019/12/09/flink-kubernetes-kudo.html
>> [11]
>> https://developpaper.com/why-will-apache-flink-1-9-0-support-the-python-api/
>> [12] https://www.meetup.com/Seoul-Apache-Flink-Meetup/events/266824815/
>> [13]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Weekly-Community-Update-2019-48-td35423.html
>> [14] https://www.meetup.com/Apache-Flink-Meetup-Minsk/events/267134296/
>>
>> Cheers,
>>
>> Konstantin (@snntrable)
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>>
>> Follow us @VervericaData Ververica 
>>
>>
>> --
>>
>> Joi

Re: [ANNOUNCE] Weekly Community Update 2019/50

2019-12-17 Thread Hequn Cheng
Cool. I will do it in the next three weeks.
Thanks a lot for your continued great work!

Best, Hequn

On Tue, Dec 17, 2019 at 6:16 PM Konstantin Knauf 
wrote:

> Hi Hequn,
>
> thanks, and thanks for the offer. Of course, you can cover the holiday
> break, i.e. the next three weeks. Looking forward to your updates!
>
> Cheers,
>
> Konstantin
>
> On Mon, Dec 16, 2019 at 5:53 AM Hequn Cheng  wrote:
>
>> Hi Konstantin,
>>
>> Happy holidays and thanks a lot for your great job on the updates
>> continuously.
>> With the updates, it is easier for us to catch up with what's going on in
>> the community, which I think is quite helpful.
>>
>> I'm wondering if I can do some help and cover this during your vocation.
>> :)
>>
>> Best,
>> Hequn
>>
>> On Sun, Dec 15, 2019 at 11:36 PM Konstantin Knauf <
>> konstan...@ververica.com> wrote:
>>
>>> Dear community,
>>>
>>> happy to share this week's brief community digest with updates on Flink
>>> 1.8.3 and Flink 1.10, a discussion on how to facilitate easier Flink/Hive
>>> setups, a couple of blog posts and a bit more.
>>>
>>> *Personal Note:* Thank you for reading these updates since I started
>>> them early this year. I will take a three week Christmas break and will be
>>> back with a Holiday season community update on the 12th of January.
>>>
>>> Flink Development
>>> ==
>>>
>>> * [releases] Apache Flink 1.8.3 was released on Wednesday. [1,2]
>>>
>>> * [releases] The feature freeze for Apache Flink took place on Monday.
>>> The community is now working on testing, bug fixes and improving the
>>> documentation in order to create a first release candidate soon. [3]
>>>
>>> * [development process] Seth has revived the discussion on a past PR by
>>> Marta, which added a documentation style guide to the contributor guide.
>>> Please check it [4] out, if you are contributing documentation to Apache
>>> Flink. [5]
>>>
>>> * [security] Following a recent report to the Flink PMC of "exploiting"
>>> the Flink Web UI for remote code execution, Robert has started a discussion
>>> on how to improve the tooling/documentation to make users aware of this
>>> possibility and recommend securing this interface in production setups. [6]
>>>
>>> * [sql] Bowen has started a discussion on how to simplify the Flink-Hive
>>> setup for new users as currently users need to add some additional
>>> dependencies to the classpath manually. The discussion seems to conclude
>>> towards providing a single additional hive-uber jar, which contains all the
>>> required dependencies. [7]
>>>
>>> [1] https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-1-8-3-released-tp35868.html
>>> [3]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Feature-freeze-for-Apache-Flink-1-10-0-release-tp35139.html
>>> [4] https://github.com/apache/flink-web/pull/240
>>> [5]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Docs-Style-Guide-Review-tp35758.html
>>> [6]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-documentation-tooling-around-security-of-Flink-tp35898.html
>>> [7]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-have-separate-Flink-distributions-with-built-in-Hive-dependencies-tp35918.html
>>>
>>> Notable Bugs
>>> ==
>>>
>>> [FLINK-15152] [1.9.1] When a "stop" action on a job fails, because not
>>> all tasks are in "RUNNING" state the job is not checkpointing afterwards.
>>> [8]
>>>
>>> [8] https://issues.apache.org/jira/browse/FLINK-15152
>>>
>>> Events, Blog Posts, Misc
>>> ===
>>>
>>> * Zhu Zhu is now an Apache Flink Comitter. Congratulations! [9]
>>>
>>> * Gerred Dillon has published a blog post on the Apache Flink blog on
>>> how to run Flink on Kubernetes with a KUDO Flink operator. [10]
>>>
>>> * In this blog post Apache Flink PMC Sun Jincheng outlines the reasons
>>> and motivation for his and his colleague's work to provide a world-class
>>> Python support for Apache Flink's Table API. [11]
>>>
>>> * Upcoming Meetups
>>> * On December 17th there will be the second Apache Flink meetup in
>>> Seoul. [12] *Dongwon* has shared a detailed agenda in last weeks
>>> community update. [13]
>>> * On December 18th Alexander Fedulov will talk about Stateful Stream
>>> Processing with Apache Flink at the Java Professionals Meetup in Minsk. [14]
>>>
>>> [9]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Zhu-Zhu-becomes-a-Flink-committer-tp35944.html
>>> [10] https://flink.apache.org/news/2019/12/09/flink-kubernetes-kudo.html
>>> [11]
>>> https://developpaper.com/why-will-apache-flink-1-9-0-support-the-python-api/
>>> [12] https://www.meetup.com/Seoul-Apache-Flink-Meetup/events/266824815/
>>> [13]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Weekly-Community-Update-2019-48-td35423.html
>>> [14] ht

Re: Potential side-effect of connector code to JM/TM

2019-12-17 Thread Till Rohrmann
Hi Yingjie,

thanks for reporting this issue and starting this discussion. If we are
dealing with third party libraries I believe there is always the risk that
one overlooks closing resources. Ideally we make it as hard from Flink's
perspective as possible but realistically it is hard to completely avoid.
Hence, I believe that it would be beneficial to have some tooling (e.g.
stress tests) which could help to surface these kind of problems. Maybe one
could automate it so that a dev only needs to provide a user jar and then
this jar is being executed several times and the cluster is checked for
anomalies.

Cheers,
Till

On Tue, Dec 17, 2019 at 8:43 AM Yingjie Cao  wrote:

> Hi community,
>
>   After running tpc-ds test suite for several days on a session cluster, we
> found a resource leak problem of OrcInputFormat which was reported in
> FLINK-15239. The problem comes from the dependent third party library which
> creates new internal thread (pool) and never release it. As a result, the
> user class loader which is referenced by these threads will never be
> garbage collected as well as other classes loaded by the user class loader,
> which finally lead to the continually grow of meta space size for JM (AM)
> whose meta space size is not limited currently. And for TM whose meta space
> size is limited, it will result in meta space oom eventually. I am not sure
> if any other connectors/input formats incurs the similar problem.
>   In general, it is hard for Flink to restrict the behavior of the third
> party dependencies, especially the dependencies of connectors. However, it
> will be better if we can supply some mechanism like stronger isolation or
> some test facilities to find potential problems, for example, we can run
> jobs on a cluster and automatically check something like whether user class
> loader can be garbage collected, whether there is thread leak, whether some
> shutdown hooks have been registered and so on.
>   What do you think? Or should we treat it as a problem?
>
> Best,
> Yingjie
>


[jira] [Created] (FLINK-15297) Do not throw exception if YARN Application switched to FINISHED immediately after deployed in YarnClusterDescriptor#startAppMaster

2019-12-17 Thread Zili Chen (Jira)
Zili Chen created FLINK-15297:
-

 Summary: Do not throw exception if YARN Application switched to 
FINISHED immediately after deployed in YarnClusterDescriptor#startAppMaster
 Key: FLINK-15297
 URL: https://issues.apache.org/jira/browse/FLINK-15297
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Reporter: Zili Chen
Assignee: Zili Chen


Currently we throw an exception in {{YarnClusterDescriptor#startAppMaster}} if 
we first detect {{FINISHED}} before {{RUNNING}}. However, it is possible a 
legal state that the application finished normally immediately.

Right now we always try to connect the Dispatcher so it may be fine to throw 
the exception a bit earlier(otherwise when connect to a closed cluster an 
exception thrown also), but it is semantically wrong. Internally we have a code 
path that only required to report the ApplicationReport and it causes trouble.

cc [~trohrmann] what do you think?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Potential side-effect of connector code to JM/TM

2019-12-17 Thread Biao Liu
Hi Yingjie,

Thanks for figuring out the impressive bug and bringing this discussion.

I'm afraid there is no such a silver bullet for isolation from third-party
library. However I agree that resource checking utils might help.
It seems that you and Till have already raised some feasible ideas.
Resource leaking issue looks like quite common. It would be great If
someone could share some experience. Will keep an eye on this discussion.

Thanks,
Biao /'bɪ.aʊ/



On Tue, 17 Dec 2019 at 20:27, Till Rohrmann  wrote:

> Hi Yingjie,
>
> thanks for reporting this issue and starting this discussion. If we are
> dealing with third party libraries I believe there is always the risk that
> one overlooks closing resources. Ideally we make it as hard from Flink's
> perspective as possible but realistically it is hard to completely avoid.
> Hence, I believe that it would be beneficial to have some tooling (e.g.
> stress tests) which could help to surface these kind of problems. Maybe one
> could automate it so that a dev only needs to provide a user jar and then
> this jar is being executed several times and the cluster is checked for
> anomalies.
>
> Cheers,
> Till
>
> On Tue, Dec 17, 2019 at 8:43 AM Yingjie Cao 
> wrote:
>
> > Hi community,
> >
> >   After running tpc-ds test suite for several days on a session cluster,
> we
> > found a resource leak problem of OrcInputFormat which was reported in
> > FLINK-15239. The problem comes from the dependent third party library
> which
> > creates new internal thread (pool) and never release it. As a result, the
> > user class loader which is referenced by these threads will never be
> > garbage collected as well as other classes loaded by the user class
> loader,
> > which finally lead to the continually grow of meta space size for JM (AM)
> > whose meta space size is not limited currently. And for TM whose meta
> space
> > size is limited, it will result in meta space oom eventually. I am not
> sure
> > if any other connectors/input formats incurs the similar problem.
> >   In general, it is hard for Flink to restrict the behavior of the third
> > party dependencies, especially the dependencies of connectors. However,
> it
> > will be better if we can supply some mechanism like stronger isolation or
> > some test facilities to find potential problems, for example, we can run
> > jobs on a cluster and automatically check something like whether user
> class
> > loader can be garbage collected, whether there is thread leak, whether
> some
> > shutdown hooks have been registered and so on.
> >   What do you think? Or should we treat it as a problem?
> >
> > Best,
> > Yingjie
> >
>


[jira] [Created] (FLINK-15298) Wrong dependences in the DataStream API tutorial (the wiki-edits example)

2019-12-17 Thread Jun Qin (Jira)
Jun Qin created FLINK-15298:
---

 Summary: Wrong dependences in the DataStream API tutorial (the 
wiki-edits example)
 Key: FLINK-15298
 URL: https://issues.apache.org/jira/browse/FLINK-15298
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.9.1, 1.9.0
Reporter: Jun Qin


[The DataStream API Tutorial in Flink 1.9 | 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/getting-started/tutorials/datastream_api.html]
 mentioned the following dependences:

{code:java}


org.apache.flink
flink-java
${flink.version}


org.apache.flink
flink-streaming-java_2.11
${flink.version}


org.apache.flink
flink-clients_2.11
${flink.version}


org.apache.flink
flink-connector-wikiedits_2.11
${flink.version}


{code}

There are two issues here:
# {{flink-java}} and {{flink-streaming-java}} should be set to *provided* scope
# {{flink-client}} is not needed. If {{flink-client}} is added into *compile* 
scope, {{flink-runtime}} will be added implicitly



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15299) Move ClientUtils#submitJob & ClientUtils#submitJobAndWaitForResult to flink-test-utils

2019-12-17 Thread Zili Chen (Jira)
Zili Chen created FLINK-15299:
-

 Summary: Move ClientUtils#submitJob & 
ClientUtils#submitJobAndWaitForResult to flink-test-utils
 Key: FLINK-15299
 URL: https://issues.apache.org/jira/browse/FLINK-15299
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Reporter: Zili Chen
Assignee: Zili Chen
 Fix For: 1.11.0


Now we don't use these methods in production any more and it doesn't attend to 
server as productive methods. Move to flink-test-utils.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-17 Thread Becket Qin
Hi folks,

Thanks for the comments. I am convinced that the Source API should not take
boundedness as a parameter after it is constructed. What Timo and Dawid
suggested sounds a reasonable solution to me. So the Source API would
become:

Source {
Boundedness getBoundedness();
}

Assuming the above Source API, in addition to the two options mentioned in
earlier emails, I am thinking of another option:

*Option 3:*
// MySource must be unbounded, otherwise throws exception.
DataStream dataStream = env.source(mySource);

// MySource must be bounded, otherwise throws exception.
BoundedDataStream boundedDataStream = env.boundedSource(mySource);

The pros of this API are:
   a) It fits the requirements from Table / SQL well.
   b) DataStream users still have type safety (option 2 only has partial
type safety).
   c) Cristal clear boundedness from the API which makes DataStream join /
connect easy to reason about.
The caveats I see,
   a) It is inconsistent with Table since Table has one unified interface.
   b) No streaming mode for bounded source.

@Stephan Ewen  @Aljoscha Krettek
 what do you think of the approach?


Orthogonal to the above API, I am wondering whether boundedness is the only
dimension needed to describe the characteristic of the Source behavior. We
may also need to have another dimension of *record order*.

For example, when a file source is reading from a directory with bounded
records, it may have two ways to read.
1. Read files in parallel.
2. Read files in the chronological order.
In both cases, the file source is a Bounded Source. However, the processing
requirement for downstream may be different. In the first case, the
record processing and result emitting order does not matter, e.g. word
count. In the second case, the records may have to be processed in the
order they were read, e.g. change log processing.

If the Source only has a getBoundedness() method, the downstream processors
would not know whether the records emitted from the Source should be
processed in order or not. So combining the boundedness and record order,
we will have four scenarios:

*Bounded-StrictOrder*: A segment of change log.
*Bounded-Random*:  Batch Word Count.
*Unbounded-StrictOrder*: An infinite change log.
*Unbounded-Random*: Streaming Word Count.

Option 2 mentioned in the previous email was kind of trying to handle the
Bounded-StrictOrder case by creating a DataStream from a bounded source,
which actually does not work.
It looks that we do not have strict order support in some operators at this
point, e.g. join. But we may still want to add the semantic to the Source
first so later on we don't need to change all the source implementations,
especially given that many of them will be implemented by 3rd party.

Given that, we need another dimension of *Record Order* in the Source. More
specifically, the API would become:

Source {
Boundedness getBoundedness();
RecordOrder getRecordOrder();
}

public enum RecordOrder {
/** The record in the DataStream must be processed in its strict order
for correctness. */
STRICT,
/** The record in the DataStream can be processed in arbitrary order. */
RANDOM;
}

Any thoughts?

Thanks,

Jiangjie (Becket) Qin

On Tue, Dec 17, 2019 at 3:44 PM Timo Walther  wrote:

> Hi Becket,
>
> I completely agree with Dawid's suggestion. The information about the
> boundedness should come out of the source. Because most of the streaming
> sources can be made bounded based on some connector specific criterion.
> In Kafka, it would be an end offset or end timestamp but in any case
> having just a env.boundedSource() is not enough because parameters for
> making the source bounded are missing.
>
> I suggest to have a simple `isBounded(): Boolean` flag in every source
> that might be influenced by a connector builder as Dawid mentioned.
>
> For type safety during programming, we can still go with *Final state
> 1*. By having a env.source() vs env.boundedSource(). The latter would
> just enforce that the boolean flag is set to `true` and could make
> bounded operations available (if we need that actually).
>
> However, I don't think that we should start making a unified Table API
> ununified again. Boundedness is an optimization property. Every bounded
> operation can also executed in an unbounded way using updates/retraction
> or watermarks.
>
> Regards,
> Timo
>
>
> On 15.12.19 14:22, Becket Qin wrote:
> > Hi Dawid and Jark,
> >
> > I think the discussion ultimately boils down to the question that which
> one
> > of the following two final states do we want? Once we make this decision,
> > everything else can be naturally derived.
> >
> > *Final state 1*: Separate API for bounded / unbounded DataStream & Table.
> > That means any code users write will be valid at the point when they
> write
> > the code. This is similar to having type safety check at programming
> time.
> > For example,
> >
> > BoundedDataStream extends DataStream {
> > // Operations o

Re: [DISCUSS] Flink docs vendor table

2019-12-17 Thread Till Rohrmann
Thanks for continuing this discussion Seth. I like the mockup and I think
this is a good improvement. Modulo the completeness check, +1 for offering
links to 3rd party integrations.

Cheers,
Till

On Mon, Dec 16, 2019 at 6:04 PM Seth Wiesman  wrote:

> This discussion is a follow up to the previous thread on dropping
> vendor-specific documentation[1].
>
> The conversation ended unresolved on the question of what we should provide
> on the Apache Flink docs. The consensus seemed to be moving towards
> offering a table with links to 3rd parties. After an offline conversation
> with Robert, I have drafted a mock-up of what that might look like[2].
> Please note that I included a few vendors that I could think of off the top
> of my head, the list in this picture is not complete but that is not the
> conversation we are having here.
>
> There are three competing goals that we are trying to achieve here.
>
> 1) Provide information to users that vendor support is available as it can
> be important in growing adoption within enterprises
> 2) Be maintainable by the open-source Flink community
> 3) Remain neutral
>
> Please let me know what you think
>
> Seth
>
> [1]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-vendor-specific-deployment-documentation-td35457.html
> [2]
>
> https://gist.githubusercontent.com/sjwiesman/bb90f0765148c15051bcc91092367851/raw/42c0a1e9240f1c5808a053f8ff5965828cca96d5/mockup.png
>


[jira] [Created] (FLINK-15300) Shuffle memory fraction sanity check does not account for its min/max limit

2019-12-17 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-15300:
---

 Summary: Shuffle memory fraction sanity check does not account for 
its min/max limit
 Key: FLINK-15300
 URL: https://issues.apache.org/jira/browse/FLINK-15300
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Reporter: Andrey Zagrebin
Assignee: Andrey Zagrebin
 Fix For: 1.10.0


If we have a configuration which results in setting shuffle memory size to its 
min or max, not fraction during TM startup then starting TM parses generated 
dynamic properties and while doing the sanity check 
(TaskExecutorResourceUtils#sanityCheckShuffleMemory) it fails because it checks 
the exact fraction for min/max value.

Example, start TM with the following Flink config:
{code:java}
taskmanager.memory.total-flink.size: 350m
taskmanager.memory.framework.heap.size: 16m
taskmanager.memory.shuffle.fraction: 0.1{code}
It will result in the following extra program args:
{code:java}
taskmanager.memory.shuffle.max: 67108864b
 taskmanager.memory.framework.off-heap.size: 134217728b
 taskmanager.memory.managed.size: 146800642b
 taskmanager.cpu.cores: 1.0
 taskmanager.memory.task.heap.size: 2097150b
 taskmanager.memory.task.off-heap.size: 0b
 taskmanager.memory.shuffle.min: 67108864b{code}
where the derived fraction was less than shuffle memory min size (64mb),
so it was set to the min value: 64mb.



 

While TM starts, TaskExecutorResourceUtils#sanityCheckShuffleMemory trows the 
following exception:
{code:java}
org.apache.flink.configuration.IllegalConfigurationException: Derived Shuffle 
Memory size(64 Mb (67108864 bytes)) does not match configured Shuffle Memory 
fraction 
(0.1000149011612).org.apache.flink.configuration.IllegalConfigurationException:
 Derived Shuffle Memory size(64 Mb (67108864 bytes)) does not match configured 
Shuffle Memory fraction (0.1000149011612). at 
org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.sanityCheckShuffleMemory(TaskExecutorResourceUtils.java:552)
 at 
org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.deriveResourceSpecWithExplicitTaskAndManagedMemory(TaskExecutorResourceUtils.java:183)
 at 
org.apache.flink.runtime.clusterframework.TaskExecutorResourceUtils.resourceSpecFromConfig(TaskExecutorResourceUtils.java:135)
{code}
This can be fixed by checking whether the fraction to assert is within the 
min/max range.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Flink docs vendor table

2019-12-17 Thread Stephan Ewen
+1 for your proposed solution, Seth!

On Tue, Dec 17, 2019 at 3:05 PM Till Rohrmann  wrote:

> Thanks for continuing this discussion Seth. I like the mockup and I think
> this is a good improvement. Modulo the completeness check, +1 for offering
> links to 3rd party integrations.
>
> Cheers,
> Till
>
> On Mon, Dec 16, 2019 at 6:04 PM Seth Wiesman  wrote:
>
> > This discussion is a follow up to the previous thread on dropping
> > vendor-specific documentation[1].
> >
> > The conversation ended unresolved on the question of what we should
> provide
> > on the Apache Flink docs. The consensus seemed to be moving towards
> > offering a table with links to 3rd parties. After an offline conversation
> > with Robert, I have drafted a mock-up of what that might look like[2].
> > Please note that I included a few vendors that I could think of off the
> top
> > of my head, the list in this picture is not complete but that is not the
> > conversation we are having here.
> >
> > There are three competing goals that we are trying to achieve here.
> >
> > 1) Provide information to users that vendor support is available as it
> can
> > be important in growing adoption within enterprises
> > 2) Be maintainable by the open-source Flink community
> > 3) Remain neutral
> >
> > Please let me know what you think
> >
> > Seth
> >
> > [1]
> >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-vendor-specific-deployment-documentation-td35457.html
> > [2]
> >
> >
> https://gist.githubusercontent.com/sjwiesman/bb90f0765148c15051bcc91092367851/raw/42c0a1e9240f1c5808a053f8ff5965828cca96d5/mockup.png
> >
>


[jira] [Created] (FLINK-15301) Flink Kinesis AsyncRecordEmitter needs to handle unchecked exception gracefully

2019-12-17 Thread Ying Xu (Jira)
Ying Xu created FLINK-15301:
---

 Summary: Flink Kinesis AsyncRecordEmitter needs to handle 
unchecked exception gracefully
 Key: FLINK-15301
 URL: https://issues.apache.org/jira/browse/FLINK-15301
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Kinesis
Reporter: Ying Xu


Currently, any runTime exception encountered inside the 
`AsyncRecordEmitter.emitRecordAndUpdateState()` function could cause the thread 
to exit silently. Flink job would continue to run, but the stopped record 
emitter would subsequently cause Kinesis data consumption to stall. 

 

The AsyncRecordEmitter need to catch unchecked exception, log errors, and 
perhaps trigger job restart subsequently. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] have separate Flink distributions with built-in Hive dependencies

2019-12-17 Thread Bowen Li
I'm not sure providing an uber jar would be possible.

Different from kafka and elasticsearch connector who have dependencies for
a specific kafka/elastic version, or the kafka universal connector that
provides good compatibilities, hive connector needs to deal with hive jars
in all 1.x, 2.x, 3.x versions (let alone all the HDP/CDH distributions)
with incompatibility even between minor versions, different versioned
hadoop and other extra dependency jars for each hive version.

Besides, users usually need to be able to easily see which individual jars
are required, which is invisible from an uber jar. Hive users already have
their hive deployments. They usually have to use their own hive jars
because, unlike hive jars on mvn, their own jars contain changes in-house
or from vendors. They need to easily tell which jars Flink requires for
corresponding open sourced hive version to their own hive deployment, and
copy in-hosue jars over from hive deployments as replacements.

Providing a script to download all the individual jars for a specified hive
version can be an alternative.

The goal is we need to provide a *product*, not a technology, to make it
less hassle for Hive users. Afterall, it's Flink embracing Hive community
and ecosystem, not the other way around. I'd argue Hive connector can be
treat differently because its community/ecosystem/userbase is much larger
than the other connectors, and it's way more important than other
connectors to Flink on the mission of becoming a batch/streaming unified
engine and get Flink more widely adopted.


On Sun, Dec 15, 2019 at 10:03 PM Danny Chan  wrote:

> Also -1 on separate builds.
>
> After referencing some other BigData engines for distribution[1], i didn't
> find strong needs to publish a separate build
> for just a separate Hive version, indeed there are builds for different
> Hadoop version.
>
> Just like Seth and Aljoscha said, we could push a
> flink-hive-version-uber.jar to use as a lib of SQL-CLI or other use cases.
>
> [1] https://spark.apache.org/downloads.html
> [2] https://www.elastic.co/guide/en/elasticsearch/hadoop/current/hive.html
>
> Best,
> Danny Chan
> 在 2019年12月14日 +0800 AM3:03,dev@flink.apache.org,写道:
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#dependencies
>


Re: [DISCUSS] Flink docs vendor table

2019-12-17 Thread Robert Metzger
+1 to the general idea

Maybe we could add "Deployment Model" in addition to "Supported
Environments" as properties for the vendors.
I'd say Cloudera, Eventador and Huawei [1] are missing from this page

[1]https://www.huaweicloud.com/en-us/product/cs.html

On Tue, Dec 17, 2019 at 5:05 PM Stephan Ewen  wrote:

> +1 for your proposed solution, Seth!
>
> On Tue, Dec 17, 2019 at 3:05 PM Till Rohrmann 
> wrote:
>
> > Thanks for continuing this discussion Seth. I like the mockup and I think
> > this is a good improvement. Modulo the completeness check, +1 for
> offering
> > links to 3rd party integrations.
> >
> > Cheers,
> > Till
> >
> > On Mon, Dec 16, 2019 at 6:04 PM Seth Wiesman 
> wrote:
> >
> > > This discussion is a follow up to the previous thread on dropping
> > > vendor-specific documentation[1].
> > >
> > > The conversation ended unresolved on the question of what we should
> > provide
> > > on the Apache Flink docs. The consensus seemed to be moving towards
> > > offering a table with links to 3rd parties. After an offline
> conversation
> > > with Robert, I have drafted a mock-up of what that might look like[2].
> > > Please note that I included a few vendors that I could think of off the
> > top
> > > of my head, the list in this picture is not complete but that is not
> the
> > > conversation we are having here.
> > >
> > > There are three competing goals that we are trying to achieve here.
> > >
> > > 1) Provide information to users that vendor support is available as it
> > can
> > > be important in growing adoption within enterprises
> > > 2) Be maintainable by the open-source Flink community
> > > 3) Remain neutral
> > >
> > > Please let me know what you think
> > >
> > > Seth
> > >
> > > [1]
> > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-vendor-specific-deployment-documentation-td35457.html
> > > [2]
> > >
> > >
> >
> https://gist.githubusercontent.com/sjwiesman/bb90f0765148c15051bcc91092367851/raw/42c0a1e9240f1c5808a053f8ff5965828cca96d5/mockup.png
> > >
> >
>


[jira] [Created] (FLINK-15302) properties in create table DDL need to be backward compatible

2019-12-17 Thread Bowen Li (Jira)
Bowen Li created FLINK-15302:


 Summary: properties in create table DDL need to be backward 
compatible 
 Key: FLINK-15302
 URL: https://issues.apache.org/jira/browse/FLINK-15302
 Project: Flink
  Issue Type: Improvement
  Components: Table SQL / API
Reporter: Bowen Li
Assignee: Jark Wu


since we have a persistent HiveCatalog now, properties in create table DDL need 
to be backward compatible for at least 2 major versions, like state. Otherwise, 
e.g. table created via DDL in 1.10 may not be readable in 1.11





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15303) support predicate pushdown for sources in hive connector

2019-12-17 Thread Bowen Li (Jira)
Bowen Li created FLINK-15303:


 Summary: support predicate pushdown for sources in hive connector 
 Key: FLINK-15303
 URL: https://issues.apache.org/jira/browse/FLINK-15303
 Project: Flink
  Issue Type: New Feature
  Components: Connectors / Hive
Reporter: Bowen Li
Assignee: Jingsong Lee
 Fix For: 1.11.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Flink docs vendor table

2019-12-17 Thread Seth Wiesman
Happy to see there seems to be a consensus.

Robert, can you elaborate on what you mean by "deployment model"?

Seth

On Tue, Dec 17, 2019 at 12:19 PM Robert Metzger  wrote:

> +1 to the general idea
>
> Maybe we could add "Deployment Model" in addition to "Supported
> Environments" as properties for the vendors.
> I'd say Cloudera, Eventador and Huawei [1] are missing from this page
>
> [1]https://www.huaweicloud.com/en-us/product/cs.html
>
> On Tue, Dec 17, 2019 at 5:05 PM Stephan Ewen  wrote:
>
> > +1 for your proposed solution, Seth!
> >
> > On Tue, Dec 17, 2019 at 3:05 PM Till Rohrmann 
> > wrote:
> >
> > > Thanks for continuing this discussion Seth. I like the mockup and I
> think
> > > this is a good improvement. Modulo the completeness check, +1 for
> > offering
> > > links to 3rd party integrations.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Mon, Dec 16, 2019 at 6:04 PM Seth Wiesman 
> > wrote:
> > >
> > > > This discussion is a follow up to the previous thread on dropping
> > > > vendor-specific documentation[1].
> > > >
> > > > The conversation ended unresolved on the question of what we should
> > > provide
> > > > on the Apache Flink docs. The consensus seemed to be moving towards
> > > > offering a table with links to 3rd parties. After an offline
> > conversation
> > > > with Robert, I have drafted a mock-up of what that might look
> like[2].
> > > > Please note that I included a few vendors that I could think of off
> the
> > > top
> > > > of my head, the list in this picture is not complete but that is not
> > the
> > > > conversation we are having here.
> > > >
> > > > There are three competing goals that we are trying to achieve here.
> > > >
> > > > 1) Provide information to users that vendor support is available as
> it
> > > can
> > > > be important in growing adoption within enterprises
> > > > 2) Be maintainable by the open-source Flink community
> > > > 3) Remain neutral
> > > >
> > > > Please let me know what you think
> > > >
> > > > Seth
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Drop-vendor-specific-deployment-documentation-td35457.html
> > > > [2]
> > > >
> > > >
> > >
> >
> https://gist.githubusercontent.com/sjwiesman/bb90f0765148c15051bcc91092367851/raw/42c0a1e9240f1c5808a053f8ff5965828cca96d5/mockup.png
> > > >
> > >
> >
>


Re: [ANNOUNCE] Zhu Zhu becomes a Flink committer

2019-12-17 Thread zhenya Sun
Congrats zhuzhu!




| |
zhenya Sun
邮箱:toke...@126.com
|

Signature is customized by Netease Mail Master

On 12/17/2019 17:32, Gary Yao wrote:
Congratulations, well deserved.

On Tue, Dec 17, 2019 at 10:09 AM lining jing  wrote:

> Congratulations Zhu Zhu~
>
> Andrey Zagrebin  于2019年12月16日周一 下午5:01写道:
>
> > Congrats Zhu Zhu!
> >
> > On Mon, Dec 16, 2019 at 8:10 AM Xintong Song 
> > wrote:
> >
> > > Congratulations Zhu Zhu~
> > >
> > > Thank you~
> > >
> > > Xintong Song
> > >
> > >
> > >
> > > On Mon, Dec 16, 2019 at 12:34 PM Danny Chan 
> > wrote:
> > >
> > > > Congrats Zhu Zhu!
> > > >
> > > > Best,
> > > > Danny Chan
> > > > 在 2019年12月14日 +0800 AM12:51,dev@flink.apache.org,写道:
> > > > >
> > > > > Congrats Zhu Zhu and welcome on board!
> > > >
> > >
> >
>


[jira] [Created] (FLINK-15304) Remove unexpected Hadoop dependency from Flink's Mesos integration

2019-12-17 Thread Yangze Guo (Jira)
Yangze Guo created FLINK-15304:
--

 Summary: Remove unexpected Hadoop dependency from Flink's Mesos 
integration
 Key: FLINK-15304
 URL: https://issues.apache.org/jira/browse/FLINK-15304
 Project: Flink
  Issue Type: Bug
  Components: Deployment / Mesos
Reporter: Yangze Guo
 Fix For: 1.11.0


As 
[discussed|https://github.com/apache/flink/pull/10538#discussion_r358807219], 
Flink's Mesos integration should be Hadoop free.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-27: Refactor Source Interface

2019-12-17 Thread Jark Wu
Hi Becket,

That's great we have reached a consensus on Source#getBoundedness().

Regarding to option#3, my concern is that if we don't support streaming
mode for bounded source,
how could we create a testing source for streaming mode? Currently, all the
testing source for streaming
are bounded, so that the integration test will finish finally.

Regarding to Source#getRecordOrder(), could we have a implicit contract
that unbounded source should
already read in order (i.e. reading partitions in parallel), for bounded
source the order is not mandatory.
This is also the behaviors of the current sources.
1) a source can't guarantee it reads in strict order, because the producer
may produce data not in order.
2) *Bounded-StrictOrder* is not necessary, because batch can reorder data.

Best,
Jark



On Tue, 17 Dec 2019 at 22:03, Becket Qin  wrote:

> Hi folks,
>
> Thanks for the comments. I am convinced that the Source API should not take
> boundedness as a parameter after it is constructed. What Timo and Dawid
> suggested sounds a reasonable solution to me. So the Source API would
> become:
>
> Source {
> Boundedness getBoundedness();
> }
>
> Assuming the above Source API, in addition to the two options mentioned in
> earlier emails, I am thinking of another option:
>
> *Option 3:*
> // MySource must be unbounded, otherwise throws exception.
> DataStream dataStream = env.source(mySource);
>
> // MySource must be bounded, otherwise throws exception.
> BoundedDataStream boundedDataStream = env.boundedSource(mySource);
>
> The pros of this API are:
>a) It fits the requirements from Table / SQL well.
>b) DataStream users still have type safety (option 2 only has partial
> type safety).
>c) Cristal clear boundedness from the API which makes DataStream join /
> connect easy to reason about.
> The caveats I see,
>a) It is inconsistent with Table since Table has one unified interface.
>b) No streaming mode for bounded source.
>
> @Stephan Ewen  @Aljoscha Krettek
>  what do you think of the approach?
>
>
> Orthogonal to the above API, I am wondering whether boundedness is the only
> dimension needed to describe the characteristic of the Source behavior. We
> may also need to have another dimension of *record order*.
>
> For example, when a file source is reading from a directory with bounded
> records, it may have two ways to read.
> 1. Read files in parallel.
> 2. Read files in the chronological order.
> In both cases, the file source is a Bounded Source. However, the processing
> requirement for downstream may be different. In the first case, the
> record processing and result emitting order does not matter, e.g. word
> count. In the second case, the records may have to be processed in the
> order they were read, e.g. change log processing.
>
> If the Source only has a getBoundedness() method, the downstream processors
> would not know whether the records emitted from the Source should be
> processed in order or not. So combining the boundedness and record order,
> we will have four scenarios:
>
> *Bounded-StrictOrder*: A segment of change log.
> *Bounded-Random*:  Batch Word Count.
> *Unbounded-StrictOrder*: An infinite change log.
> *Unbounded-Random*: Streaming Word Count.
>
> Option 2 mentioned in the previous email was kind of trying to handle the
> Bounded-StrictOrder case by creating a DataStream from a bounded source,
> which actually does not work.
> It looks that we do not have strict order support in some operators at this
> point, e.g. join. But we may still want to add the semantic to the Source
> first so later on we don't need to change all the source implementations,
> especially given that many of them will be implemented by 3rd party.
>
> Given that, we need another dimension of *Record Order* in the Source. More
> specifically, the API would become:
>
> Source {
> Boundedness getBoundedness();
> RecordOrder getRecordOrder();
> }
>
> public enum RecordOrder {
> /** The record in the DataStream must be processed in its strict order
> for correctness. */
> STRICT,
> /** The record in the DataStream can be processed in arbitrary order.
> */
> RANDOM;
> }
>
> Any thoughts?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Dec 17, 2019 at 3:44 PM Timo Walther  wrote:
>
> > Hi Becket,
> >
> > I completely agree with Dawid's suggestion. The information about the
> > boundedness should come out of the source. Because most of the streaming
> > sources can be made bounded based on some connector specific criterion.
> > In Kafka, it would be an end offset or end timestamp but in any case
> > having just a env.boundedSource() is not enough because parameters for
> > making the source bounded are missing.
> >
> > I suggest to have a simple `isBounded(): Boolean` flag in every source
> > that might be influenced by a connector builder as Dawid mentioned.
> >
> > For type safety during programming, we can still go with *Final state
> > 1*. By having a e

[jira] [Created] (FLINK-15305) MemoryMappedBoundedDataTest fails with IOException on ppc64le

2019-12-17 Thread Siddhesh Ghadi (Jira)
Siddhesh Ghadi created FLINK-15305:
--

 Summary: MemoryMappedBoundedDataTest fails with IOException on 
ppc64le
 Key: FLINK-15305
 URL: https://issues.apache.org/jira/browse/FLINK-15305
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
 Environment: arch: ppc64le
os: rhel 7.6
jdk: 8
mvn: 3.3.9
Reporter: Siddhesh Ghadi
 Attachments: surefire-report.txt

By reducing the buffer size from 76_687 to 60_787 in 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java:164,
 test passes. Any thoughts on this approach?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15306) Adjust the default netty transport option from nio to auto

2019-12-17 Thread zhijiang (Jira)
zhijiang created FLINK-15306:


 Summary: Adjust the default netty transport option from nio to auto
 Key: FLINK-15306
 URL: https://issues.apache.org/jira/browse/FLINK-15306
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: zhijiang
Assignee: zhijiang
 Fix For: 1.11.0


The default option of `taskmanager.network.netty.transport` in 
NettyShuffleEnvironmentOptions is `nio` now. As we know, the `epoll` mode can 
get better performance, less GC and have more advanced features which are only 
available on linux.

Therefore it is better to adjust the default option to `auto` instead, and then 
the framework would automatically choose the proper mode based on the platform.

We would further verify the performance effect via micro benchmark if possible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Processing events based on weights

2019-12-17 Thread Vijay Srinivasaraghavan
 Resending email again...
Hello,
I would like to understand the options available to design an ingestion 
pipeline to support the following requirements.
1) Events are coming from various sources and depending on the type of the 
events it will be stored in specific Kafka topics (say we have 4 topics)
2) The events that are part of topics are weighted (Topic1: 0.6, Topic2: 0.1: 
Topic3: 0.2 and Topic4: 0.1)
3) The events are to be processed (consumed and enriched) based on the weights. 
For example, if I am reading 10 events from each topic, then I should consider 
processing 6 events from Topic1, 1 event from Topic2, 2 events from Topic3 and 
1 event from Topic4. Basically trying to do something similar to this 
implementation https://github.com/flipkart-incubator/priority-kafka-client
Question:

1) Should I handle the weighted distribution at the source (custom) connector 
or use a window after we read the data?
2) When reading from multiple Kafka topics, how the source connector enforce 
the batch read? If the batch size is 100, will it try to read 100 messages from 
each topic at once or through round-robin (try to get 100 from Topic1 first, 
and move on to the next topics till the batch size is reached)
Appreciate your inputs.

ThanksVijayOn Monday, December 16, 2019, 08:20:31 PM PST, Vijay 
Srinivasaraghavan  wrote:  
 
 Hello,
I would like to understand options available to design an ingestion pipeline to 
support the following requirements.
1) Events are coming from various sources and depending on the type of the 
events it will be stored in specific Kafka topics (say we have 4 topics)
2) The events that are part of topics are weighted (Topic1: 0.6, Topic2: 0.1: 
Topic3: 0.2 and Topic4: 0.1)
3) The events are to be processed (consumed and enriched) based on the weights. 
For example, if I am reading 10 events from each topic, then I should consider 
processing 6 events from Topic1, 1 event from Topic2, 2 events from Topic3 and 
1 event from Topic4. Basically trying to do something similar to this 
implementation https://github.com/flipkart-incubator/priority-kafka-client
Question:

1) Should I handle the weighted distribution at the source (custom) connector 
or use a window after we read the data?
2) When reading from multiple Kafka topics, how the source connector enforce 
the batch read? If the batch size is 100, will it try to read 100 messages from 
each topic at once or through round-robin (try to get 100 from Topic1 first, 
and move on to the next topics till the batch size is reached)
Appreciate your inputs.

ThanksVijay  

Re: [ANNOUNCE] Zhu Zhu becomes a Flink committer

2019-12-17 Thread Yu Li
Congrats, Zhu Zhu!

Best Regards,
Yu


On Wed, 18 Dec 2019 at 08:40, zhenya Sun  wrote:

> Congrats zhuzhu!
>
>
>
>
> | |
> zhenya Sun
> 邮箱:toke...@126.com
> |
>
> Signature is customized by Netease Mail Master
>
> On 12/17/2019 17:32, Gary Yao wrote:
> Congratulations, well deserved.
>
> On Tue, Dec 17, 2019 at 10:09 AM lining jing 
> wrote:
>
> > Congratulations Zhu Zhu~
> >
> > Andrey Zagrebin  于2019年12月16日周一 下午5:01写道:
> >
> > > Congrats Zhu Zhu!
> > >
> > > On Mon, Dec 16, 2019 at 8:10 AM Xintong Song 
> > > wrote:
> > >
> > > > Congratulations Zhu Zhu~
> > > >
> > > > Thank you~
> > > >
> > > > Xintong Song
> > > >
> > > >
> > > >
> > > > On Mon, Dec 16, 2019 at 12:34 PM Danny Chan 
> > > wrote:
> > > >
> > > > > Congrats Zhu Zhu!
> > > > >
> > > > > Best,
> > > > > Danny Chan
> > > > > 在 2019年12月14日 +0800 AM12:51,dev@flink.apache.org,写道:
> > > > > >
> > > > > > Congrats Zhu Zhu and welcome on board!
> > > > >
> > > >
> > >
> >
>


[jira] [Created] (FLINK-15307) Subclasses of FailoverStrategy are easily confused with implementation classes of RestartStrategy

2019-12-17 Thread andrew.D.lin (Jira)
andrew.D.lin created FLINK-15307:


 Summary: Subclasses of FailoverStrategy are easily confused with 
implementation classes of RestartStrategy
 Key: FLINK-15307
 URL: https://issues.apache.org/jira/browse/FLINK-15307
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Configuration
Affects Versions: 1.9.1, 1.9.0, 1.10.0
Reporter: andrew.D.lin
 Attachments: image-2019-12-18-14-59-03-181.png

Subclasses of RestartStrategy
 * FailingRestartStrategy
 * FailureRateRestartStrategy
 * FixedDelayRestartStrategy
 * InfiniteDelayRestartStrategy

Implementation class of FailoverStrategy
 * AdaptedRestartPipelinedRegionStrategyNG
 * RestartAllStrategy
 * RestartIndividualStrategy
 * RestartPipelinedRegionStrategy

 

FailoverStrategy describes how the job computation recovers from task failures.

I think the following names may be easier to understand and easier to 
distinguish:

Implementation class of FailoverStrategy
 * AdaptedPipelinedRegionFailoverStrategyNG
 * FailoverAllStrategy
 * FailoverIndividualStrategy
 * FailoverPipelinedRegionStrategy

FailoverStrategy is currently generated by configuration. If we change the name 
of the implementation class, it will not affect compatibility.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Potential side-effect of connector code to JM/TM

2019-12-17 Thread Yingjie Cao
Hi Till & Biao,

Thanks for the reply.

I agree that supplying some stress or stability tests can really help,
except for the jvm resource leak mentioned above, there may be other type
of resource leak like slot or network buffer leak. In addition, other tests
like triggering failover in various different ways, stressing the system
with high parallelism and heavy load jobs and running jobs or triggering
failover over and over again can also help. I think stress or stability
tests is a big topic and resource leak checking can be a good start.

As the start of resource leak checking, we may need to collect a check list
which can also help to troubleshoot resource leak problem manually. From my
previous experience, I can think of the following ones:
1. File#deleteOnExit hook leaks string of file path. Flink rest server once
suffered from the problem and it has been fixed currently.
2. Thread leak. OrcInputFormat suffers from this.
3. ApplicationShutDownHook reference user classes.
4. ClassLoader#parallelLockMap may leak because of too many generated
classes. Flink also suffers from this problem and the issue is reported in
FLINK-15024 and need to be resolved.
5. Some other static fields (like caches implemented by map) of classes
loaded by system class loader also have a potential of resource leak.

Any other supplementation to this check list is welcomed. And even with
this checklist, its may not trivial to do the check, dumping and analysing
the heap may be a choice. I will do some future survey about that.

Best,
Yingjie

Biao Liu  于2019年12月17日周二 下午9:02写道:

> Hi Yingjie,
>
> Thanks for figuring out the impressive bug and bringing this discussion.
>
> I'm afraid there is no such a silver bullet for isolation from third-party
> library. However I agree that resource checking utils might help.
> It seems that you and Till have already raised some feasible ideas.
> Resource leaking issue looks like quite common. It would be great If
> someone could share some experience. Will keep an eye on this discussion.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 17 Dec 2019 at 20:27, Till Rohrmann  wrote:
>
> > Hi Yingjie,
> >
> > thanks for reporting this issue and starting this discussion. If we are
> > dealing with third party libraries I believe there is always the risk
> that
> > one overlooks closing resources. Ideally we make it as hard from Flink's
> > perspective as possible but realistically it is hard to completely avoid.
> > Hence, I believe that it would be beneficial to have some tooling (e.g.
> > stress tests) which could help to surface these kind of problems. Maybe
> one
> > could automate it so that a dev only needs to provide a user jar and then
> > this jar is being executed several times and the cluster is checked for
> > anomalies.
> >
> > Cheers,
> > Till
> >
> > On Tue, Dec 17, 2019 at 8:43 AM Yingjie Cao 
> > wrote:
> >
> > > Hi community,
> > >
> > >   After running tpc-ds test suite for several days on a session
> cluster,
> > we
> > > found a resource leak problem of OrcInputFormat which was reported in
> > > FLINK-15239. The problem comes from the dependent third party library
> > which
> > > creates new internal thread (pool) and never release it. As a result,
> the
> > > user class loader which is referenced by these threads will never be
> > > garbage collected as well as other classes loaded by the user class
> > loader,
> > > which finally lead to the continually grow of meta space size for JM
> (AM)
> > > whose meta space size is not limited currently. And for TM whose meta
> > space
> > > size is limited, it will result in meta space oom eventually. I am not
> > sure
> > > if any other connectors/input formats incurs the similar problem.
> > >   In general, it is hard for Flink to restrict the behavior of the
> third
> > > party dependencies, especially the dependencies of connectors. However,
> > it
> > > will be better if we can supply some mechanism like stronger isolation
> or
> > > some test facilities to find potential problems, for example, we can
> run
> > > jobs on a cluster and automatically check something like whether user
> > class
> > > loader can be garbage collected, whether there is thread leak, whether
> > some
> > > shutdown hooks have been registered and so on.
> > >   What do you think? Or should we treat it as a problem?
> > >
> > > Best,
> > > Yingjie
> > >
> >
>


[jira] [Created] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1

2019-12-17 Thread Feng Jiajie (Jira)
Feng Jiajie created FLINK-15308:
---

 Summary: Job failed when enable pipelined-shuffle.compression and 
numberOfTaskSlots > 1
 Key: FLINK-15308
 URL: https://issues.apache.org/jira/browse/FLINK-15308
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.10.0
 Environment: $ git log
commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae
Author: bowen.li 
Date: Tue Dec 17 17:37:03 2019 -0800
Reporter: Feng Jiajie


Job worked well with default flink-conf.yaml with pipelined-shuffle.compression:
{code:java}
taskmanager.numberOfTaskSlots: 1
taskmanager.network.pipelined-shuffle.compression.enabled: true
{code}
But when I set taskmanager.numberOfTaskSlots to 4 or 6:
{code:java}
taskmanager.numberOfTaskSlots: 6
taskmanager.network.pipelined-shuffle.compression.enabled: true
{code}
job failed:
{code:java}
$ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m 
~/flink-example-1.0-SNAPSHOT.jar
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- The configuration directory 
('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
 already contains a LOG4J config file.If you want to use logback, then please 
delete or rename the log configuration file.
2019-12-18 15:04:40,514 WARN  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- The configuration directory 
('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf')
 already contains a LOG4J config file.If you want to use logback, then please 
delete or rename the log configuration file.
2019-12-18 15:04:40,907 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2019-12-18 15:04:41,084 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Cluster specification: ClusterSpecification{masterMemoryMB=1024, 
taskManagerMemoryMB=12288, numberTaskManagers=1, slotsPerTaskManager=6}
2019-12-18 15:04:42,344 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Submitting application master application_1576573857638_0026
2019-12-18 15:04:42,370 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted 
application application_1576573857638_0026
2019-12-18 15:04:42,371 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Waiting for the cluster to be allocated
2019-12-18 15:04:42,372 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Deploying cluster, current state ACCEPTED
2019-12-18 15:04:45,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- YARN application has been deployed successfully.
2019-12-18 15:04:45,390 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
- Found Web Interface debugboxcreate431x3.sa:36162 of application 
'application_1576573857638_0026'.
Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: 9140c70769f4271cc22ea8becaa26272)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.run

[DISCUSS] Releasing Flink 1.9.2

2019-12-17 Thread Hequn Cheng
Hi everyone,

It has already been two months since we released the Flink 1.9.1.
We already have many important bug fixes from which our users can benefit
in the release-1.9 branch (85 resolved issues).
Therefore, I propose to create the next bug fix release for Flink 1.9.

Most notable fixes are:

- [FLINK-14074] MesosResourceManager can't create new taskmanagers in
Session Cluster Mode.
- [FLINK-14995] Kinesis NOTICE is incorrect
- [FLINK-15013] Flink (on YARN) sometimes needs too many slots
- [FLINK-15036] Container startup error will be handled out side of the
YarnResourceManager's main thread
- [FLINK-14315] NPE with JobMaster.disconnectTaskManager

Furthermore, there is one issue marked as blocker for 1.9.2 which should be
merged before 1.9.2 release:

- [FLINK-15266] NPE in blink planner code gen (reviewing)

If there are any other blocker issues need to be fixed in 1.9.2, please let
me know.
I will kick off the release process once blocker issues have been merged.

It would be very appreciated if there is any PMC can help with the final
steps of the release process.

Best,
Hequn


Re: [DISCUSS] Releasing Flink 1.9.2

2019-12-17 Thread jincheng sun
Thanks for bring up the discussion Hequn. I would like to give you a hand
at the last stage when the RC is finished.(If you need)  :)

Best,
Jincheng

Best,
Jincheng
@sunjincheng121 


Hequn Cheng  于2019年12月18日周三 下午3:44写道:

> Hi everyone,
>
> It has already been two months since we released the Flink 1.9.1.
> We already have many important bug fixes from which our users can benefit
> in the release-1.9 branch (85 resolved issues).
> Therefore, I propose to create the next bug fix release for Flink 1.9.
>
> Most notable fixes are:
>
> - [FLINK-14074] MesosResourceManager can't create new taskmanagers in
> Session Cluster Mode.
> - [FLINK-14995] Kinesis NOTICE is incorrect
> - [FLINK-15013] Flink (on YARN) sometimes needs too many slots
> - [FLINK-15036] Container startup error will be handled out side of the
> YarnResourceManager's main thread
> - [FLINK-14315] NPE with JobMaster.disconnectTaskManager
>
> Furthermore, there is one issue marked as blocker for 1.9.2 which should be
> merged before 1.9.2 release:
>
> - [FLINK-15266] NPE in blink planner code gen (reviewing)
>
> If there are any other blocker issues need to be fixed in 1.9.2, please let
> me know.
> I will kick off the release process once blocker issues have been merged.
>
> It would be very appreciated if there is any PMC can help with the final
> steps of the release process.
>
> Best,
> Hequn
>


[jira] [Created] (FLINK-15309) Execute sql appear "NumberFormatException: Zero length BigInteger"

2019-12-17 Thread xiaojin.wy (Jira)
xiaojin.wy created FLINK-15309:
--

 Summary: Execute sql appear "NumberFormatException: Zero length 
BigInteger"
 Key: FLINK-15309
 URL: https://issues.apache.org/jira/browse/FLINK-15309
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Client
Affects Versions: 1.10.0
Reporter: xiaojin.wy
 Attachments: image-2019-12-18-15-53-24-501.png

*The sql is:*
CREATE TABLE `src` (
key bigint,
v varchar
) WITH (
'format.field-delimiter'='|',
'connector.type'='filesystem',
'format.derive-schema'='true',

'connector.path'='/defender_test_data/daily_regression_batch_hive_1.10/test_cast/sources/src.csv',
'format.type'='csv'
);

select
cast(key as decimal(10,2)) as c1,
cast(key as char(10)) as c2,
cast(key as varchar(10)) as c3
from src
order by c1, c2, c3
limit 1;

*The result schema get in the code is:*
sinkSchema:root
 |-- c1: DECIMAL(10, 2)
 |-- c2: CHAR(10)
 |-- c3: VARCHAR(10)

*The detail:*
If you user the sql above to execute in a sqlclinet environment, you can get 
the result like this:
 !image-2019-12-18-15-53-24-501.png! 

But if you change the result directly to a cvs sink in the code, there will be 
a exception:

Caused by: java.lang.NumberFormatException: Zero length BigInteger
at java.math.BigInteger.(BigInteger.java:302)
at 
org.apache.flink.table.dataformat.Decimal.fromUnscaledBytes(Decimal.java:214)
at 
org.apache.flink.table.dataformat.Decimal.readDecimalFieldFromSegments(Decimal.java:487)
at 
org.apache.flink.table.dataformat.BinaryRow.getDecimal(BinaryRow.java:334)
at 
org.apache.flink.table.dataformat.DataFormatConverters$BigDecimalConverter.toExternalImpl(DataFormatConverters.java:642)
at 
org.apache.flink.table.dataformat.DataFormatConverters$BigDecimalConverter.toExternalImpl(DataFormatConverters.java:618)
at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:358)
at 
org.apache.flink.table.dataformat.DataFormatConverters$RowConverter.toExternalImpl(DataFormatConverters.java:1370)
at 
org.apache.flink.table.dataformat.DataFormatConverters$RowConverter.toExternalImpl(DataFormatConverters.java:1349)
at 
org.apache.flink.table.dataformat.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:340)
at SinkConversion$43.processElement(Unknown Source)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:550)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:527)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:487)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
at 
org.apache.flink.table.runtime.operators.sort.SortLimitOperator.endInput(SortLimitOperator.java:98)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.endOperatorInput(OperatorChain.java:265)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.endHeadOperatorInput(OperatorChain.java:249)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:73)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
at java.lang.Thread.run(Thread.java:834)


*The input data is:*
193|val_193
338|val_338
446|val_446
459|val_459
394|val_394
237|val_237
482|val_482
174|val_174
413|val_413
494|val_494
207|val_207
199|val_199
466|val_466
208|val_208
174|val_174
399|val_399
396|val_396
247|val_247
417|val_417
489|val_489
162|val_162
377|val_377
397|val_397
309|val_309
365|val_365
266|val_266
439|val_439
342|val_342
367|val_367
325|val_325
167|val_167
195|val_195
475|val_475
17|val_17
113|val_113
155|val_155
203|val_203
339|val_339
0|val_0
455|val_455
128|val_128
311|val_311
316|val_316
57|val_57
302|val_302