Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

2019-09-09 Thread shimin yang
Hi Jingsong,

Although it would be nice if the accumulators in GlobalAggregateManager is
fault-tolerant, we could still take advantage of managed state to guarantee
the semantic and use the accumulators to implement distributed barrier or
lock to solve the distributed access problem.

Best,
Shimin

JingsongLee  于2019年9月9日周一 下午1:33写道:

> Thanks jark and dian:
> 1.jark's approach: do the work in task-0. Simple way.
> 2.dian's approach: use StreamingRuntimeContext#getGlobalAggregateManager
> Can do more operation. But these accumulators are not fault-tolerant?
>
> Best,
> Jingsong Lee
>
>
> --
> From:shimin yang 
> Send Time:2019年9月6日(星期五) 15:21
> To:dev 
> Subject:Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete
>
> Hi Fu,
>
> That'll be nice.
>
> Thanks.
>
> Best,
> Shimin
>
> Dian Fu  于2019年9月6日周五 下午3:17写道:
>
> > Hi Shimin,
> >
> > It can be guaranteed to be an atomic operation. This is ensured by the
> RPC
> > framework. You could take a look at RpcEndpoint for more details.
> >
> > Regards,
> > Dian
> >
> > > 在 2019年9月6日,下午2:35,shimin yang  写道:
> > >
> > > Hi Fu,
> > >
> > > Thank you for the remind. I think it would work in my case as long as
> > it's
> > > an atomic operation.
> > >
> > > Dian Fu  于2019年9月6日周五 下午2:22写道:
> > >
> > >> Hi Jingsong,
> > >>
> > >> Thanks for bring up this discussion. You can try to look at the
> > >> GlobalAggregateManager to see if it can meet your requirements. It can
> > be
> > >> got via StreamingRuntimeContext#getGlobalAggregateManager().
> > >>
> > >> Regards,
> > >> Dian
> > >>
> > >>> 在 2019年9月6日,下午1:39,shimin yang  写道:
> > >>>
> > >>> Hi Jingsong,
> > >>>
> > >>> Big fan of this idea. We faced the same problem and resolved by
> adding
> > a
> > >>> distributed lock. It would be nice to have this feature in JobMaster,
> > >> which
> > >>> can replace the lock.
> > >>>
> > >>> Best,
> > >>> Shimin
> > >>>
> > >>> JingsongLee  于2019年9月6日周五
> 下午12:20写道:
> > >>>
> >  Hi devs:
> > 
> >  I try to implement streaming file sink for table[1] like
> > >> StreamingFileSink.
> >  If the underlying is a HiveFormat, or a format that updates
> visibility
> >  through a metaStore, I have to update the metaStore in the
> >  notifyCheckpointComplete, but this operation occurs on the task
> side,
> >  which will lead to distributed access to the metaStore, which will
> >  lead to bottleneck.
> > 
> >  So I'm curious if we can support notifyOnMaster for
> >  notifyCheckpointComplete like FinalizeOnMaster.
> > 
> >  What do you think?
> > 
> >  [1]
> > 
> > >>
> >
> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
> > 
> >  Best,
> >  Jingsong Lee
> > >>
> > >>
> >
> >
>


[DISCUSS] Retrieval services in non-high-availability scenario

2019-09-09 Thread Zili Chen
Hi devs,

I'd like to start a discussion thread on the topic how we provide
retrieval services in non-high-availability scenario. To clarify
terminology, non-high-availability scenario refers to
StandaloneHaServices and EmbeddedHaServices.

***The problem***

We notice that retrieval services of current StandaloneHAServices
(pre-configured) and EmbeddedHAServices(in-memory) has their
respective problems.

For pre-configured scenario, we now have a
getJobManagerLeaderRetriever(JobID, defaultJMAddress) method
to workaround the problem that it is impossible to configure JM
address previously. The parameter defaultJMAddress is not in use in
any other defaultJMAddress with any other high-availability mode.
Also in MiniCluster scenario and anywhere else leader address
pre-configure becomes impossible, StandaloneHAServices cannot be used.

For in-memory case, it is clearly that it doesn't fit any distributed
scenario.

***The proposal***

In order to address the inconsistency between pre-configured retrieval
services and zookeeper based retrieval services, we reconsider the
promises provided by "non-high-availability" and regard it as
similar services as zookeeper based one except it doesn't tolerate
node failure. Thus, we implement a service acts like a standalone
zookeeper cluster, named LeaderServer.

A leader server is an actor runs on jobmanager actor system and reacts
to leader contender register and leader retriever request. If
jobmanager fails, the leader server associated fails, too, where
"non-high-availability" stands.

In order to communicate with leader server, we start leader client per
high-availability services(JM, TM, ClusterClient). When leader
election service starts, it registers the contender to leader server
via leader client(by akka communication); when leader retriever
starts, it registers itself to leader server via leader client.

Leader server handles leader election internally just like Embedded
implementation, and notify retrievers with new leader information
when there is new leader elected.

In this way, we unify the view of retrieval services in all scenario:

1. Configure a name services to communicate with. In zookeeper mode
it is zookeeper and in non-high-availability mode it is leader server.
2. Any retrieval request is sent to the name services and is handled
by that services.

Apart from a unified view, there are other advantages:

+ We need not to use a special method
getJobManagerLeaderRetriever(JobID, defaultJMAddress), instead, use
getJobManagerLeaderRetriever(JobID). And so that we need not include
JobManager address in slot request which might become stale during
transmission.

+ Separated configuration concerns on launch and retrieval. JobManager
address & port, REST address & port is only configured when launch
a cluster(even in YARN scenario, no need to configure). And when
retrieval requested, configure the connect info to name services(zk
or leader server).

+ Embedded implementation could be also included in this abstraction
without any regression on multiple leader simulation for test purpose.
Actually, leader server acts as a limited standalone zookeeper
cluster. And thus, from where this proposal comes from, when we
refactor metadata storage with transaction store proposed in
FLINK-10333, we only take care of zookeeper implementation and a
unified non-high-availability implementation.

***Clean up***

It is also noticed that there are several stale & unimplemented
high-availability services implementations which I'd like to remove for
a clean codebase work on this thread and FLINK-10333. They are:

- YarnHighAvailabilityServices
- AbstractYarnNonHaServices
- YarnIntraNonHaMasterServices
- YarnPreConfiguredMasterNonHaServices
- SingleLeaderElectionService
- FsNegativeRunningJobsRegistry

Any feedback is appreciated.

Best,
tison.


[jira] [Created] (FLINK-14008) Flink Scala 2.12 binary distribution contains incorrect NOTICE-binary file

2019-09-09 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-14008:
-

 Summary: Flink Scala 2.12 binary distribution contains incorrect 
NOTICE-binary file
 Key: FLINK-14008
 URL: https://issues.apache.org/jira/browse/FLINK-14008
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.9.0, 1.8.1, 1.10.0
Reporter: Till Rohrmann
 Fix For: 1.10.0


The Flink Scala {{2.12}} binary distribution contains an incorrect 
NOTICE-binary file. The problem is that we don't update the Scala version of 
the Scala dependencies listed in the {{NOTICE-binary}} file.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14009) Cron jobs broken due to verifying incorrect NOTICE-binary file

2019-09-09 Thread Till Rohrmann (Jira)
Till Rohrmann created FLINK-14009:
-

 Summary: Cron jobs broken due to verifying incorrect NOTICE-binary 
file
 Key: FLINK-14009
 URL: https://issues.apache.org/jira/browse/FLINK-14009
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.10.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.8.2, 1.9.1


With FLINK-13968 we introduced an automatic {{NOTICE-binary}} file check. 
However, since we don't use the correct {{NOTICE-binary}} file (FLINK-14008) 
for Scala 2.12 it fails currently our cron jobs.

I suggest to only enable the automatic {{NOTICE-binary}} files for Scala 2.11 
until FLINK-14008 has been fixed.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-09 Thread Kostas Kloudas
Thanks a lot everyone for the warm welcome!

Cheers,
Kostas

On Mon, Sep 9, 2019 at 4:54 AM Yun Gao  wrote:
>
>   Congratulations, Kostas!
>
>  Best,
>  Yun‍‍‍
>
>
>
>
> --
> From:Becket Qin 
> Send Time:2019 Sep. 9 (Mon.) 10:47
> To:dev 
> Subject:Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC
>
> Congrats, Kostas!
>
> On Sun, Sep 8, 2019 at 11:48 PM myasuka  wrote:
>
> > Congratulations Kostas!
> >
> > Best
> > Yun Tang
> >
> >
> >
> > --
> > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >
>


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-09 Thread JingsongLee
Congrats, Kostas! Well deserved.

Best,
Jingsong Lee


--
From:Kostas Kloudas 
Send Time:2019年9月9日(星期一) 15:50
To:dev ; Yun Gao 
Subject:Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

Thanks a lot everyone for the warm welcome!

Cheers,
Kostas

On Mon, Sep 9, 2019 at 4:54 AM Yun Gao  wrote:
>
>   Congratulations, Kostas!
>
>  Best,
>  Yun‍‍‍
>
>
>
>
> --
> From:Becket Qin 
> Send Time:2019 Sep. 9 (Mon.) 10:47
> To:dev 
> Subject:Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC
>
> Congrats, Kostas!
>
> On Sun, Sep 8, 2019 at 11:48 PM myasuka  wrote:
>
> > Congratulations Kostas!
> >
> > Best
> > Yun Tang
> >
> >
> >
> > --
> > Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> >
>



Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-09 Thread Sijie Guo
Thank you Bowen and Becket.

What's the take from Flink community? Shall we wait for FLIP-27 or shall we
proceed to next steps? And what the next steps are? :-)

Thanks,
Sijie

On Thu, Sep 5, 2019 at 2:43 PM Bowen Li  wrote:

