[jira] [Created] (FLINK-14162) Unify SchedulerOperations#allocateSlotsAndDeploy implementation for all scheduling strategies

2019-09-22 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14162:
---

 Summary: Unify SchedulerOperations#allocateSlotsAndDeploy 
implementation for all scheduling strategies
 Key: FLINK-14162
 URL: https://issues.apache.org/jira/browse/FLINK-14162
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


In scheduler NG, scheduling strategies invokes 
{{SchedulerOperations#allocateSlotsAndDeploy(Collection)}}
 to trigger scheduling of tasks.

However, {{EagerSchedulingStrategy}} and {{LazyFromSourcesSchedulingStrategy}} 
both invokes it by passing a batch of tasks, but requires the scheduling 
process to be conducted in 2 different ways:
 * {{EagerSchedulingStrategy}} requires the batch of tasks to deploy after all 
of them have acquired slots. This is essential to avoid partition update RPCs 
in streaming job scheduling.
 * {{LazyFromSourcesSchedulingStrategy}} requires tasks in the batch to 
allocate slot and get deployed individually, so that it can deploy a few tasks 
even if the slots is not enough for all tasks in the batch. This is helpful for 
batch job scheduling.

The scheduler then have to decide the scheduling pattern based whether the 
scheduling strategy is a {{LazyFromSourcesSchedulingStrategy}}. This is not 
good, as there can be more strategies in the future, and even customized 
scheduling strategies.

I think it's better to define the 
{{SchedulerOperations#allocateSlotsAndDeploy(Collection)}}
 to be that all tasks in the batch need to be assigned and deployed together, 
like what we do for {{EagerSchedulingStrategy}}.
All scheduling strategies need to follow this rule. If tasks should be 
scheduled individually, the strategy should invoke {{allocateSlotsAndDeploy}} 
multiple times, one for each task. As a result, the 
{{LazyFromSourcesSchedulingStrategy}} needs to be adjusted for that.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14163) A Task should be deployed only after all its partitions have completed the registration

2019-09-22 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14163:
---

 Summary: A Task should be deployed only after all its partitions 
have completed the registration
 Key: FLINK-14163
 URL: https://issues.apache.org/jira/browse/FLINK-14163
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


Currently {{Execution#producedPartitions}} is assigned after the partitions 
have completed the registration to shuffle master in 
{{Execution#registerProducedPartitions(...)}}.
But the task deployment process (in {{Execution#deploy()) will create 
{{ResultPartitionDeploymentDescriptor}} directly from 
{{Execution#producedPartitions}} without checking whether it's assigned.
This may lead to a task deployed without its result partitions. And eventually 
cause the job to hang.

It is not problematic at the moment when using Flink default shuffle master 
{{NettyShuffleMaster}} since it returns a completed future on registration. 
However, if the behavior is changed or if users are using a customized 
{{ShuffleMaster}}, it may cause problems.

Besides that, {{Execution#producedPartitions}} is also used for 
 * generating downstream task input descriptor
 * retrieve {{ResultPartitionID}} for partition releasing

To avoid issues to happen, we may need to change all the usages of 
{{Execution#producedPartitions} to a callback way, e.g. change 
{{Execution#producedPartitions} from {{Map}} to 
{{CompletableFuture>}} and adjust all its usages.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Re: [DISCUSS] Support JSON functions in Flink SQL

2019-09-22 Thread Forward Xu
Hi Jack,
Thank you very much for your reply, google doc I have updated, and some of
your questions I replied.
 In addition, I want to apply for Flip permissions for this purpose.

Best,
Forward

Jark Wu  于2019年9月20日周五 下午9:53写道:

> Hi Forward,
>
> Sorry for the late reply. I have went through the design doc and I think it
> is very nice.
>
> Here are my thoughts and suggestions:
>
> 0) I think support JSON functions in SQL is not complicated. Because
> Calcite already supports the parser part and the runtime part.
> We only need to integrate it in Flink and add good coverage tests.
> 1) However, I think we should also design the corresponding JSON Functions
> API for Table API which is very important.
> I don't have a clear idea about how to support all the JSON Function
> syntax in Table API. And this may need more discussions.
> 2) IMO, it deserves a FLIP (especially for the Table API part). You can
> follow the FLIP process [1] to start a FLIP proposal.
> 3) I think we only need to implement it in blink planner as we are going to
> deprecate old planner.
>So could you update the implementation section in the doc because the
> implementation in blink planner should be different.
> 4) It would be better to have an implementation plan to priority the
> sub-tasks.
> From my point of view, JSON_VALUE is the most important and JSON_TABLE
> gets the least priority.
>
> I also left some comments in the google doc.
>
> Hi @JingsongLee  ,
>
> I think we don't need to wait for FLIP-51. As we don't have a clear
> progress of FLIP-51.
> And as far as I know, it will add a few of PlannerExpressions which can be
> refactored easily during FLIP-51.
>
>
> Cheers,
> Jark
>
> [1]:
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>
>
>
> On Thu, 5 Sep 2019 at 19:29, vino yang  wrote:
>
> > +1 to have JSON functions in Flink SQL
> >
> > JingsongLee  于2019年9月5日周四 下午4:46写道:
> >
> > > +1
> > > Nice document. I think it is easier to do after expression
> reworking[1].
> > > By the way, which planner do you want to start?
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > >
> > > --
> > > From:TANG Wen-hui 
> > > Send Time:2019年9月5日(星期四) 14:36
> > > To:dev 
> > > Subject:Re: Re: [DISCUSS] Support JSON functions in Flink SQL
> > >
> > > +1
> > > I have done similar work before.
> > > Looking forward to discussing this feature.
> > >
> > > Best
> > > wenhui
> > >
> > >
> > >
> > > winifred.wenhui.t...@gmail.com
> > >
> > > From: Kurt Young
> > > Date: 2019-09-05 14:00
> > > To: dev
> > > CC: Anyang Hu
> > > Subject: Re: [DISCUSS] Support JSON functions in Flink SQL
> > > +1 to add JSON support to Flink. We also see lots of requirements for
> > JSON
> > > related functions in our internal platform. Since these are already SQL
> > > standard, I think it's a good time to add them to Flink.
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Thu, Sep 5, 2019 at 10:37 AM Qi Luo  wrote:
> > >
> > > > We also see strong demands from our SQL users for JSON/Date related
> > > > functions.
> > > >
> > > > Also +Anyang Hu 
> > > >
> > > > On Wed, Sep 4, 2019 at 9:51 PM Jark Wu  wrote:
> > > >
> > > > > Hi Forward,
> > > > >
> > > > > Thanks for bringing this discussion and preparing the nice design.
> > > > > I think it's nice to have the JSON functions in the next release.
> > > > > We have received some requirements for this feature.
> > > > >
> > > > > I can help to shepherd this JSON functions effort and will leave
> > > comments
> > > > >  in the design doc in the next days.
> > > > >
> > > > > Hi Danny,
> > > > >
> > > > > The new introduced JSON functions are from SQL:2016, not from
> MySQL.
> > > > > So there no JSON type is needed. According to the SQL:2016, the
> > > > > representation of JSON data can be "character string" which is also
> > > > > the current implementation in Calcite[1].
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > >
> > > > > [1]: https://calcite.apache.org/docs/reference.html#json-functions
> > > > >
> > > > >
> > > > > On Wed, 4 Sep 2019 at 21:22, Xu Forward 
> > > wrote:
> > > > >
> > > > > > hi Danny Chan ,Thank you very much for your reply, your help can
> > help
> > > > me
> > > > > > further improve this discussion.
> > > > > > Best
> > > > > > forward
> > > > > >
> > > > > > Danny Chan  于2019年9月4日周三 下午8:50写道:
> > > > > >
> > > > > > > Thanks Xu Forward for bring up this topic, I think the JSON
> > > functions
> > > > > are
> > > > > > > very useful especially for those MySQL users.
> > > > > > >
> > > > > > > I saw that you have done some work within the Apache Calcite,
> > > that’s
> > > > a
> > > > > > > good start, but this is one concern from me, Flink doesn’t
> > support
> > > > JSON
> > > > > > > type internal, so how to represent a JSON object in Flink
> maybe a

Confluence permission for FLIP creation

2019-09-22 Thread Forward Xu
Hi devs,

I'd like to create a page about the Support SQL 2016-2017 JSON functions in
Flink SQL

FLIP. Could you grant me Confluence permission for FLIP creation?

My Confluence ID is forwardxu.

Best,
Forward.


Re: non-deserializable root cause in DeclineCheckpoint

2019-09-22 Thread Terry Wang
Hi, Jeffrey~

I think two fixes you mentioned may not work in your case. 
This problem https://issues.apache.org/jira/browse/FLINK-14076 
 is caused by TM and JM jar 
package environment inconsistent or jar loaded behavior inconsistent in nature.
Maybe the behavior  of standalone cluster’s dynamic class loader changed in 
flink 1.9 since you mentioned that your program run normally in flink 1.8. 
Just a thought from me.
Hope to be useful~

Best,
Terry Wang



> 在 2019年9月21日,上午2:58,Jeffrey Martin  写道:
> 
> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> 
> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> 
> If FlinkKafkaProducer fails while checkpointing, it throws a KafkaException
> which gets wrapped in a CheckpointException which is sent to the JM as a
> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so the
> JM throws a fairly cryptic ClassNotFoundException. The details of the
> KafkaException wind up suppressed so it's impossible to figure out what
> actually went wrong.
> 
> I can think of two fixes that would prevent this from occurring in the
> Kafka or other connectors in the future:
> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
> rather than allowing CheckpointExceptions with non-deserializable root
> causes to slip through
> 2. CheckpointException should always capture its wrapped exception as a
> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
> rather than 'super(cause)').
> 
> Thoughts?



Re: Best coding practises guide while programming using flink apis

2019-09-22 Thread Terry Wang
Hi, Deepak~

I appreciate your idea and cc to dev mail too. 

Best,
Terry Wang



> 在 2019年9月22日,下午2:12,Deepak Sharma  写道:
> 
> Hi All
> I guess we need to put some examples in the documentation around best coding 
> practises , concurrency , non blocking IO and design patterns while writing 
> Apache Flink pipelines.
> Is there any such guide available ?
> E.g. when and how to use the GOF design patterns . Any code snippet can be 
> put as well explaining it.
> 
> This guide can come from people already running beam in production and 
> written it with all best practices in mind.
> It will help in greater and wider adoption.
> 
> Just a thought.
> Please let me know if anyone wants to contribute and i can lead this 
> initiative by documenting in flink wiki.
> 
> Thanks
> -- 
> Thanks
> Deepak
> www.bigdatabig.com 
> www.keosha.net 


Re: [DISCUSS] FLIP-48: Pluggable Intermediate Result Storage

2019-09-22 Thread Stephan Ewen
## About the improvements you mentioned in (1)

  - I am not sure that this helps to improve performance by avoiding to
break the pipeline.
Attaching an additional sink, would in virtually any case add even more
overhead than the pipeline breaking.
What is your reasoning why it would be faster, all in all?

  - About reading only a subset of the records:
 - If this is about reading the data once or twice, then
columnarizing/indexing/compressing the data is more expensive than just
reading it twice more.
 - This means turning the mechanism into something like materialized
view matching, rather than result caching. That should happen in different
parts of the stack (view matching needs schema, semantics, etc.). I am not
sure mixing both is even a good idea.


## The way I see the trade-offs are:

Pro in core Flink:
  - Small improvement to API experience, compared to a library

Contra in core Flink:
  - added API complexity, maintenance and evolution overhead
  - not clear what impacts mixing materialized view matching and result
caching has on the system architecture
  - Not yet a frequent use case, possibly a frequent use case in the future.
  - Starting as a library allows for merging into the core later when this
use case becomes major and experience improvement proves big.

Unclear
  - is breaking the pipeline by introducing a blocking intermediate result
really worse than duplicating the data into an additional sink?


==> Especially because so we can still make it part of Flink later once the
use case and approach are a bit more fleshed out, this looks like a strong
case for starting with a library approach here.

Best,
Stephan



On Thu, Sep 19, 2019 at 2:41 AM Becket Qin  wrote:

> Hi Stephan,
>
> Sorry for the belated reply. You are right that the functionality proposed
> in this FLIP can be implemented out of the Flink core as an ecosystem
> project.
>
> The main motivation of this FLIP is two folds:
>
> 1. Improve the performance of intermediate result sharing in the same
> session.
> Using the internal shuffle service to store cached result has two potential
> performance problems.
>   a) the cached intermediate results may break the operator chaining due to
> the addition of BLOCKING_PERSISTENT edge.
>   b) the downstream processor must read all the records in intermediate
> results to process.
>
> A pluggable intermediate result storage will help address both of the
> problem. Adding a sink will not break chaining, but just ensure cached
> logical node will not be optimized away. The pluggable storage can help
> improve the performance by making the intermediate results filterable /
> projectable, etc. Alternatively we can make the shuffle service more
> sophisticated, but it may complicate things and is not necessary for the
> normal shuffles.
>
> This motivation seems difficult to be addressed as an external library on
> top of Flink core, mainly because the in-session intermediate result
> cleanup may need participation of RM to achieve fault tolerance. Also,
> having an external library essentially introduces another way to cache the
> in-session intermediate results.
>
> 2. Cross session intermediate result sharing.
> As you said, this can be implemented as an external library. The only
> difference is that users may need to deal with another set of API, but that
> seems OK.
>
>
> So for this FLIP, it would be good to see whether we think motivation 1 is
> worth addressing or not.
>
> What do you think?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen  wrote:
>
> > Sorry for the late response. So many FLIPs these days.
> >
> > I am a bit unsure about the motivation here, and that this need to be a
> > part of Flink. It sounds like this can be perfectly built around Flink
> as a
> > minimal library on top of it, without any change in the core APIs or
> > runtime.
> >
> > The proposal to handle "caching intermediate results" (to make them
> > reusable across jobs in a session), and "writing them in different
> formats
> > / indexing them" doesn't sound like it should be the same mechanism.
> >
> >   - The caching part is a transparent low-level primitive. It avoid
> > re-executing a part of the job graph, but otherwise is completely
> > transparent to the consumer job.
> >
> >   - Writing data out in a sink, compressing/indexing it and then reading
> it
> > in another job is also a way of reusing a previous result, but on a
> > completely different abstraction level. It is not the same intermediate
> > result any more. When the consumer reads from it and applies predicate
> > pushdown, etc. then the consumer job looks completely different from a
> job
> > that consumed the original result. It hence needs to be solved on the API
> > level via a sink and a source.
> >
> > I would suggest to keep these concepts separate: Caching (possibly
> > automatically) for jobs in a session, and long term writing/sharing of
> data
> > sets.
> >
> > So

Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-22 Thread Stephan Ewen
My assumption is as Sijie's, that once the connector is either part of
Flink, or part of the streamnative repo. No double maintenance.

I feel this discussion is very much caught in problems that are all
solvable if we want to solve them.
Maybe we can think what our goal for users and the communities is?

  - Do we want to help build a relationship between the Pulsar and Flink
open source communities?
  - Will users find a connector in the streamnative repository?
  - Will users trust a connector that is not part of Flink as much?

And then decide what is best according to the overall goals there.
As mentioned before, I believe that we can handle connectors more
pragmatically and less strict than the core of Flink, if it helps unlocking
users.

Best,
Stephan



On Fri, Sep 20, 2019 at 2:10 PM Sijie Guo  wrote:

> Thanks Becket.
>
> I think it is better for the Flink community to judge the benefits of doing
> this. I was trying to provide some views from outsiders.
>
> Thanks,
> Sijie
>
> On Fri, Sep 20, 2019 at 10:25 AM Becket Qin  wrote:
>
> > Hi Sijie,
> >
> > Yes, we will have to support existing old connectors and new connectors
> in
> > parallel for a while. We have to take that maintenance overhead because
> > existing connectors have been used by the users for a long time. I guess
> It
> > may take at least a year for us to fully remove the old connectors.
> >
> > Process wise, we can do the same for Pulsar connector. But I am not sure
> if
> > we want to have the same burden on Pulsar connector, and I would like to
> > understand the benefit of doing that.
> >
> > For users, the benefit of having the old Pulsar connector checked in
> seems
> > limited because 1) that code base will be immediately deprecated in the
> > next release in 3-4 months; 2) users can always use it even if it is not
> in
> > the Flink code base. Admittedly it is not as convenient as having it in
> > Flink code base, but doesn't seem super either. And after 3-4 months,
> users
> > can just use the new connector in Flink repo.
> >
> > For Flink developers, the old connector code base is not something that
> we
> > want to evolve later. Instead, these code will be deprecated and
> > removed. So why do we want to get a beta version out to attract people to
> > use something we don't want to maintain?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Fri, Sep 20, 2019 at 10:12 AM Sijie Guo  wrote:
> >
> > > Thanks everyone here. Sorry for jumping into the discussion here.
> > >
> > > I am not very familiar about the deprecation process in Flink. If I
> > > misunderstood the process, please fix me.
> > >
> > > As far as I understand, FLIP-27 is introducing a new unified API for
> > > connectors. After it introduces the new API
> > > and before moving all the existing connectors from old API to new API,
> > both
> > > old ApI and new API will co-exist
> > > for a while until Flink moves all existing connectors to new API. So
> the
> > > Pulsar connector (using old API) can
> > > follow the deprecation process with other connector using old API and
> the
> > > deprecation of old API, no?
> > >
> > > If that's the case, I think contributing the current connector back to
> > > Flink rather than maintaining it outside Flink
> > > would provide a bit more benefits. We can deprecate the existing
> > > streamnative/pulsar-flink repo and point the users
> > > to use the connector in Flink repo. So all the review processes will
> > happen
> > > within Flink for both old connector and
> > > new connector. It also reduces the confusions for the users as the
> > > documentation and code base happen in one place.
> > >
> > > Thoughts?
> > >
> > > - Sijie
> > >
> > >
> > >
> > >
> > > On Fri, Sep 20, 2019 at 12:53 AM Becket Qin 
> > wrote:
> > >
> > > > Thanks for the explanation, Stephan. I have a few questions /
> thoughts.
> > > >
> > > > So that means we will remove the old connector without a major
> version
> > > > bump, is that correct?
> > > >
> > > > I am not 100% sure if mixing 1.10 connectors with 1.11 connectors
> will
> > > > always work because we saw some dependency class collisions in the
> > past.
> > > To
> > > > make it safe we may have to maintain the old code for one more
> release.
> > > >
> > > > To be honest I am still wondering if we have to put the old connector
> > in
> > > > Flink repo. if we check in the old connector to Flink. We will end up
> > in
> > > > the following situation:
> > > > 1. Old connector in streamnative/pulsar-flink repo.
> > > > 2. Old connector in Flink Repo, which may be different from the one
> in
> > > > Pulsar repo. (Added in 1.10, deprecated in 1.11, removed in 1.12)
> > > > 3. New connector in Flink Repo.
> > > >
> > > > We need to think about how to make the users in each case happy.
> > > > - For users of (1), I assume Sijie and Yijie will have to maintain
> the
> > > code
> > > > a bit longer for its own compatibility even after we have (2). In
> that
> > > > case, bugs fo

[DISCUSS] Add Bucket File System Connector

2019-09-22 Thread Jun Zhang
Hi,everyone:
      In the current flink system, use flink sql to read data 
and then write it to File System with kind of formats is not supported, the 
current File System Connector is only experimental [1], so I have developed a 
new File System Connector.
       Thanks to the suggestion of Kurt and Fabian, I 
carefully studied the design documentation of FLIP-63, redesigned this feature, 
enriched the functionality of the existing File System Connector, and add 
partition support. Users can add this File System Connector by using code or 
DDL, and then use flink sql to write data to the file system.
       We can treat it as a sub-task of FLIP-63. I wrote a 
design document and put it in google docs [2].
       I hope everyone will give me some more suggestion, 
thank you very much.??
      
[1].https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#file-system-connector
[2].https://docs.google.com/document/d/1R5K_tKgy1MhqhQmolGD_hKnEAKglfeHRDa2f4tB-xew/edit?usp=sharing

use of org.apache.flink.yarn.cli.FlinkYarnSessionCli in Flink Sql client

2019-09-22 Thread Dipanjan Mazumder
Hi ,
  Thanks again for responding on my earlier queries..
    I was again going through the Flink SQL client code and came across the 
default custom command-line , few days back i came to know that Flink sql 
client is not supported in a full fledged cluster with different resource 
managers like yarn but this "org.apache.flink.yarn.cli.FlinkYarnSessionCli" 
class seems like is used by the SQL client to establish session with yarn 
managed cluster.

Am i wrong in thinking this or is there some other use for this class. Please 
kindly help on the same.
RegardsDipanjan

[ANNOUNCE] Weekly Community Update 2019/38

2019-09-22 Thread Konstantin Knauf
Dear community,

happy to share this week's community update with a FLIP for the Pulsar
Connector contribution, three FLIPs for the SQL Ecosystem (plugin system,
computed columns, extended support for views), and a bit more. Enjoy!

Flink Development
==

* [connectors]  After some discussion on the mailing list over the last
weeks, Sijie has opened a FLIP to add an exactly-once Pulsar Connector
(DataStream API, Table API, Catalog API) to Flink. [1]

* [sql] The discussion on supporting Hive built-in function in Flink SQL
lead to FLIP-69 to extend the core table system with modular plugins. [2]
While focusing on function modules as a first step, the FLIP proposes a
more general plugin system also covering user defined types, operators,
rules, etc. As part of this FLIP the existing functions in Flink SQL would
also be migrated into a "CorePlugin". [3]

* [sql] Danny proposes to add support for computed columns in Flink SQL (as
FLIP-10). [4]

* [sql] Zhenghua has started a discussion on extending support for VIEWs in
Flink SQL (as FLIP-71).  He proposes to add support to store views in a
catalog and to add support for "SHOW VIEWS" and "DESCRIBE VIEW". [5]


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-72%3A+Introduce+Pulsar+Connector
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-68%3A+Extend+Core+Table+System+with+Modular+Plugins
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-68-Extend-Core-Table-System-with-Modular-Plugins-tp33161.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-70-Support-Computed-Column-for-Flink-SQL-tp33126.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-71-E2E-View-support-in-Flink-SQL-tp33131.html

Notable Bugs
==

* [FLINK-14010] [ 1.9.0] [1.8.2] [1.7.2] [yarn] When the Flink Yarn
Application Manager receives a shut down request by the YARN Resource
Manager, the Flink cluster can get into an inconsistent state, where
leaderhship for JobManager, ResourceManager and Dispatcher components is
split between two master processes. Tison is working on a fix. [6]

* [FLINK-14107] [ 1.9.0] [ 1.8.2] [kinesis] When using event time alignment
with the Kinsesis Consumer the consumer might deadlock in one corner case.
Fixed for 1.9.1 and 1.8.3. [7]

[6] https://issues.apache.org/jira/browse/FLINK-14010
[7] https://issues.apache.org/jira/browse/FLINK-14107

Events, Blog Posts, Misc
===

* Upcoming Meetups
* *Enrico Canzonieri* of Yelp and *David Massart* of Tentative will
share their Apache Flink user stories of Yelp and BNP Paribas at the next *Bay
Area Apache Flink Meetup* 24th of September.  [8]

* *Ana Esguerra* has published a blog post on how to run Flink on YARN with
Kerberos for Kafka & YARN. [9]

[8] https://www.meetup.com/Bay-Area-Apache-Flink-Meetup/events/262680261/
[9]
https://medium.com/@minyodev/apache-flink-on-yarn-with-kerberos-authentication-adeb62ef47d2

Cheers,

Konstantin

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData Ververica 


--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Re: non-deserializable root cause in DeclineCheckpoint

2019-09-22 Thread Jeffrey Martin
Thanks for suggestion, Terry. I've investigated a bit further.

DeclineCheckpoint specifically checks for the possibility of an exception
that the JM won't be able to deserialize (i.e. anything other than a
Checkpoint exception). It just doesn't check for the possibility of a
CheckpointException that can't be deserialize because its root cause can't
be deserialize.

I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
1.9 broke the FlinkKafkaProducer API so I wound up having to set the
Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
checkpoints to fail sometimes. That caused the KafkaException to be
propagated to the JM as the root cause of a CheckpointException.

On Sun, Sep 22, 2019 at 5:03 AM Terry Wang  wrote:

> Hi, Jeffrey~
>
> I think two fixes you mentioned may not work in your case.
> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM
> jar package environment inconsistent or jar loaded behavior inconsistent in
> nature.
> Maybe the behavior  of standalone cluster’s dynamic class loader changed
> in flink 1.9 since you mentioned that your program run normally in flink
> 1.8.
> Just a thought from me.
> Hope to be useful~
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月21日,上午2:58,Jeffrey Martin  写道:
> >
> > JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> >
> > I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> >
> > If FlinkKafkaProducer fails while checkpointing, it throws a
> KafkaException
> > which gets wrapped in a CheckpointException which is sent to the JM as a
> > DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
> the
> > JM throws a fairly cryptic ClassNotFoundException. The details of the
> > KafkaException wind up suppressed so it's impossible to figure out what
> > actually went wrong.
> >
> > I can think of two fixes that would prevent this from occurring in the
> > Kafka or other connectors in the future:
> > 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
> > rather than allowing CheckpointExceptions with non-deserializable root
> > causes to slip through
> > 2. CheckpointException should always capture its wrapped exception as a
> > SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
> > rather than 'super(cause)').
> >
> > Thoughts?
>
>


Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-22 Thread Becket Qin
Hi Stephan,

I have no doubt about the value of adding Pulsar connector to Flink repo.
My concern is about how exactly we are going to do it.

As mentioned before, I believe that we can handle connectors more
> pragmatically and less strict than the core of Flink, if it helps unlocking
> users.

I can see the benefit of being less restrict for the initial connector code
adoption. However, I don't think we should be less restrict on the
maintenance commitment once the code is in Flink repo. It only makes sense
to check in something and ask users to use if we plan to maintain it.

If I understand correctly, the current plan so far is following:
1. release 1.10
   - Check in Pulsar connector on old interface and label it as beta
version.
   - encourage users to try it and report bugs.
2. release 1.11
   - Check in Pulsar connector on new interface (a.k.a new Pulsar
connector) and label it as beta version
   - Deprecate the old Pulsar connector
   - Fix bugs reported on old Pulsar connector from release 1.10
   - Ask users to migrate from old Pulsar connector to new Pulsar connector
3. release 1.12
   - Announce end of support for old Pulsar connector and remove the code
   - Fix bugs reported on new Pulsar connector.

If this is the plan, it seems neither Flink nor the users trying the old
Pulsar connector will benefit from this experimental old Pulsar connector,
because whatever feedbacks we got or bugs we fix on the old Pulsar
connector are immediately thrown away in one or two releases.

If we check in the old Pulsar connector right now, the only option I see is
to maintain it for a while (e.g. a year or more). IMO, the immediate
deprecation and code removal hurts the users much more than asking them to
wait for another release. I personally think that we can avoid this
maintenance burden by going directly to the new Pulsar connector,
especially given that users can still use the connector even if they are
not in Flink repo. That said, I am OK with maintaining both old and new
Pulsar connector if we believe that having the Pulsar connector available
right now in Flink repo is more important.

Thanks,

Jiangjie (Becket) Qin

On Sun, Sep 22, 2019 at 9:10 PM Stephan Ewen  wrote:

> My assumption is as Sijie's, that once the connector is either part of
> Flink, or part of the streamnative repo. No double maintenance.
>
> I feel this discussion is very much caught in problems that are all
> solvable if we want to solve them.
> Maybe we can think what our goal for users and the communities is?
>
>   - Do we want to help build a relationship between the Pulsar and Flink
> open source communities?
>   - Will users find a connector in the streamnative repository?
>   - Will users trust a connector that is not part of Flink as much?
>
> And then decide what is best according to the overall goals there.
> As mentioned before, I believe that we can handle connectors more
> pragmatically and less strict than the core of Flink, if it helps unlocking
> users.
>
> Best,
> Stephan
>
>
>
> On Fri, Sep 20, 2019 at 2:10 PM Sijie Guo  wrote:
>
> > Thanks Becket.
> >
> > I think it is better for the Flink community to judge the benefits of
> doing
> > this. I was trying to provide some views from outsiders.
> >
> > Thanks,
> > Sijie
> >
> > On Fri, Sep 20, 2019 at 10:25 AM Becket Qin 
> wrote:
> >
> > > Hi Sijie,
> > >
> > > Yes, we will have to support existing old connectors and new connectors
> > in
> > > parallel for a while. We have to take that maintenance overhead because
> > > existing connectors have been used by the users for a long time. I
> guess
> > It
> > > may take at least a year for us to fully remove the old connectors.
> > >
> > > Process wise, we can do the same for Pulsar connector. But I am not
> sure
> > if
> > > we want to have the same burden on Pulsar connector, and I would like
> to
> > > understand the benefit of doing that.
> > >
> > > For users, the benefit of having the old Pulsar connector checked in
> > seems
> > > limited because 1) that code base will be immediately deprecated in the
> > > next release in 3-4 months; 2) users can always use it even if it is
> not
> > in
> > > the Flink code base. Admittedly it is not as convenient as having it in
> > > Flink code base, but doesn't seem super either. And after 3-4 months,
> > users
> > > can just use the new connector in Flink repo.
> > >
> > > For Flink developers, the old connector code base is not something that
> > we
> > > want to evolve later. Instead, these code will be deprecated and
> > > removed. So why do we want to get a beta version out to attract people
> to
> > > use something we don't want to maintain?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > > On Fri, Sep 20, 2019 at 10:12 AM Sijie Guo  wrote:
> > >
> > > > Thanks everyone here. Sorry for jumping into the discussion here.
> > > >
> > > > I am not very familiar about the deprecation process in Flink. If I
> > > > misunderstood the process, plea

Re: non-deserializable root cause in DeclineCheckpoint

2019-09-22 Thread Terry Wang
Hi, Jeffrey~

Thanks for your detailed explanation and I understood why job failed with flink 
1.9.

But the two fixes you mentioned may still not work well. As KafkaException can 
be serialized 
in TM for there is necessary jar in its classpath but not in JM, so maybe it’s 
impossible to check
the possibility of serialization in advance. 
Do I understand right?



Best,
Terry Wang



> 在 2019年9月23日,上午5:17,Jeffrey Martin  写道:
> 
> Thanks for suggestion, Terry. I've investigated a bit further.
> 
> DeclineCheckpoint specifically checks for the possibility of an exception
> that the JM won't be able to deserialize (i.e. anything other than a
> Checkpoint exception). It just doesn't check for the possibility of a
> CheckpointException that can't be deserialize because its root cause can't
> be deserialize.
> 
> I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
> checkpoints to fail sometimes. That caused the KafkaException to be
> propagated to the JM as the root cause of a CheckpointException.
> 
> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang  wrote:
> 
>> Hi, Jeffrey~
>> 
>> I think two fixes you mentioned may not work in your case.
>> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
>> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and JM
>> jar package environment inconsistent or jar loaded behavior inconsistent in
>> nature.
>> Maybe the behavior  of standalone cluster’s dynamic class loader changed
>> in flink 1.9 since you mentioned that your program run normally in flink
>> 1.8.
>> Just a thought from me.
>> Hope to be useful~
>> 
>> Best,
>> Terry Wang
>> 
>> 
>> 
>>> 在 2019年9月21日,上午2:58,Jeffrey Martin  写道:
>>> 
>>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
>>> 
>>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
>>> 
>>> If FlinkKafkaProducer fails while checkpointing, it throws a
>> KafkaException
>>> which gets wrapped in a CheckpointException which is sent to the JM as a
>>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
>> the
>>> JM throws a fairly cryptic ClassNotFoundException. The details of the
>>> KafkaException wind up suppressed so it's impossible to figure out what
>>> actually went wrong.
>>> 
>>> I can think of two fixes that would prevent this from occurring in the
>>> Kafka or other connectors in the future:
>>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
>>> rather than allowing CheckpointExceptions with non-deserializable root
>>> causes to slip through
>>> 2. CheckpointException should always capture its wrapped exception as a
>>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
>>> rather than 'super(cause)').
>>> 
>>> Thoughts?
>> 
>> 



Re: use of org.apache.flink.yarn.cli.FlinkYarnSessionCli in Flink Sql client

2019-09-22 Thread Terry Wang
Hi Dipanjan:

I just looked through the Flink SQL client code and got the same conclusion as 
you.
Look forward to receiving other comments.

Best,
Terry Wang



> 在 2019年9月22日,下午11:53,Dipanjan Mazumder  写道:
> 
> Hi ,
>   Thanks again for responding on my earlier queries..
> I was again going through the Flink SQL client code and came across the 
> default custom command-line , few days back i came to know that Flink sql 
> client is not supported in a full fledged cluster with different resource 
> managers like yarn but this "org.apache.flink.yarn.cli.FlinkYarnSessionCli" 
> class seems like is used by the SQL client to establish session with yarn 
> managed cluster.
> 
> Am i wrong in thinking this or is there some other use for this class. Please 
> kindly help on the same.
> RegardsDipanjan



Re: [SURVEY] How many people are using customized RestartStrategy(s)

2019-09-22 Thread Zhu Zhu
Steven,

Thanks for the information. If we can determine this a common issue, we can
solve it in Flink core.
To get to that state, I have two questions which need your help:
1. Why is gauge not good for alerting? The metric "fullRestart" is a
Gauge. Does the metric reporter you use report Counter and
Gauge to external services in different ways? Or anything else can be
different due to the metric type?
2. Is the "number of restarts" what you actually need, rather than
the "fullRestart" count? If so, I believe we will have such a counter
metric in 1.10, since the previous "fullRestart" metric value is not the
number of restarts when grained recovery (feature added 1.9.0) is enabled.
"fullRestart" reveals how many times entire job graph has been
restarted. If grained recovery (feature added 1.9.0) is enabled, the graph
would not be restarted when task failures happen and the "fullRestart"
value will not increment in such cases.

I'd appreciate if you can help with these questions and we can make better
decisions for Flink.

Thanks,
Zhu Zhu

Steven Wu  于2019年9月22日周日 上午3:31写道:

> Zhu Zhu,
>
> Flink fullRestart metric is a Gauge, which is not good for alerting on. We
> publish an equivalent Counter metric for alerting purpose.
>
> Thanks,
> Steven
>
> On Thu, Sep 19, 2019 at 7:45 PM Zhu Zhu  wrote:
>
>> Thanks Steven for the feedback!
>> Could you share more information about the metrics you add in you
>> customized restart strategy?
>>
>> Thanks,
>> Zhu Zhu
>>
>> Steven Wu  于2019年9月20日周五 上午7:11写道:
>>
>>> We do use config like "restart-strategy:
>>> org.foobar.MyRestartStrategyFactoryFactory". Mainly to add additional
>>> metrics than the Flink provided ones.
>>>
>>> On Thu, Sep 19, 2019 at 4:50 AM Zhu Zhu  wrote:
>>>
 Thanks everyone for the input.

 The RestartStrategy customization is not recognized as a public
 interface as it is not explicitly documented.
 As it is not used from the feedbacks of this survey, I'll conclude that
 we do not need to support customized RestartStrategy for the new scheduler
 in Flink 1.10

 Other usages are still supported, including all the strategies and
 configuring ways described in
 https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/task_failure_recovery.html#restart-strategies
 .

 Feel free to share in this thread if you has any concern for it.

 Thanks,
 Zhu Zhu

 Zhu Zhu  于2019年9月12日周四 下午10:33写道:

> Thanks Oytun for the reply!
>
> Sorry for not have stated it clearly. When saying "customized
> RestartStrategy", we mean that users implement an
> *org.apache.flink.runtime.executiongraph.restart.RestartStrategy* by
> themselves and use it by configuring like "restart-strategy:
> org.foobar.MyRestartStrategyFactoryFactory".
>
> The usage of restart strategies you mentioned will keep working with
> the new scheduler.
>
> Thanks,
> Zhu Zhu
>
> Oytun Tez  于2019年9月12日周四 下午10:05写道:
>
>> Hi Zhu,
>>
>> We are using custom restart strategy like this:
>>
>> environment.setRestartStrategy(failureRateRestart(2, Time.minutes(1),
>> Time.minutes(10)));
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Thu, Sep 12, 2019 at 7:11 AM Zhu Zhu  wrote:
>>
>>> Hi everyone,
>>>
>>> I wanted to reach out to you and ask how many of you are using a
>>> customized RestartStrategy[1] in production jobs.
>>>
>>> We are currently developing the new Flink scheduler[2] which
>>> interacts with restart strategies in a different way. We have to 
>>> re-design
>>> the interfaces for the new restart strategies (so called
>>> RestartBackoffTimeStrategy). Existing customized RestartStrategy will 
>>> not
>>> work any more with the new scheduler.
>>>
>>> We want to know whether we should keep the way
>>> to customized RestartBackoffTimeStrategy so that existing customized
>>> RestartStrategy can be migrated.
>>>
>>> I'd appreciate if you can share the status if you are
>>> using customized RestartStrategy. That will be valuable for use to make
>>> decisions.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/dev/task_failure_recovery.html#restart-strategies
>>> [2] https://issues.apache.org/jira/browse/FLINK-10429
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>


Re: How to prevent from launching 2 jobs at the same time

2019-09-22 Thread Zili Chen
The situation is as Dian said. Flink identifies jobs by job id instead of
job name.

However, I think it is still a valid question if it is an alternative Flink
identifies jobs by job name and
leaves the work to distinguish jobs by name to users. The advantages in
this way includes a readable
display and interaction, as well as reduce some hardcode works on job id,
such as we always set
job id to new JobID(0, 0) in standalone per-job mode for getting the same
ZK path.

Best,
tison.


Dian Fu  于2019年9月23日周一 上午10:55写道:

> Hi David,
>
> The jobs are identified by job id, not by job name internally in Flink and
> so It will only check if there are two jobs with the same job id.
>
> If you submit the job via CLI[1], I'm afraid there are still no built-in
> ways provided as currently the job id is generated randomly when submitting
> a job via CLI and the generated job id has nothing to do with the job name.
> However, if you submit the job via REST API [2], it did provide an option
> to specify the job id when submitting a job. You can generate the job id by
> yourself.
>
> Regards,
> Dian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/monitoring/rest_api.html#jars-jarid-run
>
> 在 2019年9月23日,上午4:57,David Morin  写道:
>
> Hi,
>
> What is the best way to prevent from launching 2 jobs with the same name
> concurrently ?
> Instead of doing a check in the script that starts the Flink job, I would
> prefer to stop a job if another one with the same name is in progress
> (Exception or something like that).
>
> David
>
>
>


[jira] [Created] (FLINK-14164) Add a metric to show failover count regarding fine grained recovery

2019-09-22 Thread Zhu Zhu (Jira)
Zhu Zhu created FLINK-14164:
---

 Summary: Add a metric to show failover count regarding fine 
grained recovery
 Key: FLINK-14164
 URL: https://issues.apache.org/jira/browse/FLINK-14164
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination, Runtime / Metrics
Affects Versions: 1.9.0, 1.10.0
Reporter: Zhu Zhu
 Fix For: 1.10.0


Previously Flink uses restart all strategy to recover jobs from failures. And 
the metric "fullRestart" is used to show the count of failovers.

However, with fine grained recovery introduced in 1.9.0, the "fullRestart" 
metric only reveals how many times the entire graph has been restarted, not 
including the number of fine grained failure recoveries.

As many users want to build their job alerting based on failovers, I'd propose 
to add such a new metric {{numberOfFailures}}/{{numberOfRestarts}} which also 
respects fine grained recoveries.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: non-deserializable root cause in DeclineCheckpoint

2019-09-22 Thread Jeffrey Martin
Hi Terry,

KafkaException comes in through the job's dependencies (it's defined in the
kafka-clients jar packed up in the fat job jar) and is on either the TM nor
JM default classpath. The job running in the TM includes the job
dependencies and so can throw a KafkaException but the JM can't deserialize
it because it's not available on the default classpath.

I'm suggesting defensively wrapping all causes of a CheckpointException in
a SerializedThrowable (in addition to defensively wrapping everything
except a CheckpointException). I believe SerializedThrowable is there
specifically for this case, i.e. where a job in the TM sends the JM an
exception that's defined only in the job itself.

It might be clearer if I just put up a PR :) I'd be happy to and it'll be
very short.

Best,

Jeff

On Sun, Sep 22, 2019 at 7:45 PM Terry Wang  wrote:

> Hi, Jeffrey~
>
> Thanks for your detailed explanation and I understood why job failed with
> flink 1.9.
>
> But the two fixes you mentioned may still not work well. As KafkaException
> can be serialized
> in TM for there is necessary jar in its classpath but not in JM, so maybe
> it’s impossible to check
> the possibility of serialization in advance.
> Do I understand right?
>
>
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月23日,上午5:17,Jeffrey Martin  写道:
> >
> > Thanks for suggestion, Terry. I've investigated a bit further.
> >
> > DeclineCheckpoint specifically checks for the possibility of an exception
> > that the JM won't be able to deserialize (i.e. anything other than a
> > Checkpoint exception). It just doesn't check for the possibility of a
> > CheckpointException that can't be deserialize because its root cause
> can't
> > be deserialize.
> >
> > I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
> > 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
> > Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
> > checkpoints to fail sometimes. That caused the KafkaException to be
> > propagated to the JM as the root cause of a CheckpointException.
> >
> > On Sun, Sep 22, 2019 at 5:03 AM Terry Wang  wrote:
> >
> >> Hi, Jeffrey~
> >>
> >> I think two fixes you mentioned may not work in your case.
> >> This problem https://issues.apache.org/jira/browse/FLINK-14076 <
> >> https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and
> JM
> >> jar package environment inconsistent or jar loaded behavior
> inconsistent in
> >> nature.
> >> Maybe the behavior  of standalone cluster’s dynamic class loader changed
> >> in flink 1.9 since you mentioned that your program run normally in flink
> >> 1.8.
> >> Just a thought from me.
> >> Hope to be useful~
> >>
> >> Best,
> >> Terry Wang
> >>
> >>
> >>
> >>> 在 2019年9月21日,上午2:58,Jeffrey Martin  写道:
> >>>
> >>> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> >>>
> >>> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> >>>
> >>> If FlinkKafkaProducer fails while checkpointing, it throws a
> >> KafkaException
> >>> which gets wrapped in a CheckpointException which is sent to the JM as
> a
> >>> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
> >> the
> >>> JM throws a fairly cryptic ClassNotFoundException. The details of the
> >>> KafkaException wind up suppressed so it's impossible to figure out what
> >>> actually went wrong.
> >>>
> >>> I can think of two fixes that would prevent this from occurring in the
> >>> Kafka or other connectors in the future:
> >>> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
> >>> rather than allowing CheckpointExceptions with non-deserializable root
> >>> causes to slip through
> >>> 2. CheckpointException should always capture its wrapped exception as a
> >>> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
> >>> rather than 'super(cause)').
> >>>
> >>> Thoughts?
> >>
> >>
>
>