> Hi,
>
> I think having a Pulsar connector in Flink can be a good mutual benefit to
> both communities.
>
> Another perspective is that Pulsar connector is the 1st streaming connector
> that integrates with Flink's metadata management system and Catalog APIs.
> It'll be cool to see how the integration turns out and whether we need to
> improve Flink Catalog stack, which are currently in Beta, to cater to
> streaming source/sink. Thus I'm in favor of merging Pulsar connector into
> Flink 1.10.
>
> I'd suggest to submit smaller sized PRs, e.g. maybe one for basic
> source/sink functionalities and another for schema and catalog integration,
> just to make them easier to review.
>
> It doesn't seem to hurt to wait for FLIP-27. But I don't think FLIP-27
> should be a blocker in cases where it cannot make its way into 1.10 or
> doesn't leave reasonable amount of time for committers to review or for
> Pulsar connector to fully adapt to new interfaces.
>
> Bowen
>
>
>
> On Thu, Sep 5, 2019 at 3:21 AM Becket Qin  wrote:
>
> > Hi Till,
> >
> > You are right. It all depends on when the new source interface is going
> to
> > be ready. Personally I think it would be there in about a month or so.
> But
> > I could be too optimistic. It would also be good to hear what do Aljoscha
> > and Stephan think as they are also involved in FLIP-27.
> >
> > In general I think we should have Pulsar connector in Flink 1.10,
> > preferably with the new source interface. We can also check it in right
> now
> > with old source interface, but I suspect few users will use it before the
> > next official release. Therefore, it seems reasonable to wait a little
> bit
> > to see whether we can jump to the new source interface. As long as we
> make
> > sure Flink 1.10 has it, waiting a little bit doesn't seem to hurt much.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Sep 5, 2019 at 3:59 PM Till Rohrmann 
> wrote:
> >
> > > Hi everyone,
> > >
> > > I'm wondering what the problem would be if we committed the Pulsar
> > > connector before the new source interface is ready. If I understood it
> > > correctly, then we need to support the old source interface anyway for
> > the
> > > existing connectors. By checking it in early I could see the benefit
> that
> > > our users could start using the connector earlier. Moreover, it would
> > > prevent that the Pulsar integration is being delayed in case that the
> > > source interface should be delayed. The only downside I see is the
> extra
> > > review effort and potential fixes which might be irrelevant for the new
> > > source interface implementation. I guess it mainly depends on how
> certain
> > > we are when the new source interface will be ready.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Sep 5, 2019 at 8:56 AM Becket Qin 
> wrote:
> > >
> > > > Hi Sijie and Yijie,
> > > >
> > > > Thanks for sharing your thoughts.
> > > >
> > > > Just want to have some update on FLIP-27. Although the FLIP wiki and
> > > > discussion thread has been quiet for some time, a few committer /
> > > > contributors in Flink community were actually prototyping the entire
> > > thing.
> > > > We have made some good progress there but want to update the FLIP
> wiki
> > > > after the entire thing is verified to work in case there are some
> last
> > > > minute surprise in the implementation. I don't have an exact ETA yet,
> > > but I
> > > > guess it is going to be within a month or so.
> > > >
> > > > I am happy to review the current Flink Pulsar connector and see if it
> > > would
> > > > fit in FLIP-27. It would be good to avoid the case that we checked in
> > the
> > > > Pulsar connector with some review efforts and shortly after that the
> > new
> > > > Source interface is ready.
> > > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > > On Thu, Sep 5, 2019 at 8:39 AM Yijie Shen  >
> > > > wrote:
> > > >
> > > > > Thanks for all the feedback and suggestions!
> > > > >
> > > > > As Sijie said, the goal of the connector has always been to provide
> > > > > users with the latest features of both systems as soon as possible.
> > We
> > > > > propose to contribute the connector to Flink and hope to get more
> > > > > suggestions and feedback from Flink experts to ensure the high
> > quality
> > > > > of the connector.
> > > > >
> > > > > For FLIP-27, we noticed its existence at the beginning of reworking
> > > > > the connector implementation based on Flink 1.9; we also wanted to
> > > > > build a connector that supports both batch and stream computing
> based
> > > > > on it.
> > > > > However, it has been inactive for some time, so we decided to
> provide
> > > > > a connector with most of the new features, such as the new type
> > system
> > > > > and th

Re: Checkpointing clarification

2019-09-09 Thread Till Rohrmann
Yes you are correct Dominik. The committed Kafka offsets tell you what the
program has read as input from the Kafka topic. But depending on the actual
program logic this does not mean that you have output the results of
processing these input events up to this point. As you have said, there are
Flink operations such as window calculations which need to buffer events
for a certain period before they can emit the results.

Cheers,
Till

On Mon, Sep 9, 2019 at 4:54 AM Paul Lam  wrote:

> Hi Dom,
>
> There are sync phase and async phase in checkpointing. When a operator
> receives a barrier, it performs snapshot aka the sync phase. And when the
> barriers pass through all the operators including sinks, the operators will
> get a notification, after which they do the async part, like committing the
> Kafka offsets. WRT your question, the offsets would only be committed when
> the whole checkpoint is successfully finished. For more information, you
> can refer to this post[1].
>
> [1]
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
> <
> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
> >
>
> Best,
> Paul Lam
>
> > 在 2019年9月9日,07:06,Dominik Wosiński  写道:
> >
> > Okay, thanks for clarifying. I have some followup question here. If we
> > consider Kafka offsets commits, this basically means that
> > the offsets committed during the checkpoint are not necessarily the
> > offsets that were really processed by the pipeline and written to sink ?
> I
> > mean If there is a window in the pipeline, then the records are saved in
> > the window state if the window was not emitted yet, but they are
> considered
> > as processed, thus will not be replayed in case of restart, because Flink
> > considers them as processed when committing offsets. Am I correct ?
> >
> > Best,
> > Dom.
>
>


[jira] [Created] (FLINK-14010) Dispatcher & JobManagers don't give up leadership when AM is shut down

2019-09-09 Thread TisonKun (Jira)
TisonKun created FLINK-14010:


 Summary: Dispatcher & JobManagers don't give up leadership when AM 
is shut down
 Key: FLINK-14010
 URL: https://issues.apache.org/jira/browse/FLINK-14010
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN, Runtime / Coordination
Affects Versions: 1.9.0, 1.8.1, 1.7.2, 1.10.0
Reporter: TisonKun


In YARN deployment scenario, YARN RM possibly launches a new AM for the job 
even if the previous AM does not terminated, for example, when AMRM heartbeat 
timeout. This is a common case that RM will send a shutdown request to the 
previous AM and expect the AM shutdown properly.

However, currently in {{YARNResourceManager}}, we handle this request in 
{{onShutdownRequest}} which simply close the {{YARNResourceManager}} *but not 
Dispatcher and JobManagers*. Thus, Dispatcher and JobManager launched in new AM 
cannot be granted leadership properly. Visually,

on previous AM: Dispatcher leader, JM leaders
on new AM: ResourceManager leader

since on client side or in per-job mode, JobManager address and port are 
configured as the new AM, the whole cluster goes into an unrecoverable 
inconsistent status: client all queries the dispatcher on new AM who is now the 
leader. Briefly, Dispatcher and JobManagers on previous AM do not give up their 
leadership properly.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Call for approving Elasticsearch 7.x connector

2019-09-09 Thread vino yang
Hi guys,

There is an issue about supporting Elasticsearch 7.x.[1]
Based on our validation and discussion. We found that Elasticsearch 7.x
does not guarantee API compatibility. Therefore, it does not have the
ability to provide a universal connector like Kafka. It seems that we have
to provide a new connector to support Elasticsearch 7.x.

Consider that Elasticsearch is a widely used system. There have been
multiple user comments hoping to support Elasticsearch 7.x as soon as
possible. Therefore, I hope this new connector will be approved as soon as
possible, so that this work can be started.

Best,
Vino

[1]: https://issues.apache.org/jira/browse/FLINK-13025


[jira] [Created] (FLINK-14011) Make some fields final and initialize them during construction in AsyncWaitOperator

2019-09-09 Thread Alex (Jira)
Alex created FLINK-14011:


 Summary: Make some fields final and initialize them during 
construction in AsyncWaitOperator
 Key: FLINK-14011
 URL: https://issues.apache.org/jira/browse/FLINK-14011
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Reporter: Alex


This is a small follow up ticket after the FLINK-12958.
With the changes introduced there, the {{AsyncWaitOperator}} is created by 
{{AsyncWaitOperatorFactory}}, so some fields that initialized in the {{setup}} 
method can be setup in the constructor instead.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] StreamingFile with ParquetBulkWriter bucketing limitations

2019-09-09 Thread Kostas Kloudas
Hi Enrico,

Sorry for the late reply. I think your understanding is correct.
The best way to do it is to write your own ParquetBulkWriter and the
corresponding factory.

Out of curiosity, I guess that in the BucketingSink you were using the
AvroKeyValueSinkWriter, right?

Cheers,
Kostas

On Fri, Aug 30, 2019 at 10:23 AM Enrico Agnoli
 wrote:
>
> StreamingFile limitations
>
> Hi community,
>
> I'm working toward the porting of our code from `BucketingSink<>` to 
> `StreamingFileSink`.
> In this case we use the sink to write AVRO via Parquet and the suggested 
> implementation of the Sink should be something like:
>
> ```
> val parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(mySchemaClass)
> StreamingFileSink.forBulkFormat(basePath, 
> parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner)
> ```
>
> In this design the BucketAssigner is concatenated after the bulkFormat step. 
> The problem that I'm having with this design is that I have an object that 
> contains information that should be used to construct the path and a 
> sub-object that contains the data to serialize. A simple example
>
> myClass
> |- country
> |- cityClass extends SpecificRecordBase)
>
> Let's say I receive myClass as a stream and I want to serialize the cityClass 
> data via the logic above. The problem is that the `forBulkFormat(..)` needs 
> to run on a subType of `SpecificRecordBase`, so myClass doesn't work.
> If I extract cityClass from myClass then I will not have country available in 
> the `withBucketAssigner(..)` to be able to store the data in the right 
> folder...
>
>
> Am I missing something or I do have to write my own version of the 
> `ParquetBulkWriter` class so to be able to handle `myClass`?
>
> Thanks for any idea and suggestion.
> Enrico


Re: [DISCUSS] Retrieval services in non-high-availability scenario

2019-09-09 Thread Till Rohrmann
Hi Tison,

thanks for starting this discussion. I think your mail includes multiple
points which are worth being treated separately (might even make sense to
have separate discussion threads). Please correct me if I understood things
wrongly:

1. Adding new non-ha HAServices:

Based on your description I could see the "ZooKeeper-light" non-ha
HAServices implementation work. Would any changes to the existing
interfaces be needed? How would the LeaderServer integrate in the lifecycle
of the cluster entrypoint?

2. Replacing existing non-ha HAServices with LeaderServer implementation:

I'm not sure whether we need to enforce that every non-ha HAServices
implementation works as you've described. I think it is pretty much an
implementation detail whether the services talk to a LeaderServer or are
being started with a pre-configured address. I also think that it is fair
to have different implementations with different characteristics and usage
scenarios. As you've said the EmbeddedHaServices are targeted for single
process cluster setups and they are only used by the MiniCluster.