Re: [DISCUSS] Add Bucket File System Connector

2019-09-22 Thread JingsongLee
Hi jun:

Sorry for the late reply,  I share my thoughts on StreamingFileSink in FLIP-63 
[1] and I don't recommend using StreamingFileSink to support partitioning in 
Table.
1.The bucket concept and SQL's bucket concept are in serious conflict.[2]
2.In table, we need support single-partition writing, grouped multi-partition 
writing, non-grouped multi-partition writing.
3.We need a global role to commit files to metastore.
4.We need an abstraction to support both streaming and batch mode.
5.Table partition is simpler than StreamingFileSink, the concept of 
partitioning is that we only support partition references on fields, rather 
than being as flexible as runtime.

The DDL can like this:
CREATE TABLE USER_T (
  a INT,
  b STRING,
  c DOUBLE
) PARTITIONED BY (date STRING, country STRING)
WITH (
  'connector.type' = ‘filesystem’,
  'connector.path' = 'hdfs:///tmp/xxx',
  'format.type' = 'csv',
  'update-mode' = 'append',
'partition-support' = 'true'
 )
In SQL world, we can only support row inputs. 
The only difference from the previous FileSystem is that the partition-support 
attribute is required. We can use this identifier to represent the new 
connector support partition without changing the previous connector.
Other attributes can be completely consistent. We can add parquet, Orc and 
other formats incrementally later.

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-63-Rework-table-partition-support-td32770.html
[2] 
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL+BucketedTables


--
From:Jun Zhang <825875...@qq.com>
Send Time:2019年9月22日(星期日) 23:18
To:dev 
Cc:Kurt Young ; fhue...@gmail.com 
Subject:[DISCUSS] Add Bucket File System Connector

Hi,everyone:
      In the current flink system, use flink sql to read data 
and then write it to File System with kind of formats is not supported, the 
current File System Connector is only experimental [1], so I have developed a 
new File System Connector.
       Thanks to the suggestion of Kurt and Fabian, I 
carefully studied the design documentation of FLIP-63, redesigned this feature, 
enriched the functionality of the existing File System Connector, and add 
partition support. Users can add this File System Connector by using code or 
DDL, and then use flink sql to write data to the file system.
       We can treat it as a sub-task of FLIP-63. I wrote a 
design document and put it in google docs [2].
       I hope everyone will give me some more suggestion, 
thank you very much.。
      
[1].https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#file-system-connector
[2].https://docs.google.com/document/d/1R5K_tKgy1MhqhQmolGD_hKnEAKglfeHRDa2f4tB-xew/edit?usp=sharing



Re: non-deserializable root cause in DeclineCheckpoint

2019-09-22 Thread Terry Wang
Hi Jeffrey,

You are right and I understood what you have  said  after I just studied the 
class org.apache.flink.util.SerializedThrowable.
I prefer the fixes no.2 you mentioned:
CheckpointException should always capture its wrapped exception as a 
SerializedThrowable 
Looking forward to seeing your pr soon :)