What I like about the StandaloneHaServices is that they are dead simple
(apart from the configuration). With a new implementation based on the
LeaderServer, the client side implementation becomes much more complex
because now one needs to handle all kind of network issues properly.
Moreover, it adds more complexity to the system because it starts a new
distributed component which needs to be managed. I could see that once the
new implementation has matured enough that it might replace the
EmbeddedHaServices. But I wouldn't start with removing them.

You are right that due to the fact that we don't know the JM address before
it's being started that we need to send the address with every slot
request. Moreover we have the method #getJobManagerLeaderRetriever(JobID,
defaultJMAddress) on the HAServices. While this is not super nice, I don't
think that this is a fundamental problem at the moment. What we pay is a
couple of extra bytes we need to send over the network.

Configuration-wise, I'm not so sure whether we gain too much by replacing
the StandaloneHaServices with the LeaderServer based implementation. For
the new implementation one needs to configure a static address as well at
cluster start-up time. The only benefit I can see is that we don't need to
send the JM address to the RM and TMs. But as I've said, I don't think that
this is a big problem for which we need to introduce new HAServices.
Instead I could see that we might be able to remove it once the
LeaderServer HAServices implementation has proven to be stable.

3. Configuration of HAServices:

I agree that Flink's address and port configuration is not done
consistently. I might make sense to group the address and port
configuration under the ha service configuration section. Maybe it makes
also sense to rename ha services into ServiceDiscovery because it also
works in the non-ha case. it could be possible to only configure address
and port if one is using the non-ha services, for example. However, this
definitely deserves a separate discussion and design because one needs to
check where exactly the respective configuration options are being used.

I think improving the configuration of HAServices is actually orthogonal to
introducing the LeaderServer HAServices implementation and could also be
done for the existing HAServices.

4. Clean up of HAServices implementations:

You are right that some of the existing HAServices implementations are
"dead code" at the moment. They are the result of some implementation ideas
which haven't been completed. I would suggest to start a separate
discussion to discuss what to do with them.

Cheers,
Till

On Mon, Sep 9, 2019 at 9:16 AM Zili Chen  wrote:

> Hi devs,
>
> I'd like to start a discussion thread on the topic how we provide
> retrieval services in non-high-availability scenario. To clarify
> terminology, non-high-availability scenario refers to
> StandaloneHaServices and EmbeddedHaServices.
>
> ***The problem***
>
> We notice that retrieval services of current StandaloneHAServices
> (pre-configured) and EmbeddedHAServices(in-memory) has their
> respective problems.
>
> For pre-configured scenario, we now have a
> getJobManagerLeaderRetriever(JobID, defaultJMAddress) method
> to workaround the problem that it is impossible to configure JM
> address previously. The parameter defaultJMAddress is not in use in
> any other defaultJMAddress with any other high-availability mode.
> Also in MiniCluster scenario and anywhere else leader address
> pre-configure becomes impossible, StandaloneHAServices cannot be used.
>
> For in-memory case, it is clearly that it doesn't fit any distributed
> scenario.
>
> ***The proposal***
>
> In order to address the inconsistency between pre-configured retrieval
> services and zookeeper based retrieval services, we reconsider the
> promises provided by "non-high-availability" and regard it as
> similar ser

[jira] [Created] (FLINK-14012) Failed to start job for consuming Secure Kafka after the job cancel

2019-09-09 Thread Daebeom Lee (Jira)
Daebeom Lee created FLINK-14012:
---

 Summary: Failed to start job for consuming Secure Kafka after the 
job cancel
 Key: FLINK-14012
 URL: https://issues.apache.org/jira/browse/FLINK-14012
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.9.0
 Environment: * Kubernetes 1.13.2
 * Flink 1.9.0
 * Kafka client libary 2.2.0
Reporter: Daebeom Lee


Hello, this is Daebeom Lee.
h2. Background

I installed Flink 1.9.0 at this our Kubernetes cluster.

We use Flink session cluster. - build fatJar file and upload it at the UI, run 
serval jobs.

At first, our jobs are good to start.

But, when we cancel some jobs, the job failed

This is the error code.


{code:java}
// code placeholder
java.lang.NoClassDefFoundError: 
org/apache/kafka/common/security/scram/internals/ScramSaslClient
at 
org.apache.kafka.common.security.scram.internals.ScramSaslClient$ScramSaslClientFactory.createSaslClient(ScramSaslClient.java:235)
at javax.security.sasl.Sasl.createSaslClient(Sasl.java:384)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.lambda$createSaslClient$0(SaslClientAuthenticator.java:180)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslClient(SaslClientAuthenticator.java:176)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.(SaslClientAuthenticator.java:168)
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildClientAuthenticator(SaslChannelBuilder.java:254)
at 
org.apache.kafka.common.network.SaslChannelBuilder.lambda$buildChannel$1(SaslChannelBuilder.java:202)
at 
org.apache.kafka.common.network.KafkaChannel.(KafkaChannel.java:140)
at 
org.apache.kafka.common.network.SaslChannelBuilder.buildChannel(SaslChannelBuilder.java:210)
at 
org.apache.kafka.common.network.Selector.buildAndAttachKafkaChannel(Selector.java:334)
at 
org.apache.kafka.common.network.Selector.registerChannel(Selector.java:325)
at org.apache.kafka.common.network.Selector.connect(Selector.java:257)
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:920)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:474)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1803)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1771)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.getAllPartitionsForTopics(KafkaPartitionDiscoverer.java:77)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:131)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:508)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
{code}
h2. Our workaround
 * I think that this is Flink JVM classloader issue.
 * Classloader unloads when job cancels by the way kafka client library is 