Best,
Terry Wang



> 在 2019年9月23日,上午11:48,Jeffrey Martin  写道:
> 
> Hi Terry,
> 
> KafkaException comes in through the job's dependencies (it's defined in the
> kafka-clients jar packed up in the fat job jar) and is on either the TM nor
> JM default classpath. The job running in the TM includes the job
> dependencies and so can throw a KafkaException but the JM can't deserialize
> it because it's not available on the default classpath.
> 
> I'm suggesting defensively wrapping all causes of a CheckpointException in
> a SerializedThrowable (in addition to defensively wrapping everything
> except a CheckpointException). I believe SerializedThrowable is there
> specifically for this case, i.e. where a job in the TM sends the JM an
> exception that's defined only in the job itself.
> 
> It might be clearer if I just put up a PR :) I'd be happy to and it'll be
> very short.
> 
> Best,
> 
> Jeff
> 
> On Sun, Sep 22, 2019 at 7:45 PM Terry Wang  wrote:
> 
>> Hi, Jeffrey~
>> 
>> Thanks for your detailed explanation and I understood why job failed with
>> flink 1.9.
>> 
>> But the two fixes you mentioned may still not work well. As KafkaException
>> can be serialized
>> in TM for there is necessary jar in its classpath but not in JM, so maybe
>> it’s impossible to check
>> the possibility of serialization in advance.
>> Do I understand right?
>> 
>> 
>> 
>> Best,
>> Terry Wang
>> 
>> 
>> 
>>> 在 2019年9月23日,上午5:17,Jeffrey Martin  写道:
>>> 
>>> Thanks for suggestion, Terry. I've investigated a bit further.
>>> 
>>> DeclineCheckpoint specifically checks for the possibility of an exception
>>> that the JM won't be able to deserialize (i.e. anything other than a
>>> Checkpoint exception). It just doesn't check for the possibility of a
>>> CheckpointException that can't be deserialize because its root cause
>> can't
>>> be deserialize.
>>> 
>>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring --
>>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
>>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
>>> checkpoints to fail sometimes. That caused the KafkaException to be
>>> propagated to the JM as the root cause of a CheckpointException.
>>> 
>>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang  wrote:
>>> 
 Hi, Jeffrey~
 
 I think two fixes you mentioned may not work in your case.
 This problem https://issues.apache.org/jira/browse/FLINK-14076 <
 https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM and
>> JM
 jar package environment inconsistent or jar loaded behavior
>> inconsistent in
 nature.
 Maybe the behavior  of standalone cluster’s dynamic class loader changed
 in flink 1.9 since you mentioned that your program run normally in flink
 1.8.
 Just a thought from me.
 Hope to be useful~
 
 Best,
 Terry Wang
 
 
 
> 在 2019年9月21日,上午2:58,Jeffrey Martin  写道:
> 
> JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> 
> I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> 
> If FlinkKafkaProducer fails while checkpointing, it throws a
 KafkaException
> which gets wrapped in a CheckpointException which is sent to the JM as
>> a
> DeclineCheckpoint. KafkaException isn't on the JM default classpath, so
 the
> JM throws a fairly cryptic ClassNotFoundException. The details of the
> KafkaException wind up suppressed so it's impossible to figure out what
> actually went wrong.
> 
> I can think of two fixes that would prevent this from occurring in the
> Kafka or other connectors in the future:
> 1. DeclineCheckpoint should always send a SerializedThrowable to the JM
> rather than allowing CheckpointExceptions with non-deserializable root
> causes to slip through
> 2. CheckpointException should always capture its wrapped exception as a
> SerializedThrowable (i.e., use 'super(new SerializedThrowable(cause))'
> rather than 'super(cause)').
> 
> Thoughts?
 
 
>> 
>> 