included fatJar.
 * So, I located Kafka client library to /opt/flink/lib 
 ** /opt/flink/lib/kafka-clients-2.2.0.jar
 * And then all issue solved.
 * But there are weird points
 ** When Flink 1.8.1 has no problem before 2 weeks
 ** Before 1 week I rollback from 1.9.0 to 1.8.1, same errors occurred.
 ** Maybe docker image is changed at docker repository ( 
[https://github.com/docker-flink/docker-flink ) 
|https://github.com/docker-flink/docker-flink]

 
h2. Suggestion
 * I'd like to know why this error occurred exactly reason after upgrade 1.9.0.
 * Does anybody know a better solution in this case?

 

Thank you in advance.

 



--
Thi

Re: [DISCUSS] StreamingFile with ParquetBulkWriter bucketing limitations

2019-09-09 Thread Enrico Agnoli
Thanks for confirming.
We have a 
```
public class ParquetSinkWriter implements Writer
```
that handles the serialization of the data.
We implemented it starting from:
https://medium.com/hadoop-noob/flink-parquet-writer-d127f745b519
https://stackoverflow.com/questions/48098011/how-to-use-apache-flink-write-parquet-file-on-hdfs-by-datetime-partition


On 2019/09/09 09:31:03, Kostas Kloudas  wrote: 
> Hi Enrico,
> 
> Sorry for the late reply. I think your understanding is correct.
> The best way to do it is to write your own ParquetBulkWriter and the
> corresponding factory.
> 
> Out of curiosity, I guess that in the BucketingSink you were using the
> AvroKeyValueSinkWriter, right?
> 
> Cheers,
> Kostas
> 
> On Fri, Aug 30, 2019 at 10:23 AM Enrico Agnoli
>  wrote:
> >
> > StreamingFile limitations
> >
> > Hi community,
> >
> > I'm working toward the porting of our code from `BucketingSink<>` to 
> > `StreamingFileSink`.
> > In this case we use the sink to write AVRO via Parquet and the suggested 
> > implementation of the Sink should be something like:
> >
> > ```
> > val parquetWriterFactory = 
> > ParquetAvroWriters.forSpecificRecord(mySchemaClass)
> > StreamingFileSink.forBulkFormat(basePath, 
> > parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner)
> > ```
> >
> > In this design the BucketAssigner is concatenated after the bulkFormat 
> > step. The problem that I'm having with this design is that I have an object 
> > that contains information that should be used to construct the path and a 
> > sub-object that contains the data to serialize. A simple example
> >
> > myClass
> > |- country
> > |- cityClass extends SpecificRecordBase)
> >
> > Let's say I receive myClass as a stream and I want to serialize the 
> > cityClass data via the logic above. The problem is that the 
> > `forBulkFormat(..)` needs to run on a subType of `SpecificRecordBase`, so 
> > myClass doesn't work.
> > If I extract cityClass from myClass then I will not have country available 
> > in the `withBucketAssigner(..)` to be able to store the data in the right 
> > folder...
> >
> >
> > Am I missing something or I do have to write my own version of the 
> > `ParquetBulkWriter` class so to be able to handle `myClass`?
> >
> > Thanks for any idea and suggestion.
> > Enrico
> 


Re: [DISCUSS] FLIP-66: Support time attribute in SQL DDL

2019-09-09 Thread Jark Wu
Hi all,

Thanks all for so much feedbacks received in the doc so far.
I saw a general agreement on using computed column to support proctime
attribute and extract timestamps.
So we will prepare a computed column FLIP and share in the dev ML soon.

Feel free to leave more comments!

Best,
Jark



On Fri, 6 Sep 2019 at 13:50, Dian Fu  wrote:

> Hi Jark,
>
> Thanks for bringing up this discussion and the detailed design doc. This
> is definitely a critical feature for streaming SQL jobs. I have left a few
> comments in the design doc.
>
> Thanks,
> Dian
>
> > 在 2019年9月6日,上午11:48,Forward Xu  写道:
> >
> > Thanks Jark for this topic, This will be very useful.
> >
> >
> > Best,
> >
> > ForwardXu
> >
> > Danny Chan  于2019年9月6日周五 上午11:26写道:
> >
> >> Thanks Jark for bring up this topic, this is definitely an import
> feature
> >> for the SQL, especially the DDL users.
> >>
> >> I would spend some time to review this design doc, really thanks.
> >>
> >> Best,
> >> Danny Chan
> >> 在 2019年9月6日 +0800 AM11:19,Jark Wu ,写道:
> >>> Hi everyone,
> >>>
> >>> I would like to start discussion about how to support time attribute in
> >> SQL
> >>> DDL.
> >>> In Flink 1.9, we already introduced a basic SQL DDL to create a table.
> >>> However, it doesn't support to define time attributes. This makes users
> >>> can't
> >>> apply window operations on the tables created by DDL which is a bad
> >>> experience.
> >>>
> >>> In FLIP-66, we propose a syntax for watermark to define rowtime
> attribute
> >>> and propose to use computed column syntax to define proctime attribute.
> >>> But computed column is another big topic and should deserve a separate
> >>> FLIP.
> >>> If we have a consensus on the computed column approach, we will start
> >>> computed column FLIP soon.
> >>>
> >>> FLIP-66:
> >>>
> >>
> https://docs.google.com/document/d/1-SecocBqzUh7zY6HBYcfMlG_0z-JAcuZkCvsmN3LrOw/edit#
> >>>
> >>> Thanks for any feedback!
> >>>
> >>> Best,
> >>> Jark
> >>
>
>


Re: [VOTE] Release 1.8.2, release candidate #1

2019-09-09 Thread Dian Fu
+1 (non-binding)

- built from source successfully (mvn clean install -DskipTests)
- checked gpg signature and hashes of the source release and binary release 
packages
- All artifacts have been deployed to the maven central repository
- no new dependencies were added since 1.8.1
- run a couple of tests in IDE success

Regards,
Dian

> 在 2019年9月9日,下午2:28,jincheng sun  写道:
> 
> +1 (binding)
> 
> - checked signatures [SUCCESS]
> - built from source without tests [SUCCESS]
> - ran some tests in IDE [SUCCESS]
> - start local cluster and submit word count example [SUCCESS]
> - announcement PR for website looks good! (I have left a few comments)
> 
> Best,
> Jincheng
> 
> Jark Wu  于2019年9月6日周五 下午8:47写道:
> 
>> Hi everyone,
>> 
>> Please review and vote on the release candidate #1 for the version 1.8.2,
>> as follows:
>> [ ] +1, Approve the release
>> [ ] -1, Do not approve the release (please provide specific comments)
>> 
>> 
>> The complete staging area is available for your review, which includes:
>> * JIRA release notes [1],
>> * the official Apache source release and binary convenience releases to be
>> deployed to dist.apache.org [2], which are signed with the key with
>> fingerprint E2C45417BED5C104154F341085BACB5AEFAE3202 [3],
>> * all artifacts to be deployed to the Maven Central Repository [4],
>> * source code tag "release-1.8.2-rc1" [5],
>> * website pull request listing the new release and adding announcement blog
>> post [6].
>> 
>> The vote will be open for at least 72 hours.
>> Please cast your votes before *Sep. 11th 2019, 13:00 UTC*.
>> 
>> It is adopted by majority approval, with at least 3 PMC affirmative votes.
>> 
>> Thanks,
>> Jark
>> 
>> [1]
>> 
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345670
>> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.8.2-rc1/
>> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
>> [4] https://repository.apache.org/content/repositories/orgapacheflink-1262
>> [5]
>> 
>> https://github.com/apache/flink/commit/6322618bb0f1b7942d86cb1b2b7bc55290d9e330
>> [6] https://github.com/apache/flink-web/pull/262
>> 



Re: [VOTE] FLIP-58: Flink Python User-Defined Function for Table API

2019-09-09 Thread Dian Fu
Thanks Jincheng a lot for the remind and thanks all for the voting. I'm closing 
the vote now.
So far, the vote has received:
  - 5 binding +1 votes (Jincheng, Hequn, Jark, Shaoxuan, Becket)
  - 5 non-binding +1 votes (Wei, Xingbo, Terry, Yu, Jeff)
  - No 0/-1 votes

There are more than 3 binding +1 votes, no -1 votes, and the voting time has 
passed. According to the new bylaws, I'm glad to announce that FLIP-58 is 
approved. I'll update the FLIP wiki page accordingly.

Thanks,
Dian


> 在 2019年9月9日,上午10:38,jincheng sun  写道:
> 
> Hi all,
> 
> This VOTE looks like everyone agrees with the current FLIP.
> 
> Hi Time & Aljoscha Do you have any other comments after the ML discussion?
> [1]
> 
> Hi Dian, Could you announce the VOTE result and create a JIRA. for the FLIP
> today later, if there no other feedback?
> 
> Cheers,
> Jincheng
> 
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673i20.html#a32669
> 
> Becket Qin  于2019年9月2日周一 下午1:32写道:
> 
>> +1
>> 
>> It is extremely useful for ML users.
>> 
>> On Mon, Sep 2, 2019 at 9:46 AM Shaoxuan Wang  wrote:
>> 
>>> +1 (binding)
>>> 
>>> This will be a great feature for Flink users, especially for the data
>>> science and AI engineers.
>>> 
>>> Regards,
>>> Shaoxuan
>>> 
>>> 
>>> On Fri, Aug 30, 2019 at 1:35 PM Jeff Zhang  wrote:
>>> 
 +1, very looking forward this feature in flink 1.10
 
 
 Yu Li  于2019年8月30日周五 上午11:08写道:
 
> +1 (non-binding)
> 
> Thanks for driving this!
> 
> Best Regards,
> Yu
> 
> 
> On Fri, 30 Aug 2019 at 11:01, Terry Wang  wrote:
> 
>> +1. That would be very helpful.
>> Best,
>> Terry Wang
>> 
>> 
>> 
>>> 在 2019年8月30日,上午10:18,Jark Wu  写道:
>>> 
>>> +1
>>> 
>>> Thanks for the great work!
>>> 
>>> On Fri, 30 Aug 2019 at 10:04, Xingbo Huang 
 wrote:
>>> 
 Hi Dian,
 
 +1,
 Thanks a lot for driving this.
 
 Best,
 Xingbo
> 在 2019年8月30日,上午9:39,Wei Zhong  写道:
> 
> Hi Dian,
> 
> +1 non-binding
> Thanks for driving this!
> 
> Best, Wei
> 
>> 在 2019年8月29日,09:25,Hequn Cheng  写道:
>> 
>> Hi Dian,
>> 
>> +1
>> Thanks a lot for driving this.
>> 
>> Best, Hequn
>> 
>> On Wed, Aug 28, 2019 at 2:01 PM jincheng sun <
>> sunjincheng...@gmail.com>
>> wrote:
>> 
>>> Hi Dian,
>>> 
>>> +1, Thanks for your great job!
>>> 
>>> Best,
>>> Jincheng
>>> 
>>> Dian Fu  于2019年8月28日周三 上午11:04写道:
>>> 
 Hi all,
 
 I'd like to start a voting thread for FLIP-58 [1] since that
>>> we
> have
 reached an agreement on the design in the discussion thread
>>> [2],
 
 This vote will be open for at least 72 hours. Unless there
>> is
>>> an
 objection, I will try to close it by Sept 2, 2019 00:00 UTC
>> if
 we
>> have
 received sufficient votes.
 
 PS: This doesn't mean that we cannot further improve the
>>> design.
> We
 can
 still discuss the implementation details case by case in the
 JIRA
> as
 long
 as it doesn't affect the overall design.
 
 [1]
 
>>> 
 
>> 
> 
 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Function+for+Table+API
 <
 
>>> 
 
>> 
> 
 
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-58:+Flink+Python+User-Defined+Function+for+Table+API
> 
 [2]
 
>>> 
 
>> 
> 
 
>>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
 <
 
>>> 
 
>> 
> 
 
>>> 
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html
> 
 
 Thanks,
 Dian
>>> 
> 
 
 
>> 
>> 
> 
 
 
 --
 Best Regards
 
 Jeff Zhang
 
>>> 
>> 



[jira] [Created] (FLINK-14013) Support Flink Python User-Defined Stateless Function for Table

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14013:
---

 Summary: Support Flink Python User-Defined Stateless Function for 
Table
 Key: FLINK-14013
 URL: https://issues.apache.org/jira/browse/FLINK-14013
 Project: Flink
  Issue Type: New Feature
  Components: API / Python, Table SQL / API
Affects Versions: 1.10.0
Reporter: Dian Fu
 Fix For: 1.10.0


The Python Table API has been supported in release 1.9.0. See the 
[FLIP-38|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API]]
 and FLINK-12308 for details. However, currently Python user-defined functions 
are still not supported. In this FLIP, we want to support stateless Python 
user-defined functions in Python Table API.

More detailed description can be found in 
[FLIP-58|[https://cwiki.apache.org/confluence/display/FLINK/FLIP-58%3A+Flink+Python+User-Defined+Stateless+Function+for+Table]].

The discussion can be found in [mailing 
thread|[http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Python-User-Defined-Function-for-Table-API-td31673.html]].



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14014) Introduce PythonScalarFunctionRunner to handle the communication with Python worker for Python ScalarFunction execution

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14014:
---

 Summary: Introduce PythonScalarFunctionRunner to handle the 
communication with Python worker for Python ScalarFunction execution
 Key: FLINK-14014
 URL: https://issues.apache.org/jira/browse/FLINK-14014
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


PythonScalarFunctionRunner is responsible for Python ScalarFunction execution 
and it only handles the Python ScalarFunction execution and nothing else. So 
its logic should be very simple, forwarding an input element to Python worker 
and fetching the execution results back:
# Internally, it uses Apache Beam’s portability for Python UDF execution and 
this is transparent for the caller of PythonScalarFunctionRunner
# By default, each runner will startup a separate Python worker
# The Python worker can run in a docker, a separate process or even an 
non-managed external service.
# It has the ability to execute multiple Python ScalarFunctions
# It also supports chained Python ScalarFunctions



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14015) Introduce PythonScalarFunctionOperator as a standalone StreamOperator for Python ScalarFunction execution

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14015:
---

 Summary: Introduce PythonScalarFunctionOperator as a standalone 
StreamOperator for Python ScalarFunction execution
 Key: FLINK-14015
 URL: https://issues.apache.org/jira/browse/FLINK-14015
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


PythonScalarFunctionOperator is a standalone StreamOperator and it doesn’t need 
to how the Python ScalarFunctions are executed which is the responsibility of 
PythonScalarFunctionRunner:
 # It is a StreamOperator which employs PythonScalarFunctionRunner for Python 
ScalarFunction execution
 # It sends input elements to PythonScalarFunctionRunner, fetches the execution 
results, constructs the result rows and sends them to the downstream operator
 # It should handle the checkpoint and watermark properly



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14016) Introduce RelNodes FlinkLogicalPythonScalarFunctionExec and DataStreamPythonScalarFunctionExec which are containers for Python PythonScalarFunctions

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14016:
---

 Summary: Introduce RelNodes FlinkLogicalPythonScalarFunctionExec 
and DataStreamPythonScalarFunctionExec which are containers for Python 
PythonScalarFunctions
 Key: FLINK-14016
 URL: https://issues.apache.org/jira/browse/FLINK-14016
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


Dedicated RelNodes such as FlinkLogicalPythonScalarFunctionExec and 
DataStreamPythonScalarFunctionExec should be introduced for Python 
ScalarFunction execution. These nodes exists as containers for Python 
ScalarFunctions which could be executed in a batch and then we can employ 
PythonScalarFunctionOperator for Python ScalarFunction execution.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14017) Support to start up Python worker in process mode

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14017:
---

 Summary: Support to start up Python worker in process mode
 Key: FLINK-14017
 URL: https://issues.apache.org/jira/browse/FLINK-14017
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


We employ Apache Beam's portability frameowork for the Python UDF execution. 
However, there is only a golang implementation for the boot script to start up 
SDK harness in Beam. It’s used by both the Python SDK harness and the Go SDK 
harness. This is not a problem for Beam. However, it’s indeed a problem for 
Flink as it indicates that the whole stack of Beam’s Go SDK harness will be 
depended if we use the golang implementation of the boot script. We want to 
avoid this by adding a Python boot script.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14018) Add Python building blocks to make sure the basic functionality of Python ScalarFunction could work

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14018:
---

 Summary: Add Python building blocks to make sure the basic 
functionality of Python ScalarFunction could work
 Key: FLINK-14018
 URL: https://issues.apache.org/jira/browse/FLINK-14018
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


We need to add a few Python building blocks such as ScalarFunctionOperation, 
BigIntCoder, VarcharCoder, etc for Python ScalarFunction execution. 
ScalarFunctionOperation is subclass of Operation in Beam and BigIntCoder, 
VarcharCoder, etc are subclasses of Coder in Beam. These classes will be 
registered into the Beam’s portability framework to make sure they take effects.

This PR makes sure that a basic end to end Python UDF could be executed.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14019) Python environment and dependency management

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14019:
---

 Summary: Python environment and dependency management
 Key: FLINK-14019
 URL: https://issues.apache.org/jira/browse/FLINK-14019
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


A Python user-defined functions may depend on third party dependencies. We 
should provide a proper way to handle it:
 # Provide a way to let users specifying the dependencies
 # Provide a way to let users specifying the Python used



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14020) User Apache Arrow as the serializer for data transmission between Java operator and Python harness

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14020:
---

 Summary: User Apache Arrow as the serializer for data transmission 
between Java operator and Python harness
 Key: FLINK-14020
 URL: https://issues.apache.org/jira/browse/FLINK-14020
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


Apache Arrow is "a cross-language development platform for in-memory data. It 
specifies a standardized language-independent columnar memory format for flat 
and hierarchical data, organized for efficient analytic operations on modern 
hardware". It has been widely used in many notable projects, such as Spark, 
Parquet, Pandas, etc. We could make use of Arrow as the data serializer between 
Java operator and Python harness.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14021) Add rules to push down the Python ScalarFunctions contained in the join condition of Correlate node

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14021:
---

 Summary: Add rules to push down the Python ScalarFunctions 
contained in the join condition of Correlate node
 Key: FLINK-14021
 URL: https://issues.apache.org/jira/browse/FLINK-14021
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


The Python ScalarFunctions contained in the join condition of Correlate node 
should be extracted to make sure the TableFunction works well.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14022) Add validation check for places where Python ScalarFunction cannot be used

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14022:
---

 Summary: Add validation check for places where Python 
ScalarFunction cannot be used
 Key: FLINK-14022
 URL: https://issues.apache.org/jira/browse/FLINK-14022
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


Currently, there are places where Python ScalarFunction could not be used, for 
example:
 # Python UDF could not be used in MatchRecognize
 # Python UDFs could not be used in Join condition which take the columns from 
both the left table and the right table as inputs

We should add validation check for places where it’s not supported.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14023) Support accessing job parameters in Python user-defined functions

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14023:
---

 Summary: Support accessing job parameters in Python user-defined 
functions
 Key: FLINK-14023
 URL: https://issues.apache.org/jira/browse/FLINK-14023
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


Currently, it’s possible to access job parameters in the Java user-defined 
functions. It could be used to define the behavior according to job parameters. 
It should also be supported for Python user-defined functions.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14024) Support use-defined metrics in Python user-defined functions

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14024:
---

 Summary: Support use-defined metrics in Python user-defined 
functions
 Key: FLINK-14024
 URL: https://issues.apache.org/jira/browse/FLINK-14024
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


We should support users to define a few metrics in the Python user-defined 
functions. Beam’s portability framework has provided a framework for metrics 
report. We could make use of it.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14025) Support to run the Python worker in docker mode

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14025:
---

 Summary: Support to run the Python worker in docker mode
 Key: FLINK-14025
 URL: https://issues.apache.org/jira/browse/FLINK-14025
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


The Python worker run in “Process” mode by default. Docker mode should be 
supported as well.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14026) Manage the resource of Python worker properly

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14026:
---

 Summary: Manage the resource of Python worker properly
 Key: FLINK-14026
 URL: https://issues.apache.org/jira/browse/FLINK-14026
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


For a Flink Table API & SQL job, if it uses Python user-defined functions, the 
Java operator will launch separate Python process for Python user-defined 
function execution. We should make sure that the resources used by the Python 
process are managed by Flink’s resource management framework.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14027) Add documentation for Python user-defined functions

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14027:
---

 Summary: Add documentation for Python user-defined functions
 Key: FLINK-14027
 URL: https://issues.apache.org/jira/browse/FLINK-14027
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python, Documentation
Reporter: Dian Fu
 Fix For: 1.10.0


We should add documentation about how to use Python user-defined functions.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14028) Support logging aggregation in Python user-defined functions

2019-09-09 Thread Dian Fu (Jira)
Dian Fu created FLINK-14028:
---

 Summary: Support logging aggregation in Python user-defined 
functions
 Key: FLINK-14028
 URL: https://issues.apache.org/jira/browse/FLINK-14028
 Project: Flink
  Issue Type: Sub-task
  Components: API / Python
Reporter: Dian Fu
 Fix For: 1.10.0


Beam's portability framework has provided the ability to collecting log to 
operator from the Python workers. We should make use of this functionality to 
collect the logging of Python user-defined functions and output them into the 
logging file of Java operator process. Then users could access the logging 
generated by the Python user-defined functions easily.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14029) Update Flink's Mesos scheduling behavior to reject all expired offers

2019-09-09 Thread Piyush Narang (Jira)
Piyush Narang created FLINK-14029:
-

 Summary: Update Flink's Mesos scheduling behavior to reject all 
expired offers
 Key: FLINK-14029
 URL: https://issues.apache.org/jira/browse/FLINK-14029
 Project: Flink
  Issue Type: Bug
Reporter: Piyush Narang


While digging into why our Flink jobs weren't being scheduled on our internal 
Mesos setup we noticed that we were hitting Mesos quota limits tied to the way 
we've set up the Fenzo (https://github.com/Netflix/Fenzo/) library defaults in 
the Flink project. 

Behavior we noticed was that we got a bunch of offers from our Mesos master 
(50+) out of which only 1 or 2 of them were super skewed and took up a huge 
chunk of our disk resource quota. Thanks to this we were not sent any new / 
different offers (as our usage at the time + resource offers reached our Mesos 
disk quota). As the Flink / Fenzo Mesos scheduling code was not using the 1-2 
skewed disk offers they end up expiring. The way we've set up the Fenzo 
scheduler is to use the default values on when to expire unused offers (120s) 
and maximum number of unused offer leases at a time (4). Unfortunately as we 
have a considerable number of outstanding expired offers (50+) we end up in a 
situation where we reject only 4 or so every 2 mins and we never get around to 
rejecting the super skewed disk ones which are stopping us from scheduling our 
Flink job. Thanks to this we end up in a situation where our job is waiting to 
be scheduled for more than an hour. 

An option to work around this is to reject all expired offers at 2 minute 
expiry time rather than hold on to them. This will allow Mesos to send 
alternate offers that might be scheduled by Fenzo. 




--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-09 Thread Biao Liu
Congrats, Kostas!

Thanks,
Biao /'bɪ.aʊ/



On Mon, 9 Sep 2019 at 16:07, JingsongLee 
wrote:

> Congrats, Kostas! Well deserved.
>
> Best,
> Jingsong Lee
>
>
> --
> From:Kostas Kloudas 
> Send Time:2019年9月9日(星期一) 15:50
> To:dev ; Yun Gao 
> Subject:Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC
>
> Thanks a lot everyone for the warm welcome!
>
> Cheers,
> Kostas
>
> On Mon, Sep 9, 2019 at 4:54 AM Yun Gao 
> wrote:
> >
> >   Congratulations, Kostas!
> >
> >  Best,
> >  Yun‍‍‍
> >
> >
> >
> >
> > --
> > From:Becket Qin 
> > Send Time:2019 Sep. 9 (Mon.) 10:47
> > To:dev 
> > Subject:Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC
> >
> > Congrats, Kostas!
> >
> > On Sun, Sep 8, 2019 at 11:48 PM myasuka  wrote:
> >
> > > Congratulations Kostas!
> > >
> > > Best
> > > Yun Tang
> > >
> > >
> > >
> > > --
> > > Sent from:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
> > >
> >
>
>


[jira] [Created] (FLINK-14030) Nonequivalent conversion happens in Table planner

2019-09-09 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-14030:
--

 Summary: Nonequivalent conversion happens in Table planner 
 Key: FLINK-14030
 URL: https://issues.apache.org/jira/browse/FLINK-14030
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.9.0
Reporter: Leonard Xu
 Fix For: 2.0.0


~testAllApis()~ unit tests will run fail because planner make a conversion
from ~[ifThenElse(isNull(plus(f0, f1)), 'null', 'not null')]~
to ~[CASE(OR(IS NULL($0), IS NULL($1)), _UTF-16LE'null', _UTF-16LE'not null')]~
which is not a equivalence conversion. The result of expression 'f0 + 'f1  
should be null
when the result overflows even if its two operands both are not null. 

It's easy to reproduce  as following:
 ^testAllApis(
 'f0 + 'f1,
 "f1 + f1",
 "f1 + f1",
 "null")// the result should be null because overflow

 override def testData: Row = {
val testData = new Row(2)
testData.setField(0, BigDecimal("1e10").bigDecimal)
testData.setField(1, BigDecimal("0").bigDecimal)
testData
  }
  
  override def typeInfo: RowTypeInfo = {
new RowTypeInfo(
  /* 0 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 10)),
  /* 1 */ fromLogicalTypeToTypeInfo(DECIMAL(38, 28)) 
)
  }^





--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14031) flink-examples should add blink dependency on flink-examples-table

2019-09-09 Thread Jimmy Wong (Jira)
Jimmy Wong created FLINK-14031:
--

 Summary: flink-examples should add blink dependency on 
flink-examples-table
 Key: FLINK-14031
 URL: https://issues.apache.org/jira/browse/FLINK-14031
 Project: Flink
  Issue Type: Bug
  Components: Examples
Affects Versions: 1.9.0
Reporter: Jimmy Wong
 Fix For: 1.9.0