Per Key Grained Watermark Support

2019-09-22 Thread 廖嘉逸
Hi all,

Currently Watermark can only be supported on task’s level(or partition level), 
which means that the data belonging to the faster key has to share the same 
watermark with the data belonging to the slower key in the same key group of a 
KeyedStream. This will lead to two problems:




1. Latency. For example, every key has its own window state but they have to 
trigger it after the window’s end time is exceeded by the watermark which is 
determined by the data belonging to the slowest key usually. (Same in 
CepOperator and other operators which are using watermark to fire result)

2. States Size. Because the faster key delayes its firing on result, it has to 
store more redundant states which should be pruned earlier.




However, since the watermark has been introduced for a long time and not been 
designed to be more fine-grained in the first place, I find that it’s very hard 
to solve this problem without a big change. I wonder if there is anyone in 
community having some successful experience on this or maybe there is a 
shortcut way? If not, I can try to draft a design if this is needed in 
community.







Best Regards,

Jiayi Liao

Re: use of org.apache.flink.yarn.cli.FlinkYarnSessionCli in Flink Sql client

2019-09-22 Thread Dian Fu
Hi Dipanjan,

I think you are right that it's already been supported to submit a job to 
cluster via SQL client. This is supported in [1]. Besides, I think that it is 
not configured in the YAML. It's specified in the CLI options when you start up 
the SQL client CLI.

[1] https://issues.apache.org/jira/browse/FLINK-8852 

> 在 2019年9月23日,上午11:00,Terry Wang  写道:
> 
> Hi Dipanjan:
> 
> I just looked through the Flink SQL client code and got the same conclusion 
> as you.
> Look forward to receiving other comments.
> 
> Best,
> Terry Wang
> 
> 
> 
>> 在 2019年9月22日,下午11:53,Dipanjan Mazumder  写道:
>> 
>> Hi ,
>>  Thanks again for responding on my earlier queries..
>>I was again going through the Flink SQL client code and came across the 
>> default custom command-line , few days back i came to know that Flink sql 
>> client is not supported in a full fledged cluster with different resource 
>> managers like yarn but this "org.apache.flink.yarn.cli.FlinkYarnSessionCli" 
>> class seems like is used by the SQL client to establish session with yarn 
>> managed cluster.
>> 
>> Am i wrong in thinking this or is there some other use for this class. 
>> Please kindly help on the same.
>> RegardsDipanjan
> 



Re: Per Key Grained Watermark Support

2019-09-22 Thread Lasse Nedergaard
Hi Jiayi

We have face the same challenge as we deal with IoT unit and they do not 
necessarily share the same timestamp. Watermark or. Key would be perfect match 
here. We tried to workaround with handle late events as special case with 
sideoutputs but isn’t the perfect solution. 
My conclusion is to skip watermark and create a keyed processed function and 
handle the time for each key my self. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 23. sep. 2019 kl. 06.16 skrev 廖嘉逸 :
> 
> Hi all,
>   Currently Watermark can only be supported on task’s level(or partition 
> level), which means that the data belonging to the faster key has to share 
> the same watermark with the data belonging to the slower key in the same key 
> group of a KeyedStream. This will lead to two problems:
> 
>   1. Latency. For example, every key has its own window state but they 
> have to trigger it after the window’s end time is exceeded by the watermark 
> which is determined by the data belonging to the slowest key usually. (Same 
> in CepOperator and other operators which are using watermark to fire result)
>   2. States Size. Because the faster key delayes its firing on result, it 
> has to store more redundant states which should be pruned earlier.
> 
>   However, since the watermark has been introduced for a long time and 
> not been designed to be more fine-grained in the first place, I find that 
> it’s very hard to solve this problem without a big change. I wonder if there 
> is anyone in community having some successful experience on this or maybe 
> there is a shortcut way? If not, I can try to draft a design if this is 
> needed in community.
> 
> 
> Best Regards,
> Jiayi Liao
> 
> 
>  


[DISCUSS] Releasing Flink 1.9.1

2019-09-22 Thread Jark Wu
Hi everyone,

It has already been a month since we released Flink 1.9.0.
We already have many important bug fixes from which our users can benefit
in the release-1.9 branch (83 resolved issues).
Therefore, I propose to create the next bug fix release for Flink 1.9.

Most notable fixes are:

- [FLINK-13526] When switching to a non existing catalog or database in the
SQL Client the client crashes.
- [FLINK-13568] It is not possible to create a table with a "STRING" data
type via the SQL DDL.
- [FLINK-13941] Prevent data-loss by not cleaning up small part files from
S3.
- [FLINK-13490][jdbc] If one column value is null when reading JDBC, the
following values will all be null.
- [FLINK-14107][kinesis] When using event time alignment with the Kinsesis
Consumer the consumer might deadlock in one corner case.

Furthermore, I would like the following critical issues to be merged before
1.9.1 release:

- [FLINK-14118] Reduce the unnecessary flushing when there is no data
available for flush which can save 20% ~ 40% CPU. (reviewing)
- [FLINK-13386] Fix A couple of issues with the new dashboard have already
been filed. (PR is created, need review)
- [FLINK-14010][yarn] The Flink YARN cluster can get into an inconsistent
state in some cases, where
leaderhship for JobManager, ResourceManager and Dispatcher components is
split between two master processes. (PR is created, need review)

I would volunteer as release manager and kick off the release process once
blocker issues has been merged. What do you think?

If there is any other blocker issues need to be fixed in 1.9.1, please let
me know.

Cheers,
Jark


Re: [DISCUSS] Releasing Flink 1.9.1

2019-09-22 Thread Jeff Zhang
FLINK-13708 is also very critical IMO. This would cause invalid flink job
(doubled output)

https://issues.apache.org/jira/browse/FLINK-13708

Jark Wu  于2019年9月23日周一 下午2:03写道:

> Hi everyone,
>
> It has already been a month since we released Flink 1.9.0.
> We already have many important bug fixes from which our users can benefit
> in the release-1.9 branch (83 resolved issues).
> Therefore, I propose to create the next bug fix release for Flink 1.9.
>
> Most notable fixes are:
>
> - [FLINK-13526] When switching to a non existing catalog or database in the
> SQL Client the client crashes.
> - [FLINK-13568] It is not possible to create a table with a "STRING" data
> type via the SQL DDL.
> - [FLINK-13941] Prevent data-loss by not cleaning up small part files from
> S3.
> - [FLINK-13490][jdbc] If one column value is null when reading JDBC, the
> following values will all be null.
> - [FLINK-14107][kinesis] When using event time alignment with the Kinsesis
> Consumer the consumer might deadlock in one corner case.
>
> Furthermore, I would like the following critical issues to be merged before
> 1.9.1 release:
>
> - [FLINK-14118] Reduce the unnecessary flushing when there is no data
> available for flush which can save 20% ~ 40% CPU. (reviewing)
> - [FLINK-13386] Fix A couple of issues with the new dashboard have already
> been filed. (PR is created, need review)
> - [FLINK-14010][yarn] The Flink YARN cluster can get into an inconsistent
> state in some cases, where
> leaderhship for JobManager, ResourceManager and Dispatcher components is
> split between two master processes. (PR is created, need review)
>
> I would volunteer as release manager and kick off the release process once
> blocker issues has been merged. What do you think?
>
> If there is any other blocker issues need to be fixed in 1.9.1, please let
> me know.
>
> Cheers,
> Jark
>