The flink-examples-table module is missing Blink dependency. If I run a blink 
example with Intellij IDEA, have error:

 
{code:java}
Exception in thread "main" org.apache.flink.table.api.TableException: Could not 
instantiate the executor. Make sure a planner module is on the 
classpathException in thread "main" org.apache.flink.table.api.TableException: 
Could not instantiate the executor. Make sure a planner module is on the 
classpath at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.lookupExecutor(StreamTableEnvironmentImpl.java:140)
 at 
org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:110)
 at 
org.apache.flink.table.api.java.StreamTableEnvironment.create(StreamTableEnvironment.java:112)
 at 
org.apache.flink.table.examples.java.BlinkStreamSQL.main(BlinkStreamSQL.java:19)Caused
 by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find 
a suitable table factory for 
'org.apache.flink.table.delegation.ExecutorFactory' inthe classpath.
{code}
*But*, once I add Blink dependency, it'll work well.

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [VOTE] FLIP-49: Unified Memory Configuration for TaskExecutors

2019-09-09 Thread Stephan Ewen
One thing that I just came across: Some of these options should also have a
corresponding value for the JobManager, like JVM overhead, metaspace,
direct memory.

On Fri, Sep 6, 2019 at 4:34 AM Xintong Song  wrote:

> Thanks all for the votes.
> So far, we have
>
>- 4 binding +1 votes (Stephan, Andrey, Till, Zhijiang)
>- 2 un-binding +1 votes (Xintong, Yu)
>- No -1 votes
>
> There are more than 3 binding +1 votes and no -1 votes, and the voting time
> has past. According to the new bylaws, I'm happily to announce that FLIP-49
> is approved to be adopted by Apache Flink.
>
> Regarding the minors mentioned during the voting, if there's no objection,
> I would like to update the FLIP document with the followings
>
>- Exclude JVM Overhead from '-XX:MaxDirectMemorySize'
>- Add a 'Follow-Up' section, with the follow-ups of web ui and
>documentation issues
>- Add a 'Limitation' section, with the Unsafe Java 12 support issue
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Fri, Sep 6, 2019 at 10:28 AM Xintong Song 
> wrote:
>
> > +1 (non-binding) from my side.
> >
> > @Yu, thanks for the vote.
> > - The current FLIP document already mentioned the issue that Unsafe is
> not
> > supported in Java 12, in the section 'Unifying Explicit and Implicit
> Memory
> > Allocation'. It makes sense to me to emphasize this by adding a separate
> > limitation section.
> > - I think we should also update the FLIP document if we change the config
> > names later in PRs. But I would not consider this as a major change to
> the
> > FLIP that requires another vote, especially when we already agreed during
> > this vote to revisit the config names at implementation stage.
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Fri, Sep 6, 2019 at 2:43 AM Yu Li  wrote:
> >
> >> +1 (non-binding)
> >>
> >> Minor:
> >> 1. Is it worth a separate "Limitations" section to contain all relative
> >> information like the Unsafe support issue in Java 12, just like many
> other
> >> FLIPs?
> >> 2. About the config names, if we change them later in PR, does it mean
> we
> >> will need to update the FLIP document? If so, it seems we need another
> >> vote
> >> after the modification according to current bylaw? Or maybe we could
> just
> >> create a subpage under the FLIP and only re-vote on that part later?
> >>
> >> Thanks.
> >>
> >> Best Regards,
> >> Yu
> >>
> >>
> >> On Thu, 5 Sep 2019 at 00:52, Stephan Ewen  wrote:
> >>
> >> > Let's not block on config key names, just go ahead and we figure this
> >> out
> >> > concurrently or on the PR later.
> >> >
> >> >
> >> > On Wed, Sep 4, 2019 at 3:48 PM Stephan Ewen  wrote:
> >> >
> >> > > Maybe to clear up confusion about my suggestion:
> >> > >
> >> > > I would vote to keep the name of the config parameter
> >> > > "taskmanager.memory.network" because it is the same key as currently
> >> > (good
> >> > > to not break things unless good reason) and there currently is no
> >> case or
> >> > > even a concrete follow-up where we would actually differentiate
> >> between
> >> > > different types of network memory.
> >> > >
> >> > > I would suggest to not prematurely rename this because of something
> >> that
> >> > > might happen in the future. Experience shows that its better to do
> >> these
> >> > > things when the actual change comes.
> >> > >
> >> > > Side note: I am not sure "shuffle" is a good term in this context. I
> >> have
> >> > > so far only heard that in batch contexts, which is not what we do
> >> here.
> >> > One
> >> > > more reason for me to not pre-maturely change names.
> >> > >
> >> > > On Wed, Sep 4, 2019 at 10:56 AM Xintong Song  >
> >> > > wrote:
> >> > >
> >> > >> @till
> >> > >>
> >> > >> > Just to clarify Xintong, you suggest that Task off-heap memory
> >> > >> represents
> >> > >> > direct and native memory. Since we don't know how the user will
> >> > allocate
> >> > >> > the memory we will add this value to -XX:MaxDirectMemorySize so
> >> that
> >> > the
> >> > >> > process won't fail if the user allocates only direct memory and
> no
> >> > >> native
> >> > >> > memory. Is that correct?
> >> > >> >
> >> > >> Yes, this is what I mean.
> >> > >>
> >> > >>
> >> > >> Thank you~
> >> > >>
> >> > >> Xintong Song
> >> > >>
> >> > >>
> >> > >>
> >> > >> On Wed, Sep 4, 2019 at 4:25 PM Till Rohrmann  >
> >> > >> wrote:
> >> > >>
> >> > >> > Just to clarify Xintong, you suggest that Task off-heap memory
> >> > >> represents
> >> > >> > direct and native memory. Since we don't know how the user will
> >> > allocate
> >> > >> > the memory we will add this value to -XX:MaxDirectMemorySize so
> >> that
> >> > the
> >> > >> > process won't fail if the user allocates only direct memory and
> no
> >> > >> native
> >> > >> > memory. Is that correct?
> >> > >> >
> >> > >> > Cheers,
> >> > >> > Till
> >> > >> >
> >> > >> > On Wed, Sep 4, 2019 at 10:18 AM Xintong Song <
> >> tonysong...@gmail.com>
> >> > >> > wrote:
> >> > >> >
> >> > >> > > @Stephan
> >> > >> > > Not sure

[jira] [Created] (FLINK-14032) Make the cache size of RocksDBPriorityQueueSetFactory configurable

2019-09-09 Thread Yun Tang (Jira)
Yun Tang created FLINK-14032:


 Summary: Make the cache size of RocksDBPriorityQueueSetFactory 
configurable
 Key: FLINK-14032
 URL: https://issues.apache.org/jira/browse/FLINK-14032
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / State Backends
Reporter: Yun Tang
 Fix For: 1.10.0


Currently, the cache size of {{RocksDBPriorityQueueSetFactory}} has been set as 
128 and no any ways to configure this to other value. (We could increase this 
to obtain better performance if necessary). Actually, this is also a TODO for 
quiet a long time.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14033) Distributed caches are not registered in Yarn Per Job Cluster Mode

2019-09-09 Thread Zhenqiu Huang (Jira)
Zhenqiu Huang created FLINK-14033:
-

 Summary: Distributed caches are not registered in Yarn Per Job 
Cluster Mode
 Key: FLINK-14033
 URL: https://issues.apache.org/jira/browse/FLINK-14033
 Project: Flink
  Issue Type: Bug
  Components: Deployment / YARN
Affects Versions: 1.9.0
Reporter: Zhenqiu Huang


CacheFiles in StreamExecutionEnvironment is not used in Job Submission in the 
Yarn per job cluster mode. Compare to the job submission in session cluster 
mode that will upload distributed cache files onto http server in application 
master, we should get the cache files in job graph and register them into blob 
store in YarnJobClusterEntrypoint.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14034) In FlinkKafkaProducer, KafkaTransactionState should be made public or invoke should be made final

2019-09-09 Thread Niels van Kaam (Jira)
Niels van Kaam created FLINK-14034:
--

 Summary: In FlinkKafkaProducer, KafkaTransactionState should be 
made public or invoke should be made final
 Key: FLINK-14034
 URL: https://issues.apache.org/jira/browse/FLINK-14034
 Project: Flink
  Issue Type: Wish
  Components: Connectors / Kafka
Affects Versions: 1.9.0
Reporter: Niels van Kaam


It is not possible to override the invoke method of the FlinkKafkaProducer, 
because the first parameter, KafkaTransactionState, is a private inner class.  
It is not possible to override the original invoke of SinkFunction, because 
TwoPhaseCommitSinkFunction (which is implemented by FlinkKafkaProducer) does 
override the original invoke method with final.

[https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java]

If there is a particular reason for this, I think it would be better to make 
the invoke method in FlinkKafkaProducer final as well, and document the reason 
such that it is clear this is by design (I don't see any overrides in the same 
package).

Otherwise, I would make the KafkaTransactionState publicly visible. I would 
like to override the Invoke method to create a custom KafkaProducer which 
performs some additional generic validations and transformations. (which can 
also be done in a process-function, but a custom sink would simplify the code 
of jobs)

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[DISCUSS] modular built-in functions

2019-09-09 Thread Bowen Li
Hi all,

During the discussion of how to support Hive built-in functions in Flink in
FLIP-57 [1], an idea of "modular built-in functions" was brought up with
examples of "Extension" in Postgres [2] and "Plugin" in Presto [3]. Thus
I'd like to kick off a discussion to see if we should adopt such an
approach.

I try to summarize basics of the idea:
- functions from modules (e.g. Geo, ML) can be loaded into Flink as
built-in functions
- modules can be configured with order, discovered using SPI or set via
code like "catalogManager.setFunctionModules(CoreFunctions, GeoFunctions,
HiveFunctions)"
- built-in functions from external systems, like Hive, can be packaged
into such a module

I took time and researched Presto Plugin and Postgres Extension, and here
are some of my findings.

Presto:
- "Presto's Catalog associated with a connector, and a catalog only
contains schemas and references a data source via a connector." [4] A
Presto catalog doesn't have the concept of catalog functions, thus all
Presto functions don't have namespaces. Neither does Presto have function
DDL [5].
- Plugin are not specific to functions - "Plugins can provide
additional Connectors, Types, Functions, and System Access Control" [6]
- Thus, I feel a Plugin in Presto acts more as a "catalog" which is
similar to catalogs in Flink. Since all Presto functions don't have
namespaces, it probably can be seen as a built-in function module.

Postgres:
- Postgres extension is always installed to a schema, not the entire
cluster. There's a "schema_name" param in extension creation DDL - "The
name of the schema in which to install the extension's objects, given that
the extension allows its contents to be relocated. The named schema must
already exist. If not specified, and the extension's control file does not
specify a schema either, the current default object creation schema is
used." [7]  Thus it also acts as "catalog" for schema, and thus functions
in extension are not built-in functions to Postgres.

Therefore, I feel the examples are not exactly the "built-in function
modules" that were brought up, but feel free to correct me if I'm wrong.

Going back to the idea itself, besides it seems to be a simpler concept and
design in some ways, I have two concerns:
1. The major one is still on name resolution - how to deal with name
collisions?
- Not allowing duplicated name won't work for Hive built-in functions
as many of them are dup named with Flink's, so we must allow modules
containing same named functions to be registered
- One assumption of this approach seems to be, given modules are
specified in order, functions from modules can be overrode according to the
order?
- If so, how can users reference a function that is overrode in the
above case (E.g. I may want to switch KMEANS between modules ML1 and ML2
with different implementations)?
 - If it's supported, it seems we still need some new syntax?
 - If it's not supported, that seems to be a major limitation for
users
2. The minor one is, allowing built-in functions from external system to be
accessed within Flink so widely can bring performance issue to users' jobs
- Unlike the potential native Flink Geo or ML functions, built-in
functions from external systems come with a pretty big performance penalty
in Flink due to data conversions and different invocation mechanism.
Supporting Hive built-in functions is mainly for simplifying migration from
Hive. I'm not sure if it makes sense when a user job has nothing to do with
Hive data but unintentionally ends up using Hive built-in functions without
knowing it's penalized on performance. Though doc can help to some extent,
not all users really read docs in detail.

An alternative is to treat "function modules" as catalog.
- For Flink native function modules like Geo or ML, they can be discovered
and registered automatically at runtime with a predefined catalog name in
itself, like "ml" or "ml1", which should be unique. Their functions are
considered as built-in functions to the catalog, and can be referenced, in
some new syntax like "catalog::func", as "ml:kmeans" and "ml1:kmeans".
- For built-in functions from external systems (e.g. Hive), they have to be
referenced either as "catalog::func" to make sure users are explicitly
expecting those external functions, or as complementary built-in functions
to Flink if a config "enable_hive_built_in_functions" in HiveCatalog is
turned on.

Either approach seems to have its own benefits, and I'm open for discussion
and would like to hear others' opinions and use cases where a specific
solution is required.

Thanks,
Bowen


[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-57-Rework-FunctionCatalog-td32291.html
[2] https://www.postgresql.org/docs/10/extend-extensions.html
[3] https://prestodb.github.io/docs/current/develop/functions.html
[4]
https://prestodb.github.io/docs/current/overview/concepts.html#data-sources

[jira] [Created] (FLINK-14035) Introduce/Change some log for snapshot to better analysis checkpoint problem

2019-09-09 Thread Congxian Qiu(klion26) (Jira)
Congxian Qiu(klion26) created FLINK-14035:
-

 Summary: Introduce/Change some log for snapshot to better analysis 
checkpoint problem
 Key: FLINK-14035
 URL: https://issues.apache.org/jira/browse/FLINK-14035
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Checkpointing
Affects Versions: 1.10.0
Reporter: Congxian Qiu(klion26)


Currently, the information for checkpoint are mostly debug log (especially on 
TM side). If we want to track where the checkpoint steps and consume time 
during each step when we have a failed checkpoint or the checkpoint time is too 
long, we need to restart the job with enabling debug log, this issue wants to 
improve this situation, wants to change some exist debug log from debug to 
info, and add some more debug log.  we have changed this log level in our 
production in Alibaba, and it seems no problem until now.

 

Detail
{{change the log below from debug level to info}} 
 * log about \{{Starting checkpoint xxx }} in TM  side
 * log about Sync complete in TM  side
 * log about async compete in TM  side

Add debug log 
 *  log about receiving the barrier  for exactly once mode  - align from at 
lease once mode

 

If this issue is valid, then I'm happy to contribute it.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] Contribute Pulsar Flink connector back to Flink

2019-09-09 Thread Becket Qin
Hi Sijie,

If we agree that the goal is to have Pulsar connector in 1.10, how about we
do the following:

0. Start a FLIP to add Pulsar connector to Flink main repo as it is a new
public interface to Flink main repo.
1. Start to review the Pulsar sink right away as there is no change to the
sink interface so far.
2. Wait a little bit on FLIP-27. Flink 1.10 is going to be code freeze in
late Nov and let's say we give a month to the development and review of
Pulsar connector, we need to have FLIP-27 by late Oct. There are still 7
weeks. Personally I think it is doable. If FLIP-27 is not ready by late
Oct, we can review and check in Pulsar connector with the existing source
interface. This means we will have Pulsar connector in Flink 1.10, either
with or without FLIP-27.

Because we are going to have Pulsar sink and source checked in separately,
it might make sense to have two FLIPs, one for Pulsar sink and another for
Pulsar source. And we can start the work on Pulsar sink right away.

Thanks,

Jiangjie (Becket) Qin

On Mon, Sep 9, 2019 at 4:13 PM Sijie Guo  wrote:

> Thank you Bowen and Becket.
>
> What's the take from Flink community? Shall we wait for FLIP-27 or shall we
> proceed to next steps? And what the next steps are? :-)
>
> Thanks,
> Sijie
>
> On Thu, Sep 5, 2019 at 2:43 PM Bowen Li  wrote:
>
> > Hi,
> >
> > I think having a Pulsar connector in Flink can be a good mutual benefit
> to
> > both communities.
> >
> > Another perspective is that Pulsar connector is the 1st streaming
> connector
> > that integrates with Flink's metadata management system and Catalog APIs.
> > It'll be cool to see how the integration turns out and whether we need to
> > improve Flink Catalog stack, which are currently in Beta, to cater to
> > streaming source/sink. Thus I'm in favor of merging Pulsar connector into
> > Flink 1.10.
> >
> > I'd suggest to submit smaller sized PRs, e.g. maybe one for basic
> > source/sink functionalities and another for schema and catalog
> integration,
> > just to make them easier to review.
> >
> > It doesn't seem to hurt to wait for FLIP-27. But I don't think FLIP-27
> > should be a blocker in cases where it cannot make its way into 1.10 or
> > doesn't leave reasonable amount of time for committers to review or for
> > Pulsar connector to fully adapt to new interfaces.
> >
> > Bowen
> >
> >
> >
> > On Thu, Sep 5, 2019 at 3:21 AM Becket Qin  wrote:
> >
> > > Hi Till,
> > >
> > > You are right. It all depends on when the new source interface is going
> > to
> > > be ready. Personally I think it would be there in about a month or so.
> > But
> > > I could be too optimistic. It would also be good to hear what do
> Aljoscha
> > > and Stephan think as they are also involved in FLIP-27.
> > >
> > > In general I think we should have Pulsar connector in Flink 1.10,
> > > preferably with the new source interface. We can also check it in right
> > now
> > > with old source interface, but I suspect few users will use it before
> the
> > > next official release. Therefore, it seems reasonable to wait a little
> > bit
> > > to see whether we can jump to the new source interface. As long as we
> > make
> > > sure Flink 1.10 has it, waiting a little bit doesn't seem to hurt much.
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Thu, Sep 5, 2019 at 3:59 PM Till Rohrmann 
> > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I'm wondering what the problem would be if we committed the Pulsar
> > > > connector before the new source interface is ready. If I understood
> it
> > > > correctly, then we need to support the old source interface anyway
> for
> > > the
> > > > existing connectors. By checking it in early I could see the benefit
> > that
> > > > our users could start using the connector earlier. Moreover, it would
> > > > prevent that the Pulsar integration is being delayed in case that the
> > > > source interface should be delayed. The only downside I see is the
> > extra
> > > > review effort and potential fixes which might be irrelevant for the
> new
> > > > source interface implementation. I guess it mainly depends on how
> > certain
> > > > we are when the new source interface will be ready.
> > > >
> > > > Cheers,
> > > > Till
> > > >
> > > > On Thu, Sep 5, 2019 at 8:56 AM Becket Qin 
> > wrote:
> > > >
> > > > > Hi Sijie and Yijie,
> > > > >
> > > > > Thanks for sharing your thoughts.
> > > > >
> > > > > Just want to have some update on FLIP-27. Although the FLIP wiki
> and
> > > > > discussion thread has been quiet for some time, a few committer /
> > > > > contributors in Flink community were actually prototyping the
> entire
> > > > thing.
> > > > > We have made some good progress there but want to update the FLIP
> > wiki
> > > > > after the entire thing is verified to work in case there are some
> > last
> > > > > minute surprise in the implementation. I don't have an exact ETA
> yet,
> > > > but I
> > > > > guess it is going to be within a month or so.
> > > > >
> >

[jira] [Created] (FLINK-14036) function log(f0,f1) in Table API do not support decimal type

2019-09-09 Thread Leonard Xu (Jira)
Leonard Xu created FLINK-14036:
--

 Summary:  function log(f0,f1) in Table API  do not support decimal 
type 
 Key: FLINK-14036
 URL: https://issues.apache.org/jira/browse/FLINK-14036
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.9.0
Reporter: Leonard Xu
 Fix For: 2.0.0


function log(f0,f1) in Table API module do not support decimal type, but it 
works in  Table SQL module. The following code will run fail:

 
{code:java}
testTableApi(
 'f0.log(f1),
 "log(f0,f1)",
 "2.0")

 override def testData: Row = {
val testData = new Row(2)
testData.setField(0, BigDecimal("3").bigDecimal)
testData.setField(1, 9)
testData
  }
  
  override def typeInfo: RowTypeInfo = {
new RowTypeInfo(
  /* 0 */ fromLogicalTypeToTypeInfo(DECIMAL(1, 0)),
  /* 1 */ Types.INT 
)
  }{code}
 

The real cause is that the return type of *log()* function must be Double 
type,planner will cast all oprands' type to Double Type before function 
execution,  however *org.apache.flink.table.planner.typeutils.TypeCoercion* can 
not yet cast Decimal type to Double type。



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-14037) Deserializing the input/output formats failed: unread block data

2019-09-09 Thread liupengcheng (Jira)
liupengcheng created FLINK-14037:


 Summary: Deserializing the input/output formats failed: unread 
block data
 Key: FLINK-14037
 URL: https://issues.apache.org/jira/browse/FLINK-14037
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.9.0
 Environment: flink 1.9.0

app jar use `flink-shaded-hadoop-2` dependencies to avoid some confilicts

 
Reporter: liupengcheng