-- 
Best Regards

Jeff Zhang


Re: [DISCUSS] Releasing Flink 1.9.1

2019-09-22 Thread Debasish Ghosh
I hope https://issues.apache.org/jira/browse/FLINK-12501 will also be part
of 1.9.1 ..

regards.

On Mon, Sep 23, 2019 at 11:39 AM Jeff Zhang  wrote:

> FLINK-13708 is also very critical IMO. This would cause invalid flink job
> (doubled output)
>
> https://issues.apache.org/jira/browse/FLINK-13708
>
> Jark Wu  于2019年9月23日周一 下午2:03写道:
>
> > Hi everyone,
> >
> > It has already been a month since we released Flink 1.9.0.
> > We already have many important bug fixes from which our users can benefit
> > in the release-1.9 branch (83 resolved issues).
> > Therefore, I propose to create the next bug fix release for Flink 1.9.
> >
> > Most notable fixes are:
> >
> > - [FLINK-13526] When switching to a non existing catalog or database in
> the
> > SQL Client the client crashes.
> > - [FLINK-13568] It is not possible to create a table with a "STRING" data
> > type via the SQL DDL.
> > - [FLINK-13941] Prevent data-loss by not cleaning up small part files
> from
> > S3.
> > - [FLINK-13490][jdbc] If one column value is null when reading JDBC, the
> > following values will all be null.
> > - [FLINK-14107][kinesis] When using event time alignment with the
> Kinsesis
> > Consumer the consumer might deadlock in one corner case.
> >
> > Furthermore, I would like the following critical issues to be merged
> before
> > 1.9.1 release:
> >
> > - [FLINK-14118] Reduce the unnecessary flushing when there is no data
> > available for flush which can save 20% ~ 40% CPU. (reviewing)
> > - [FLINK-13386] Fix A couple of issues with the new dashboard have
> already
> > been filed. (PR is created, need review)
> > - [FLINK-14010][yarn] The Flink YARN cluster can get into an inconsistent
> > state in some cases, where
> > leaderhship for JobManager, ResourceManager and Dispatcher components is
> > split between two master processes. (PR is created, need review)
> >
> > I would volunteer as release manager and kick off the release process
> once
> > blocker issues has been merged. What do you think?
> >
> > If there is any other blocker issues need to be fixed in 1.9.1, please
> let
> > me know.
> >
> > Cheers,
> > Jark
> >
>
>
> --
> Best Regards
>
> Jeff Zhang
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Re: non-deserializable root cause in DeclineCheckpoint

2019-09-22 Thread Jeffrey Martin
Draft PR here: https://github.com/apache/flink/pull/9742
There might be some failing tests (still waiting on Travis), but I think
the diff is small enough for you to evaluate the approach for acceptability.

On Sun, Sep 22, 2019 at 9:10 PM Terry Wang  wrote:

> Hi Jeffrey,
>
> You are right and I understood what you have  said  after I just studied
> the class org.apache.flink.util.SerializedThrowable.
> I prefer the fixes no.2 you mentioned:
> CheckpointException should always capture its wrapped exception as
> a SerializedThrowable
> Looking forward to seeing your pr soon :)
>
> Best,
> Terry Wang
>
>
>
> > 在 2019年9月23日,上午11:48,Jeffrey Martin  写道:
> >
> > Hi Terry,
> >
> > KafkaException comes in through the job's dependencies (it's defined in
> the
> > kafka-clients jar packed up in the fat job jar) and is on either the TM
> nor
> > JM default classpath. The job running in the TM includes the job
> > dependencies and so can throw a KafkaException but the JM can't
> deserialize
> > it because it's not available on the default classpath.
> >
> > I'm suggesting defensively wrapping all causes of a CheckpointException
> in
> > a SerializedThrowable (in addition to defensively wrapping everything
> > except a CheckpointException). I believe SerializedThrowable is there
> > specifically for this case, i.e. where a job in the TM sends the JM an
> > exception that's defined only in the job itself.
> >
> > It might be clearer if I just put up a PR :) I'd be happy to and it'll be
> > very short.
> >
> > Best,
> >
> > Jeff
> >
> > On Sun, Sep 22, 2019 at 7:45 PM Terry Wang  wrote:
> >
> >> Hi, Jeffrey~
> >>
> >> Thanks for your detailed explanation and I understood why job failed
> with
> >> flink 1.9.
> >>
> >> But the two fixes you mentioned may still not work well. As
> KafkaException
> >> can be serialized
> >> in TM for there is necessary jar in its classpath but not in JM, so
> maybe
> >> it’s impossible to check
> >> the possibility of serialization in advance.
> >> Do I understand right?
> >>
> >>
> >>
> >> Best,
> >> Terry Wang
> >>
> >>
> >>
> >>> 在 2019年9月23日,上午5:17,Jeffrey Martin  写道:
> >>>
> >>> Thanks for suggestion, Terry. I've investigated a bit further.
> >>>
> >>> DeclineCheckpoint specifically checks for the possibility of an
> exception
> >>> that the JM won't be able to deserialize (i.e. anything other than a
> >>> Checkpoint exception). It just doesn't check for the possibility of a
> >>> CheckpointException that can't be deserialize because its root cause
> >> can't
> >>> be deserialize.
> >>>
> >>> I think the job succeeding on 1.8 and failing on 1.9 was a red herring
> --
> >>> 1.9 broke the FlinkKafkaProducer API so I wound up having to set the
> >>> Semantic explicitly on 1.9. I set it to EXACTLY_ONCE, which caused
> >>> checkpoints to fail sometimes. That caused the KafkaException to be
> >>> propagated to the JM as the root cause of a CheckpointException.
> >>>
> >>> On Sun, Sep 22, 2019 at 5:03 AM Terry Wang  wrote:
> >>>
>  Hi, Jeffrey~
> 
>  I think two fixes you mentioned may not work in your case.
>  This problem https://issues.apache.org/jira/browse/FLINK-14076 <
>  https://issues.apache.org/jira/browse/FLINK-14076> is caused by TM
> and
> >> JM
>  jar package environment inconsistent or jar loaded behavior
> >> inconsistent in
>  nature.
>  Maybe the behavior  of standalone cluster’s dynamic class loader
> changed
>  in flink 1.9 since you mentioned that your program run normally in
> flink
>  1.8.
>  Just a thought from me.
>  Hope to be useful~
> 
>  Best,
>  Terry Wang
> 
> 
> 
> > 在 2019年9月21日,上午2:58,Jeffrey Martin  写道:
> >
> > JIRA ticket: https://issues.apache.org/jira/browse/FLINK-14076
> >
> > I'm on Flink v1.9 with the Kafka connector and a standalone JM.
> >
> > If FlinkKafkaProducer fails while checkpointing, it throws a
>  KafkaException
> > which gets wrapped in a CheckpointException which is sent to the JM
> as
> >> a
> > DeclineCheckpoint. KafkaException isn't on the JM default classpath,
> so
>  the
> > JM throws a fairly cryptic ClassNotFoundException. The details of the
> > KafkaException wind up suppressed so it's impossible to figure out
> what
> > actually went wrong.
> >
> > I can think of two fixes that would prevent this from occurring in
> the
> > Kafka or other connectors in the future:
> > 1. DeclineCheckpoint should always send a SerializedThrowable to the
> JM
> > rather than allowing CheckpointExceptions with non-deserializable
> root
> > causes to slip through
> > 2. CheckpointException should always capture its wrapped exception
> as a
> > SerializedThrowable (i.e., use 'super(new
> SerializedThrowable(cause))'
> > rather than 'super(cause)').
> >
> > Thoughts?
> 
> 
> >>
> >>
>
>