Recently, we encountered the following issue when testing flink 1.9.0:
{code:java}
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: 8ffbc071dda81d6f8005c02be8adde6b)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at 
org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:539)
at 
com.github.ehiggs.spark.terasort.FlinkTeraSort$.main(FlinkTeraSort.scala:89)
at 
com.github.ehiggs.spark.terasort.FlinkTeraSort.main(FlinkTeraSort.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1886)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:382)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
server error., (JobManagerRunner.java:152)
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:83)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:375)
at 
org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot 
initialize task 'Dat

[jira] [Created] (FLINK-14038) ExecutionGraph deploy failed due to akka timeout

2019-09-09 Thread liupengcheng (Jira)
liupengcheng created FLINK-14038:


 Summary: ExecutionGraph deploy failed due to akka timeout
 Key: FLINK-14038
 URL: https://issues.apache.org/jira/browse/FLINK-14038
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.9.0
 Environment: Flink on yarn

Flink 1.9.0
Reporter: liupengcheng


When launching the flink application, the following error was reported, I 
downloaded the operator logs, but still have no clue. The operator logs 
provided no useful information and was cancelled directly.

JobManager logs:
{code:java}
java.lang.IllegalStateException: Update task on TaskManager 
container_e860_1567429198842_571077_01_06 @ zjy-hadoop-prc-st320.bj 
(dataPort=50990) failed due to:
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$sendUpdatePartitionInfoRpcCall$14(Execution.java:1395)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: 
akka.pattern.AskTimeoutException: Ask timed out on 
[Actor[akka.tcp://fl...@zjy-hadoop-prc-st320.bj:62051/user/taskmanager_0#-171547157]]
 after [1 ms]. Message of type 
[org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation]. A typical reason 
for `AskTimeoutException` is that the recipient actor didn't send a reply.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:871)
at akka.dispatch.OnComplete.internal(Future.scala:263)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at 
akka.pattern.PromiseActorRef$$anonfun$1.apply$mcV$sp(AskSupport.scala:644)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
at 
scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
at 
scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

2019-09-09 Thread Dian Fu
Hi Jingsong,

Good point!

1. If it doesn't matter which task performs the finalize work, then I think 
task-0 suggested by Jark is a very good solution.
2. If it requires the last finished task to perform the finalize work, then we 
have to consider other solutions. 
  WRT fault-tolerant of StreamingRuntimeContext#getGlobalAggregateManager, 
AFAIK, there is no built-in support.
1) Regarding to TM failover, I think it's not a problem. We can use an 
accumulator i.e. finish_count and it is increased by 1 when a sub-task is 
finished(i.e. close() method is called).
   When finish_count == RuntimeContext.getNumberOfParallelSubtasks() for 
some sub-task, then we can know that it's the last finished sub-task. This 
holds true even in case of TM failover.
2) Regarding to JM failover, I have no idea how to work around it so far. 
Maybe @Jamie Grier who is the author of this feature could share more thoughts. 
Not sure if there is already solution/plan to support JM failover or this 
feature is not designed for this kind of use case?

Regards,
Dian

> 在 2019年9月9日,下午3:08,shimin yang  写道:
> 
> Hi Jingsong,
> 
> Although it would be nice if the accumulators in GlobalAggregateManager is
> fault-tolerant, we could still take advantage of managed state to guarantee
> the semantic and use the accumulators to implement distributed barrier or
> lock to solve the distributed access problem.
> 
> Best,
> Shimin
> 
> JingsongLee  于2019年9月9日周一 下午1:33写道:
> 
>> Thanks jark and dian:
>> 1.jark's approach: do the work in task-0. Simple way.
>> 2.dian's approach: use StreamingRuntimeContext#getGlobalAggregateManager
>> Can do more operation. But these accumulators are not fault-tolerant?
>> 
>> Best,
>> Jingsong Lee
>> 
>> 
>> --
>> From:shimin yang 
>> Send Time:2019年9月6日(星期五) 15:21
>> To:dev 
>> Subject:Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete
>> 
>> Hi Fu,
>> 
>> That'll be nice.
>> 
>> Thanks.
>> 
>> Best,
>> Shimin
>> 
>> Dian Fu  于2019年9月6日周五 下午3:17写道:
>> 
>>> Hi Shimin,
>>> 
>>> It can be guaranteed to be an atomic operation. This is ensured by the
>> RPC
>>> framework. You could take a look at RpcEndpoint for more details.
>>> 
>>> Regards,
>>> Dian
>>> 
 在 2019年9月6日,下午2:35,shimin yang  写道:
 
 Hi Fu,
 
 Thank you for the remind. I think it would work in my case as long as
>>> it's
 an atomic operation.
 
 Dian Fu  于2019年9月6日周五 下午2:22写道:
 
> Hi Jingsong,
> 
> Thanks for bring up this discussion. You can try to look at the
> GlobalAggregateManager to see if it can meet your requirements. It can
>>> be
> got via StreamingRuntimeContext#getGlobalAggregateManager().
> 
> Regards,
> Dian
> 
>> 在 2019年9月6日,下午1:39,shimin yang  写道:
>> 
>> Hi Jingsong,
>> 
>> Big fan of this idea. We faced the same problem and resolved by
>> adding
>>> a
>> distributed lock. It would be nice to have this feature in JobMaster,
> which
>> can replace the lock.
>> 
>> Best,
>> Shimin
>> 
>> JingsongLee  于2019年9月6日周五
>> 下午12:20写道:
>> 
>>> Hi devs:
>>> 
>>> I try to implement streaming file sink for table[1] like
> StreamingFileSink.
>>> If the underlying is a HiveFormat, or a format that updates
>> visibility
>>> through a metaStore, I have to update the metaStore in the
>>> notifyCheckpointComplete, but this operation occurs on the task
>> side,
>>> which will lead to distributed access to the metaStore, which will
>>> lead to bottleneck.
>>> 
>>> So I'm curious if we can support notifyOnMaster for
>>> notifyCheckpointComplete like FinalizeOnMaster.
>>> 
>>> What do you think?
>>> 
>>> [1]
>>> 
> 
>>> 
>> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
>>> 
>>> Best,
>>> Jingsong Lee
> 
> 
>>> 
>>> 
>> 



Re: [VOTE] Release 1.8.2, release candidate #1

2019-09-09 Thread Kurt Young
+1 (binding)

- build from source and passed all tests locally
- checked the difference between 1.8.1 and 1.8.2, no legal risk found
- went through all commits checked in between 1.8.1 and 1.8.2, make
sure all the issues set the proper "fixVersion" property

Best,
Kurt


On Mon, Sep 9, 2019 at 8:45 PM Dian Fu  wrote:

> +1 (non-binding)
>
> - built from source successfully (mvn clean install -DskipTests)
> - checked gpg signature and hashes of the source release and binary
> release packages
> - All artifacts have been deployed to the maven central repository
> - no new dependencies were added since 1.8.1
> - run a couple of tests in IDE success
>
> Regards,
> Dian
>
> > 在 2019年9月9日,下午2:28,jincheng sun  写道:
> >
> > +1 (binding)
> >
> > - checked signatures [SUCCESS]
> > - built from source without tests [SUCCESS]
> > - ran some tests in IDE [SUCCESS]
> > - start local cluster and submit word count example [SUCCESS]
> > - announcement PR for website looks good! (I have left a few comments)
> >
> > Best,
> > Jincheng
> >
> > Jark Wu  于2019年9月6日周五 下午8:47写道:
> >
> >> Hi everyone,
> >>
> >> Please review and vote on the release candidate #1 for the version
> 1.8.2,
> >> as follows:
> >> [ ] +1, Approve the release
> >> [ ] -1, Do not approve the release (please provide specific comments)
> >>
> >>
> >> The complete staging area is available for your review, which includes:
> >> * JIRA release notes [1],
> >> * the official Apache source release and binary convenience releases to
> be
> >> deployed to dist.apache.org [2], which are signed with the key with
> >> fingerprint E2C45417BED5C104154F341085BACB5AEFAE3202 [3],
> >> * all artifacts to be deployed to the Maven Central Repository [4],
> >> * source code tag "release-1.8.2-rc1" [5],
> >> * website pull request listing the new release and adding announcement
> blog
> >> post [6].
> >>
> >> The vote will be open for at least 72 hours.
> >> Please cast your votes before *Sep. 11th 2019, 13:00 UTC*.
> >>
> >> It is adopted by majority approval, with at least 3 PMC affirmative
> votes.
> >>
> >> Thanks,
> >> Jark
> >>
> >> [1]
> >>
> >>
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345670
> >> [2] https://dist.apache.org/repos/dist/dev/flink/flink-1.8.2-rc1/
> >> [3] https://dist.apache.org/repos/dist/release/flink/KEYS
> >> [4]
> https://repository.apache.org/content/repositories/orgapacheflink-1262
> >> [5]
> >>
> >>
> https://github.com/apache/flink/commit/6322618bb0f1b7942d86cb1b2b7bc55290d9e330
> >> [6] https://github.com/apache/flink-web/pull/262
> >>
>
>


Re: [DISCUSS] FLIP-63: Rework table partition support

2019-09-09 Thread Biao Liu
Hi Jingsong,

Thank you for bringing this discussion. Since I don't have much experience
of Flink table/SQL, I'll ask some questions from runtime or engine
perspective.

> ... where we describe how to partition support in flink and how to
integrate to hive partition.

FLIP-27 [1] introduces "partition" concept officially. The changes of
FLIP-27 are not only about source interface but also about the whole
infrastructure.
Have you ever thought how to integrate your proposal with these changes? Or
you just want to support "partition" in table layer, there will be no
requirement of underlying infrastructure?

I have seen a discussion [2] that seems be a requirement of infrastructure
to support your proposal. So I have some concerns there might be some
conflicts between this proposal and FLIP-27.

1.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
2.
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-notifyOnMaster-for-notifyCheckpointComplete-td32769.html

Thanks,
Biao /'bɪ.aʊ/



On Fri, 6 Sep 2019 at 13:22, JingsongLee 
wrote:

> Hi everyone, thank you for your comments. Mail name was updated
> and streaming-related concepts were added.
>
> We would like to start a discussion thread on "FLIP-63: Rework table
> partition support"(Design doc: [1]), where we describe how to partition
> support in flink and how to integrate to hive partition.
>
> This FLIP addresses:
>- Introduce whole story about partition support.
>- Introduce and discuss DDL of partition support.
>- Introduce static and dynamic partition insert.
>- Introduce partition pruning
>- Introduce dynamic partition implementation
>- Introduce FileFormatSink to deal with streaming exactly-once and
>  partition-related logic.
>
> Details can be seen in the design document.
> Looking forward to your feedbacks. Thank you.
>
> [1]
> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
>
> Best,
> Jingsong Lee